summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEike Stepper2007-07-25 13:45:34 (EDT)
committerEike Stepper2007-07-25 13:45:34 (EDT)
commitb9a735c1df0480e2f5592ae87856abfe7dc0623b (patch)
tree7f888bbebc32a993467a134cbeafb75d253ab302
parentd9cb5e1721be7a2e75b565494669f1705c743664 (diff)
downloadcdo-b9a735c1df0480e2f5592ae87856abfe7dc0623b.zip
cdo-b9a735c1df0480e2f5592ae87856abfe7dc0623b.tar.gz
cdo-b9a735c1df0480e2f5592ae87856abfe7dc0623b.tar.bz2
*** empty log message ***
-rw-r--r--plugins/org.eclipse.net4j.jms.server/src/org/eclipse/net4j/jms/internal/server/Server.java6
-rw-r--r--plugins/org.eclipse.net4j.jms/src/org/eclipse/net4j/internal/jms/SessionImpl.java6
-rw-r--r--plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPAcceptor.java29
-rw-r--r--plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPSelector.java43
-rw-r--r--plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/tcp/ITCPSelector.java2
-rw-r--r--plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/internal/util/lifecycle/QueueWorker.java1
-rw-r--r--plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/internal/util/lifecycle/Worker.java63
-rw-r--r--plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/RWLock.java3
-rw-r--r--plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/lifecycle/LifecycleUtil.java47
9 files changed, 166 insertions, 34 deletions
diff --git a/plugins/org.eclipse.net4j.jms.server/src/org/eclipse/net4j/jms/internal/server/Server.java b/plugins/org.eclipse.net4j.jms.server/src/org/eclipse/net4j/jms/internal/server/Server.java
index 950fb89..16bfdff 100644
--- a/plugins/org.eclipse.net4j.jms.server/src/org/eclipse/net4j/jms/internal/server/Server.java
+++ b/plugins/org.eclipse.net4j.jms.server/src/org/eclipse/net4j/jms/internal/server/Server.java
@@ -172,6 +172,12 @@ public class Server extends QueueWorker<MessageImpl> implements IServer
}
@Override
+ protected String getThreadName()
+ {
+ return "jms-server";
+ }
+
+ @Override
protected void work(WorkContext context, MessageImpl message)
{
ServerDestination destination = getServerDestination(message.getJMSDestination());
diff --git a/plugins/org.eclipse.net4j.jms/src/org/eclipse/net4j/internal/jms/SessionImpl.java b/plugins/org.eclipse.net4j.jms/src/org/eclipse/net4j/internal/jms/SessionImpl.java
index 6e2a570..4fb5928 100644
--- a/plugins/org.eclipse.net4j.jms/src/org/eclipse/net4j/internal/jms/SessionImpl.java
+++ b/plugins/org.eclipse.net4j.jms/src/org/eclipse/net4j/internal/jms/SessionImpl.java
@@ -411,6 +411,12 @@ public class SessionImpl extends QueueWorker<MessageConsumerImpl> implements Ses
}
@Override
+ protected String getThreadName()
+ {
+ return "jms-session";
+ }
+
+ @Override
protected void work(WorkContext context, MessageConsumerImpl consumer)
{
consumer.dispatchMessage();
diff --git a/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPAcceptor.java b/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPAcceptor.java
index 4394ba4..fe84616 100644
--- a/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPAcceptor.java
+++ b/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPAcceptor.java
@@ -11,14 +11,17 @@
package org.eclipse.net4j.internal.tcp;
import org.eclipse.net4j.internal.tcp.bundle.OM;
+import org.eclipse.net4j.internal.util.lifecycle.Worker;
import org.eclipse.net4j.internal.util.om.trace.ContextTracer;
import org.eclipse.net4j.tcp.ITCPAcceptor;
import org.eclipse.net4j.tcp.ITCPSelector;
import org.eclipse.net4j.tcp.ITCPSelectorListener;
import org.eclipse.net4j.util.StringUtil;
+import org.eclipse.net4j.util.io.IOUtil;
import org.eclipse.internal.net4j.Acceptor;
+import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
@@ -28,19 +31,26 @@ import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.text.MessageFormat;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
/**
* @author Eike Stepper
*/
public class TCPAcceptor extends Acceptor implements ITCPAcceptor, ITCPSelectorListener.Passive
{
+ public static final boolean DEFAULT_START_SYNCHRONOUSLY = true;
+
+ public static final long DEFAULT_SYNCHRONOUS_START_TIMEOUT = 2 * Worker.DEFAULT_TIMEOUT;
+
private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG, TCPAcceptor.class);
private TCPSelector selector;
private SelectionKey selectionKey;
- private boolean startSynchronously = true;
+ private boolean startSynchronously = DEFAULT_START_SYNCHRONOUSLY;
+
+ private long synchronousStartTimeout = DEFAULT_SYNCHRONOUS_START_TIMEOUT;
private CountDownLatch startLatch;
@@ -99,6 +109,16 @@ public class TCPAcceptor extends Acceptor implements ITCPAcceptor, ITCPSelectorL
return selectionKey;
}
+ public long getSynchronousStartTimeout()
+ {
+ return synchronousStartTimeout;
+ }
+
+ public void setSynchronousStartTimeout(long synchronousStartTimeout)
+ {
+ this.synchronousStartTimeout = synchronousStartTimeout;
+ }
+
public void registered(SelectionKey selectionKey)
{
this.selectionKey = selectionKey;
@@ -206,10 +226,15 @@ public class TCPAcceptor extends Acceptor implements ITCPAcceptor, ITCPSelectorL
startLatch = new CountDownLatch(1);
}
+ // LifecycleUtil.waitForActive(selector, 2000L);
selector.registerAsync(serverSocketChannel, this);
if (startSynchronously)
{
- startLatch.await();
+ if (!startLatch.await(synchronousStartTimeout, TimeUnit.MILLISECONDS))
+ {
+ IOUtil.closeSilent(serverSocketChannel);
+ throw new IOException("Registration with selector timed out after " + synchronousStartTimeout + " millis");
+ }
}
}
diff --git a/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPSelector.java b/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPSelector.java
index cc5024b..67a1074 100644
--- a/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPSelector.java
+++ b/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPSelector.java
@@ -40,25 +40,16 @@ public class TCPSelector extends Lifecycle implements ITCPSelector, Runnable
private Selector selector;
- private Queue<Runnable> pendingOperations = new ConcurrentLinkedQueue();
+ private transient Queue<Runnable> pendingOperations = new ConcurrentLinkedQueue();
- private Thread thread;
+ private transient Thread thread;
+
+ private transient boolean running;
public TCPSelector()
{
}
- public void invokeAsync(final Runnable operation)
- {
- if (TRACER.isEnabled())
- {
- TRACER.trace("Pending operation " + operation);
- }
-
- pendingOperations.add(operation);
- selector.wakeup();
- }
-
public void registerAsync(final ServerSocketChannel channel, final Passive listener)
{
assertValidListener(listener);
@@ -149,14 +140,8 @@ public class TCPSelector extends Lifecycle implements ITCPSelector, Runnable
public void run()
{
- while (isActive())
+ while (running && !Thread.interrupted())
{
- if (Thread.interrupted())
- {
- deactivate();
- break;
- }
-
try
{
Runnable operation;
@@ -205,10 +190,11 @@ public class TCPSelector extends Lifecycle implements ITCPSelector, Runnable
catch (Exception ex)
{
OM.LOG.error(ex);
- deactivate();
break;
}
}
+
+ deactivate();
}
@Override
@@ -275,6 +261,8 @@ public class TCPSelector extends Lifecycle implements ITCPSelector, Runnable
@Override
protected void doActivate() throws Exception
{
+ super.doActivate();
+ running = true;
selector = Selector.open();
thread = new Thread(this, "selector"); //$NON-NLS-1$
thread.setDaemon(true);
@@ -284,6 +272,7 @@ public class TCPSelector extends Lifecycle implements ITCPSelector, Runnable
@Override
protected void doDeactivate() throws Exception
{
+ running = false;
selector.wakeup();
Exception exception = null;
@@ -319,6 +308,7 @@ public class TCPSelector extends Lifecycle implements ITCPSelector, Runnable
selector = null;
}
+ super.doDeactivate();
if (exception != null)
{
throw exception;
@@ -333,6 +323,17 @@ public class TCPSelector extends Lifecycle implements ITCPSelector, Runnable
}
}
+ private void invokeAsync(final Runnable operation)
+ {
+ if (TRACER.isEnabled())
+ {
+ TRACER.trace("Pending operation " + operation);
+ }
+
+ pendingOperations.add(operation);
+ selector.wakeup();
+ }
+
private void doRegister(final ServerSocketChannel channel, final ITCPSelectorListener.Passive listener)
{
if (TRACER.isEnabled())
diff --git a/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/tcp/ITCPSelector.java b/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/tcp/ITCPSelector.java
index 62eb19d..fef9be4 100644
--- a/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/tcp/ITCPSelector.java
+++ b/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/tcp/ITCPSelector.java
@@ -22,8 +22,6 @@ import java.nio.channels.SocketChannel;
*/
public interface ITCPSelector
{
- public void invokeAsync(Runnable operation);
-
public void registerAsync(ServerSocketChannel channel, Passive listener);
public void registerAsync(SocketChannel channel, Active listener);
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/internal/util/lifecycle/QueueWorker.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/internal/util/lifecycle/QueueWorker.java
index b788d46..1652318 100644
--- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/internal/util/lifecycle/QueueWorker.java
+++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/internal/util/lifecycle/QueueWorker.java
@@ -36,7 +36,6 @@ public abstract class QueueWorker<E> extends Worker
public void setPollMillis(long pollMillis)
{
this.pollMillis = pollMillis;
- setJoinMillis(2 * pollMillis);
}
public boolean addWork(E element)
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/internal/util/lifecycle/Worker.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/internal/util/lifecycle/Worker.java
index 6b5e780..a9503d3 100644
--- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/internal/util/lifecycle/Worker.java
+++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/internal/util/lifecycle/Worker.java
@@ -12,14 +12,24 @@ package org.eclipse.net4j.internal.util.lifecycle;
import org.eclipse.net4j.internal.util.bundle.OM;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
/**
* @author Eike Stepper
*/
public abstract class Worker extends Lifecycle
{
+ public static final int DEFAULT_TIMEOUT = 2000;
+
private boolean daemon;
- private long joinMillis;
+ private long activationTimeout = DEFAULT_TIMEOUT;
+
+ private long deactivationTimeout = DEFAULT_TIMEOUT;
+
+ private transient CountDownLatch activationLatch;
private transient WorkerThread workerThread;
@@ -37,22 +47,47 @@ public abstract class Worker extends Lifecycle
this.daemon = daemon;
}
- public long getJoinMillis()
+ public long getActivationTimeout()
+ {
+ return activationTimeout;
+ }
+
+ public void setActivationTimeout(long activationTimeout)
{
- return joinMillis;
+ this.activationTimeout = activationTimeout;
}
- public void setJoinMillis(long joinMillis)
+ public long getDeactivationTimeout()
{
- this.joinMillis = joinMillis;
+ return deactivationTimeout;
+ }
+
+ public void setDeactivationTimeout(long deactivationTimeout)
+ {
+ this.deactivationTimeout = deactivationTimeout;
}
@Override
protected void doActivate() throws Exception
{
super.doActivate();
- workerThread = new WorkerThread();
+ activationLatch = new CountDownLatch(1);
+ String threadName = getThreadName();
+ workerThread = threadName == null ? new WorkerThread() : new WorkerThread(threadName);
workerThread.start();
+ if (!activationLatch.await(activationTimeout, TimeUnit.MILLISECONDS))
+ {
+ try
+ {
+ workerThread.stopRunning();
+ workerThread.interrupt();
+ }
+ catch (RuntimeException ignore)
+ {
+ }
+
+ throw new TimeoutException("Worker thread activation timed out after " + activationTimeout + " millis");
+ }
}
@Override
@@ -62,15 +97,20 @@ public abstract class Worker extends Lifecycle
{
workerThread.stopRunning();
workerThread.interrupt();
- workerThread.join(joinMillis);
+ workerThread.join(deactivationTimeout);
}
- catch (Exception ignore)
+ catch (RuntimeException ignore)
{
}
super.doDeactivate();
}
+ protected String getThreadName()
+ {
+ return null;
+ }
+
protected abstract void work(WorkContext context) throws Exception;
/**
@@ -85,6 +125,12 @@ public abstract class Worker extends Lifecycle
setDaemon(daemon);
}
+ public WorkerThread(String threadName)
+ {
+ super(threadName);
+ setDaemon(daemon);
+ }
+
public void stopRunning()
{
running = false;
@@ -94,6 +140,7 @@ public abstract class Worker extends Lifecycle
public void run()
{
WorkContext context = new WorkContext();
+ activationLatch.countDown();
while (running && !isInterrupted())
{
try
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/RWLock.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/RWLock.java
index b4a548f..3f01f52 100644
--- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/RWLock.java
+++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/RWLock.java
@@ -106,6 +106,9 @@ public class RWLock extends ReentrantReadWriteLock
throw new TimeoutException("Acquisition of lock timed out after " + timeoutMillis + " millis");
}
}
+ catch (InterruptedException ignore)
+ {
+ }
catch (Exception ex)
{
throw WrappedException.wrap(ex);
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/lifecycle/LifecycleUtil.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/lifecycle/LifecycleUtil.java
index ed0d540..6725ec7 100644
--- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/lifecycle/LifecycleUtil.java
+++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/lifecycle/LifecycleUtil.java
@@ -12,7 +12,9 @@ package org.eclipse.net4j.util.lifecycle;
import org.eclipse.net4j.internal.util.bundle.OM;
import org.eclipse.net4j.internal.util.lifecycle.Lifecycle;
+import org.eclipse.net4j.internal.util.lifecycle.LifecycleEventAdapter;
import org.eclipse.net4j.internal.util.om.trace.ContextTracer;
+import org.eclipse.net4j.util.WrappedException;
import java.lang.annotation.Annotation;
import java.lang.annotation.ElementType;
@@ -20,6 +22,8 @@ import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.lang.reflect.Method;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
/**
* @author Eike Stepper
@@ -94,6 +98,49 @@ public final class LifecycleUtil
}
}
+ public static boolean waitForActive(Object object, long millis)
+ {
+ try
+ {
+ if (object instanceof ILifecycle)
+ {
+ Lifecycle lifecycle = (Lifecycle)object;
+ if (lifecycle.isActive())
+ {
+ return true;
+ }
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ LifecycleEventAdapter adapter = new LifecycleEventAdapter()
+ {
+ @Override
+ protected void onActivated(ILifecycle lifecycle)
+ {
+ latch.countDown();
+ }
+ };
+
+ try
+ {
+ lifecycle.addListener(adapter);
+ latch.await(millis, TimeUnit.MILLISECONDS);
+ }
+ finally
+ {
+ lifecycle.removeListener(adapter);
+ }
+
+ return lifecycle.isActive();
+ }
+
+ return true;
+ }
+ catch (Exception ex)
+ {
+ throw WrappedException.wrap(ex);
+ }
+ }
+
public static Exception deactivate(Object object)
{
return deactivate(object, false);