Skip to main content
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrbrooks2010-07-24 17:38:31 +0000
committerrbrooks2010-07-24 17:38:31 +0000
commit4081f04850d3eff9eb1bd7241763707bfbb43a45 (patch)
tree258620df94d4b24077ff103a6c4dea9cb78493fb /plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse
parent4ed8bddc791fda93940f238c5135556a07041dc0 (diff)
downloadorg.eclipse.osee-4081f04850d3eff9eb1bd7241763707bfbb43a45.tar.gz
org.eclipse.osee-4081f04850d3eff9eb1bd7241763707bfbb43a45.tar.xz
org.eclipse.osee-4081f04850d3eff9eb1bd7241763707bfbb43a45.zip
applied Eclipse source cleanup
Diffstat (limited to 'plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse')
-rw-r--r--plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/IOteMessageService.java9
-rw-r--r--plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/AbstractMessageListener.java63
-rw-r--r--plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/IMessageDbFactory.java3
-rw-r--r--plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/ISubscriptionListener.java102
-rw-r--r--plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/db/AbstractMessageDataBase.java9
-rw-r--r--plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/db/MessageInstance.java8
-rw-r--r--plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/Activator.java100
-rw-r--r--plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/ChannelProcessor.java188
-rw-r--r--plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/MessageReference.java75
-rw-r--r--plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/MessageSubscription.java407
-rw-r--r--plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/MessageSubscriptionService.java537
-rw-r--r--plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/OteClientServiceTracker.java87
-rw-r--r--plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/UpdateDispatcher.java287
-rw-r--r--plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/state/AbstractSubscriptionState.java70
-rw-r--r--plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/state/ActivateState.java120
-rw-r--r--plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/state/ISubscriptionState.java31
-rw-r--r--plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/state/InactiveState.java157
-rw-r--r--plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/state/UnresolvedState.java150
18 files changed, 1188 insertions, 1215 deletions
diff --git a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/IOteMessageService.java b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/IOteMessageService.java
index f5647cebc99..583f34655f4 100644
--- a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/IOteMessageService.java
+++ b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/IOteMessageService.java
@@ -19,10 +19,11 @@ import org.eclipse.osee.ote.message.tool.IFileTransferHandle;
/**
* @author Ken J. Aguilar
- *
*/
public interface IOteMessageService {
- IMessageSubscription subscribe(String name);
- IFileTransferHandle startRecording(String fileName, List<RecordCommand.MessageRecordDetails> list) throws FileNotFoundException, IOException;
- void stopRecording() throws Exception;
+ IMessageSubscription subscribe(String name);
+
+ IFileTransferHandle startRecording(String fileName, List<RecordCommand.MessageRecordDetails> list) throws FileNotFoundException, IOException;
+
+ void stopRecording() throws Exception;
}
diff --git a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/AbstractMessageListener.java b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/AbstractMessageListener.java
index aa703f135ea..bfbc1c18769 100644
--- a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/AbstractMessageListener.java
+++ b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/AbstractMessageListener.java
@@ -15,39 +15,38 @@ import org.eclipse.osee.ote.message.listener.IOSEEMessageListener;
/**
* @author Ken J. Aguilar
- *
*/
public abstract class AbstractMessageListener implements ISubscriptionListener, IOSEEMessageListener {
- private final IMessageSubscription subscription;
-
- protected AbstractMessageListener(IMessageSubscription subscription) {
- this.subscription = subscription;
- }
-
- @Override
- public void subscriptionCanceled(IMessageSubscription subscription) {
- if (subscription.isResolved()) {
- subscription.getMessage().removeListener(this);
- }
- }
-
- @Override
- public void subscriptionResolved(IMessageSubscription subscription) {
- if (subscription.isResolved()) {
- subscription.getMessage().addListener(this);
- }
- }
-
- @Override
- public void subscriptionUnresolved(IMessageSubscription subscription) {
- }
-
- @Override
- public void onInitListener() throws MessageSystemException {
- }
-
- public IMessageSubscription getSubscription() {
- return subscription;
- }
+ private final IMessageSubscription subscription;
+
+ protected AbstractMessageListener(IMessageSubscription subscription) {
+ this.subscription = subscription;
+ }
+
+ @Override
+ public void subscriptionCanceled(IMessageSubscription subscription) {
+ if (subscription.isResolved()) {
+ subscription.getMessage().removeListener(this);
+ }
+ }
+
+ @Override
+ public void subscriptionResolved(IMessageSubscription subscription) {
+ if (subscription.isResolved()) {
+ subscription.getMessage().addListener(this);
+ }
+ }
+
+ @Override
+ public void subscriptionUnresolved(IMessageSubscription subscription) {
+ }
+
+ @Override
+ public void onInitListener() throws MessageSystemException {
+ }
+
+ public IMessageSubscription getSubscription() {
+ return subscription;
+ }
}
diff --git a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/IMessageDbFactory.java b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/IMessageDbFactory.java
index 9676b4ab17f..cc1c246765b 100644
--- a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/IMessageDbFactory.java
+++ b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/IMessageDbFactory.java
@@ -15,9 +15,8 @@ import org.eclipse.osee.ote.service.IMessageDictionary;
/**
* @author Ken J. Aguilar
- *
*/
public interface IMessageDbFactory {
- AbstractMessageDataBase createMessageDataBase(IMessageDictionary dictionary);
+ AbstractMessageDataBase createMessageDataBase(IMessageDictionary dictionary);
}
diff --git a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/ISubscriptionListener.java b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/ISubscriptionListener.java
index 8ee90e34231..d8af3094813 100644
--- a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/ISubscriptionListener.java
+++ b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/ISubscriptionListener.java
@@ -14,65 +14,57 @@ import org.eclipse.osee.ote.message.Message;
/**
* @author Ken J. Aguilar
- *
*/
public interface ISubscriptionListener {
- /**
- * called when a subscription can no longer be fulfilled by the message
- * service. this occurs when runtime libraries are changed and the message
- * does not exist. Can be called when a message exist in the libraries but
- * is not supported by the test environment. all references and event
- * listeners must be removed
- *
- * @param subscription
- */
- void subscriptionInvalidated(IMessageSubscription subscription);
+ /**
+ * called when a subscription can no longer be fulfilled by the message service. this occurs when runtime libraries
+ * are changed and the message does not exist. Can be called when a message exist in the libraries but is not
+ * supported by the test environment. all references and event listeners must be removed
+ *
+ * @param subscription
+ */
+ void subscriptionInvalidated(IMessageSubscription subscription);
+
+ /**
+ * called upon a successful acquisition of a {@link Message} instance. At this point, no message traffic is being
+ * transmitted but operations on the message are allowed. Subclasses should register any event listeners on
+ * {@link Message} upon this method being invoked.
+ *
+ * @param subscription
+ */
+ void subscriptionResolved(IMessageSubscription subscription);
- /**
- * called upon a successful acquisition of a {@link Message} instance. At
- * this point, no message traffic is being transmitted but operations on the
- * message are allowed. Subclasses should register any event listeners on
- * {@link Message} upon this method being invoked.
- *
- * @param subscription
- */
- void subscriptionResolved(IMessageSubscription subscription);
+ /**
+ * called when the message library has been unloaded. The subscription will still be honored by the system and can be
+ * reactivated upon a reload of a library assuming the library has a definition for the message. Subclasses should
+ * <B>must no longer reference</B> the ({@link Message} related to the subscription. Subclasses do not need to
+ * de-register any event listeners since all listeners will be cleared. The message is still in a valid state during
+ * this method invocation but not after.
+ *
+ * @param subscription
+ */
+ void subscriptionUnresolved(IMessageSubscription subscription);
- /**
- * called when the message library has been unloaded. The subscription will
- * still be honored by the system and can be reactivated upon a reload of a
- * library assuming the library has a definition for the message. Subclasses
- * should <B>must no longer reference</B> the ({@link Message} related to
- * the subscription. Subclasses do not need to de-register any event
- * listeners since all listeners will be cleared. The message is still in a
- * valid state during this method invocation but not after.
- *
- * @param subscription
- */
- void subscriptionUnresolved(IMessageSubscription subscription);
-
- /**
- * called upon successful registration with the test server. Message traffic
- * is now possible
- *
- * @param subscription
- */
- void subscriptionActivated(IMessageSubscription subscription);
+ /**
+ * called upon successful registration with the test server. Message traffic is now possible
+ *
+ * @param subscription
+ */
+ void subscriptionActivated(IMessageSubscription subscription);
- /**
- * called when the {@link IMessageSubscription#cancel()} method is called.
- * Any references and event listeners must be removed.
- *
- * @param subscription
- */
- void subscriptionCanceled(IMessageSubscription subscription);
+ /**
+ * called when the {@link IMessageSubscription#cancel()} method is called. Any references and event listeners must be
+ * removed.
+ *
+ * @param subscription
+ */
+ void subscriptionCanceled(IMessageSubscription subscription);
- /**
- * called when the subscription is resolved but a connected environment does
- * not support this type of message. The message can still be referenced but
- * no updates will be delivered.
- *
- * @param subscription
- */
- void subscriptionNotSupported(IMessageSubscription subscription);
+ /**
+ * called when the subscription is resolved but a connected environment does not support this type of message. The
+ * message can still be referenced but no updates will be delivered.
+ *
+ * @param subscription
+ */
+ void subscriptionNotSupported(IMessageSubscription subscription);
}
diff --git a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/db/AbstractMessageDataBase.java b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/db/AbstractMessageDataBase.java
index d655d8bd992..e0e6548f315 100644
--- a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/db/AbstractMessageDataBase.java
+++ b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/db/AbstractMessageDataBase.java
@@ -15,7 +15,6 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
-
import org.eclipse.osee.framework.logging.OseeLog;
import org.eclipse.osee.ote.client.msg.core.internal.MessageReference;
import org.eclipse.osee.ote.message.Message;
@@ -34,9 +33,9 @@ import org.eclipse.osee.ote.service.IMessageDictionary;
public abstract class AbstractMessageDataBase {
private final HashMap<MessageReference, MessageInstance> referenceToMsgMap =
- new HashMap<MessageReference, MessageInstance>();
+ new HashMap<MessageReference, MessageInstance>();
private final ConcurrentHashMap<Integer, MessageInstance> idToMsgMap =
- new ConcurrentHashMap<Integer, MessageInstance>();
+ new ConcurrentHashMap<Integer, MessageInstance>();
private final IMessageDictionary dictionary;
private IMsgToolServiceClient client;
@@ -102,7 +101,7 @@ public abstract class AbstractMessageDataBase {
doInstanceDetach(instance, service);
}
MessageReference reference =
- new MessageReference(instance.getType(), instance.getMode(), instance.getMessage().getClass().getName());
+ new MessageReference(instance.getType(), instance.getMode(), instance.getMessage().getClass().getName());
referenceToMsgMap.remove(reference);
destroyMessage(instance.getMessage());
}
@@ -121,7 +120,7 @@ public abstract class AbstractMessageDataBase {
doInstanceAttach(instance, service);
} catch (Exception e) {
OseeLog.log(AbstractMessageDataBase.class, Level.SEVERE,
- "could not attach instance for " + instance.toString(), e);
+ "could not attach instance for " + instance.toString(), e);
}
}
}
diff --git a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/db/MessageInstance.java b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/db/MessageInstance.java
index 9e7ca3b00a0..77b89059d62 100644
--- a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/db/MessageInstance.java
+++ b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/db/MessageInstance.java
@@ -12,7 +12,6 @@ package org.eclipse.osee.ote.client.msg.core.db;
import java.util.HashSet;
import java.util.Set;
-
import org.eclipse.osee.ote.message.Message;
import org.eclipse.osee.ote.message.commands.SubscribeToMessage;
import org.eclipse.osee.ote.message.commands.UnSubscribeToMessage;
@@ -60,7 +59,7 @@ public class MessageInstance {
public Integer attachToService(IRemoteMessageService service, IMsgToolServiceClient client) throws Exception {
SubscriptionDetails details =
- service.subscribeToMessage(new SubscribeToMessage(msg.getClass().getName(), type, mode, client));
+ service.subscribeToMessage(new SubscribeToMessage(msg.getClass().getName(), type, mode, client));
if (details == null) {
supported = false;
return null;
@@ -75,7 +74,8 @@ public class MessageInstance {
public void detachService(IRemoteMessageService service, IMsgToolServiceClient client) throws Exception {
if (service != null) {
- service.unsubscribeToMessage(new UnSubscribeToMessage(msg.getClass().getName(), mode, type, client.getAddressByType(msg.getClass().getName(), type)));
+ service.unsubscribeToMessage(new UnSubscribeToMessage(msg.getClass().getName(), mode, type,
+ client.getAddressByType(msg.getClass().getName(), type)));
}
availableTypes.clear();
serverSubscriptionKey = null;
@@ -116,6 +116,6 @@ public class MessageInstance {
@Override
public String toString() {
return String.format("Message Instance(type=%s, mode=%s, ref=%d, supported=%b)", type.name(), mode.name(),
- refcount, supported);
+ refcount, supported);
}
}
diff --git a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/Activator.java b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/Activator.java
index 849276db01f..09417b5d788 100644
--- a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/Activator.java
+++ b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/Activator.java
@@ -23,63 +23,65 @@ import org.osgi.framework.BundleContext;
*/
public class Activator extends Plugin {
- // The plug-in ID
- public static final String PLUGIN_ID = "org.eclipse.ote.client.msg";
+ // The plug-in ID
+ public static final String PLUGIN_ID = "org.eclipse.ote.client.msg";
- // The shared instance
- private static Activator plugin;
+ // The shared instance
+ private static Activator plugin;
- private BundleContext context;
+ private BundleContext context;
- private OteClientServiceTracker tracker;
-
- /**
- * The constructor
- */
- public Activator() {
- }
+ private OteClientServiceTracker tracker;
- public void start(BundleContext context) throws Exception {
- super.start(context);
- this.context = context;
- plugin = this;
-
- ExtensionDefinedObjects<IMessageDbFactory> definedObjects = new ExtensionDefinedObjects<IMessageDbFactory>(
- "org.wclipse.see.ote.client.msg.dBFactory", "DatabaseFactory", "className");
- try {
- List<IMessageDbFactory> providers = definedObjects.getObjects();
- if (!providers.isEmpty()) {
- tracker = new OteClientServiceTracker(providers.get(0));
- tracker.open(true);
- } else {
- OseeLog.log(Activator.class, Level.WARNING, "no message db factory found. Message Subscription Service not started");
- }
- } catch (Exception ex) {
- OseeLog.log(Activator.class, Level.SEVERE, "failed to process message database factory extensions", ex);
- }
-
+ /**
+ * The constructor
+ */
+ public Activator() {
+ }
+ @Override
+ public void start(BundleContext context) throws Exception {
+ super.start(context);
+ this.context = context;
+ plugin = this;
- }
+ ExtensionDefinedObjects<IMessageDbFactory> definedObjects =
+ new ExtensionDefinedObjects<IMessageDbFactory>("org.wclipse.see.ote.client.msg.dBFactory", "DatabaseFactory",
+ "className");
+ try {
+ List<IMessageDbFactory> providers = definedObjects.getObjects();
+ if (!providers.isEmpty()) {
+ tracker = new OteClientServiceTracker(providers.get(0));
+ tracker.open(true);
+ } else {
+ OseeLog.log(Activator.class, Level.WARNING,
+ "no message db factory found. Message Subscription Service not started");
+ }
+ } catch (Exception ex) {
+ OseeLog.log(Activator.class, Level.SEVERE, "failed to process message database factory extensions", ex);
+ }
- public void stop(BundleContext context) throws Exception {
- plugin = null;
- tracker.close();
- super.stop(context);
- this.context = null;
- }
+ }
- /**
- * Returns the shared instance
- *
- * @return the shared instance
- */
- public static Activator getDefault() {
- return plugin;
- }
+ @Override
+ public void stop(BundleContext context) throws Exception {
+ plugin = null;
+ tracker.close();
+ super.stop(context);
+ this.context = null;
+ }
- BundleContext getBundleContext() {
- return context;
- }
+ /**
+ * Returns the shared instance
+ *
+ * @return the shared instance
+ */
+ public static Activator getDefault() {
+ return plugin;
+ }
+
+ BundleContext getBundleContext() {
+ return context;
+ }
}
diff --git a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/ChannelProcessor.java b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/ChannelProcessor.java
index de38d70271d..2a3d77d5b1b 100644
--- a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/ChannelProcessor.java
+++ b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/ChannelProcessor.java
@@ -17,7 +17,6 @@ import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.logging.Level;
-
import org.eclipse.osee.framework.logging.OseeLog;
import org.eclipse.osee.ote.client.msg.core.db.AbstractMessageDataBase;
import org.eclipse.osee.ote.client.msg.core.db.MessageInstance;
@@ -31,108 +30,107 @@ import org.eclipse.osee.ote.message.enums.DataType;
* @author Ken J. Aguilar
*/
final public class ChannelProcessor {
- private final ArrayBlockingQueue<Task> queue;
- private final ExecutorService threadPool;
- private final AbstractMessageDataBase msgDb;
- private final DataType memType;
+ private final ArrayBlockingQueue<Task> queue;
+ private final ExecutorService threadPool;
+ private final AbstractMessageDataBase msgDb;
+ private final DataType memType;
+
+ /**
+ * A task allows each channel to have multiple updates processed concurrently. Each task has its own buffers.
+ *
+ * @author Ken J. Aguilar
+ */
+ private final class Task implements Runnable {
+ private final ByteBuffer buffer;
- /**
- * A task allows each channel to have multiple updates processed
- * concurrently. Each task has its own buffers.
- *
- * @author Ken J. Aguilar
- */
- private final class Task implements Runnable {
- private final ByteBuffer buffer;
+ public Task(ByteBuffer buffer) {
+ this.buffer = buffer;
+ }
- public Task(ByteBuffer buffer) {
- this.buffer = buffer;
- }
+ public void prepTask(ReadableByteChannel channel) throws IOException {
+ buffer.clear();
+ // read the data from the channel into the buffer
+ channel.read(buffer);
+ buffer.flip();
+ }
- public void prepTask(ReadableByteChannel channel) throws IOException {
- buffer.clear();
- // read the data from the channel into the buffer
- channel.read(buffer);
- buffer.flip();
- }
+ @Override
+ public void run() {
+ try {
+ final int id = buffer.getInt();
+ final long time = buffer.getLong();
+ final MessageInstance instance = msgDb.findById(id);
+ if (instance != null) {
+ onUpdate(instance, buffer, time);
+ }
+ // return to the queue
+ queue.put(this);
+ } catch (InterruptedException e) {
+ // do nothing
+ } catch (Exception ex) {
+ OseeLog.log(Activator.class, Level.SEVERE, "failed to process message update", ex);
+ }
+ }
+ }
- public void run() {
- try {
- final int id = buffer.getInt();
- final long time = buffer.getLong();
- final MessageInstance instance = msgDb.findById(id);
- if (instance != null) {
- onUpdate(instance, buffer, time);
- }
- // return to the queue
- queue.put(this);
- } catch (InterruptedException e) {
- // do nothing
- } catch (Exception ex) {
- OseeLog.log(Activator.class, Level.SEVERE, "failed to process message update", ex);
- }
- }
- }
+ public ChannelProcessor(int depth, int bufferSize, ExecutorService threadPool, AbstractMessageDataBase msgDb, DataType memType) {
+ this.queue = new ArrayBlockingQueue<Task>(depth);
+ try {
+ // fill the queue with pre-allocated tasks
+ for (int i = 0; i < depth; i++) {
+ queue.put(new Task(ByteBuffer.allocateDirect(bufferSize)));
+ }
+ } catch (InterruptedException ex) {
+ throw new Error("should never happen", ex);
+ }
+ this.threadPool = threadPool;
+ this.msgDb = msgDb;
+ this.memType = memType;
+ }
- public ChannelProcessor(int depth, int bufferSize, ExecutorService threadPool, AbstractMessageDataBase msgDb, DataType memType) {
- this.queue = new ArrayBlockingQueue<Task>(depth);
- try {
- // fill the queue with pre-allocated tasks
- for (int i = 0; i < depth; i++) {
- queue.put(new Task(ByteBuffer.allocateDirect(bufferSize)));
- }
- } catch (InterruptedException ex) {
- throw new Error("should never happen", ex);
- }
- this.threadPool = threadPool;
- this.msgDb = msgDb;
- this.memType = memType;
- }
+ public final void process(final ReadableByteChannel channel) throws InterruptedException, IOException {
+ // get a free task
+ final Task task = queue.take();
- public final void process(final ReadableByteChannel channel) throws InterruptedException, IOException {
- // get a free task
- final Task task = queue.take();
+ // prep the task
+ task.prepTask(channel);
- // prep the task
- task.prepTask(channel);
+ // the task is now ready for execution, submit it to the thread pool
+ threadPool.submit(task);
+ }
- // the task is now ready for execution, submit it to the thread pool
- threadPool.submit(task);
- }
+ /**
+ * called when there is data to be processed from a channel. Can be called by one or more threads for the same data
+ * concurrently to so implementors need to be thread safe
+ *
+ * @param data
+ * @param buffer
+ * @param time
+ */
+ protected void onUpdate(MessageInstance instance, ByteBuffer buffer, long time) {
+ MessageData msgData = instance.getMessage().getActiveDataSource(memType);
+ if (msgData != null) {
+ byte[] data = msgData.getMem().getData();
+ int remaining = buffer.remaining();
+ if (data.length < remaining) {
+ OseeLog.log(Activator.class, Level.WARNING, String.format(
+ "Message [%s] changed it's backing data size from [%d] to [%d].", instance.getMessage().getName(),
+ data.length, remaining));
+ data = new byte[remaining];
+ buffer.get(data, 0, remaining);
+ msgData.setNewBackingBuffer(data);
+ return;
+ }
- /**
- * called when there is data to be processed from a channel. Can be called
- * by one or more threads for the same data concurrently to so implementors
- * need to be thread safe
- *
- * @param data
- * @param buffer
- * @param time
- */
- protected void onUpdate(MessageInstance instance, ByteBuffer buffer, long time) {
- MessageData msgData = instance.getMessage().getActiveDataSource(memType);
- if (msgData != null) {
- byte[] data = msgData.getMem().getData();
- int remaining = buffer.remaining();
- if (data.length < remaining) {
- OseeLog.log(Activator.class, Level.WARNING, String.format("Message [%s] changed it's backing data size from [%d] to [%d].", instance
- .getMessage().getName(),
- data.length, remaining));
- data = new byte[remaining];
- buffer.get(data, 0, remaining);
- msgData.setNewBackingBuffer(data);
- return;
- }
-
- if (remaining < data.length) {
- Arrays.fill(data, remaining, data.length, (byte) 0);
- // msg.getActiveDataSource().setCurrentLength(remaining);
- }
- buffer.get(data, 0, remaining);
- msgData.setCurrentLength(remaining);
- msgData.incrementActivityCount();
- msgData.notifyListeners();
- }
- }
+ if (remaining < data.length) {
+ Arrays.fill(data, remaining, data.length, (byte) 0);
+ // msg.getActiveDataSource().setCurrentLength(remaining);
+ }
+ buffer.get(data, 0, remaining);
+ msgData.setCurrentLength(remaining);
+ msgData.incrementActivityCount();
+ msgData.notifyListeners();
+ }
+ }
}
diff --git a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/MessageReference.java b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/MessageReference.java
index 2477ec1ec2d..5f78dfcdfe9 100644
--- a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/MessageReference.java
+++ b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/MessageReference.java
@@ -16,50 +16,49 @@ import org.eclipse.osee.ote.message.tool.MessageMode;
/**
* @author Ken J. Aguilar
- *
*/
public class MessageReference {
- private final DataType type;
- private final MessageMode mode;
- private final String msgClassName;
+ private final DataType type;
+ private final MessageMode mode;
+ private final String msgClassName;
+
+ public MessageReference(DataType type, MessageMode mode, String msgClassName) {
+ this.type = type;
+ this.mode = mode;
+ this.msgClassName = msgClassName;
+ }
+
+ public MessageReference(Message msg) {
+ this.type = msg.getDefaultMessageData().getType();
+ this.mode = MessageMode.READER;
+ this.msgClassName = msg.getMessageName();
+ }
+
+ public String getMsgClass() {
+ return msgClassName;
+ }
- public MessageReference(DataType type, MessageMode mode, String msgClassName) {
- this.type = type;
- this.mode = mode;
- this.msgClassName = msgClassName;
- }
-
- public MessageReference(Message msg) {
- this.type = msg.getDefaultMessageData().getType();
- this.mode = MessageMode.READER;
- this.msgClassName = msg.getMessageName();
- }
+ public DataType getType() {
+ return type;
+ }
- public String getMsgClass() {
- return msgClassName;
- }
+ public MessageMode getMode() {
+ return mode;
+ }
- public DataType getType() {
- return type;
- }
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ MessageReference otherRef = (MessageReference) obj;
+ return msgClassName.equals(otherRef.msgClassName) && type == otherRef.type && mode == otherRef.mode;
+ }
- public MessageMode getMode() {
- return mode;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == null) {
- return false;
- }
- MessageReference otherRef = (MessageReference) obj;
- return msgClassName.equals(otherRef.msgClassName) && type == otherRef.type && mode == otherRef.mode;
- }
+ @Override
+ public int hashCode() {
+ return msgClassName.hashCode() ^ mode.hashCode() ^ type.hashCode();
+ }
- @Override
- public int hashCode() {
- return msgClassName.hashCode() ^ mode.hashCode() ^ type.hashCode();
- }
-
}
diff --git a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/MessageSubscription.java b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/MessageSubscription.java
index 2a38f9a8802..8af144b6b8c 100644
--- a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/MessageSubscription.java
+++ b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/MessageSubscription.java
@@ -27,212 +27,209 @@ import org.eclipse.osee.ote.message.tool.MessageMode;
/**
* @author Ken J. Aguilar
- *
*/
public class MessageSubscription implements IMessageSubscription {
- private ISubscriptionState currentState = null;
- private final MessageSubscriptionService msgService;
- private final HashSet<ISubscriptionListener> listeners = new HashSet<ISubscriptionListener>();
-
- /**
- * creates a subscription with no reference to a message
- *
- * @param name
- * @param subscriber
- */
- public MessageSubscription(MessageSubscriptionService msgService) {
- this.msgService = msgService;
- }
-
- public synchronized void bind(String name) {
- bind(name, null, MessageMode.READER);
- }
-
- private void bind(String name, DataType type, MessageMode mode) {
- currentState = new UnresolvedState(name, this, type, mode);
- }
-
- @Override
- public synchronized boolean isActive() {
- return currentState != null ? currentState.isActive() : false;
- }
-
- @Override
- public synchronized boolean isResolved() {
- return currentState != null ? currentState.isResolved() : false;
- }
-
- @Override
- public synchronized DataType getMemType() {
- return currentState.getMemType();
- }
-
- @Override
- public synchronized MessageMode getMessageMode() {
- return currentState.getMode();
- }
-
- @Override
- public synchronized String getMessageClassName() {
- return currentState.getMsgClassName();
- }
-
- @Override
- public synchronized Message getMessage() {
- return currentState.getMessage();
- }
-
-
- public synchronized void attachMessageDb(AbstractMessageDataBase msgDb) {
- currentState = currentState.onMessageDbFound(msgDb);
- }
-
- public synchronized void detachMessageDb(AbstractMessageDataBase msgDb) {
- currentState = currentState.onMessageDbClosing(msgDb);
- }
-
- public synchronized void attachService(IRemoteMessageService service) {
- currentState = currentState.onActivated();
- }
-
- public synchronized void detachService(IRemoteMessageService service) {
- currentState = currentState.onDeactivated();
- }
-
- private void doCancel() {
- if (currentState != null) {
- currentState.onCanceled();
- currentState = null;
- }
- }
-
- @Override
- public synchronized Set<DataType> getAvailableTypes() {
- return currentState.getAvailableTypes();
- }
-
- @Override
- public synchronized void changeMessageMode(MessageMode mode) {
- if (mode == getMessageMode()) {
- return;
- }
- String name = getMessageClassName();
- DataType type = getMemType();
- notifyCanceled();
- doCancel();
- bind(name, type, mode);
- progressState();
- }
-
- @Override
- public synchronized void cancel() {
- if (currentState == null) {
- return;
- }
- try {
- notifyCanceled();
- } finally {
- doCancel();
- msgService.removeSubscription(this);
- }
- }
-
- @Override
- public synchronized void changeMemType(DataType type) {
- if (type == getMemType()) {
- return;
- }
- String name = getMessageClassName();
- MessageMode mode = getMessageMode();
- doCancel();
- bind(name, type, mode);
- progressState();
- }
-
- private void progressState() {
- if (msgService.getMsgDatabase() != null) {
- attachMessageDb(msgService.getMsgDatabase());
- if (msgService.getService() != null) {
- attachService(msgService.getService());
- }
- }
- }
-
- @Override
- public void setElementValue(List<Object> path, String value) throws Exception {
- final SetElementValue cmd = new SetElementValue(getMessageClassName(), getMemType(), path, value);
- msgService.getService().setElementValue(cmd);
- }
-
- @Override
- public void send() throws Exception {
- final SetElementValue cmd = new SetElementValue(getMessageClassName(), getMemType(), null, null);
- msgService.getService().setElementValue(cmd);
- }
-
- @Override
- public void zeroize(List<Object> path) throws Exception {
- final ZeroizeElement cmd = new ZeroizeElement(getMessageClassName(), getMemType(), path);
- msgService.getService().zeroizeElement(cmd);
- }
-
- public void notifyCanceled() {
- for (ISubscriptionListener listener : listeners) {
- listener.subscriptionCanceled(MessageSubscription.this);
- }
- }
-
- public void notifyActivated() {
- for (ISubscriptionListener listener : listeners) {
- listener.subscriptionActivated(MessageSubscription.this);
- }
- }
-
-
- public void notifyInvalidated() {
- for (ISubscriptionListener listener : listeners) {
- listener.subscriptionInvalidated(MessageSubscription.this);
- }
- }
-
- public void notifyResolved() {
- for (ISubscriptionListener listener : listeners) {
- listener.subscriptionResolved(MessageSubscription.this);
- }
- }
-
- public void notifyUnresolved() {
- for (ISubscriptionListener listener : listeners) {
- listener.subscriptionUnresolved(MessageSubscription.this);
- }
- }
-
- @Override
- public synchronized boolean addSubscriptionListener(ISubscriptionListener listener) {
- boolean result = listeners.add(listener);
- if (currentState == null) {
- listener.subscriptionCanceled(this);
- } else {
- if (msgService.getMsgDatabase() != null) {
- // a database is available
- if (currentState.isResolved()) {
- listener.subscriptionResolved(this);
- } else {
- listener.subscriptionInvalidated(this);
- }
- if (currentState.isActive()) {
- listener.subscriptionActivated(this);
- }
- }
-
- }
-
- return result;
- }
-
- @Override
- public boolean removeSubscriptionListener(ISubscriptionListener listener) {
- return listeners.remove(listener);
- }
+ private ISubscriptionState currentState = null;
+ private final MessageSubscriptionService msgService;
+ private final HashSet<ISubscriptionListener> listeners = new HashSet<ISubscriptionListener>();
+
+ /**
+ * creates a subscription with no reference to a message
+ *
+ * @param name
+ * @param subscriber
+ */
+ public MessageSubscription(MessageSubscriptionService msgService) {
+ this.msgService = msgService;
+ }
+
+ public synchronized void bind(String name) {
+ bind(name, null, MessageMode.READER);
+ }
+
+ private void bind(String name, DataType type, MessageMode mode) {
+ currentState = new UnresolvedState(name, this, type, mode);
+ }
+
+ @Override
+ public synchronized boolean isActive() {
+ return currentState != null ? currentState.isActive() : false;
+ }
+
+ @Override
+ public synchronized boolean isResolved() {
+ return currentState != null ? currentState.isResolved() : false;
+ }
+
+ @Override
+ public synchronized DataType getMemType() {
+ return currentState.getMemType();
+ }
+
+ @Override
+ public synchronized MessageMode getMessageMode() {
+ return currentState.getMode();
+ }
+
+ @Override
+ public synchronized String getMessageClassName() {
+ return currentState.getMsgClassName();
+ }
+
+ @Override
+ public synchronized Message getMessage() {
+ return currentState.getMessage();
+ }
+
+ public synchronized void attachMessageDb(AbstractMessageDataBase msgDb) {
+ currentState = currentState.onMessageDbFound(msgDb);
+ }
+
+ public synchronized void detachMessageDb(AbstractMessageDataBase msgDb) {
+ currentState = currentState.onMessageDbClosing(msgDb);
+ }
+
+ public synchronized void attachService(IRemoteMessageService service) {
+ currentState = currentState.onActivated();
+ }
+
+ public synchronized void detachService(IRemoteMessageService service) {
+ currentState = currentState.onDeactivated();
+ }
+
+ private void doCancel() {
+ if (currentState != null) {
+ currentState.onCanceled();
+ currentState = null;
+ }
+ }
+
+ @Override
+ public synchronized Set<DataType> getAvailableTypes() {
+ return currentState.getAvailableTypes();
+ }
+
+ @Override
+ public synchronized void changeMessageMode(MessageMode mode) {
+ if (mode == getMessageMode()) {
+ return;
+ }
+ String name = getMessageClassName();
+ DataType type = getMemType();
+ notifyCanceled();
+ doCancel();
+ bind(name, type, mode);
+ progressState();
+ }
+
+ @Override
+ public synchronized void cancel() {
+ if (currentState == null) {
+ return;
+ }
+ try {
+ notifyCanceled();
+ } finally {
+ doCancel();
+ msgService.removeSubscription(this);
+ }
+ }
+
+ @Override
+ public synchronized void changeMemType(DataType type) {
+ if (type == getMemType()) {
+ return;
+ }
+ String name = getMessageClassName();
+ MessageMode mode = getMessageMode();
+ doCancel();
+ bind(name, type, mode);
+ progressState();
+ }
+
+ private void progressState() {
+ if (msgService.getMsgDatabase() != null) {
+ attachMessageDb(msgService.getMsgDatabase());
+ if (msgService.getService() != null) {
+ attachService(msgService.getService());
+ }
+ }
+ }
+
+ @Override
+ public void setElementValue(List<Object> path, String value) throws Exception {
+ final SetElementValue cmd = new SetElementValue(getMessageClassName(), getMemType(), path, value);
+ msgService.getService().setElementValue(cmd);
+ }
+
+ @Override
+ public void send() throws Exception {
+ final SetElementValue cmd = new SetElementValue(getMessageClassName(), getMemType(), null, null);
+ msgService.getService().setElementValue(cmd);
+ }
+
+ @Override
+ public void zeroize(List<Object> path) throws Exception {
+ final ZeroizeElement cmd = new ZeroizeElement(getMessageClassName(), getMemType(), path);
+ msgService.getService().zeroizeElement(cmd);
+ }
+
+ public void notifyCanceled() {
+ for (ISubscriptionListener listener : listeners) {
+ listener.subscriptionCanceled(MessageSubscription.this);
+ }
+ }
+
+ public void notifyActivated() {
+ for (ISubscriptionListener listener : listeners) {
+ listener.subscriptionActivated(MessageSubscription.this);
+ }
+ }
+
+ public void notifyInvalidated() {
+ for (ISubscriptionListener listener : listeners) {
+ listener.subscriptionInvalidated(MessageSubscription.this);
+ }
+ }
+
+ public void notifyResolved() {
+ for (ISubscriptionListener listener : listeners) {
+ listener.subscriptionResolved(MessageSubscription.this);
+ }
+ }
+
+ public void notifyUnresolved() {
+ for (ISubscriptionListener listener : listeners) {
+ listener.subscriptionUnresolved(MessageSubscription.this);
+ }
+ }
+
+ @Override
+ public synchronized boolean addSubscriptionListener(ISubscriptionListener listener) {
+ boolean result = listeners.add(listener);
+ if (currentState == null) {
+ listener.subscriptionCanceled(this);
+ } else {
+ if (msgService.getMsgDatabase() != null) {
+ // a database is available
+ if (currentState.isResolved()) {
+ listener.subscriptionResolved(this);
+ } else {
+ listener.subscriptionInvalidated(this);
+ }
+ if (currentState.isActive()) {
+ listener.subscriptionActivated(this);
+ }
+ }
+
+ }
+
+ return result;
+ }
+
+ @Override
+ public boolean removeSubscriptionListener(ISubscriptionListener listener) {
+ return listeners.remove(listener);
+ }
}
diff --git a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/MessageSubscriptionService.java b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/MessageSubscriptionService.java
index 099ea6f1b65..ef8f6dfb382 100644
--- a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/MessageSubscriptionService.java
+++ b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/MessageSubscriptionService.java
@@ -52,271 +52,274 @@ import org.eclipse.osee.ote.service.ITestConnectionListener;
*/
public class MessageSubscriptionService implements IOteMessageService, IMessageDictionaryListener, ITestConnectionListener, IMsgToolServiceClient {
- /** * Static Fields ** */
- private static final int MAX_CONCURRENT_WORKER_THREADS = Runtime.getRuntime().availableProcessors() + 1;
-
- private final InetAddress localAddress;
- private final LinkedList<MessageSubscription> subscriptions = new LinkedList<MessageSubscription>();
- private IMsgToolServiceClient exportedThis = null;
- private AbstractMessageDataBase msgDatabase;
- private UdpFileTransferHandler fileTransferHandler;
-
- private final ExecutorService threadPool =
- Executors.newFixedThreadPool(MAX_CONCURRENT_WORKER_THREADS, new ThreadFactory() {
- private final ThreadGroup group =
- new ThreadGroup(Thread.currentThread().getThreadGroup(), "Msg Watch Workers");
- private int count = 1;
-
- public Thread newThread(Runnable arg0) {
- Thread thread = new Thread(group, arg0, "Msg Watch Wrkr - " + count++);
- thread.setDaemon(false);
- return thread;
- }
-
- });
-
- /**
- * Monitors a set of channels for message updates and dispatches the updates to worker threads
- */
- private UpdateDispatcher dispatcher = null;
- private IRemoteMessageService service;
-
- private final IMessageDbFactory messageDbFactory;
- private final IOteClientService clientService;
-
- public MessageSubscriptionService(IOteClientService service, IMessageDbFactory messageDbFactory) throws IOException {
- this.messageDbFactory = messageDbFactory;
- localAddress = InetAddress.getLocalHost();
- OseeLog.log(Activator.class, Level.INFO,
- "OTE client message service started on: " + localAddress.getHostAddress());
- clientService = service;
- clientService.addDictionaryListener(this);
- clientService.addConnectionListener(this);
- }
-
- public synchronized IMessageSubscription subscribe(String name) {
- MessageSubscription subscription = new MessageSubscription(this);
- subscription.bind(name);
- if (msgDatabase != null) {
- subscription.attachMessageDb(msgDatabase);
- if (service != null) {
- subscription.attachService(service);
- }
- }
- subscriptions.add(subscription);
- return subscription;
- }
-
- /**
- * Shuts down the client message service. All worker threads will be terminated and all IO resources will be closed.
- */
- public void shutdown() {
- OseeLog.log(MessageSubscriptionService.class, Level.INFO, "shutting down subscription service");
- clientService.removeDictionaryListener(this);
- clientService.removeConnectionListener(this);
- shutdownDispatcher();
- threadPool.shutdown();
- try {
- threadPool.awaitTermination(5, TimeUnit.SECONDS);
- } catch (InterruptedException ex1) {
- OseeLog.log(Activator.class, Level.WARNING, ex1.toString(), ex1);
- }
- }
-
- @Override
- public synchronized void onConnectionLost(IServiceConnector connector) {
- OseeLog.log(Activator.class, Level.INFO, "connection lost: ote client message service halted");
- shutdownDispatcher();
- msgDatabase.detachService(null);
- for (MessageSubscription subscription : subscriptions) {
- subscription.detachService(null);
- }
- exportedThis = null;
- service = null;
- }
-
- @Override
- public synchronized void onPostConnect(ConnectionEvent event) {
- assert msgDatabase != null;
- OseeLog.log(Activator.class, Level.INFO, "connecting OTE client message service");
- if (event.getEnvironment() instanceof ITestEnvironmentMessageSystem) {
- ITestEnvironmentMessageSystem env = (ITestEnvironmentMessageSystem) event.getEnvironment();
- try {
- service = env.getMessageToolServiceProxy();
- if (service == null) {
- throw new Exception("could not get message tool service proxy");
- }
- exportedThis = (IMsgToolServiceClient) event.getConnector().export(this);
- } catch (Exception e) {
- OseeLog.log(MessageSubscriptionService.class, Level.SEVERE,
- "failed to create exported Message Tool Client", e);
- service = null;
- exportedThis = null;
- return;
- }
-
- try {
- dispatcher = new UpdateDispatcher(service.getMsgUpdateSocketAddress());
- } catch (Exception e) {
- OseeLog.log(MessageSubscriptionService.class, Level.SEVERE, "failed to create update dispatcher", e);
- service = null;
- exportedThis = null;
- return;
- }
-
- try {
- createProccessors();
- } catch (Exception e) {
- OseeLog.log(MessageSubscriptionService.class, Level.SEVERE, "failed to create update processors", e);
- service = null;
- exportedThis = null;
- return;
- }
-
- msgDatabase.attachToService(service, exportedThis);
- for (MessageSubscription subscription : subscriptions) {
- subscription.attachService(service);
- }
- dispatcher.start();
- }
- }
-
- private void createProccessors() throws IOException {
- Set<? extends DataType> availableTypes = service.getAvailablePhysicalTypes();
-
- int port = PortUtil.getInstance().getConsecutiveValidPorts(availableTypes.size());
- for (DataType type : availableTypes) {
- final ChannelProcessor handler = new ChannelProcessor(1, type.getToolingBufferSize(), threadPool, msgDatabase, type);
-
- dispatcher.addChannel(localAddress, port, type, handler);
- port++;
- }
- }
-
- private void shutdownDispatcher() {
- if (dispatcher != null && dispatcher.isRunning()) {
- try {
- dispatcher.close();
- } catch (Throwable ex) {
- OseeLog.log(MessageSubscriptionService.class, Level.WARNING, "exception while closing down dispatcher", ex);
- } finally {
- dispatcher = null;
- }
- }
- }
-
- @Override
- public synchronized void onPreDisconnect(ConnectionEvent event) {
- if (service == null) {
- return;
- }
- msgDatabase.detachService(service);
- for (MessageSubscription subscription : subscriptions) {
- subscription.detachService(service);
- }
- try {
- event.getConnector().unexport(this);
- } catch (Exception e) {
- OseeLog.log(MessageSubscriptionService.class, Level.WARNING, "problems unexporting Message Tool Client", e);
- }
- shutdownDispatcher();
- exportedThis = null;
- service = null;
- }
-
- @Override
- public void changeIsScheduled(String msgName, boolean isScheduled) throws RemoteException {
-
- }
-
- @Override
- public void changeRate(String msgName, double rate) throws RemoteException {
-
- }
-
- @Override
- public InetSocketAddress getAddressByType(String messageName, DataType dataType) throws RemoteException {
- final DatagramChannel channel = dispatcher.getChannel(dataType);
- OseeLog.log(Activator.class, Level.INFO, String.format(
- "callback from remote msg manager: msg=%s, type=%s, ip=%s:%d\n", messageName,
- dataType.name(), localAddress.toString(), channel.socket().getLocalPort()));
-
- return new InetSocketAddress(localAddress, channel.socket().getLocalPort());
- }
-
- @Override
- public UserTestSessionKey getTestSessionKey() throws RemoteException {
- return clientService.getSessionKey();
- }
-
- @Override
- public synchronized void onDictionaryLoaded(IMessageDictionary dictionary) {
- msgDatabase = messageDbFactory.createMessageDataBase(dictionary);
- for (MessageSubscription subscription : subscriptions) {
- subscription.attachMessageDb(msgDatabase);
- }
- }
-
- @Override
- public synchronized void onDictionaryUnloaded(IMessageDictionary dictionary) {
- for (MessageSubscription subscription : subscriptions) {
- subscription.detachMessageDb(msgDatabase);
- }
- msgDatabase = null;
- }
-
- @Override
- public synchronized IFileTransferHandle startRecording(String fileName, List<MessageRecordDetails> list) throws FileNotFoundException, IOException {
- if (service == null) {
- throw new IllegalStateException("can't record: not connected to test server");
- }
- if (fileTransferHandler == null) {
- fileTransferHandler = new UdpFileTransferHandler();
- fileTransferHandler.start();
- }
- int port = PortUtil.getInstance().getValidPort();
- // get the address of the socket the message recorder is going to write
- // data to
- InetSocketAddress recorderOutputAddress = service.getRecorderSocketAddress();
-
- // setup a transfer from a socket to a file
- TransferConfig config =
- new TransferConfig(fileName, recorderOutputAddress,
- new InetSocketAddress(InetAddress.getLocalHost(), port),
- TransferConfig.Direction.SOCKET_TO_FILE, 128000);
- IFileTransferHandle handle = fileTransferHandler.registerTransfer(config);
-
- // send the command to start recording
- RecordCommand cmd =
- new RecordCommand(exportedThis, new InetSocketAddress(InetAddress.getLocalHost(), port), list);
- service.startRecording(cmd);
- OseeLog.log(Activator.class, Level.INFO,
- "recording started with " + list.size() + " entries, recorder output socket="
- + recorderOutputAddress.toString());
- return handle;
- }
-
- @Override
- public synchronized void stopRecording() throws RemoteException, IOException {
- try {
- service.stopRecording();
- } finally {
- if (fileTransferHandler != null && fileTransferHandler.hasActiveTransfers()) {
- fileTransferHandler.stopAllTransfers();
- }
- fileTransferHandler = null;
- }
- }
-
- public AbstractMessageDataBase getMsgDatabase() {
- return msgDatabase;
- }
-
- public IRemoteMessageService getService() {
- return service;
- }
-
- public void removeSubscription(MessageSubscription subscription) {
- subscriptions.remove(subscription);
- }
+ /** * Static Fields ** */
+ private static final int MAX_CONCURRENT_WORKER_THREADS = Runtime.getRuntime().availableProcessors() + 1;
+
+ private final InetAddress localAddress;
+ private final LinkedList<MessageSubscription> subscriptions = new LinkedList<MessageSubscription>();
+ private IMsgToolServiceClient exportedThis = null;
+ private AbstractMessageDataBase msgDatabase;
+ private UdpFileTransferHandler fileTransferHandler;
+
+ private final ExecutorService threadPool = Executors.newFixedThreadPool(MAX_CONCURRENT_WORKER_THREADS,
+ new ThreadFactory() {
+ private final ThreadGroup group =
+ new ThreadGroup(Thread.currentThread().getThreadGroup(), "Msg Watch Workers");
+ private int count = 1;
+
+ @Override
+ public Thread newThread(Runnable arg0) {
+ Thread thread = new Thread(group, arg0, "Msg Watch Wrkr - " + count++);
+ thread.setDaemon(false);
+ return thread;
+ }
+
+ });
+
+ /**
+ * Monitors a set of channels for message updates and dispatches the updates to worker threads
+ */
+ private UpdateDispatcher dispatcher = null;
+ private IRemoteMessageService service;
+
+ private final IMessageDbFactory messageDbFactory;
+ private final IOteClientService clientService;
+
+ public MessageSubscriptionService(IOteClientService service, IMessageDbFactory messageDbFactory) throws IOException {
+ this.messageDbFactory = messageDbFactory;
+ localAddress = InetAddress.getLocalHost();
+ OseeLog.log(Activator.class, Level.INFO,
+ "OTE client message service started on: " + localAddress.getHostAddress());
+ clientService = service;
+ clientService.addDictionaryListener(this);
+ clientService.addConnectionListener(this);
+ }
+
+ @Override
+ public synchronized IMessageSubscription subscribe(String name) {
+ MessageSubscription subscription = new MessageSubscription(this);
+ subscription.bind(name);
+ if (msgDatabase != null) {
+ subscription.attachMessageDb(msgDatabase);
+ if (service != null) {
+ subscription.attachService(service);
+ }
+ }
+ subscriptions.add(subscription);
+ return subscription;
+ }
+
+ /**
+ * Shuts down the client message service. All worker threads will be terminated and all IO resources will be closed.
+ */
+ public void shutdown() {
+ OseeLog.log(MessageSubscriptionService.class, Level.INFO, "shutting down subscription service");
+ clientService.removeDictionaryListener(this);
+ clientService.removeConnectionListener(this);
+ shutdownDispatcher();
+ threadPool.shutdown();
+ try {
+ threadPool.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException ex1) {
+ OseeLog.log(Activator.class, Level.WARNING, ex1.toString(), ex1);
+ }
+ }
+
+ @Override
+ public synchronized void onConnectionLost(IServiceConnector connector) {
+ OseeLog.log(Activator.class, Level.INFO, "connection lost: ote client message service halted");
+ shutdownDispatcher();
+ msgDatabase.detachService(null);
+ for (MessageSubscription subscription : subscriptions) {
+ subscription.detachService(null);
+ }
+ exportedThis = null;
+ service = null;
+ }
+
+ @Override
+ public synchronized void onPostConnect(ConnectionEvent event) {
+ assert msgDatabase != null;
+ OseeLog.log(Activator.class, Level.INFO, "connecting OTE client message service");
+ if (event.getEnvironment() instanceof ITestEnvironmentMessageSystem) {
+ ITestEnvironmentMessageSystem env = (ITestEnvironmentMessageSystem) event.getEnvironment();
+ try {
+ service = env.getMessageToolServiceProxy();
+ if (service == null) {
+ throw new Exception("could not get message tool service proxy");
+ }
+ exportedThis = (IMsgToolServiceClient) event.getConnector().export(this);
+ } catch (Exception e) {
+ OseeLog.log(MessageSubscriptionService.class, Level.SEVERE,
+ "failed to create exported Message Tool Client", e);
+ service = null;
+ exportedThis = null;
+ return;
+ }
+
+ try {
+ dispatcher = new UpdateDispatcher(service.getMsgUpdateSocketAddress());
+ } catch (Exception e) {
+ OseeLog.log(MessageSubscriptionService.class, Level.SEVERE, "failed to create update dispatcher", e);
+ service = null;
+ exportedThis = null;
+ return;
+ }
+
+ try {
+ createProccessors();
+ } catch (Exception e) {
+ OseeLog.log(MessageSubscriptionService.class, Level.SEVERE, "failed to create update processors", e);
+ service = null;
+ exportedThis = null;
+ return;
+ }
+
+ msgDatabase.attachToService(service, exportedThis);
+ for (MessageSubscription subscription : subscriptions) {
+ subscription.attachService(service);
+ }
+ dispatcher.start();
+ }
+ }
+
+ private void createProccessors() throws IOException {
+ Set<? extends DataType> availableTypes = service.getAvailablePhysicalTypes();
+
+ int port = PortUtil.getInstance().getConsecutiveValidPorts(availableTypes.size());
+ for (DataType type : availableTypes) {
+ final ChannelProcessor handler =
+ new ChannelProcessor(1, type.getToolingBufferSize(), threadPool, msgDatabase, type);
+
+ dispatcher.addChannel(localAddress, port, type, handler);
+ port++;
+ }
+ }
+
+ private void shutdownDispatcher() {
+ if (dispatcher != null && dispatcher.isRunning()) {
+ try {
+ dispatcher.close();
+ } catch (Throwable ex) {
+ OseeLog.log(MessageSubscriptionService.class, Level.WARNING, "exception while closing down dispatcher", ex);
+ } finally {
+ dispatcher = null;
+ }
+ }
+ }
+
+ @Override
+ public synchronized void onPreDisconnect(ConnectionEvent event) {
+ if (service == null) {
+ return;
+ }
+ msgDatabase.detachService(service);
+ for (MessageSubscription subscription : subscriptions) {
+ subscription.detachService(service);
+ }
+ try {
+ event.getConnector().unexport(this);
+ } catch (Exception e) {
+ OseeLog.log(MessageSubscriptionService.class, Level.WARNING, "problems unexporting Message Tool Client", e);
+ }
+ shutdownDispatcher();
+ exportedThis = null;
+ service = null;
+ }
+
+ @Override
+ public void changeIsScheduled(String msgName, boolean isScheduled) throws RemoteException {
+
+ }
+
+ @Override
+ public void changeRate(String msgName, double rate) throws RemoteException {
+
+ }
+
+ @Override
+ public InetSocketAddress getAddressByType(String messageName, DataType dataType) throws RemoteException {
+ final DatagramChannel channel = dispatcher.getChannel(dataType);
+ OseeLog.log(Activator.class, Level.INFO, String.format(
+ "callback from remote msg manager: msg=%s, type=%s, ip=%s:%d\n", messageName, dataType.name(),
+ localAddress.toString(), channel.socket().getLocalPort()));
+
+ return new InetSocketAddress(localAddress, channel.socket().getLocalPort());
+ }
+
+ @Override
+ public UserTestSessionKey getTestSessionKey() throws RemoteException {
+ return clientService.getSessionKey();
+ }
+
+ @Override
+ public synchronized void onDictionaryLoaded(IMessageDictionary dictionary) {
+ msgDatabase = messageDbFactory.createMessageDataBase(dictionary);
+ for (MessageSubscription subscription : subscriptions) {
+ subscription.attachMessageDb(msgDatabase);
+ }
+ }
+
+ @Override
+ public synchronized void onDictionaryUnloaded(IMessageDictionary dictionary) {
+ for (MessageSubscription subscription : subscriptions) {
+ subscription.detachMessageDb(msgDatabase);
+ }
+ msgDatabase = null;
+ }
+
+ @Override
+ public synchronized IFileTransferHandle startRecording(String fileName, List<MessageRecordDetails> list) throws FileNotFoundException, IOException {
+ if (service == null) {
+ throw new IllegalStateException("can't record: not connected to test server");
+ }
+ if (fileTransferHandler == null) {
+ fileTransferHandler = new UdpFileTransferHandler();
+ fileTransferHandler.start();
+ }
+ int port = PortUtil.getInstance().getValidPort();
+ // get the address of the socket the message recorder is going to write
+ // data to
+ InetSocketAddress recorderOutputAddress = service.getRecorderSocketAddress();
+
+ // setup a transfer from a socket to a file
+ TransferConfig config =
+ new TransferConfig(fileName, recorderOutputAddress, new InetSocketAddress(InetAddress.getLocalHost(), port),
+ TransferConfig.Direction.SOCKET_TO_FILE, 128000);
+ IFileTransferHandle handle = fileTransferHandler.registerTransfer(config);
+
+ // send the command to start recording
+ RecordCommand cmd =
+ new RecordCommand(exportedThis, new InetSocketAddress(InetAddress.getLocalHost(), port), list);
+ service.startRecording(cmd);
+ OseeLog.log(
+ Activator.class,
+ Level.INFO,
+ "recording started with " + list.size() + " entries, recorder output socket=" + recorderOutputAddress.toString());
+ return handle;
+ }
+
+ @Override
+ public synchronized void stopRecording() throws RemoteException, IOException {
+ try {
+ service.stopRecording();
+ } finally {
+ if (fileTransferHandler != null && fileTransferHandler.hasActiveTransfers()) {
+ fileTransferHandler.stopAllTransfers();
+ }
+ fileTransferHandler = null;
+ }
+ }
+
+ public AbstractMessageDataBase getMsgDatabase() {
+ return msgDatabase;
+ }
+
+ public IRemoteMessageService getService() {
+ return service;
+ }
+
+ public void removeSubscription(MessageSubscription subscription) {
+ subscriptions.remove(subscription);
+ }
}
diff --git a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/OteClientServiceTracker.java b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/OteClientServiceTracker.java
index cf7fc50078e..5c0ef3c0f1b 100644
--- a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/OteClientServiceTracker.java
+++ b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/OteClientServiceTracker.java
@@ -20,53 +20,52 @@ import org.osgi.framework.ServiceReference;
import org.osgi.framework.ServiceRegistration;
import org.osgi.util.tracker.ServiceTracker;
-public class OteClientServiceTracker extends ServiceTracker{
+public class OteClientServiceTracker extends ServiceTracker {
- private final IMessageDbFactory factory;
-
- private MessageSubscriptionService messageSubscriptionService;
+ private final IMessageDbFactory factory;
- private ServiceRegistration registration;
-
- OteClientServiceTracker(IMessageDbFactory factory) {
- super(Activator.getDefault().getBundleContext(), IOteClientService.class.getName(), null);
- this.factory = factory;
- }
+ private MessageSubscriptionService messageSubscriptionService;
- @Override
- public Object addingService(ServiceReference reference) {
- IOteClientService service = (IOteClientService) super.addingService(reference);
- try {
- messageSubscriptionService = new MessageSubscriptionService(service, factory);
- registration = context.registerService(IOteMessageService.class.getName(), messageSubscriptionService, null);
- } catch (IOException e) {
- OseeLog.log(OteClientServiceTracker.class, Level.SEVERE, "could not start Message Service", e);
- }
- return service;
- }
+ private ServiceRegistration registration;
- @Override
- public void removedService(ServiceReference reference, Object service) {
- shutdownMessageService();
- super.removedService(reference, service);
- }
-
- private void shutdownMessageService() {
- if (registration != null) {
- registration.unregister();
- registration = null;
- }
- if (messageSubscriptionService != null) {
- messageSubscriptionService.shutdown();
- messageSubscriptionService = null;
- }
- }
+ OteClientServiceTracker(IMessageDbFactory factory) {
+ super(Activator.getDefault().getBundleContext(), IOteClientService.class.getName(), null);
+ this.factory = factory;
+ }
+
+ @Override
+ public Object addingService(ServiceReference reference) {
+ IOteClientService service = (IOteClientService) super.addingService(reference);
+ try {
+ messageSubscriptionService = new MessageSubscriptionService(service, factory);
+ registration = context.registerService(IOteMessageService.class.getName(), messageSubscriptionService, null);
+ } catch (IOException e) {
+ OseeLog.log(OteClientServiceTracker.class, Level.SEVERE, "could not start Message Service", e);
+ }
+ return service;
+ }
+
+ @Override
+ public void removedService(ServiceReference reference, Object service) {
+ shutdownMessageService();
+ super.removedService(reference, service);
+ }
+
+ private void shutdownMessageService() {
+ if (registration != null) {
+ registration.unregister();
+ registration = null;
+ }
+ if (messageSubscriptionService != null) {
+ messageSubscriptionService.shutdown();
+ messageSubscriptionService = null;
+ }
+ }
+
+ @Override
+ public void close() {
+ shutdownMessageService();
+ super.close();
+ }
- @Override
- public void close() {
- shutdownMessageService();
- super.close();
- }
-
-
}
diff --git a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/UpdateDispatcher.java b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/UpdateDispatcher.java
index b79e5ab39c1..d8a7dbcd381 100644
--- a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/UpdateDispatcher.java
+++ b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/UpdateDispatcher.java
@@ -21,160 +21,155 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.logging.Level;
-
import org.eclipse.osee.framework.logging.OseeLog;
import org.eclipse.osee.ote.message.enums.DataType;
/**
- * A thread that listens for activity on a set of channels and then
- * dispatches any recieved UDP packets to the appropriate worker threads
+ * A thread that listens for activity on a set of channels and then dispatches any recieved UDP packets to the
+ * appropriate worker threads
*
* @author Ken J. Aguilar
*/
public final class UpdateDispatcher {
- /**
+ /**
*
*/
- private static final int RECEIVE_BUFFER_SIZE = 1024 * 256;
-
- /** * Class Instance Fields ** */
- private final Map<DataType, DatagramChannel> channelMap = new HashMap<DataType, DatagramChannel>();
-
- private final InetSocketAddress remoteAddress;
- private final Object gate = new Object();
- private final Selector channelSelector;
- private volatile boolean running = false;
-
- private final Runnable runner = new Runnable() {
-
- @Override
- public void run() {
-
- running = true;
- try {
- while (running) {
- /* check to see if something has interrupted this thread */
- if (Thread.interrupted()) {
- OseeLog.log(Activator.class, Level.FINE, "Channel Listener Thread interrupted");
- running = false;
- } else {
- /*
- * wait for channel activity on all channels registered
- * with this selector
- */
- final int readyCount = channelSelector.select();
- if (readyCount != 0) {
- /* get the set of readable channels */
- final Set<SelectionKey> readyChannels = channelSelector.selectedKeys();
-
- /* iterate through the set of readable channels */
- final Iterator<SelectionKey> keys = readyChannels.iterator();
- while (keys.hasNext()) {
- final SelectionKey key = keys.next();
- /* make sure the channel is still valid */
- if (key.isValid() && key.isReadable()) {
- ((ChannelProcessor) key.attachment()).process((DatagramChannel) key.channel());
- }
- keys.remove();
- }
- }
- synchronized (gate) {
- /*
- * do this to prevent the current thread from
- * entering the channelSelector.select() method
- * during registration of channels with the
- * selector. Not doing this will cause deadlock
- */
- }
- }
- }
- } catch (InterruptedException ie) {
- /*
- * something has interrupted us, most likely we need to shut
- * down. Catching the exception clears the interrupted flag
- */
- OseeLog.log(Activator.class, Level.INFO, "Channel Listener Interrupted... Shutting down");
- } catch (IOException ioe) {
- OseeLog.log(Activator.class, Level.INFO, "IOException occurred in channel listening thread... shutting down ", ioe);
- } catch (Throwable t) {
- OseeLog.log(Activator.class, Level.SEVERE, "Unusual exception occurred in channel listening thread... shutting down ", t);
- }
- running = false;
- OseeLog.log(Activator.class, Level.INFO, "Channel Listener thread has terminated");
- }
-
- };
-
- private final Thread thread = new Thread(runner, "Message Update Dispatcher Thread");
-
-
- public UpdateDispatcher(InetSocketAddress remoteAddress) throws IOException {
- this.remoteAddress = remoteAddress;
- thread.setDaemon(false);
- channelSelector = Selector.open();
- }
-
- public void start() {
- thread.start();
- }
-
-
- public boolean isRunning() {
- return running;
- }
-
- public DatagramChannel getChannel(DataType type) {
- return channelMap.get(type);
- }
-
- public SelectionKey addChannel(InetAddress localAddress, int port, DataType type, ChannelProcessor processor)
- throws IOException {
- final DatagramChannel channel = DatagramChannel.open();
- channel.configureBlocking(false);
- channel.socket().bind(new InetSocketAddress(localAddress, port));
- if (channel.socket().getReceiveBufferSize() < RECEIVE_BUFFER_SIZE) {
- channel.socket().setReceiveBufferSize(RECEIVE_BUFFER_SIZE);
- }
- channel.connect(remoteAddress);
- if (channelMap.put(type, channel) != null) {
- OseeLog.log(MessageSubscriptionService.class, Level.WARNING, "A previous channel was replaced");
- }
- synchronized (gate) {
- channelSelector.wakeup();
- return channel.register(channelSelector, SelectionKey.OP_READ, processor);
- }
- }
- /**
- * terminates this thread in a graceful manner and attempts to release
- * resources
- */
- public void close() {
- thread.interrupt();
- try {
- thread.join();
- } catch (InterruptedException ex) {
- OseeLog.log(Activator.class, Level.WARNING, "Interrupted while joining", ex);
- } finally {
- OseeLog.log(Activator.class, Level.INFO, "clearing pool");
- /* release IO resources */
- try {
- channelSelector.close();
- } catch (Exception ex) {
- OseeLog.log(Activator.class, Level.WARNING, "Exception closing selector", ex);
- } finally {
- for (final DataType type : channelMap.keySet()) {
- try {
- final DatagramChannel channel = channelMap.get(type);
- if (channel != null) {
- channel.close();
- }
-
- } catch (Throwable ex) {
- OseeLog.log(Activator.class, Level.WARNING, "could not close channel for " + type, ex);
- }
- }
- channelMap.clear();
- }
- }
- }
+ private static final int RECEIVE_BUFFER_SIZE = 1024 * 256;
+
+ /** * Class Instance Fields ** */
+ private final Map<DataType, DatagramChannel> channelMap = new HashMap<DataType, DatagramChannel>();
+
+ private final InetSocketAddress remoteAddress;
+ private final Object gate = new Object();
+ private final Selector channelSelector;
+ private volatile boolean running = false;
+
+ private final Runnable runner = new Runnable() {
+
+ @Override
+ public void run() {
+
+ running = true;
+ try {
+ while (running) {
+ /* check to see if something has interrupted this thread */
+ if (Thread.interrupted()) {
+ OseeLog.log(Activator.class, Level.FINE, "Channel Listener Thread interrupted");
+ running = false;
+ } else {
+ /*
+ * wait for channel activity on all channels registered with this selector
+ */
+ final int readyCount = channelSelector.select();
+ if (readyCount != 0) {
+ /* get the set of readable channels */
+ final Set<SelectionKey> readyChannels = channelSelector.selectedKeys();
+
+ /* iterate through the set of readable channels */
+ final Iterator<SelectionKey> keys = readyChannels.iterator();
+ while (keys.hasNext()) {
+ final SelectionKey key = keys.next();
+ /* make sure the channel is still valid */
+ if (key.isValid() && key.isReadable()) {
+ ((ChannelProcessor) key.attachment()).process((DatagramChannel) key.channel());
+ }
+ keys.remove();
+ }
+ }
+ synchronized (gate) {
+ /*
+ * do this to prevent the current thread from entering the channelSelector.select() method during
+ * registration of channels with the selector. Not doing this will cause deadlock
+ */
+ }
+ }
+ }
+ } catch (InterruptedException ie) {
+ /*
+ * something has interrupted us, most likely we need to shut down. Catching the exception clears the
+ * interrupted flag
+ */
+ OseeLog.log(Activator.class, Level.INFO, "Channel Listener Interrupted... Shutting down");
+ } catch (IOException ioe) {
+ OseeLog.log(Activator.class, Level.INFO,
+ "IOException occurred in channel listening thread... shutting down ", ioe);
+ } catch (Throwable t) {
+ OseeLog.log(Activator.class, Level.SEVERE,
+ "Unusual exception occurred in channel listening thread... shutting down ", t);
+ }
+ running = false;
+ OseeLog.log(Activator.class, Level.INFO, "Channel Listener thread has terminated");
+ }
+
+ };
+
+ private final Thread thread = new Thread(runner, "Message Update Dispatcher Thread");
+
+ public UpdateDispatcher(InetSocketAddress remoteAddress) throws IOException {
+ this.remoteAddress = remoteAddress;
+ thread.setDaemon(false);
+ channelSelector = Selector.open();
+ }
+
+ public void start() {
+ thread.start();
+ }
+
+ public boolean isRunning() {
+ return running;
+ }
+
+ public DatagramChannel getChannel(DataType type) {
+ return channelMap.get(type);
+ }
+
+ public SelectionKey addChannel(InetAddress localAddress, int port, DataType type, ChannelProcessor processor) throws IOException {
+ final DatagramChannel channel = DatagramChannel.open();
+ channel.configureBlocking(false);
+ channel.socket().bind(new InetSocketAddress(localAddress, port));
+ if (channel.socket().getReceiveBufferSize() < RECEIVE_BUFFER_SIZE) {
+ channel.socket().setReceiveBufferSize(RECEIVE_BUFFER_SIZE);
+ }
+ channel.connect(remoteAddress);
+ if (channelMap.put(type, channel) != null) {
+ OseeLog.log(MessageSubscriptionService.class, Level.WARNING, "A previous channel was replaced");
+ }
+ synchronized (gate) {
+ channelSelector.wakeup();
+ return channel.register(channelSelector, SelectionKey.OP_READ, processor);
+ }
+ }
+
+ /**
+ * terminates this thread in a graceful manner and attempts to release resources
+ */
+ public void close() {
+ thread.interrupt();
+ try {
+ thread.join();
+ } catch (InterruptedException ex) {
+ OseeLog.log(Activator.class, Level.WARNING, "Interrupted while joining", ex);
+ } finally {
+ OseeLog.log(Activator.class, Level.INFO, "clearing pool");
+ /* release IO resources */
+ try {
+ channelSelector.close();
+ } catch (Exception ex) {
+ OseeLog.log(Activator.class, Level.WARNING, "Exception closing selector", ex);
+ } finally {
+ for (final DataType type : channelMap.keySet()) {
+ try {
+ final DatagramChannel channel = channelMap.get(type);
+ if (channel != null) {
+ channel.close();
+ }
+
+ } catch (Throwable ex) {
+ OseeLog.log(Activator.class, Level.WARNING, "could not close channel for " + type, ex);
+ }
+ }
+ channelMap.clear();
+ }
+ }
+ }
} \ No newline at end of file
diff --git a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/state/AbstractSubscriptionState.java b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/state/AbstractSubscriptionState.java
index d0c6c333dba..45fe82b320a 100644
--- a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/state/AbstractSubscriptionState.java
+++ b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/state/AbstractSubscriptionState.java
@@ -16,43 +16,41 @@ import org.eclipse.osee.ote.message.tool.MessageMode;
/**
* @author Ken J. Aguilar
- *
*/
public abstract class AbstractSubscriptionState implements ISubscriptionState {
- private final DataType type;
- private final MessageMode mode;
- private final MessageSubscription subscription;
-
- protected AbstractSubscriptionState(MessageSubscription subscription, DataType type, MessageMode mode) {
- this.subscription = subscription;
- this.type = type;
- this.mode = mode;
- }
-
- protected AbstractSubscriptionState(AbstractSubscriptionState otherState) {
- this.subscription = otherState.getSubscription();
- this.type = otherState.getMemType();
- this.mode = otherState.getMode();
- }
-
-
- @Override
- public DataType getMemType() {
- return type;
- }
-
- @Override
- public MessageMode getMode() {
- return mode;
- }
-
- protected MessageSubscription getSubscription() {
- return subscription;
- }
-
- @Override
- public void onCanceled() {
- }
-
+ private final DataType type;
+ private final MessageMode mode;
+ private final MessageSubscription subscription;
+
+ protected AbstractSubscriptionState(MessageSubscription subscription, DataType type, MessageMode mode) {
+ this.subscription = subscription;
+ this.type = type;
+ this.mode = mode;
+ }
+
+ protected AbstractSubscriptionState(AbstractSubscriptionState otherState) {
+ this.subscription = otherState.getSubscription();
+ this.type = otherState.getMemType();
+ this.mode = otherState.getMode();
+ }
+
+ @Override
+ public DataType getMemType() {
+ return type;
+ }
+
+ @Override
+ public MessageMode getMode() {
+ return mode;
+ }
+
+ protected MessageSubscription getSubscription() {
+ return subscription;
+ }
+
+ @Override
+ public void onCanceled() {
+ }
+
}
diff --git a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/state/ActivateState.java b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/state/ActivateState.java
index 5371748d22e..9a2cbecd910 100644
--- a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/state/ActivateState.java
+++ b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/state/ActivateState.java
@@ -20,79 +20,77 @@ import org.eclipse.osee.ote.message.enums.DataType;
/**
* @author Ken J. Aguilar
- *
*/
public class ActivateState extends AbstractSubscriptionState {
+ private final MessageInstance instance;
+ private final AbstractMessageDataBase msgDb;
+
+ public ActivateState(MessageInstance instance, AbstractMessageDataBase msgDb, AbstractSubscriptionState otherState) {
+ super(otherState);
+ this.instance = instance;
+ this.msgDb = msgDb;
+ }
- private final MessageInstance instance;
- private final AbstractMessageDataBase msgDb;
+ @Override
+ public Message getMessage() {
+ return instance.getMessage();
+ }
- public ActivateState(MessageInstance instance, AbstractMessageDataBase msgDb, AbstractSubscriptionState otherState) {
- super(otherState);
- this.instance = instance;
- this.msgDb = msgDb;
- }
+ @Override
+ public String getMsgClassName() {
+ return instance.getMessage().getClass().getName();
+ }
- @Override
- public Message getMessage() {
- return instance.getMessage();
- }
+ @Override
+ public ISubscriptionState onMessageDbClosing(AbstractMessageDataBase msgDb) {
+ getSubscription().notifyUnresolved();
+ try {
+ msgDb.releaseInstance(instance);
+ } catch (Exception e) {
+ OseeLog.log(ActivateState.class, Level.SEVERE, "problem releasing instance of " + getMsgClassName());
+ }
+ return new UnresolvedState(instance.getMessage().getName(), this);
+ }
- @Override
- public String getMsgClassName() {
- return instance.getMessage().getClass().getName();
- }
+ @Override
+ public ISubscriptionState onMessageDbFound(AbstractMessageDataBase msgDB) {
+ throw new Error("Unexpected input for this state");
+ }
- @Override
- public ISubscriptionState onMessageDbClosing(AbstractMessageDataBase msgDb) {
- getSubscription().notifyUnresolved();
- try {
- msgDb.releaseInstance(instance);
- } catch (Exception e) {
- OseeLog.log(ActivateState.class, Level.SEVERE, "problem releasing instance of " + getMsgClassName());
- }
- return new UnresolvedState(instance.getMessage().getName(), this);
- }
+ @Override
+ public ISubscriptionState onActivated() {
+ throw new Error("Unexpected input for this state");
+ }
- @Override
- public ISubscriptionState onMessageDbFound(AbstractMessageDataBase msgDB) {
- throw new Error("Unexpected input for this state");
- }
+ @Override
+ public ISubscriptionState onDeactivated() {
+ return new InactiveState(instance, msgDb, this);
+ }
- @Override
- public ISubscriptionState onActivated() {
- throw new Error("Unexpected input for this state");
- }
+ @Override
+ public void onCanceled() {
+ super.onCanceled();
+ try {
+ msgDb.releaseInstance(instance);
+ } catch (Exception e) {
+ OseeLog.log(ActivateState.class, Level.SEVERE, "problem releasing instance of " + getMsgClassName());
+ }
+ }
- @Override
- public ISubscriptionState onDeactivated() {
- return new InactiveState(instance, msgDb, this);
- }
-
- @Override
- public void onCanceled() {
- super.onCanceled();
- try {
- msgDb.releaseInstance(instance);
- } catch (Exception e) {
- OseeLog.log(ActivateState.class, Level.SEVERE, "problem releasing instance of " + getMsgClassName());
- }
- }
-
- @Override
- public Set<DataType> getAvailableTypes() {
- return instance.getAvailableTypes();
- }
+ @Override
+ public Set<DataType> getAvailableTypes() {
+ return instance.getAvailableTypes();
+ }
- @Override
- public boolean isActive() {
- return true;
- }
+ @Override
+ public boolean isActive() {
+ return true;
+ }
- @Override
- public boolean isResolved() {
- return true;
- }
+ @Override
+ public boolean isResolved() {
+ return true;
+ }
}
diff --git a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/state/ISubscriptionState.java b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/state/ISubscriptionState.java
index 0f6e01b2c25..19b6a43588a 100644
--- a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/state/ISubscriptionState.java
+++ b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/state/ISubscriptionState.java
@@ -18,31 +18,30 @@ import org.eclipse.osee.ote.message.tool.MessageMode;
/**
* @author Ken J. Aguilar
- *
*/
public interface ISubscriptionState {
DataType getMemType();
- MessageMode getMode();
+ MessageMode getMode();
+
+ String getMsgClassName();
+
+ Message getMessage();
+
+ Set<DataType> getAvailableTypes();
+
+ ISubscriptionState onMessageDbFound(AbstractMessageDataBase msgDB);
- String getMsgClassName();
+ ISubscriptionState onMessageDbClosing(AbstractMessageDataBase msgDb);
- Message getMessage();
+ ISubscriptionState onActivated();
- Set<DataType> getAvailableTypes();
-
- ISubscriptionState onMessageDbFound(AbstractMessageDataBase msgDB);
+ ISubscriptionState onDeactivated();
- ISubscriptionState onMessageDbClosing(AbstractMessageDataBase msgDb);
+ void onCanceled();
- ISubscriptionState onActivated();
+ boolean isActive();
- ISubscriptionState onDeactivated();
-
- void onCanceled();
-
- boolean isActive();
+ boolean isResolved();
- boolean isResolved();
-
}
diff --git a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/state/InactiveState.java b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/state/InactiveState.java
index d10114ce99c..f9fb7f62f65 100644
--- a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/state/InactiveState.java
+++ b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/state/InactiveState.java
@@ -21,88 +21,83 @@ import org.eclipse.osee.ote.message.enums.DataType;
/**
* @author Ken J. Aguilar
- *
*/
public class InactiveState extends AbstractSubscriptionState {
-
- private final MessageInstance instance;
- private final AbstractMessageDataBase msgDb;
-
- public InactiveState(MessageInstance instance, AbstractMessageDataBase msgDb, AbstractSubscriptionState previousState) {
- super(previousState);
- this.instance = instance;
- this.msgDb = msgDb;
- }
-
-
- @Override
- public Message getMessage() {
- return instance.getMessage();
- }
-
- @Override
- public String getMsgClassName() {
- return instance.getMessage().getClass().getName();
- }
-
- @Override
- public ISubscriptionState onMessageDbFound(AbstractMessageDataBase msgDB) {
- throw new Error("Unexpected input for this state");
- }
-
- @Override
- public ISubscriptionState onMessageDbClosing(AbstractMessageDataBase msgDb) {
- assert this.msgDb == msgDb;
- getSubscription().notifyUnresolved();
- try {
- msgDb.releaseInstance(instance);
- } catch (Exception e) {
- OseeLog.log(ActivateState.class, Level.SEVERE, "problem releasing instance of " + getMsgClassName());
- }
- return new UnresolvedState(instance.getMessage().getName(), this);
- }
-
- @Override
- public void onCanceled() {
- super.onCanceled();
- try {
- msgDb.releaseInstance(instance);
- } catch (Exception e) {
- OseeLog.log(ActivateState.class, Level.SEVERE, "problem releasing instance of " + getMsgClassName());
- }
- }
-
- @Override
- public ISubscriptionState onActivated() {
- if (instance.isSupported()) {
- getSubscription().notifyActivated();
- return new ActivateState(instance, msgDb, this);
- } else {
- return this;
- }
-
- }
-
- @Override
- public ISubscriptionState onDeactivated() {
- return this;
- }
-
-
- @Override
- public Set<DataType> getAvailableTypes() {
- return new HashSet<DataType>();
- }
-
-
- @Override
- public boolean isActive() {
- return false;
- }
-
- @Override
- public boolean isResolved() {
- return true;
- }
+ private final MessageInstance instance;
+ private final AbstractMessageDataBase msgDb;
+
+ public InactiveState(MessageInstance instance, AbstractMessageDataBase msgDb, AbstractSubscriptionState previousState) {
+ super(previousState);
+ this.instance = instance;
+ this.msgDb = msgDb;
+ }
+
+ @Override
+ public Message getMessage() {
+ return instance.getMessage();
+ }
+
+ @Override
+ public String getMsgClassName() {
+ return instance.getMessage().getClass().getName();
+ }
+
+ @Override
+ public ISubscriptionState onMessageDbFound(AbstractMessageDataBase msgDB) {
+ throw new Error("Unexpected input for this state");
+ }
+
+ @Override
+ public ISubscriptionState onMessageDbClosing(AbstractMessageDataBase msgDb) {
+ assert this.msgDb == msgDb;
+ getSubscription().notifyUnresolved();
+ try {
+ msgDb.releaseInstance(instance);
+ } catch (Exception e) {
+ OseeLog.log(ActivateState.class, Level.SEVERE, "problem releasing instance of " + getMsgClassName());
+ }
+ return new UnresolvedState(instance.getMessage().getName(), this);
+ }
+
+ @Override
+ public void onCanceled() {
+ super.onCanceled();
+ try {
+ msgDb.releaseInstance(instance);
+ } catch (Exception e) {
+ OseeLog.log(ActivateState.class, Level.SEVERE, "problem releasing instance of " + getMsgClassName());
+ }
+ }
+
+ @Override
+ public ISubscriptionState onActivated() {
+ if (instance.isSupported()) {
+ getSubscription().notifyActivated();
+ return new ActivateState(instance, msgDb, this);
+ } else {
+ return this;
+ }
+
+ }
+
+ @Override
+ public ISubscriptionState onDeactivated() {
+ return this;
+ }
+
+ @Override
+ public Set<DataType> getAvailableTypes() {
+ return new HashSet<DataType>();
+ }
+
+ @Override
+ public boolean isActive() {
+ return false;
+ }
+
+ @Override
+ public boolean isResolved() {
+ return true;
+ }
}
diff --git a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/state/UnresolvedState.java b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/state/UnresolvedState.java
index 9b6d198288c..aef6ef9a2ec 100644
--- a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/state/UnresolvedState.java
+++ b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/state/UnresolvedState.java
@@ -23,82 +23,82 @@ import org.eclipse.osee.ote.message.tool.MessageMode;
/**
* @author Ken J. Aguilar
- *
*/
public class UnresolvedState extends AbstractSubscriptionState {
- private DataType type;
-
- private final String msgClassName;
- private MessageInstance instance = null;
-
- public UnresolvedState(String msgClassName, MessageSubscription subscription, DataType type, MessageMode mode) {
- super(subscription, type, mode);
- this.type = type;
- this.msgClassName = msgClassName;
- }
-
- public UnresolvedState(String msgClassName, AbstractSubscriptionState previousState) {
- super(previousState);
- this.msgClassName = msgClassName;
- }
- @Override
- public Message getMessage() {
- return instance != null ? instance.getMessage() : null;
- }
-
- @Override
- public String getMsgClassName() {
- return msgClassName;
- }
-
- @Override
- public ISubscriptionState onMessageDbFound(AbstractMessageDataBase msgDB) {
- try {
- instance = msgDB.acquireInstance(msgClassName, getMode(), getMemType());
- this.type = instance.getType();
- getSubscription().notifyResolved();
- return new InactiveState(instance, msgDB, this);
- } catch (Exception e) {
- OseeLog.log(UnresolvedState.class, Level.SEVERE, "problems acquring instance for " + getMsgClassName(), e);
- getSubscription().notifyInvalidated();
- return this;
- }
-
- }
-
- @Override
- public DataType getMemType() {
- return type;
- }
-
- @Override
- public ISubscriptionState onMessageDbClosing(AbstractMessageDataBase msgDb) {
- return this;
- }
-
- @Override
- public ISubscriptionState onActivated() {
- return this;
- }
-
- @Override
- public ISubscriptionState onDeactivated() {
- return this;
- }
-
- @Override
- public Set<DataType> getAvailableTypes() {
- return new HashSet<DataType>();
- }
-
- @Override
- public boolean isActive() {
- return false;
- }
-
- @Override
- public boolean isResolved() {
- return instance != null;
- }
+ private DataType type;
+
+ private final String msgClassName;
+ private MessageInstance instance = null;
+
+ public UnresolvedState(String msgClassName, MessageSubscription subscription, DataType type, MessageMode mode) {
+ super(subscription, type, mode);
+ this.type = type;
+ this.msgClassName = msgClassName;
+ }
+
+ public UnresolvedState(String msgClassName, AbstractSubscriptionState previousState) {
+ super(previousState);
+ this.msgClassName = msgClassName;
+ }
+
+ @Override
+ public Message getMessage() {
+ return instance != null ? instance.getMessage() : null;
+ }
+
+ @Override
+ public String getMsgClassName() {
+ return msgClassName;
+ }
+
+ @Override
+ public ISubscriptionState onMessageDbFound(AbstractMessageDataBase msgDB) {
+ try {
+ instance = msgDB.acquireInstance(msgClassName, getMode(), getMemType());
+ this.type = instance.getType();
+ getSubscription().notifyResolved();
+ return new InactiveState(instance, msgDB, this);
+ } catch (Exception e) {
+ OseeLog.log(UnresolvedState.class, Level.SEVERE, "problems acquring instance for " + getMsgClassName(), e);
+ getSubscription().notifyInvalidated();
+ return this;
+ }
+
+ }
+
+ @Override
+ public DataType getMemType() {
+ return type;
+ }
+
+ @Override
+ public ISubscriptionState onMessageDbClosing(AbstractMessageDataBase msgDb) {
+ return this;
+ }
+
+ @Override
+ public ISubscriptionState onActivated() {
+ return this;
+ }
+
+ @Override
+ public ISubscriptionState onDeactivated() {
+ return this;
+ }
+
+ @Override
+ public Set<DataType> getAvailableTypes() {
+ return new HashSet<DataType>();
+ }
+
+ @Override
+ public boolean isActive() {
+ return false;
+ }
+
+ @Override
+ public boolean isResolved() {
+ return instance != null;
+ }
}

Back to the top