diff options
Diffstat (limited to 'plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Connector.java')
-rw-r--r-- | plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Connector.java | 159 |
1 files changed, 78 insertions, 81 deletions
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Connector.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Connector.java index fe55606a11..d88d6cc6b5 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Connector.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Connector.java @@ -27,8 +27,8 @@ import org.eclipse.net4j.internal.util.event.Event; import org.eclipse.net4j.internal.util.factory.FactoryKey; import org.eclipse.net4j.internal.util.lifecycle.Lifecycle; import org.eclipse.net4j.internal.util.om.trace.ContextTracer; -import org.eclipse.net4j.util.ReflectUtil; import org.eclipse.net4j.util.StringUtil; +import org.eclipse.net4j.util.concurrent.RWLock; import org.eclipse.net4j.util.container.IContainer; import org.eclipse.net4j.util.container.IContainerDelta; import org.eclipse.net4j.util.container.IContainerEvent; @@ -45,6 +45,7 @@ import org.eclipse.internal.net4j.bundle.OM; import java.util.ArrayList; import java.util.List; import java.util.Queue; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -74,6 +75,8 @@ public abstract class Connector extends Lifecycle implements IConnector private List<Channel> channels = new ArrayList(0); + private RWLock channelsLock = new RWLock(2500); + private ConnectorState connectorState = ConnectorState.DISCONNECTED; /** @@ -294,19 +297,20 @@ public abstract class Connector extends Lifecycle implements IConnector public IChannel[] getChannels() { - List<IChannel> result = new ArrayList(channels.size()); - - TRACE_SYNCHRONIZE(); - synchronized (channels) + final List<IChannel> result = new ArrayList(0); + channelsLock.read(new Runnable() { - for (Channel channel : channels) + public void run() { - if (channel != null) + for (Channel channel : channels) { - result.add(channel); + if (channel != null) + { + result.add(channel); + } } } - } + }); return result.toArray(new IChannel[result.size()]); } @@ -395,101 +399,99 @@ public abstract class Connector extends Lifecycle implements IConnector return channel; } - public Channel getChannel(short channelIndex) + public Channel getChannel(final short channelIndex) { - try + return channelsLock.read(new Callable<Channel>() { - TRACE_SYNCHRONIZE(); - synchronized (channels) + public Channel call() throws Exception { return channels.get(channelIndex); } - } - catch (IndexOutOfBoundsException ex) - { - if (TRACER.isEnabled()) - { - TRACER.trace("Invalid channelIndex " + channelIndex); //$NON-NLS-1$ - } - - return null; - } + }); } protected List<Queue<IBuffer>> getChannelBufferQueues() { final List<Queue<IBuffer>> result = new ArrayList(channels.size()); - TRACE_SYNCHRONIZE(); - synchronized (channels) + channelsLock.read(new Runnable() { - for (final Channel channel : channels) + public void run() { - if (channel != null && channel.isActive()) + for (final Channel channel : channels) { - Queue<IBuffer> bufferQueue = channel.getSendQueue(); - result.add(bufferQueue); + if (channel != null && channel.isActive()) + { + Queue<IBuffer> bufferQueue = channel.getSendQueue(); + result.add(bufferQueue); + } } } - } + }); return result; } - protected short findFreeChannelIndex() + private short findFreeChannelIndex() { - TRACE_SYNCHRONIZE(); - synchronized (channels) + return channelsLock.read(new Callable<Short>() { - int size = channels.size(); - for (short i = 0; i < size; i++) + public Short call() throws Exception { - if (channels.get(i) == null) + int size = channels.size(); + for (short i = 0; i < size; i++) { - return i; + if (channels.get(i) == null) + { + return i; + } } - } - return (short)size; - } + return (short)size; + } + }); } - protected void addChannel(Channel channel) + protected void addChannel(final Channel channel) { - TRACE_SYNCHRONIZE(); - synchronized (channels) + channelsLock.write(new Runnable() { - short channelIndex = channel.getChannelIndex(); - while (channelIndex >= channels.size()) + public void run() { - channels.add(null); - } + short channelIndex = channel.getChannelIndex(); + while (channelIndex >= channels.size()) + { + channels.add(null); + } - channels.set(channelIndex, channel); - } + channels.set(channelIndex, channel); + } + }); } - protected boolean removeChannel(Channel channel, boolean actively) + protected boolean removeChannel(final Channel channel, boolean actively) { - TRACE_SYNCHRONIZE(); - synchronized (channels) + return channelsLock.write(new Callable<Boolean>() { - int channelIndex = channel.getChannelIndex(); - if (channelIndex < channels.size() && channels.get(channelIndex) == channel) + public Boolean call() throws Exception { - channel.removeListener(channelListener); - if (TRACER.isEnabled()) + int channelIndex = channel.getChannelIndex(); + if (channelIndex < channels.size() && channels.get(channelIndex) == channel) { - TRACER.trace("Removing channel " + channelIndex); //$NON-NLS-1$ + channel.removeListener(channelListener); + if (TRACER.isEnabled()) + { + TRACER.trace("Removing channel " + channelIndex); //$NON-NLS-1$ + } + + channels.set(channelIndex, null); + return true; + } + else + { + return false; } - - channels.set(channelIndex, null); - return true; - } - else - { - return false; } - } + }); } protected IProtocol createProtocol(String type) @@ -560,31 +562,26 @@ public abstract class Connector extends Lifecycle implements IConnector protected void doDeactivate() throws Exception { setState(ConnectorState.DISCONNECTED); - TRACE_SYNCHRONIZE(); - synchronized (channels) + channelsLock.write(new Runnable() { - for (short i = 0; i < channels.size(); i++) + public void run() { - Channel channel = channels.get(i); - if (channel != null) + for (short i = 0; i < channels.size(); i++) { - LifecycleUtil.deactivate(channel); + Channel channel = channels.get(i); + if (channel != null) + { + LifecycleUtil.deactivate(channel); + } } - } - channels.clear(); - } + channels.clear(); + } + }); super.doDeactivate(); } - private static void TRACE_SYNCHRONIZE() - { - Thread currentThread = Thread.currentThread(); - System.out.println("SYNCHRONIZE " + currentThread); // XXX - ReflectUtil.printStackTrace(System.out, currentThread.getStackTrace()); - } - protected abstract void registerChannelWithPeer(short channelIndex, IProtocol protocol) throws ConnectorException; /** |