Skip to main content
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkaguilar2010-06-18 22:41:51 +0000
committerkaguilar2010-06-18 22:41:51 +0000
commitcc0749cf9fd39bf7d9883162774fceb4d524e6af (patch)
treec027f2530d1fcfc31ef9619d39aa1d56074ec781
parent2c07b2b3db68edfb08ed1c05c641a6fa300cf5e7 (diff)
downloadorg.eclipse.osee-cc0749cf9fd39bf7d9883162774fceb4d524e6af.tar.gz
org.eclipse.osee-cc0749cf9fd39bf7d9883162774fceb4d524e6af.tar.xz
org.eclipse.osee-cc0749cf9fd39bf7d9883162774fceb4d524e6af.zip
corrected issue with shutdown and unsubscribes
-rw-r--r--plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/db/AbstractMessageDataBase.java6
-rw-r--r--plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/db/MessageInstance.java3
-rw-r--r--plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/MessageSubscriptionService.java531
-rw-r--r--plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/commands/SetMessageModeCmd.java13
-rw-r--r--plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/commands/UnSubscribeToMessage.java19
-rw-r--r--plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/interfaces/IRemoteMessageService.java3
-rw-r--r--plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/tool/AbstractMessageToolService.java85
-rw-r--r--plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/tool/SubscriptionKey.java2
8 files changed, 306 insertions, 356 deletions
diff --git a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/db/AbstractMessageDataBase.java b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/db/AbstractMessageDataBase.java
index 815e3502fa8..d655d8bd992 100644
--- a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/db/AbstractMessageDataBase.java
+++ b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/db/AbstractMessageDataBase.java
@@ -15,6 +15,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
+
import org.eclipse.osee.framework.logging.OseeLog;
import org.eclipse.osee.ote.client.msg.core.internal.MessageReference;
import org.eclipse.osee.ote.message.Message;
@@ -126,12 +127,11 @@ public abstract class AbstractMessageDataBase {
}
public void detachService(IRemoteMessageService service) {
- this.service = null;
- this.client = null;
for (MessageInstance instance : referenceToMsgMap.values()) {
doInstanceDetach(instance, service);
}
-
+ this.service = null;
+ this.client = null;
}
public MessageInstance findById(int id) {
diff --git a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/db/MessageInstance.java b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/db/MessageInstance.java
index b8e9b6a4aa1..9e7ca3b00a0 100644
--- a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/db/MessageInstance.java
+++ b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/db/MessageInstance.java
@@ -12,6 +12,7 @@ package org.eclipse.osee.ote.client.msg.core.db;
import java.util.HashSet;
import java.util.Set;
+
import org.eclipse.osee.ote.message.Message;
import org.eclipse.osee.ote.message.commands.SubscribeToMessage;
import org.eclipse.osee.ote.message.commands.UnSubscribeToMessage;
@@ -74,7 +75,7 @@ public class MessageInstance {
public void detachService(IRemoteMessageService service, IMsgToolServiceClient client) throws Exception {
if (service != null) {
- service.unsubscribeToMessage(new UnSubscribeToMessage(msg.getClass().getName(), mode, type, client));
+ service.unsubscribeToMessage(new UnSubscribeToMessage(msg.getClass().getName(), mode, type, client.getAddressByType(msg.getClass().getName(), type)));
}
availableTypes.clear();
serverSubscriptionKey = null;
diff --git a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/MessageSubscriptionService.java b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/MessageSubscriptionService.java
index aa0f7dabf20..20a014435bc 100644
--- a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/MessageSubscriptionService.java
+++ b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/MessageSubscriptionService.java
@@ -54,268 +54,271 @@ import org.eclipse.osee.ote.service.ITestConnectionListener;
*/
public class MessageSubscriptionService implements IOteMessageService, IMessageDictionaryListener, ITestConnectionListener, IMsgToolServiceClient {
- /** * Static Fields ** */
- private static final int MAX_CONCURRENT_WORKER_THREADS = Runtime.getRuntime().availableProcessors() + 1;
-
- private final InetAddress localAddress;
- private final LinkedList<MessageSubscription> subscriptions = new LinkedList<MessageSubscription>();
- private IMsgToolServiceClient exportedThis = null;
- private AbstractMessageDataBase msgDatabase;
- private UdpFileTransferHandler fileTransferHandler;
-
- private final ExecutorService threadPool =
- Executors.newFixedThreadPool(MAX_CONCURRENT_WORKER_THREADS, new ThreadFactory() {
- private final ThreadGroup group =
- new ThreadGroup(Thread.currentThread().getThreadGroup(), "Msg Watch Workers");
- private int count = 1;
-
- public Thread newThread(Runnable arg0) {
- Thread thread = new Thread(group, arg0, "Msg Watch Wrkr - " + count++);
- thread.setDaemon(false);
- return thread;
- }
-
- });
-
- /**
- * Monitors a set of channels for message updates and dispatches the updates to worker threads
- */
- private UpdateDispatcher dispatcher = null;
- private IRemoteMessageService service;
-
- private final IMessageDbFactory messageDbFactory;
- private final IOteClientService clientService;
-
- public MessageSubscriptionService(IOteClientService service, IMessageDbFactory messageDbFactory) throws IOException {
- this.messageDbFactory = messageDbFactory;
- localAddress = InetAddress.getLocalHost();
- OseeLog.log(Activator.class, Level.INFO,
- "OTE client message service started on: " + localAddress.getHostAddress());
- clientService = service;
- clientService.addDictionaryListener(this);
- clientService.addConnectionListener(this);
- }
-
- public synchronized IMessageSubscription subscribe(String name) {
- MessageSubscription subscription = new MessageSubscription(this);
- subscription.bind(name);
- if (msgDatabase != null) {
- subscription.attachMessageDb(msgDatabase);
- if (service != null) {
- subscription.attachService(service);
- }
- }
- subscriptions.add(subscription);
- return subscription;
- }
-
- /**
- * Shuts down the client message service. All worker threads will be terminated and all IO resources will be closed.
- */
- public void shutdown() {
- OseeLog.log(MessageSubscriptionService.class, Level.INFO, "shutting down subscription service");
- clientService.removeDictionaryListener(this);
- clientService.removeConnectionListener(this);
- shutdownDispatcher();
- threadPool.shutdown();
- try {
- threadPool.awaitTermination(5, TimeUnit.SECONDS);
- } catch (InterruptedException ex1) {
- OseeLog.log(Activator.class, Level.WARNING, ex1.toString(), ex1);
- }
- }
-
- @Override
- public synchronized void onConnectionLost(IServiceConnector connector, IHostTestEnvironment testHost) {
- OseeLog.log(Activator.class, Level.INFO, "connection lost: ote client message service halted");
- shutdownDispatcher();
- msgDatabase.detachService(null);
- for (MessageSubscription subscription : subscriptions) {
- subscription.detachService(null);
- }
- exportedThis = null;
- service = null;
- }
-
- @Override
- public synchronized void onPostConnect(ConnectionEvent event) {
- assert msgDatabase != null;
- OseeLog.log(Activator.class, Level.INFO, "connecting OTE client message service");
- if (event.getEnvironment() instanceof ITestEnvironmentMessageSystem) {
- ITestEnvironmentMessageSystem env = (ITestEnvironmentMessageSystem) event.getEnvironment();
- try {
- service = env.getMessageToolServiceProxy();
- if (service == null) {
- throw new Exception("could not get message tool service proxy");
- }
- exportedThis = (IMsgToolServiceClient) event.getConnector().export(this);
- } catch (Exception e) {
- OseeLog.log(MessageSubscriptionService.class, Level.SEVERE,
- "failed to create exported Message Tool Client", e);
- service = null;
- exportedThis = null;
- return;
- }
-
- try {
- dispatcher = new UpdateDispatcher(service.getMsgUpdateSocketAddress());
- } catch (Exception e) {
- OseeLog.log(MessageSubscriptionService.class, Level.SEVERE, "failed to create update dispatcher", e);
- service = null;
- exportedThis = null;
- return;
- }
-
- try {
- createProccessors();
- } catch (Exception e) {
- OseeLog.log(MessageSubscriptionService.class, Level.SEVERE, "failed to create update processors", e);
- service = null;
- exportedThis = null;
- return;
- }
-
- msgDatabase.attachToService(service, exportedThis);
- for (MessageSubscription subscription : subscriptions) {
- subscription.attachService(service);
- }
- dispatcher.start();
- }
- }
-
- private void createProccessors() throws IOException {
- Set<? extends DataType> availableTypes = service.getAvailablePhysicalTypes();
-
- int port = PortUtil.getInstance().getConsecutiveValidPorts(availableTypes.size());
- for (DataType type : availableTypes) {
- final ChannelProcessor handler = new ChannelProcessor(1, type.getToolingBufferSize(), threadPool, msgDatabase, type);
-
- dispatcher.addChannel(localAddress, port, type, handler);
- port++;
- }
- }
-
- private void shutdownDispatcher() {
- if (dispatcher != null && dispatcher.isRunning()) {
- try {
- dispatcher.close();
- } catch (Throwable ex) {
- OseeLog.log(MessageSubscriptionService.class, Level.WARNING, "exception while closing down dispatcher", ex);
- } finally {
- dispatcher = null;
- }
- }
- }
-
- @Override
- public synchronized void onPreDisconnect(ConnectionEvent event) {
- msgDatabase.detachService(service);
- for (MessageSubscription subscription : subscriptions) {
- subscription.detachService(service);
- }
- try {
- event.getConnector().unexport(this);
- } catch (Exception e) {
- OseeLog.log(MessageSubscriptionService.class, Level.WARNING, "problems unexporting Message Tool Client", e);
- }
- shutdownDispatcher();
- exportedThis = null;
- service = null;
- }
-
- @Override
- public void changeIsScheduled(String msgName, boolean isScheduled) throws RemoteException {
-
- }
-
- @Override
- public void changeRate(String msgName, double rate) throws RemoteException {
-
- }
-
- @Override
- public InetSocketAddress getAddressByType(String messageName, DataType dataType) throws RemoteException {
- final DatagramChannel channel = dispatcher.getChannel(dataType);
- OseeLog.log(Activator.class, Level.INFO, String.format(
- "callback from remote msg manager: msg=%s, type=%s, ip=%s:%d\n", messageName,
- dataType.name(), localAddress.toString(), channel.socket().getLocalPort()));
-
- return new InetSocketAddress(localAddress, channel.socket().getLocalPort());
- }
-
- @Override
- public UserTestSessionKey getTestSessionKey() throws RemoteException {
- return clientService.getSessionKey();
- }
-
- @Override
- public synchronized void onDictionaryLoaded(IMessageDictionary dictionary) {
- msgDatabase = messageDbFactory.createMessageDataBase(dictionary);
- for (MessageSubscription subscription : subscriptions) {
- subscription.attachMessageDb(msgDatabase);
- }
- }
-
- @Override
- public synchronized void onDictionaryUnloaded(IMessageDictionary dictionary) {
- for (MessageSubscription subscription : subscriptions) {
- subscription.detachMessageDb(msgDatabase);
- }
- msgDatabase = null;
- }
-
- @Override
- public synchronized IFileTransferHandle startRecording(String fileName, List<MessageRecordDetails> list) throws FileNotFoundException, IOException {
- if (service == null) {
- throw new IllegalStateException("can't record: not connected to test server");
- }
- if (fileTransferHandler == null) {
- fileTransferHandler = new UdpFileTransferHandler();
- fileTransferHandler.start();
- }
- int port = PortUtil.getInstance().getValidPort();
- // get the address of the socket the message recorder is going to write
- // data to
- InetSocketAddress recorderOutputAddress = service.getRecorderSocketAddress();
-
- // setup a transfer from a socket to a file
- TransferConfig config =
- new TransferConfig(fileName, recorderOutputAddress,
- new InetSocketAddress(InetAddress.getLocalHost(), port),
- TransferConfig.Direction.SOCKET_TO_FILE, 128000);
- IFileTransferHandle handle = fileTransferHandler.registerTransfer(config);
-
- // send the command to start recording
- RecordCommand cmd =
- new RecordCommand(exportedThis, new InetSocketAddress(InetAddress.getLocalHost(), port), list);
- service.startRecording(cmd);
- OseeLog.log(Activator.class, Level.INFO,
- "recording started with " + list.size() + " entries, recorder output socket="
- + recorderOutputAddress.toString());
- return handle;
- }
-
- @Override
- public synchronized void stopRecording() throws RemoteException, IOException {
- try {
- service.stopRecording();
- } finally {
- if (fileTransferHandler != null && fileTransferHandler.hasActiveTransfers()) {
- fileTransferHandler.stopAllTransfers();
- }
- fileTransferHandler = null;
- }
- }
-
- public AbstractMessageDataBase getMsgDatabase() {
- return msgDatabase;
- }
-
- public IRemoteMessageService getService() {
- return service;
- }
-
- public void removeSubscription(MessageSubscription subscription) {
- subscriptions.remove(subscription);
- }
+ /** * Static Fields ** */
+ private static final int MAX_CONCURRENT_WORKER_THREADS = Runtime.getRuntime().availableProcessors() + 1;
+
+ private final InetAddress localAddress;
+ private final LinkedList<MessageSubscription> subscriptions = new LinkedList<MessageSubscription>();
+ private IMsgToolServiceClient exportedThis = null;
+ private AbstractMessageDataBase msgDatabase;
+ private UdpFileTransferHandler fileTransferHandler;
+
+ private final ExecutorService threadPool =
+ Executors.newFixedThreadPool(MAX_CONCURRENT_WORKER_THREADS, new ThreadFactory() {
+ private final ThreadGroup group =
+ new ThreadGroup(Thread.currentThread().getThreadGroup(), "Msg Watch Workers");
+ private int count = 1;
+
+ public Thread newThread(Runnable arg0) {
+ Thread thread = new Thread(group, arg0, "Msg Watch Wrkr - " + count++);
+ thread.setDaemon(false);
+ return thread;
+ }
+
+ });
+
+ /**
+ * Monitors a set of channels for message updates and dispatches the updates to worker threads
+ */
+ private UpdateDispatcher dispatcher = null;
+ private IRemoteMessageService service;
+
+ private final IMessageDbFactory messageDbFactory;
+ private final IOteClientService clientService;
+
+ public MessageSubscriptionService(IOteClientService service, IMessageDbFactory messageDbFactory) throws IOException {
+ this.messageDbFactory = messageDbFactory;
+ localAddress = InetAddress.getLocalHost();
+ OseeLog.log(Activator.class, Level.INFO,
+ "OTE client message service started on: " + localAddress.getHostAddress());
+ clientService = service;
+ clientService.addDictionaryListener(this);
+ clientService.addConnectionListener(this);
+ }
+
+ public synchronized IMessageSubscription subscribe(String name) {
+ MessageSubscription subscription = new MessageSubscription(this);
+ subscription.bind(name);
+ if (msgDatabase != null) {
+ subscription.attachMessageDb(msgDatabase);
+ if (service != null) {
+ subscription.attachService(service);
+ }
+ }
+ subscriptions.add(subscription);
+ return subscription;
+ }
+
+ /**
+ * Shuts down the client message service. All worker threads will be terminated and all IO resources will be closed.
+ */
+ public void shutdown() {
+ OseeLog.log(MessageSubscriptionService.class, Level.INFO, "shutting down subscription service");
+ clientService.removeDictionaryListener(this);
+ clientService.removeConnectionListener(this);
+ shutdownDispatcher();
+ threadPool.shutdown();
+ try {
+ threadPool.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException ex1) {
+ OseeLog.log(Activator.class, Level.WARNING, ex1.toString(), ex1);
+ }
+ }
+
+ @Override
+ public synchronized void onConnectionLost(IServiceConnector connector, IHostTestEnvironment testHost) {
+ OseeLog.log(Activator.class, Level.INFO, "connection lost: ote client message service halted");
+ shutdownDispatcher();
+ msgDatabase.detachService(null);
+ for (MessageSubscription subscription : subscriptions) {
+ subscription.detachService(null);
+ }
+ exportedThis = null;
+ service = null;
+ }
+
+ @Override
+ public synchronized void onPostConnect(ConnectionEvent event) {
+ assert msgDatabase != null;
+ OseeLog.log(Activator.class, Level.INFO, "connecting OTE client message service");
+ if (event.getEnvironment() instanceof ITestEnvironmentMessageSystem) {
+ ITestEnvironmentMessageSystem env = (ITestEnvironmentMessageSystem) event.getEnvironment();
+ try {
+ service = env.getMessageToolServiceProxy();
+ if (service == null) {
+ throw new Exception("could not get message tool service proxy");
+ }
+ exportedThis = (IMsgToolServiceClient) event.getConnector().export(this);
+ } catch (Exception e) {
+ OseeLog.log(MessageSubscriptionService.class, Level.SEVERE,
+ "failed to create exported Message Tool Client", e);
+ service = null;
+ exportedThis = null;
+ return;
+ }
+
+ try {
+ dispatcher = new UpdateDispatcher(service.getMsgUpdateSocketAddress());
+ } catch (Exception e) {
+ OseeLog.log(MessageSubscriptionService.class, Level.SEVERE, "failed to create update dispatcher", e);
+ service = null;
+ exportedThis = null;
+ return;
+ }
+
+ try {
+ createProccessors();
+ } catch (Exception e) {
+ OseeLog.log(MessageSubscriptionService.class, Level.SEVERE, "failed to create update processors", e);
+ service = null;
+ exportedThis = null;
+ return;
+ }
+
+ msgDatabase.attachToService(service, exportedThis);
+ for (MessageSubscription subscription : subscriptions) {
+ subscription.attachService(service);
+ }
+ dispatcher.start();
+ }
+ }
+
+ private void createProccessors() throws IOException {
+ Set<? extends DataType> availableTypes = service.getAvailablePhysicalTypes();
+
+ int port = PortUtil.getInstance().getConsecutiveValidPorts(availableTypes.size());
+ for (DataType type : availableTypes) {
+ final ChannelProcessor handler = new ChannelProcessor(1, type.getToolingBufferSize(), threadPool, msgDatabase, type);
+
+ dispatcher.addChannel(localAddress, port, type, handler);
+ port++;
+ }
+ }
+
+ private void shutdownDispatcher() {
+ if (dispatcher != null && dispatcher.isRunning()) {
+ try {
+ dispatcher.close();
+ } catch (Throwable ex) {
+ OseeLog.log(MessageSubscriptionService.class, Level.WARNING, "exception while closing down dispatcher", ex);
+ } finally {
+ dispatcher = null;
+ }
+ }
+ }
+
+ @Override
+ public synchronized void onPreDisconnect(ConnectionEvent event) {
+ if (service == null) {
+ return;
+ }
+ msgDatabase.detachService(service);
+ for (MessageSubscription subscription : subscriptions) {
+ subscription.detachService(service);
+ }
+ try {
+ event.getConnector().unexport(this);
+ } catch (Exception e) {
+ OseeLog.log(MessageSubscriptionService.class, Level.WARNING, "problems unexporting Message Tool Client", e);
+ }
+ shutdownDispatcher();
+ exportedThis = null;
+ service = null;
+ }
+
+ @Override
+ public void changeIsScheduled(String msgName, boolean isScheduled) throws RemoteException {
+
+ }
+
+ @Override
+ public void changeRate(String msgName, double rate) throws RemoteException {
+
+ }
+
+ @Override
+ public InetSocketAddress getAddressByType(String messageName, DataType dataType) throws RemoteException {
+ final DatagramChannel channel = dispatcher.getChannel(dataType);
+ OseeLog.log(Activator.class, Level.INFO, String.format(
+ "callback from remote msg manager: msg=%s, type=%s, ip=%s:%d\n", messageName,
+ dataType.name(), localAddress.toString(), channel.socket().getLocalPort()));
+
+ return new InetSocketAddress(localAddress, channel.socket().getLocalPort());
+ }
+
+ @Override
+ public UserTestSessionKey getTestSessionKey() throws RemoteException {
+ return clientService.getSessionKey();
+ }
+
+ @Override
+ public synchronized void onDictionaryLoaded(IMessageDictionary dictionary) {
+ msgDatabase = messageDbFactory.createMessageDataBase(dictionary);
+ for (MessageSubscription subscription : subscriptions) {
+ subscription.attachMessageDb(msgDatabase);
+ }
+ }
+
+ @Override
+ public synchronized void onDictionaryUnloaded(IMessageDictionary dictionary) {
+ for (MessageSubscription subscription : subscriptions) {
+ subscription.detachMessageDb(msgDatabase);
+ }
+ msgDatabase = null;
+ }
+
+ @Override
+ public synchronized IFileTransferHandle startRecording(String fileName, List<MessageRecordDetails> list) throws FileNotFoundException, IOException {
+ if (service == null) {
+ throw new IllegalStateException("can't record: not connected to test server");
+ }
+ if (fileTransferHandler == null) {
+ fileTransferHandler = new UdpFileTransferHandler();
+ fileTransferHandler.start();
+ }
+ int port = PortUtil.getInstance().getValidPort();
+ // get the address of the socket the message recorder is going to write
+ // data to
+ InetSocketAddress recorderOutputAddress = service.getRecorderSocketAddress();
+
+ // setup a transfer from a socket to a file
+ TransferConfig config =
+ new TransferConfig(fileName, recorderOutputAddress,
+ new InetSocketAddress(InetAddress.getLocalHost(), port),
+ TransferConfig.Direction.SOCKET_TO_FILE, 128000);
+ IFileTransferHandle handle = fileTransferHandler.registerTransfer(config);
+
+ // send the command to start recording
+ RecordCommand cmd =
+ new RecordCommand(exportedThis, new InetSocketAddress(InetAddress.getLocalHost(), port), list);
+ service.startRecording(cmd);
+ OseeLog.log(Activator.class, Level.INFO,
+ "recording started with " + list.size() + " entries, recorder output socket="
+ + recorderOutputAddress.toString());
+ return handle;
+ }
+
+ @Override
+ public synchronized void stopRecording() throws RemoteException, IOException {
+ try {
+ service.stopRecording();
+ } finally {
+ if (fileTransferHandler != null && fileTransferHandler.hasActiveTransfers()) {
+ fileTransferHandler.stopAllTransfers();
+ }
+ fileTransferHandler = null;
+ }
+ }
+
+ public AbstractMessageDataBase getMsgDatabase() {
+ return msgDatabase;
+ }
+
+ public IRemoteMessageService getService() {
+ return service;
+ }
+
+ public void removeSubscription(MessageSubscription subscription) {
+ subscriptions.remove(subscription);
+ }
}
diff --git a/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/commands/SetMessageModeCmd.java b/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/commands/SetMessageModeCmd.java
index 47f7427a0ba..f410a4c870b 100644
--- a/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/commands/SetMessageModeCmd.java
+++ b/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/commands/SetMessageModeCmd.java
@@ -11,8 +11,9 @@
package org.eclipse.osee.ote.message.commands;
import java.io.Serializable;
+import java.net.InetSocketAddress;
+
import org.eclipse.osee.ote.message.enums.DataType;
-import org.eclipse.osee.ote.message.interfaces.IMsgToolServiceClient;
import org.eclipse.osee.ote.message.tool.MessageMode;
@@ -27,14 +28,14 @@ public class SetMessageModeCmd implements Serializable{
private final DataType type;
private final MessageMode oldMode;
private final MessageMode newMode;
- private final IMsgToolServiceClient client;
+ private final InetSocketAddress address;
- public SetMessageModeCmd(String name, DataType type, MessageMode oldMode, MessageMode newMode, IMsgToolServiceClient client) {
+ public SetMessageModeCmd(String name, DataType type, MessageMode oldMode, MessageMode newMode, InetSocketAddress address) {
this.name = name;
this.type = type;
this.oldMode = oldMode;
this.newMode = newMode;
- this.client = client;
+ this.address = address;
}
/**
@@ -68,8 +69,8 @@ public class SetMessageModeCmd implements Serializable{
/**
* @return Returns the client.
*/
- public IMsgToolServiceClient getClient() {
- return client;
+ public InetSocketAddress getAddress() {
+ return address;
}
/**
diff --git a/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/commands/UnSubscribeToMessage.java b/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/commands/UnSubscribeToMessage.java
index 2ba5daa8da4..6da5c9b77bc 100644
--- a/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/commands/UnSubscribeToMessage.java
+++ b/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/commands/UnSubscribeToMessage.java
@@ -10,10 +10,11 @@
*******************************************************************************/
package org.eclipse.osee.ote.message.commands;
-import java.io.Serializable;
-import org.eclipse.osee.ote.message.enums.DataType;
-import org.eclipse.osee.ote.message.interfaces.IMsgToolServiceClient;
-import org.eclipse.osee.ote.message.tool.MessageMode;
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+
+import org.eclipse.osee.ote.message.enums.DataType;
+import org.eclipse.osee.ote.message.tool.MessageMode;
@@ -30,7 +31,7 @@ public class UnSubscribeToMessage implements Serializable {
private static final long serialVersionUID = -1140091630056507142L;
private final String messageName;
private final MessageMode mode;
- private final IMsgToolServiceClient client;
+ private final InetSocketAddress address;
private final DataType memTypeOrdinal;
/**
* Creates a new unsubscribe command.
@@ -39,10 +40,10 @@ public class UnSubscribeToMessage implements Serializable {
* @param address the address of the client.
*/
public UnSubscribeToMessage(final String messageName, final MessageMode mode,
- final DataType memTypeOrdinal, IMsgToolServiceClient client) {
+ final DataType memTypeOrdinal, InetSocketAddress address) {
this.messageName = messageName;
this.mode = mode;
- this.client = client;
+ this.address = address;
this.memTypeOrdinal = memTypeOrdinal;
}
@@ -50,8 +51,8 @@ public class UnSubscribeToMessage implements Serializable {
return messageName;
}
- public IMsgToolServiceClient getClient() {
- return client;
+ public InetSocketAddress getAddress() {
+ return address;
}
public DataType getMemTypeOrdinal() {
diff --git a/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/interfaces/IRemoteMessageService.java b/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/interfaces/IRemoteMessageService.java
index 7572cab5afb..fc80d2a0a99 100644
--- a/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/interfaces/IRemoteMessageService.java
+++ b/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/interfaces/IRemoteMessageService.java
@@ -18,7 +18,6 @@ import java.util.Set;
import org.eclipse.osee.ote.message.commands.RecordCommand;
import org.eclipse.osee.ote.message.commands.SetElementValue;
-import org.eclipse.osee.ote.message.commands.SetMessageModeCmd;
import org.eclipse.osee.ote.message.commands.SubscribeToMessage;
import org.eclipse.osee.ote.message.commands.UnSubscribeToMessage;
import org.eclipse.osee.ote.message.commands.ZeroizeElement;
@@ -55,8 +54,6 @@ public interface IRemoteMessageService extends Remote {
*/
SubscriptionDetails subscribeToMessage(SubscribeToMessage cmd) throws RemoteException;
- SubscriptionDetails setReaderWriterMode(SetMessageModeCmd cmd) throws RemoteException;
-
Set<? extends DataType> getAvailablePhysicalTypes() throws RemoteException;
boolean startRecording(RecordCommand cmd) throws RemoteException;
diff --git a/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/tool/AbstractMessageToolService.java b/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/tool/AbstractMessageToolService.java
index 924ee0cf4f1..0382c0ac06f 100644
--- a/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/tool/AbstractMessageToolService.java
+++ b/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/tool/AbstractMessageToolService.java
@@ -37,7 +37,6 @@ import org.eclipse.osee.ote.message.Message;
import org.eclipse.osee.ote.message.MessageSystemTestEnvironment;
import org.eclipse.osee.ote.message.commands.RecordCommand;
import org.eclipse.osee.ote.message.commands.SetElementValue;
-import org.eclipse.osee.ote.message.commands.SetMessageModeCmd;
import org.eclipse.osee.ote.message.commands.SubscribeToMessage;
import org.eclipse.osee.ote.message.commands.UnSubscribeToMessage;
import org.eclipse.osee.ote.message.commands.ZeroizeElement;
@@ -87,11 +86,16 @@ private static final boolean debugEnabled = false;
private final IMsgToolServiceClient remoteReference;
private final InetSocketAddress ipAddress;
-
+
+ private final int hashcode;
public ClientInfo(final IMsgToolServiceClient remoteReference, final InetSocketAddress ipAddress) {
super();
+ if (ipAddress == null) {
+ throw new IllegalArgumentException("ip address is null");
+ }
this.remoteReference = remoteReference;
this.ipAddress = ipAddress;
+ hashcode = 31 * (31 + ipAddress.hashCode());
}
public InetSocketAddress getIpAddress() {
@@ -104,13 +108,13 @@ private static final boolean debugEnabled = false;
@Override
public int hashCode() {
- return ipAddress.hashCode() ^ remoteReference.hashCode();
+ return hashcode;
}
@Override
public boolean equals(Object obj) {
ClientInfo client = (ClientInfo) obj;
- return ipAddress.equals(client.ipAddress) && remoteReference.equals(client.remoteReference);
+ return ipAddress.equals(client.ipAddress);
}
}
@@ -232,9 +236,9 @@ private static final boolean debugEnabled = false;
// return null;
// }
- public synchronized ClientInfo findClient(IMsgToolServiceClient client) {
+ public synchronized ClientInfo findClient(InetSocketAddress address) {
for (ClientInfo clientInfo : clients) {
- if (clientInfo.remoteReference.equals(client)) {
+ if (clientInfo.ipAddress.equals(address)) {
return clientInfo;
}
}
@@ -634,16 +638,17 @@ private static final boolean debugEnabled = false;
type.name()));
}
final SubscriptionRecord record = modeMap.get(cmd.getMode());
- ClientInfo client = record.findClient(cmd.getClient());
- OseeLog.log(MessageSystemTestEnvironment.class, Level.INFO, String.format(
- "client at %s is unsubscribing to the %s for %s(%s)", client.ipAddress.toString(), cmd.getMode(), name,
- type));
+ ClientInfo client = record.findClient(cmd.getAddress());
+
if (record != null) {
/* remove the client address from the listener's client list */
record.removeClient(client);
+ OseeLog.log(MessageSystemTestEnvironment.class, Level.INFO, String.format(
+ "client at %s is unsubscribing to the %s for %s(%s)", client.ipAddress.toString(), cmd.getMode(), name,
+ type));
/*
* if the listener has no more clients then remove the listener and unregister the listener
* for message updates.
@@ -865,66 +870,6 @@ private static final boolean debugEnabled = false;
return available;
}
- public synchronized SubscriptionDetails setReaderWriterMode(SetMessageModeCmd cmd) throws RemoteException {
- final String name = cmd.getName();
-
- try {
- final Map<DataType, EnumMap<MessageMode, SubscriptionRecord>> memToModeMap = messageMap.get(name);
- if (memToModeMap == null) {
- throw new IllegalStateException("No subscriptions registered for message " + name);
- }
- final EnumMap<MessageMode, SubscriptionRecord> modeMap = memToModeMap.get(cmd.getType());
- if (modeMap == null) {
- throw new IllegalStateException("Could not find record for Mem Type of " + cmd.getType() + " for message ");
- }
- final SubscriptionRecord record = modeMap.get(cmd.getOldMode());
- if (record == null) {
- throw new IllegalStateException("Could not find record for " + cmd.getOldMode() + " of the message ");
- }
- ClientInfo client = record.findClient(cmd.getClient());
- record.removeClient(client);
- final InetSocketAddress address = client.ipAddress;
- OseeLog.log(MessageSystemTestEnvironment.class, Level.INFO, String.format(
- "Client at %s is changing message mode for %s from %s to %s", address.toString(), name,
- cmd.getOldMode(), cmd.getNewMode()));
- if (record.clients.isEmpty()) {
- record.unregister();
- messageRequestor.remove(record.msg);
- modeMap.remove(cmd.getOldMode());
- }
- SubscriptionRecord newRecord = modeMap.get(cmd.getNewMode());
- if (newRecord == null) {
- final Class<?> msgWriterClass =
- Activator.getTestEnvironment().getRuntimeManager().loadFromRuntimeLibraryLoader(name);
- /* check to see if an instance of a writer for the specified message exists */
- Message<?, ?, ?> newMsg =
- cmd.getNewMode() == MessageMode.WRITER ? messageRequestor.getMessageWriter(msgWriterClass) : messageRequestor.getMessageReader(msgWriterClass);
- if (newMsg == null) {
- throw new ClassNotFoundException(
- "Could not find class for the " + cmd.getNewMode().toString() + " of the message");
- }
- OseeLog.log(
- MessageSystemTestEnvironment.class,
- Level.INFO,
- "Adding a subscription to: " + newMsg.getName() + " type[" + record.key.getType() + "] mode[" + cmd.getNewMode() + "]");
- newRecord = new SubscriptionRecord(newMsg, record.key.getType(), cmd.getNewMode(), client);
- newMsg.setMemTypeActive(record.key.getType());
- modeMap.put(cmd.getNewMode(), newRecord);
- } else {
- newRecord.addClient(client);
- }
-
- OseeLog.log(MessageSystemTestEnvironment.class, Level.INFO,
- "success changing reader/writer mode. New mode = " + newRecord.key.getMode());
- return new SubscriptionDetails(newRecord.key, newRecord.msg.getActiveDataSource(cmd.getType()).toByteArray(),
- newRecord.msg.getAvailableMemTypes());
- } catch (Throwable t) {
- String title = "Could not set reader/writer for message " + name;
- OseeLog.log(MessageSystemTestEnvironment.class, Level.INFO, t.getMessage(), t);
- throw new RemoteException(title, t);
- }
- }
-
public Map<String, Throwable> getCancelledSubscriptions() {
return cancelledSubscriptions;
}
diff --git a/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/tool/SubscriptionKey.java b/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/tool/SubscriptionKey.java
index 9fb38ee262c..d6e411ce23e 100644
--- a/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/tool/SubscriptionKey.java
+++ b/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/tool/SubscriptionKey.java
@@ -11,6 +11,7 @@
package org.eclipse.osee.ote.message.tool;
import java.io.Serializable;
+
import org.eclipse.osee.ote.message.enums.DataType;
/**
@@ -50,4 +51,5 @@ public final class SubscriptionKey implements Serializable {
public String getMessageClassName() {
return messageClassName;
}
+
}

Back to the top