Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Connector.java')
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Connector.java159
1 files changed, 78 insertions, 81 deletions
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Connector.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Connector.java
index fe55606a11..d88d6cc6b5 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Connector.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Connector.java
@@ -27,8 +27,8 @@ import org.eclipse.net4j.internal.util.event.Event;
import org.eclipse.net4j.internal.util.factory.FactoryKey;
import org.eclipse.net4j.internal.util.lifecycle.Lifecycle;
import org.eclipse.net4j.internal.util.om.trace.ContextTracer;
-import org.eclipse.net4j.util.ReflectUtil;
import org.eclipse.net4j.util.StringUtil;
+import org.eclipse.net4j.util.concurrent.RWLock;
import org.eclipse.net4j.util.container.IContainer;
import org.eclipse.net4j.util.container.IContainerDelta;
import org.eclipse.net4j.util.container.IContainerEvent;
@@ -45,6 +45,7 @@ import org.eclipse.internal.net4j.bundle.OM;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -74,6 +75,8 @@ public abstract class Connector extends Lifecycle implements IConnector
private List<Channel> channels = new ArrayList(0);
+ private RWLock channelsLock = new RWLock(2500);
+
private ConnectorState connectorState = ConnectorState.DISCONNECTED;
/**
@@ -294,19 +297,20 @@ public abstract class Connector extends Lifecycle implements IConnector
public IChannel[] getChannels()
{
- List<IChannel> result = new ArrayList(channels.size());
-
- TRACE_SYNCHRONIZE();
- synchronized (channels)
+ final List<IChannel> result = new ArrayList(0);
+ channelsLock.read(new Runnable()
{
- for (Channel channel : channels)
+ public void run()
{
- if (channel != null)
+ for (Channel channel : channels)
{
- result.add(channel);
+ if (channel != null)
+ {
+ result.add(channel);
+ }
}
}
- }
+ });
return result.toArray(new IChannel[result.size()]);
}
@@ -395,101 +399,99 @@ public abstract class Connector extends Lifecycle implements IConnector
return channel;
}
- public Channel getChannel(short channelIndex)
+ public Channel getChannel(final short channelIndex)
{
- try
+ return channelsLock.read(new Callable<Channel>()
{
- TRACE_SYNCHRONIZE();
- synchronized (channels)
+ public Channel call() throws Exception
{
return channels.get(channelIndex);
}
- }
- catch (IndexOutOfBoundsException ex)
- {
- if (TRACER.isEnabled())
- {
- TRACER.trace("Invalid channelIndex " + channelIndex); //$NON-NLS-1$
- }
-
- return null;
- }
+ });
}
protected List<Queue<IBuffer>> getChannelBufferQueues()
{
final List<Queue<IBuffer>> result = new ArrayList(channels.size());
- TRACE_SYNCHRONIZE();
- synchronized (channels)
+ channelsLock.read(new Runnable()
{
- for (final Channel channel : channels)
+ public void run()
{
- if (channel != null && channel.isActive())
+ for (final Channel channel : channels)
{
- Queue<IBuffer> bufferQueue = channel.getSendQueue();
- result.add(bufferQueue);
+ if (channel != null && channel.isActive())
+ {
+ Queue<IBuffer> bufferQueue = channel.getSendQueue();
+ result.add(bufferQueue);
+ }
}
}
- }
+ });
return result;
}
- protected short findFreeChannelIndex()
+ private short findFreeChannelIndex()
{
- TRACE_SYNCHRONIZE();
- synchronized (channels)
+ return channelsLock.read(new Callable<Short>()
{
- int size = channels.size();
- for (short i = 0; i < size; i++)
+ public Short call() throws Exception
{
- if (channels.get(i) == null)
+ int size = channels.size();
+ for (short i = 0; i < size; i++)
{
- return i;
+ if (channels.get(i) == null)
+ {
+ return i;
+ }
}
- }
- return (short)size;
- }
+ return (short)size;
+ }
+ });
}
- protected void addChannel(Channel channel)
+ protected void addChannel(final Channel channel)
{
- TRACE_SYNCHRONIZE();
- synchronized (channels)
+ channelsLock.write(new Runnable()
{
- short channelIndex = channel.getChannelIndex();
- while (channelIndex >= channels.size())
+ public void run()
{
- channels.add(null);
- }
+ short channelIndex = channel.getChannelIndex();
+ while (channelIndex >= channels.size())
+ {
+ channels.add(null);
+ }
- channels.set(channelIndex, channel);
- }
+ channels.set(channelIndex, channel);
+ }
+ });
}
- protected boolean removeChannel(Channel channel, boolean actively)
+ protected boolean removeChannel(final Channel channel, boolean actively)
{
- TRACE_SYNCHRONIZE();
- synchronized (channels)
+ return channelsLock.write(new Callable<Boolean>()
{
- int channelIndex = channel.getChannelIndex();
- if (channelIndex < channels.size() && channels.get(channelIndex) == channel)
+ public Boolean call() throws Exception
{
- channel.removeListener(channelListener);
- if (TRACER.isEnabled())
+ int channelIndex = channel.getChannelIndex();
+ if (channelIndex < channels.size() && channels.get(channelIndex) == channel)
{
- TRACER.trace("Removing channel " + channelIndex); //$NON-NLS-1$
+ channel.removeListener(channelListener);
+ if (TRACER.isEnabled())
+ {
+ TRACER.trace("Removing channel " + channelIndex); //$NON-NLS-1$
+ }
+
+ channels.set(channelIndex, null);
+ return true;
+ }
+ else
+ {
+ return false;
}
-
- channels.set(channelIndex, null);
- return true;
- }
- else
- {
- return false;
}
- }
+ });
}
protected IProtocol createProtocol(String type)
@@ -560,31 +562,26 @@ public abstract class Connector extends Lifecycle implements IConnector
protected void doDeactivate() throws Exception
{
setState(ConnectorState.DISCONNECTED);
- TRACE_SYNCHRONIZE();
- synchronized (channels)
+ channelsLock.write(new Runnable()
{
- for (short i = 0; i < channels.size(); i++)
+ public void run()
{
- Channel channel = channels.get(i);
- if (channel != null)
+ for (short i = 0; i < channels.size(); i++)
{
- LifecycleUtil.deactivate(channel);
+ Channel channel = channels.get(i);
+ if (channel != null)
+ {
+ LifecycleUtil.deactivate(channel);
+ }
}
- }
- channels.clear();
- }
+ channels.clear();
+ }
+ });
super.doDeactivate();
}
- private static void TRACE_SYNCHRONIZE()
- {
- Thread currentThread = Thread.currentThread();
- System.out.println("SYNCHRONIZE " + currentThread); // XXX
- ReflectUtil.printStackTrace(System.out, currentThread.getStackTrace());
- }
-
protected abstract void registerChannelWithPeer(short channelIndex, IProtocol protocol) throws ConnectorException;
/**

Back to the top