summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEike Stepper2007-01-07 16:51:31 (EST)
committerEike Stepper2007-01-07 16:51:31 (EST)
commitb155eedc55cd1af50931375888effcf306ef204f (patch)
treee32e75b575a9aec57b2c65cc77e97ef2e11cf21e
parent9affaca3a22f514927ccf90c21e0db68a52946fe (diff)
downloadcdo-b155eedc55cd1af50931375888effcf306ef204f.zip
cdo-b155eedc55cd1af50931375888effcf306ef204f.tar.gz
cdo-b155eedc55cd1af50931375888effcf306ef204f.tar.bz2
Develop CDO2
-rw-r--r--plugins/org.eclipse.net4j/.options2
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/AbstractConnector.java6
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/BufferImpl.java18
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/BufferUtil.java41
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/ChannelImpl.java34
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/embedded/AbstractEmbeddedConnector.java6
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/AbstractTCPConnector.java17
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ControlChannelImpl.java7
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/IndicationWithResponse.java5
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithConfirmation.java5
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/transport/Buffer.java2
11 files changed, 117 insertions, 26 deletions
diff --git a/plugins/org.eclipse.net4j/.options b/plugins/org.eclipse.net4j/.options
index d4803f3..2aa7225 100644
--- a/plugins/org.eclipse.net4j/.options
+++ b/plugins/org.eclipse.net4j/.options
@@ -7,7 +7,7 @@ org.eclipse.net4j/debug.registry = true
org.eclipse.net4j/debug.om = true
org.eclipse.net4j/debug.buffer = true
-org.eclipse.net4j/debug.buffer.stream = false
+org.eclipse.net4j/debug.buffer.stream = true
org.eclipse.net4j/debug.channel = true
org.eclipse.net4j/debug.selector = true
org.eclipse.net4j/debug.acceptor = true
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/AbstractConnector.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/AbstractConnector.java
index acdc13c..66f7323 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/AbstractConnector.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/AbstractConnector.java
@@ -357,7 +357,7 @@ public abstract class AbstractConnector extends AbstractLifecycle implements Con
{
short channelIndex = findFreeChannelIndex();
ChannelImpl channel = createChannel(channelIndex, protocolID, protocolData);
- registerChannelWithPeer(channelIndex, protocolID);
+ registerChannelWithPeer(channelIndex, protocolID, protocolData);
try
{
@@ -616,8 +616,8 @@ public abstract class AbstractConnector extends AbstractLifecycle implements Con
super.onDeactivate();
}
- protected abstract void registerChannelWithPeer(short channelIndex, String protocolID)
- throws ConnectorException;
+ protected abstract void registerChannelWithPeer(short channelIndex, String protocolID,
+ Object protocolData) throws ConnectorException;
private static int getNextConnectorID()
{
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/BufferImpl.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/BufferImpl.java
index 7fc9a7e..bf06963 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/BufferImpl.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/BufferImpl.java
@@ -267,6 +267,18 @@ public class BufferImpl implements Buffer
return true;
}
+ public void flip()
+ {
+ if (state != State.PUTTING)
+ {
+ throw new IllegalStateException("state == " + state); //$NON-NLS-1$
+ }
+
+ byteBuffer.flip();
+ byteBuffer.position(HEADER_SIZE);
+ state = State.GETTING;
+ }
+
@Override
public String toString()
{
@@ -280,7 +292,11 @@ public class BufferImpl implements Buffer
try
{
- byteBuffer.flip();
+ if (state != State.GETTING)
+ {
+ byteBuffer.flip();
+ }
+
if (state == State.PUTTING)
{
byteBuffer.position(HEADER_SIZE);
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/BufferUtil.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/BufferUtil.java
index 0cbb842..512a437 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/BufferUtil.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/BufferUtil.java
@@ -12,6 +12,11 @@ package org.eclipse.internal.net4j.transport;
import org.eclipse.net4j.transport.BufferProvider;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
@@ -20,6 +25,10 @@ import java.nio.ByteBuffer;
*/
public final class BufferUtil
{
+ private static final byte FALSE = (byte)0;
+
+ private static final byte TRUE = (byte)1;
+
public static final short DEFAULT_BUFFER_CAPACITY = 4096;
public static final String UTF8_CHAR_SET_NAME = "UTF-8"; //$NON-NLS-1$
@@ -81,6 +90,38 @@ public final class BufferUtil
}
}
+ public static void putObject(ByteBuffer byteBuffer, Object object) throws IOException
+ {
+ if (object != null)
+ {
+ byteBuffer.put(TRUE);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream stream = new ObjectOutputStream(baos);
+ stream.writeObject(object);
+
+ byte[] array = baos.toByteArray();
+ putByteArray(byteBuffer, array);
+ }
+ else
+ {
+ byteBuffer.put(FALSE);
+ }
+ }
+
+ public static Object getObject(ByteBuffer byteBuffer) throws IOException, ClassNotFoundException
+ {
+ boolean nonNull = byteBuffer.get() == TRUE;
+ if (nonNull)
+ {
+ byte[] array = getByteArray(byteBuffer);
+ ByteArrayInputStream bais = new ByteArrayInputStream(array);
+ ObjectInputStream stream = new ObjectInputStream(bais);
+ return stream.readObject();
+ }
+
+ return null;
+ }
+
public static void putByteArray(ByteBuffer byteBuffer, byte[] array)
{
byteBuffer.putShort((short)array.length);
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/ChannelImpl.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/ChannelImpl.java
index ccd4180..369874b 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/ChannelImpl.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/ChannelImpl.java
@@ -146,7 +146,7 @@ public class ChannelImpl extends AbstractLifecycle implements Channel, BufferPro
if (TRACER.isEnabled())
{
- TRACER.trace("Handling buffer from client: " + buffer); //$NON-NLS-1$
+ TRACER.format("Handling buffer from client: {0} --> {1}", buffer, this); //$NON-NLS-1$
}
sendQueue.add(buffer);
@@ -164,16 +164,10 @@ public class ChannelImpl extends AbstractLifecycle implements Channel, BufferPro
if (TRACER.isEnabled())
{
- TRACER.trace("Handling buffer from multiplexer: " + buffer); //$NON-NLS-1$
+ TRACER.format("Handling buffer from multiplexer: {0} --> {1}", buffer, this); //$NON-NLS-1$
}
- receiveSerializer.addWork(new Runnable()
- {
- public void run()
- {
- receiveHandler.handleBuffer(buffer);
- }
- });
+ receiveSerializer.addWork(new ReceiverWork(buffer));
}
@Override
@@ -242,6 +236,10 @@ public class ChannelImpl extends AbstractLifecycle implements Channel, BufferPro
*/
private final class ChannelIDImpl implements ChannelID
{
+ public ChannelIDImpl()
+ {
+ }
+
public Connector getConnector()
{
return connector;
@@ -277,4 +275,22 @@ public class ChannelImpl extends AbstractLifecycle implements Channel, BufferPro
return "ChannelID[" + connector + ", channelIndex=" + channelIndex + "]"; //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
}
}
+
+ /**
+ * @author Eike Stepper
+ */
+ private final class ReceiverWork implements Runnable
+ {
+ private final Buffer buffer;
+
+ private ReceiverWork(Buffer buffer)
+ {
+ this.buffer = buffer;
+ }
+
+ public void run()
+ {
+ receiveHandler.handleBuffer(buffer);
+ }
+ }
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/embedded/AbstractEmbeddedConnector.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/embedded/AbstractEmbeddedConnector.java
index 351e150..88392f6 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/embedded/AbstractEmbeddedConnector.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/embedded/AbstractEmbeddedConnector.java
@@ -13,7 +13,6 @@ package org.eclipse.internal.net4j.transport.embedded;
import org.eclipse.net4j.transport.Buffer;
import org.eclipse.net4j.transport.Channel;
import org.eclipse.net4j.transport.ConnectorException;
-import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
import org.eclipse.internal.net4j.transport.AbstractConnector;
import org.eclipse.internal.net4j.transport.ChannelImpl;
@@ -44,12 +43,12 @@ public abstract class AbstractEmbeddedConnector extends AbstractConnector
}
@Override
- protected void registerChannelWithPeer(short channelIndex, String protocolID)
+ protected void registerChannelWithPeer(short channelIndex, String protocolID, Object protocolData)
throws ConnectorException
{
try
{
- ChannelImpl channel = getPeer().createChannel(channelIndex, protocolID, null);
+ ChannelImpl channel = getPeer().createChannel(channelIndex, protocolID, protocolData);
if (channel == null)
{
throw new ConnectorException("Failed to register channel with peer"); //$NON-NLS-1$
@@ -78,6 +77,7 @@ public abstract class AbstractEmbeddedConnector extends AbstractConnector
Queue<Buffer> localQueue = ((ChannelImpl)localChannel).getSendQueue();
Buffer buffer = localQueue.poll();
+ buffer.flip();
peerChannel.handleBufferFromMultiplexer(buffer);
}
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/AbstractTCPConnector.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/AbstractTCPConnector.java
index 169bfd1..08ca2f5 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/AbstractTCPConnector.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/AbstractTCPConnector.java
@@ -240,12 +240,23 @@ public abstract class AbstractTCPConnector extends AbstractConnector implements
}
@Override
- protected void registerChannelWithPeer(short channelIndex, String protocolID)
+ protected void registerChannelWithPeer(short channelIndex, String protocolID, Object protocolData)
throws ConnectorException
{
- if (!controlChannel.registerChannel(channelIndex, protocolID))
+ try
+ {
+ if (!controlChannel.registerChannel(channelIndex, protocolID, protocolData))
+ {
+ throw new ConnectorException("Failed to register channel with peer"); //$NON-NLS-1$
+ }
+ }
+ catch (ConnectorException ex)
+ {
+ throw ex;
+ }
+ catch (IOException ex)
{
- throw new ConnectorException("Failed to register channel with peer"); //$NON-NLS-1$
+ throw new ConnectorException(ex);
}
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ControlChannelImpl.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ControlChannelImpl.java
index ebca313..6fe449a 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ControlChannelImpl.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/transport/tcp/ControlChannelImpl.java
@@ -19,6 +19,7 @@ import org.eclipse.internal.net4j.bundle.Net4j;
import org.eclipse.internal.net4j.transport.BufferUtil;
import org.eclipse.internal.net4j.transport.ChannelImpl;
+import java.io.IOException;
import java.nio.ByteBuffer;
/**
@@ -52,7 +53,7 @@ public final class ControlChannelImpl extends ChannelImpl
setConnector(connector);
}
- public boolean registerChannel(short channelIndex, String protocolID)
+ public boolean registerChannel(short channelIndex, String protocolID, Object protocolData) throws IOException
{
assertValidChannelIndex(channelIndex);
Synchronizer<Boolean> registration = registrations.correlate(channelIndex);
@@ -62,6 +63,7 @@ public final class ControlChannelImpl extends ChannelImpl
byteBuffer.put(OPCODE_REGISTRATION);
byteBuffer.putShort(channelIndex);
BufferUtil.putUTF8(byteBuffer, protocolID);
+ BufferUtil.putObject(byteBuffer, protocolData);
handleBuffer(buffer);
return registration.get(REGISTRATION_TIMEOUT);
@@ -104,8 +106,9 @@ public final class ControlChannelImpl extends ChannelImpl
{
byte[] handlerFactoryUTF8 = BufferUtil.getByteArray(byteBuffer);
String protocolID = BufferUtil.fromUTF8(handlerFactoryUTF8);
+ Object protocolData = BufferUtil.getObject(byteBuffer);
ChannelImpl channel = ((AbstractTCPConnector)getConnector()).createChannel(channelIndex,
- protocolID, null);
+ protocolID, protocolData);
if (channel != null)
{
channel.activate();
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/IndicationWithResponse.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/IndicationWithResponse.java
index 90e65bf..51429f3 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/IndicationWithResponse.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/IndicationWithResponse.java
@@ -12,6 +12,7 @@ package org.eclipse.net4j.signal;
import org.eclipse.net4j.stream.BufferInputStream;
import org.eclipse.net4j.stream.BufferOutputStream;
+import org.eclipse.net4j.util.ReflectUtil;
import org.eclipse.net4j.util.om.trace.ContextTracer;
import org.eclipse.net4j.util.stream.ExtendedDataInputStream;
import org.eclipse.net4j.util.stream.ExtendedDataOutputStream;
@@ -37,13 +38,13 @@ public abstract class IndicationWithResponse extends SignalReactor
{
if (TRACER.isEnabled())
{
- TRACER.trace("================ Indicating"); //$NON-NLS-1$
+ TRACER.trace("================ Indicating " + ReflectUtil.getSimpleClassName(this)); //$NON-NLS-1$
}
indicating(new ExtendedDataInputStream(in));
if (TRACER.isEnabled())
{
- TRACER.trace("================ Responding"); //$NON-NLS-1$
+ TRACER.trace("================ Responding " + ReflectUtil.getSimpleClassName(this)); //$NON-NLS-1$
}
responding(new ExtendedDataOutputStream(out));
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithConfirmation.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithConfirmation.java
index a9bab9d..d3656bd 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithConfirmation.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithConfirmation.java
@@ -13,6 +13,7 @@ package org.eclipse.net4j.signal;
import org.eclipse.net4j.stream.BufferInputStream;
import org.eclipse.net4j.stream.BufferOutputStream;
import org.eclipse.net4j.transport.Channel;
+import org.eclipse.net4j.util.ReflectUtil;
import org.eclipse.net4j.util.om.trace.ContextTracer;
import org.eclipse.net4j.util.stream.ExtendedDataInputStream;
import org.eclipse.net4j.util.stream.ExtendedDataOutputStream;
@@ -39,14 +40,14 @@ public abstract class RequestWithConfirmation<RESULT> extends SignalActor<RESULT
{
if (TRACER.isEnabled())
{
- TRACER.trace("================ Requesting"); //$NON-NLS-1$
+ TRACER.trace("================ Requesting " + ReflectUtil.getSimpleClassName(this)); //$NON-NLS-1$
}
requesting(new ExtendedDataOutputStream(out));
out.flush();
if (TRACER.isEnabled())
{
- TRACER.trace("================ Confirming"); //$NON-NLS-1$
+ TRACER.trace("================ Confirming " + ReflectUtil.getSimpleClassName(this)); //$NON-NLS-1$
}
setResult(confirming(new ExtendedDataInputStream(in)));
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/transport/Buffer.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/transport/Buffer.java
index 4af1881..a1c81c0 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/transport/Buffer.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/transport/Buffer.java
@@ -92,6 +92,8 @@ public interface Buffer
public boolean write(SocketChannel socketChannel) throws IOException;
+ public void flip();
+
public ByteBuffer getByteBuffer();
public boolean isEOS();