summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEike Stepper2007-07-25 11:16:28 (EDT)
committerEike Stepper2007-07-25 11:16:28 (EDT)
commitd9cb5e1721be7a2e75b565494669f1705c743664 (patch)
tree731e80e0be2d7ba0ea77c7a5d84495547fb9d843
parentf3d274403d4c01895a314ee04da147f0e7d65efa (diff)
downloadcdo-d9cb5e1721be7a2e75b565494669f1705c743664.zip
cdo-d9cb5e1721be7a2e75b565494669f1705c743664.tar.gz
cdo-d9cb5e1721be7a2e75b565494669f1705c743664.tar.bz2
*** empty log message ***
-rw-r--r--plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/WrappedException.java36
-rw-r--r--plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/RWLock.java114
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Connector.java159
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/IConnector.java7
4 files changed, 235 insertions, 81 deletions
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/WrappedException.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/WrappedException.java
new file mode 100644
index 0000000..221959a
--- /dev/null
+++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/WrappedException.java
@@ -0,0 +1,36 @@
+package org.eclipse.net4j.util;
+
+public class WrappedException extends RuntimeException
+{
+ private static final long serialVersionUID = 1L;
+
+ private WrappedException(Exception exception)
+ {
+ super(exception);
+ }
+
+ public Exception exception()
+ {
+ return (Exception)getCause();
+ }
+
+ public static RuntimeException wrap(Exception exception)
+ {
+ if (exception instanceof RuntimeException)
+ {
+ return (RuntimeException)exception;
+ }
+
+ return new WrappedException(exception);
+ }
+
+ public static Exception unwrap(Exception exception)
+ {
+ if (exception instanceof WrappedException)
+ {
+ return ((WrappedException)exception).exception();
+ }
+
+ return exception;
+ }
+}
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/RWLock.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/RWLock.java
new file mode 100644
index 0000000..b4a548f
--- /dev/null
+++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/RWLock.java
@@ -0,0 +1,114 @@
+/***************************************************************************
+ * Copyright (c) 2004 - 2007 Eike Stepper, Germany.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ **************************************************************************/
+package org.eclipse.net4j.util.concurrent;
+
+import org.eclipse.net4j.util.WrappedException;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * @author Eike Stepper
+ */
+public class RWLock extends ReentrantReadWriteLock
+{
+ private static final long serialVersionUID = 1L;
+
+ private long timeoutMillis;
+
+ public RWLock(long timeoutMillis)
+ {
+ this.timeoutMillis = timeoutMillis;
+ }
+
+ public RWLock(long timeoutMillis, boolean fair)
+ {
+ super(fair);
+ this.timeoutMillis = timeoutMillis;
+ }
+
+ public <V> V read(Callable<V> callable)
+ {
+ return call(callable, readLock(), timeoutMillis);
+ }
+
+ public void read(Runnable runnable)
+ {
+ run(runnable, readLock(), timeoutMillis);
+ }
+
+ public <V> V write(Callable<V> callable)
+ {
+ return call(callable, writeLock(), timeoutMillis);
+ }
+
+ public void write(Runnable runnable)
+ {
+ run(runnable, writeLock(), timeoutMillis);
+ }
+
+ public static <V> V call(Callable<V> callable, Lock lock, long timeoutMillis)
+ {
+ try
+ {
+ boolean locked = lock.tryLock(timeoutMillis, TimeUnit.MILLISECONDS);
+ if (locked)
+ {
+ try
+ {
+ return callable.call();
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+ else
+ {
+ throw new TimeoutException("Acquisition of lock timed out after " + timeoutMillis + " millis");
+ }
+ }
+ catch (Exception ex)
+ {
+ throw WrappedException.wrap(ex);
+ }
+ }
+
+ public static void run(Runnable runnable, Lock lock, long timeoutMillis)
+ {
+ try
+ {
+ boolean locked = lock.tryLock(timeoutMillis, TimeUnit.MILLISECONDS);
+ if (locked)
+ {
+ try
+ {
+ runnable.run();
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+ else
+ {
+ throw new TimeoutException("Acquisition of lock timed out after " + timeoutMillis + " millis");
+ }
+ }
+ catch (Exception ex)
+ {
+ throw WrappedException.wrap(ex);
+ }
+ }
+}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Connector.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Connector.java
index fe55606..d88d6cc 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Connector.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Connector.java
@@ -27,8 +27,8 @@ import org.eclipse.net4j.internal.util.event.Event;
import org.eclipse.net4j.internal.util.factory.FactoryKey;
import org.eclipse.net4j.internal.util.lifecycle.Lifecycle;
import org.eclipse.net4j.internal.util.om.trace.ContextTracer;
-import org.eclipse.net4j.util.ReflectUtil;
import org.eclipse.net4j.util.StringUtil;
+import org.eclipse.net4j.util.concurrent.RWLock;
import org.eclipse.net4j.util.container.IContainer;
import org.eclipse.net4j.util.container.IContainerDelta;
import org.eclipse.net4j.util.container.IContainerEvent;
@@ -45,6 +45,7 @@ import org.eclipse.internal.net4j.bundle.OM;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -74,6 +75,8 @@ public abstract class Connector extends Lifecycle implements IConnector
private List<Channel> channels = new ArrayList(0);
+ private RWLock channelsLock = new RWLock(2500);
+
private ConnectorState connectorState = ConnectorState.DISCONNECTED;
/**
@@ -294,19 +297,20 @@ public abstract class Connector extends Lifecycle implements IConnector
public IChannel[] getChannels()
{
- List<IChannel> result = new ArrayList(channels.size());
-
- TRACE_SYNCHRONIZE();
- synchronized (channels)
+ final List<IChannel> result = new ArrayList(0);
+ channelsLock.read(new Runnable()
{
- for (Channel channel : channels)
+ public void run()
{
- if (channel != null)
+ for (Channel channel : channels)
{
- result.add(channel);
+ if (channel != null)
+ {
+ result.add(channel);
+ }
}
}
- }
+ });
return result.toArray(new IChannel[result.size()]);
}
@@ -395,101 +399,99 @@ public abstract class Connector extends Lifecycle implements IConnector
return channel;
}
- public Channel getChannel(short channelIndex)
+ public Channel getChannel(final short channelIndex)
{
- try
+ return channelsLock.read(new Callable<Channel>()
{
- TRACE_SYNCHRONIZE();
- synchronized (channels)
+ public Channel call() throws Exception
{
return channels.get(channelIndex);
}
- }
- catch (IndexOutOfBoundsException ex)
- {
- if (TRACER.isEnabled())
- {
- TRACER.trace("Invalid channelIndex " + channelIndex); //$NON-NLS-1$
- }
-
- return null;
- }
+ });
}
protected List<Queue<IBuffer>> getChannelBufferQueues()
{
final List<Queue<IBuffer>> result = new ArrayList(channels.size());
- TRACE_SYNCHRONIZE();
- synchronized (channels)
+ channelsLock.read(new Runnable()
{
- for (final Channel channel : channels)
+ public void run()
{
- if (channel != null && channel.isActive())
+ for (final Channel channel : channels)
{
- Queue<IBuffer> bufferQueue = channel.getSendQueue();
- result.add(bufferQueue);
+ if (channel != null && channel.isActive())
+ {
+ Queue<IBuffer> bufferQueue = channel.getSendQueue();
+ result.add(bufferQueue);
+ }
}
}
- }
+ });
return result;
}
- protected short findFreeChannelIndex()
+ private short findFreeChannelIndex()
{
- TRACE_SYNCHRONIZE();
- synchronized (channels)
+ return channelsLock.read(new Callable<Short>()
{
- int size = channels.size();
- for (short i = 0; i < size; i++)
+ public Short call() throws Exception
{
- if (channels.get(i) == null)
+ int size = channels.size();
+ for (short i = 0; i < size; i++)
{
- return i;
+ if (channels.get(i) == null)
+ {
+ return i;
+ }
}
- }
- return (short)size;
- }
+ return (short)size;
+ }
+ });
}
- protected void addChannel(Channel channel)
+ protected void addChannel(final Channel channel)
{
- TRACE_SYNCHRONIZE();
- synchronized (channels)
+ channelsLock.write(new Runnable()
{
- short channelIndex = channel.getChannelIndex();
- while (channelIndex >= channels.size())
+ public void run()
{
- channels.add(null);
- }
+ short channelIndex = channel.getChannelIndex();
+ while (channelIndex >= channels.size())
+ {
+ channels.add(null);
+ }
- channels.set(channelIndex, channel);
- }
+ channels.set(channelIndex, channel);
+ }
+ });
}
- protected boolean removeChannel(Channel channel, boolean actively)
+ protected boolean removeChannel(final Channel channel, boolean actively)
{
- TRACE_SYNCHRONIZE();
- synchronized (channels)
+ return channelsLock.write(new Callable<Boolean>()
{
- int channelIndex = channel.getChannelIndex();
- if (channelIndex < channels.size() && channels.get(channelIndex) == channel)
+ public Boolean call() throws Exception
{
- channel.removeListener(channelListener);
- if (TRACER.isEnabled())
+ int channelIndex = channel.getChannelIndex();
+ if (channelIndex < channels.size() && channels.get(channelIndex) == channel)
{
- TRACER.trace("Removing channel " + channelIndex); //$NON-NLS-1$
+ channel.removeListener(channelListener);
+ if (TRACER.isEnabled())
+ {
+ TRACER.trace("Removing channel " + channelIndex); //$NON-NLS-1$
+ }
+
+ channels.set(channelIndex, null);
+ return true;
+ }
+ else
+ {
+ return false;
}
-
- channels.set(channelIndex, null);
- return true;
- }
- else
- {
- return false;
}
- }
+ });
}
protected IProtocol createProtocol(String type)
@@ -560,31 +562,26 @@ public abstract class Connector extends Lifecycle implements IConnector
protected void doDeactivate() throws Exception
{
setState(ConnectorState.DISCONNECTED);
- TRACE_SYNCHRONIZE();
- synchronized (channels)
+ channelsLock.write(new Runnable()
{
- for (short i = 0; i < channels.size(); i++)
+ public void run()
{
- Channel channel = channels.get(i);
- if (channel != null)
+ for (short i = 0; i < channels.size(); i++)
{
- LifecycleUtil.deactivate(channel);
+ Channel channel = channels.get(i);
+ if (channel != null)
+ {
+ LifecycleUtil.deactivate(channel);
+ }
}
- }
- channels.clear();
- }
+ channels.clear();
+ }
+ });
super.doDeactivate();
}
- private static void TRACE_SYNCHRONIZE()
- {
- Thread currentThread = Thread.currentThread();
- System.out.println("SYNCHRONIZE " + currentThread); // XXX
- ReflectUtil.printStackTrace(System.out, currentThread.getStackTrace());
- }
-
protected abstract void registerChannelWithPeer(short channelIndex, IProtocol protocol) throws ConnectorException;
/**
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/IConnector.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/IConnector.java
index d19c76e..3d86eb8 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/IConnector.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/IConnector.java
@@ -139,6 +139,13 @@ public interface IConnector extends IContainer<IChannel>
public ConnectorException disconnect();
+ /**
+ * Returns an array of currently open channels. Note that the resulting array
+ * does not contain <code>null</code> values. Generally the
+ * <code>channelIndex</code> of a channel can not be used as an index into
+ * this array.
+ * <p>
+ */
public IChannel[] getChannels();
/**