diff options
4 files changed, 235 insertions, 81 deletions
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/WrappedException.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/WrappedException.java new file mode 100644 index 0000000000..221959a718 --- /dev/null +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/WrappedException.java @@ -0,0 +1,36 @@ +package org.eclipse.net4j.util; + +public class WrappedException extends RuntimeException +{ + private static final long serialVersionUID = 1L; + + private WrappedException(Exception exception) + { + super(exception); + } + + public Exception exception() + { + return (Exception)getCause(); + } + + public static RuntimeException wrap(Exception exception) + { + if (exception instanceof RuntimeException) + { + return (RuntimeException)exception; + } + + return new WrappedException(exception); + } + + public static Exception unwrap(Exception exception) + { + if (exception instanceof WrappedException) + { + return ((WrappedException)exception).exception(); + } + + return exception; + } +} diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/RWLock.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/RWLock.java new file mode 100644 index 0000000000..b4a548f96d --- /dev/null +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/RWLock.java @@ -0,0 +1,114 @@ +/*************************************************************************** + * Copyright (c) 2004 - 2007 Eike Stepper, Germany. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + **************************************************************************/ +package org.eclipse.net4j.util.concurrent; + +import org.eclipse.net4j.util.WrappedException; + +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * @author Eike Stepper + */ +public class RWLock extends ReentrantReadWriteLock +{ + private static final long serialVersionUID = 1L; + + private long timeoutMillis; + + public RWLock(long timeoutMillis) + { + this.timeoutMillis = timeoutMillis; + } + + public RWLock(long timeoutMillis, boolean fair) + { + super(fair); + this.timeoutMillis = timeoutMillis; + } + + public <V> V read(Callable<V> callable) + { + return call(callable, readLock(), timeoutMillis); + } + + public void read(Runnable runnable) + { + run(runnable, readLock(), timeoutMillis); + } + + public <V> V write(Callable<V> callable) + { + return call(callable, writeLock(), timeoutMillis); + } + + public void write(Runnable runnable) + { + run(runnable, writeLock(), timeoutMillis); + } + + public static <V> V call(Callable<V> callable, Lock lock, long timeoutMillis) + { + try + { + boolean locked = lock.tryLock(timeoutMillis, TimeUnit.MILLISECONDS); + if (locked) + { + try + { + return callable.call(); + } + finally + { + lock.unlock(); + } + } + else + { + throw new TimeoutException("Acquisition of lock timed out after " + timeoutMillis + " millis"); + } + } + catch (Exception ex) + { + throw WrappedException.wrap(ex); + } + } + + public static void run(Runnable runnable, Lock lock, long timeoutMillis) + { + try + { + boolean locked = lock.tryLock(timeoutMillis, TimeUnit.MILLISECONDS); + if (locked) + { + try + { + runnable.run(); + } + finally + { + lock.unlock(); + } + } + else + { + throw new TimeoutException("Acquisition of lock timed out after " + timeoutMillis + " millis"); + } + } + catch (Exception ex) + { + throw WrappedException.wrap(ex); + } + } +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Connector.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Connector.java index fe55606a11..d88d6cc6b5 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Connector.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Connector.java @@ -27,8 +27,8 @@ import org.eclipse.net4j.internal.util.event.Event; import org.eclipse.net4j.internal.util.factory.FactoryKey; import org.eclipse.net4j.internal.util.lifecycle.Lifecycle; import org.eclipse.net4j.internal.util.om.trace.ContextTracer; -import org.eclipse.net4j.util.ReflectUtil; import org.eclipse.net4j.util.StringUtil; +import org.eclipse.net4j.util.concurrent.RWLock; import org.eclipse.net4j.util.container.IContainer; import org.eclipse.net4j.util.container.IContainerDelta; import org.eclipse.net4j.util.container.IContainerEvent; @@ -45,6 +45,7 @@ import org.eclipse.internal.net4j.bundle.OM; import java.util.ArrayList; import java.util.List; import java.util.Queue; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -74,6 +75,8 @@ public abstract class Connector extends Lifecycle implements IConnector private List<Channel> channels = new ArrayList(0); + private RWLock channelsLock = new RWLock(2500); + private ConnectorState connectorState = ConnectorState.DISCONNECTED; /** @@ -294,19 +297,20 @@ public abstract class Connector extends Lifecycle implements IConnector public IChannel[] getChannels() { - List<IChannel> result = new ArrayList(channels.size()); - - TRACE_SYNCHRONIZE(); - synchronized (channels) + final List<IChannel> result = new ArrayList(0); + channelsLock.read(new Runnable() { - for (Channel channel : channels) + public void run() { - if (channel != null) + for (Channel channel : channels) { - result.add(channel); + if (channel != null) + { + result.add(channel); + } } } - } + }); return result.toArray(new IChannel[result.size()]); } @@ -395,101 +399,99 @@ public abstract class Connector extends Lifecycle implements IConnector return channel; } - public Channel getChannel(short channelIndex) + public Channel getChannel(final short channelIndex) { - try + return channelsLock.read(new Callable<Channel>() { - TRACE_SYNCHRONIZE(); - synchronized (channels) + public Channel call() throws Exception { return channels.get(channelIndex); } - } - catch (IndexOutOfBoundsException ex) - { - if (TRACER.isEnabled()) - { - TRACER.trace("Invalid channelIndex " + channelIndex); //$NON-NLS-1$ - } - - return null; - } + }); } protected List<Queue<IBuffer>> getChannelBufferQueues() { final List<Queue<IBuffer>> result = new ArrayList(channels.size()); - TRACE_SYNCHRONIZE(); - synchronized (channels) + channelsLock.read(new Runnable() { - for (final Channel channel : channels) + public void run() { - if (channel != null && channel.isActive()) + for (final Channel channel : channels) { - Queue<IBuffer> bufferQueue = channel.getSendQueue(); - result.add(bufferQueue); + if (channel != null && channel.isActive()) + { + Queue<IBuffer> bufferQueue = channel.getSendQueue(); + result.add(bufferQueue); + } } } - } + }); return result; } - protected short findFreeChannelIndex() + private short findFreeChannelIndex() { - TRACE_SYNCHRONIZE(); - synchronized (channels) + return channelsLock.read(new Callable<Short>() { - int size = channels.size(); - for (short i = 0; i < size; i++) + public Short call() throws Exception { - if (channels.get(i) == null) + int size = channels.size(); + for (short i = 0; i < size; i++) { - return i; + if (channels.get(i) == null) + { + return i; + } } - } - return (short)size; - } + return (short)size; + } + }); } - protected void addChannel(Channel channel) + protected void addChannel(final Channel channel) { - TRACE_SYNCHRONIZE(); - synchronized (channels) + channelsLock.write(new Runnable() { - short channelIndex = channel.getChannelIndex(); - while (channelIndex >= channels.size()) + public void run() { - channels.add(null); - } + short channelIndex = channel.getChannelIndex(); + while (channelIndex >= channels.size()) + { + channels.add(null); + } - channels.set(channelIndex, channel); - } + channels.set(channelIndex, channel); + } + }); } - protected boolean removeChannel(Channel channel, boolean actively) + protected boolean removeChannel(final Channel channel, boolean actively) { - TRACE_SYNCHRONIZE(); - synchronized (channels) + return channelsLock.write(new Callable<Boolean>() { - int channelIndex = channel.getChannelIndex(); - if (channelIndex < channels.size() && channels.get(channelIndex) == channel) + public Boolean call() throws Exception { - channel.removeListener(channelListener); - if (TRACER.isEnabled()) + int channelIndex = channel.getChannelIndex(); + if (channelIndex < channels.size() && channels.get(channelIndex) == channel) { - TRACER.trace("Removing channel " + channelIndex); //$NON-NLS-1$ + channel.removeListener(channelListener); + if (TRACER.isEnabled()) + { + TRACER.trace("Removing channel " + channelIndex); //$NON-NLS-1$ + } + + channels.set(channelIndex, null); + return true; + } + else + { + return false; } - - channels.set(channelIndex, null); - return true; - } - else - { - return false; } - } + }); } protected IProtocol createProtocol(String type) @@ -560,31 +562,26 @@ public abstract class Connector extends Lifecycle implements IConnector protected void doDeactivate() throws Exception { setState(ConnectorState.DISCONNECTED); - TRACE_SYNCHRONIZE(); - synchronized (channels) + channelsLock.write(new Runnable() { - for (short i = 0; i < channels.size(); i++) + public void run() { - Channel channel = channels.get(i); - if (channel != null) + for (short i = 0; i < channels.size(); i++) { - LifecycleUtil.deactivate(channel); + Channel channel = channels.get(i); + if (channel != null) + { + LifecycleUtil.deactivate(channel); + } } - } - channels.clear(); - } + channels.clear(); + } + }); super.doDeactivate(); } - private static void TRACE_SYNCHRONIZE() - { - Thread currentThread = Thread.currentThread(); - System.out.println("SYNCHRONIZE " + currentThread); // XXX - ReflectUtil.printStackTrace(System.out, currentThread.getStackTrace()); - } - protected abstract void registerChannelWithPeer(short channelIndex, IProtocol protocol) throws ConnectorException; /** diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/IConnector.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/IConnector.java index d19c76e9c6..3d86eb8c57 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/IConnector.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/IConnector.java @@ -139,6 +139,13 @@ public interface IConnector extends IContainer<IChannel> public ConnectorException disconnect(); + /** + * Returns an array of currently open channels. Note that the resulting array + * does not contain <code>null</code> values. Generally the + * <code>channelIndex</code> of a channel can not be used as an index into + * this array. + * <p> + */ public IChannel[] getChannels(); /** |