diff options
author | kaguilar | 2010-06-18 22:41:51 +0000 |
---|---|---|
committer | kaguilar | 2010-06-18 22:41:51 +0000 |
commit | cc0749cf9fd39bf7d9883162774fceb4d524e6af (patch) | |
tree | c027f2530d1fcfc31ef9619d39aa1d56074ec781 | |
parent | 2c07b2b3db68edfb08ed1c05c641a6fa300cf5e7 (diff) | |
download | org.eclipse.osee-cc0749cf9fd39bf7d9883162774fceb4d524e6af.tar.gz org.eclipse.osee-cc0749cf9fd39bf7d9883162774fceb4d524e6af.tar.xz org.eclipse.osee-cc0749cf9fd39bf7d9883162774fceb4d524e6af.zip |
corrected issue with shutdown and unsubscribes
8 files changed, 306 insertions, 356 deletions
diff --git a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/db/AbstractMessageDataBase.java b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/db/AbstractMessageDataBase.java index 815e3502fa8..d655d8bd992 100644 --- a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/db/AbstractMessageDataBase.java +++ b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/db/AbstractMessageDataBase.java @@ -15,6 +15,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Level; + import org.eclipse.osee.framework.logging.OseeLog; import org.eclipse.osee.ote.client.msg.core.internal.MessageReference; import org.eclipse.osee.ote.message.Message; @@ -126,12 +127,11 @@ public abstract class AbstractMessageDataBase { } public void detachService(IRemoteMessageService service) { - this.service = null; - this.client = null; for (MessageInstance instance : referenceToMsgMap.values()) { doInstanceDetach(instance, service); } - + this.service = null; + this.client = null; } public MessageInstance findById(int id) { diff --git a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/db/MessageInstance.java b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/db/MessageInstance.java index b8e9b6a4aa1..9e7ca3b00a0 100644 --- a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/db/MessageInstance.java +++ b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/db/MessageInstance.java @@ -12,6 +12,7 @@ package org.eclipse.osee.ote.client.msg.core.db; import java.util.HashSet; import java.util.Set; + import org.eclipse.osee.ote.message.Message; import org.eclipse.osee.ote.message.commands.SubscribeToMessage; import org.eclipse.osee.ote.message.commands.UnSubscribeToMessage; @@ -74,7 +75,7 @@ public class MessageInstance { public void detachService(IRemoteMessageService service, IMsgToolServiceClient client) throws Exception { if (service != null) { - service.unsubscribeToMessage(new UnSubscribeToMessage(msg.getClass().getName(), mode, type, client)); + service.unsubscribeToMessage(new UnSubscribeToMessage(msg.getClass().getName(), mode, type, client.getAddressByType(msg.getClass().getName(), type))); } availableTypes.clear(); serverSubscriptionKey = null; diff --git a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/MessageSubscriptionService.java b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/MessageSubscriptionService.java index aa0f7dabf20..20a014435bc 100644 --- a/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/MessageSubscriptionService.java +++ b/plugins/org.eclipse.osee.ote.client.msg/src/org/eclipse/osee/ote/client/msg/core/internal/MessageSubscriptionService.java @@ -54,268 +54,271 @@ import org.eclipse.osee.ote.service.ITestConnectionListener; */ public class MessageSubscriptionService implements IOteMessageService, IMessageDictionaryListener, ITestConnectionListener, IMsgToolServiceClient { - /** * Static Fields ** */ - private static final int MAX_CONCURRENT_WORKER_THREADS = Runtime.getRuntime().availableProcessors() + 1; - - private final InetAddress localAddress; - private final LinkedList<MessageSubscription> subscriptions = new LinkedList<MessageSubscription>(); - private IMsgToolServiceClient exportedThis = null; - private AbstractMessageDataBase msgDatabase; - private UdpFileTransferHandler fileTransferHandler; - - private final ExecutorService threadPool = - Executors.newFixedThreadPool(MAX_CONCURRENT_WORKER_THREADS, new ThreadFactory() { - private final ThreadGroup group = - new ThreadGroup(Thread.currentThread().getThreadGroup(), "Msg Watch Workers"); - private int count = 1; - - public Thread newThread(Runnable arg0) { - Thread thread = new Thread(group, arg0, "Msg Watch Wrkr - " + count++); - thread.setDaemon(false); - return thread; - } - - }); - - /** - * Monitors a set of channels for message updates and dispatches the updates to worker threads - */ - private UpdateDispatcher dispatcher = null; - private IRemoteMessageService service; - - private final IMessageDbFactory messageDbFactory; - private final IOteClientService clientService; - - public MessageSubscriptionService(IOteClientService service, IMessageDbFactory messageDbFactory) throws IOException { - this.messageDbFactory = messageDbFactory; - localAddress = InetAddress.getLocalHost(); - OseeLog.log(Activator.class, Level.INFO, - "OTE client message service started on: " + localAddress.getHostAddress()); - clientService = service; - clientService.addDictionaryListener(this); - clientService.addConnectionListener(this); - } - - public synchronized IMessageSubscription subscribe(String name) { - MessageSubscription subscription = new MessageSubscription(this); - subscription.bind(name); - if (msgDatabase != null) { - subscription.attachMessageDb(msgDatabase); - if (service != null) { - subscription.attachService(service); - } - } - subscriptions.add(subscription); - return subscription; - } - - /** - * Shuts down the client message service. All worker threads will be terminated and all IO resources will be closed. - */ - public void shutdown() { - OseeLog.log(MessageSubscriptionService.class, Level.INFO, "shutting down subscription service"); - clientService.removeDictionaryListener(this); - clientService.removeConnectionListener(this); - shutdownDispatcher(); - threadPool.shutdown(); - try { - threadPool.awaitTermination(5, TimeUnit.SECONDS); - } catch (InterruptedException ex1) { - OseeLog.log(Activator.class, Level.WARNING, ex1.toString(), ex1); - } - } - - @Override - public synchronized void onConnectionLost(IServiceConnector connector, IHostTestEnvironment testHost) { - OseeLog.log(Activator.class, Level.INFO, "connection lost: ote client message service halted"); - shutdownDispatcher(); - msgDatabase.detachService(null); - for (MessageSubscription subscription : subscriptions) { - subscription.detachService(null); - } - exportedThis = null; - service = null; - } - - @Override - public synchronized void onPostConnect(ConnectionEvent event) { - assert msgDatabase != null; - OseeLog.log(Activator.class, Level.INFO, "connecting OTE client message service"); - if (event.getEnvironment() instanceof ITestEnvironmentMessageSystem) { - ITestEnvironmentMessageSystem env = (ITestEnvironmentMessageSystem) event.getEnvironment(); - try { - service = env.getMessageToolServiceProxy(); - if (service == null) { - throw new Exception("could not get message tool service proxy"); - } - exportedThis = (IMsgToolServiceClient) event.getConnector().export(this); - } catch (Exception e) { - OseeLog.log(MessageSubscriptionService.class, Level.SEVERE, - "failed to create exported Message Tool Client", e); - service = null; - exportedThis = null; - return; - } - - try { - dispatcher = new UpdateDispatcher(service.getMsgUpdateSocketAddress()); - } catch (Exception e) { - OseeLog.log(MessageSubscriptionService.class, Level.SEVERE, "failed to create update dispatcher", e); - service = null; - exportedThis = null; - return; - } - - try { - createProccessors(); - } catch (Exception e) { - OseeLog.log(MessageSubscriptionService.class, Level.SEVERE, "failed to create update processors", e); - service = null; - exportedThis = null; - return; - } - - msgDatabase.attachToService(service, exportedThis); - for (MessageSubscription subscription : subscriptions) { - subscription.attachService(service); - } - dispatcher.start(); - } - } - - private void createProccessors() throws IOException { - Set<? extends DataType> availableTypes = service.getAvailablePhysicalTypes(); - - int port = PortUtil.getInstance().getConsecutiveValidPorts(availableTypes.size()); - for (DataType type : availableTypes) { - final ChannelProcessor handler = new ChannelProcessor(1, type.getToolingBufferSize(), threadPool, msgDatabase, type); - - dispatcher.addChannel(localAddress, port, type, handler); - port++; - } - } - - private void shutdownDispatcher() { - if (dispatcher != null && dispatcher.isRunning()) { - try { - dispatcher.close(); - } catch (Throwable ex) { - OseeLog.log(MessageSubscriptionService.class, Level.WARNING, "exception while closing down dispatcher", ex); - } finally { - dispatcher = null; - } - } - } - - @Override - public synchronized void onPreDisconnect(ConnectionEvent event) { - msgDatabase.detachService(service); - for (MessageSubscription subscription : subscriptions) { - subscription.detachService(service); - } - try { - event.getConnector().unexport(this); - } catch (Exception e) { - OseeLog.log(MessageSubscriptionService.class, Level.WARNING, "problems unexporting Message Tool Client", e); - } - shutdownDispatcher(); - exportedThis = null; - service = null; - } - - @Override - public void changeIsScheduled(String msgName, boolean isScheduled) throws RemoteException { - - } - - @Override - public void changeRate(String msgName, double rate) throws RemoteException { - - } - - @Override - public InetSocketAddress getAddressByType(String messageName, DataType dataType) throws RemoteException { - final DatagramChannel channel = dispatcher.getChannel(dataType); - OseeLog.log(Activator.class, Level.INFO, String.format( - "callback from remote msg manager: msg=%s, type=%s, ip=%s:%d\n", messageName, - dataType.name(), localAddress.toString(), channel.socket().getLocalPort())); - - return new InetSocketAddress(localAddress, channel.socket().getLocalPort()); - } - - @Override - public UserTestSessionKey getTestSessionKey() throws RemoteException { - return clientService.getSessionKey(); - } - - @Override - public synchronized void onDictionaryLoaded(IMessageDictionary dictionary) { - msgDatabase = messageDbFactory.createMessageDataBase(dictionary); - for (MessageSubscription subscription : subscriptions) { - subscription.attachMessageDb(msgDatabase); - } - } - - @Override - public synchronized void onDictionaryUnloaded(IMessageDictionary dictionary) { - for (MessageSubscription subscription : subscriptions) { - subscription.detachMessageDb(msgDatabase); - } - msgDatabase = null; - } - - @Override - public synchronized IFileTransferHandle startRecording(String fileName, List<MessageRecordDetails> list) throws FileNotFoundException, IOException { - if (service == null) { - throw new IllegalStateException("can't record: not connected to test server"); - } - if (fileTransferHandler == null) { - fileTransferHandler = new UdpFileTransferHandler(); - fileTransferHandler.start(); - } - int port = PortUtil.getInstance().getValidPort(); - // get the address of the socket the message recorder is going to write - // data to - InetSocketAddress recorderOutputAddress = service.getRecorderSocketAddress(); - - // setup a transfer from a socket to a file - TransferConfig config = - new TransferConfig(fileName, recorderOutputAddress, - new InetSocketAddress(InetAddress.getLocalHost(), port), - TransferConfig.Direction.SOCKET_TO_FILE, 128000); - IFileTransferHandle handle = fileTransferHandler.registerTransfer(config); - - // send the command to start recording - RecordCommand cmd = - new RecordCommand(exportedThis, new InetSocketAddress(InetAddress.getLocalHost(), port), list); - service.startRecording(cmd); - OseeLog.log(Activator.class, Level.INFO, - "recording started with " + list.size() + " entries, recorder output socket=" - + recorderOutputAddress.toString()); - return handle; - } - - @Override - public synchronized void stopRecording() throws RemoteException, IOException { - try { - service.stopRecording(); - } finally { - if (fileTransferHandler != null && fileTransferHandler.hasActiveTransfers()) { - fileTransferHandler.stopAllTransfers(); - } - fileTransferHandler = null; - } - } - - public AbstractMessageDataBase getMsgDatabase() { - return msgDatabase; - } - - public IRemoteMessageService getService() { - return service; - } - - public void removeSubscription(MessageSubscription subscription) { - subscriptions.remove(subscription); - } + /** * Static Fields ** */ + private static final int MAX_CONCURRENT_WORKER_THREADS = Runtime.getRuntime().availableProcessors() + 1; + + private final InetAddress localAddress; + private final LinkedList<MessageSubscription> subscriptions = new LinkedList<MessageSubscription>(); + private IMsgToolServiceClient exportedThis = null; + private AbstractMessageDataBase msgDatabase; + private UdpFileTransferHandler fileTransferHandler; + + private final ExecutorService threadPool = + Executors.newFixedThreadPool(MAX_CONCURRENT_WORKER_THREADS, new ThreadFactory() { + private final ThreadGroup group = + new ThreadGroup(Thread.currentThread().getThreadGroup(), "Msg Watch Workers"); + private int count = 1; + + public Thread newThread(Runnable arg0) { + Thread thread = new Thread(group, arg0, "Msg Watch Wrkr - " + count++); + thread.setDaemon(false); + return thread; + } + + }); + + /** + * Monitors a set of channels for message updates and dispatches the updates to worker threads + */ + private UpdateDispatcher dispatcher = null; + private IRemoteMessageService service; + + private final IMessageDbFactory messageDbFactory; + private final IOteClientService clientService; + + public MessageSubscriptionService(IOteClientService service, IMessageDbFactory messageDbFactory) throws IOException { + this.messageDbFactory = messageDbFactory; + localAddress = InetAddress.getLocalHost(); + OseeLog.log(Activator.class, Level.INFO, + "OTE client message service started on: " + localAddress.getHostAddress()); + clientService = service; + clientService.addDictionaryListener(this); + clientService.addConnectionListener(this); + } + + public synchronized IMessageSubscription subscribe(String name) { + MessageSubscription subscription = new MessageSubscription(this); + subscription.bind(name); + if (msgDatabase != null) { + subscription.attachMessageDb(msgDatabase); + if (service != null) { + subscription.attachService(service); + } + } + subscriptions.add(subscription); + return subscription; + } + + /** + * Shuts down the client message service. All worker threads will be terminated and all IO resources will be closed. + */ + public void shutdown() { + OseeLog.log(MessageSubscriptionService.class, Level.INFO, "shutting down subscription service"); + clientService.removeDictionaryListener(this); + clientService.removeConnectionListener(this); + shutdownDispatcher(); + threadPool.shutdown(); + try { + threadPool.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException ex1) { + OseeLog.log(Activator.class, Level.WARNING, ex1.toString(), ex1); + } + } + + @Override + public synchronized void onConnectionLost(IServiceConnector connector, IHostTestEnvironment testHost) { + OseeLog.log(Activator.class, Level.INFO, "connection lost: ote client message service halted"); + shutdownDispatcher(); + msgDatabase.detachService(null); + for (MessageSubscription subscription : subscriptions) { + subscription.detachService(null); + } + exportedThis = null; + service = null; + } + + @Override + public synchronized void onPostConnect(ConnectionEvent event) { + assert msgDatabase != null; + OseeLog.log(Activator.class, Level.INFO, "connecting OTE client message service"); + if (event.getEnvironment() instanceof ITestEnvironmentMessageSystem) { + ITestEnvironmentMessageSystem env = (ITestEnvironmentMessageSystem) event.getEnvironment(); + try { + service = env.getMessageToolServiceProxy(); + if (service == null) { + throw new Exception("could not get message tool service proxy"); + } + exportedThis = (IMsgToolServiceClient) event.getConnector().export(this); + } catch (Exception e) { + OseeLog.log(MessageSubscriptionService.class, Level.SEVERE, + "failed to create exported Message Tool Client", e); + service = null; + exportedThis = null; + return; + } + + try { + dispatcher = new UpdateDispatcher(service.getMsgUpdateSocketAddress()); + } catch (Exception e) { + OseeLog.log(MessageSubscriptionService.class, Level.SEVERE, "failed to create update dispatcher", e); + service = null; + exportedThis = null; + return; + } + + try { + createProccessors(); + } catch (Exception e) { + OseeLog.log(MessageSubscriptionService.class, Level.SEVERE, "failed to create update processors", e); + service = null; + exportedThis = null; + return; + } + + msgDatabase.attachToService(service, exportedThis); + for (MessageSubscription subscription : subscriptions) { + subscription.attachService(service); + } + dispatcher.start(); + } + } + + private void createProccessors() throws IOException { + Set<? extends DataType> availableTypes = service.getAvailablePhysicalTypes(); + + int port = PortUtil.getInstance().getConsecutiveValidPorts(availableTypes.size()); + for (DataType type : availableTypes) { + final ChannelProcessor handler = new ChannelProcessor(1, type.getToolingBufferSize(), threadPool, msgDatabase, type); + + dispatcher.addChannel(localAddress, port, type, handler); + port++; + } + } + + private void shutdownDispatcher() { + if (dispatcher != null && dispatcher.isRunning()) { + try { + dispatcher.close(); + } catch (Throwable ex) { + OseeLog.log(MessageSubscriptionService.class, Level.WARNING, "exception while closing down dispatcher", ex); + } finally { + dispatcher = null; + } + } + } + + @Override + public synchronized void onPreDisconnect(ConnectionEvent event) { + if (service == null) { + return; + } + msgDatabase.detachService(service); + for (MessageSubscription subscription : subscriptions) { + subscription.detachService(service); + } + try { + event.getConnector().unexport(this); + } catch (Exception e) { + OseeLog.log(MessageSubscriptionService.class, Level.WARNING, "problems unexporting Message Tool Client", e); + } + shutdownDispatcher(); + exportedThis = null; + service = null; + } + + @Override + public void changeIsScheduled(String msgName, boolean isScheduled) throws RemoteException { + + } + + @Override + public void changeRate(String msgName, double rate) throws RemoteException { + + } + + @Override + public InetSocketAddress getAddressByType(String messageName, DataType dataType) throws RemoteException { + final DatagramChannel channel = dispatcher.getChannel(dataType); + OseeLog.log(Activator.class, Level.INFO, String.format( + "callback from remote msg manager: msg=%s, type=%s, ip=%s:%d\n", messageName, + dataType.name(), localAddress.toString(), channel.socket().getLocalPort())); + + return new InetSocketAddress(localAddress, channel.socket().getLocalPort()); + } + + @Override + public UserTestSessionKey getTestSessionKey() throws RemoteException { + return clientService.getSessionKey(); + } + + @Override + public synchronized void onDictionaryLoaded(IMessageDictionary dictionary) { + msgDatabase = messageDbFactory.createMessageDataBase(dictionary); + for (MessageSubscription subscription : subscriptions) { + subscription.attachMessageDb(msgDatabase); + } + } + + @Override + public synchronized void onDictionaryUnloaded(IMessageDictionary dictionary) { + for (MessageSubscription subscription : subscriptions) { + subscription.detachMessageDb(msgDatabase); + } + msgDatabase = null; + } + + @Override + public synchronized IFileTransferHandle startRecording(String fileName, List<MessageRecordDetails> list) throws FileNotFoundException, IOException { + if (service == null) { + throw new IllegalStateException("can't record: not connected to test server"); + } + if (fileTransferHandler == null) { + fileTransferHandler = new UdpFileTransferHandler(); + fileTransferHandler.start(); + } + int port = PortUtil.getInstance().getValidPort(); + // get the address of the socket the message recorder is going to write + // data to + InetSocketAddress recorderOutputAddress = service.getRecorderSocketAddress(); + + // setup a transfer from a socket to a file + TransferConfig config = + new TransferConfig(fileName, recorderOutputAddress, + new InetSocketAddress(InetAddress.getLocalHost(), port), + TransferConfig.Direction.SOCKET_TO_FILE, 128000); + IFileTransferHandle handle = fileTransferHandler.registerTransfer(config); + + // send the command to start recording + RecordCommand cmd = + new RecordCommand(exportedThis, new InetSocketAddress(InetAddress.getLocalHost(), port), list); + service.startRecording(cmd); + OseeLog.log(Activator.class, Level.INFO, + "recording started with " + list.size() + " entries, recorder output socket=" + + recorderOutputAddress.toString()); + return handle; + } + + @Override + public synchronized void stopRecording() throws RemoteException, IOException { + try { + service.stopRecording(); + } finally { + if (fileTransferHandler != null && fileTransferHandler.hasActiveTransfers()) { + fileTransferHandler.stopAllTransfers(); + } + fileTransferHandler = null; + } + } + + public AbstractMessageDataBase getMsgDatabase() { + return msgDatabase; + } + + public IRemoteMessageService getService() { + return service; + } + + public void removeSubscription(MessageSubscription subscription) { + subscriptions.remove(subscription); + } } diff --git a/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/commands/SetMessageModeCmd.java b/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/commands/SetMessageModeCmd.java index 47f7427a0ba..f410a4c870b 100644 --- a/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/commands/SetMessageModeCmd.java +++ b/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/commands/SetMessageModeCmd.java @@ -11,8 +11,9 @@ package org.eclipse.osee.ote.message.commands; import java.io.Serializable; +import java.net.InetSocketAddress; + import org.eclipse.osee.ote.message.enums.DataType; -import org.eclipse.osee.ote.message.interfaces.IMsgToolServiceClient; import org.eclipse.osee.ote.message.tool.MessageMode; @@ -27,14 +28,14 @@ public class SetMessageModeCmd implements Serializable{ private final DataType type; private final MessageMode oldMode; private final MessageMode newMode; - private final IMsgToolServiceClient client; + private final InetSocketAddress address; - public SetMessageModeCmd(String name, DataType type, MessageMode oldMode, MessageMode newMode, IMsgToolServiceClient client) { + public SetMessageModeCmd(String name, DataType type, MessageMode oldMode, MessageMode newMode, InetSocketAddress address) { this.name = name; this.type = type; this.oldMode = oldMode; this.newMode = newMode; - this.client = client; + this.address = address; } /** @@ -68,8 +69,8 @@ public class SetMessageModeCmd implements Serializable{ /** * @return Returns the client. */ - public IMsgToolServiceClient getClient() { - return client; + public InetSocketAddress getAddress() { + return address; } /** diff --git a/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/commands/UnSubscribeToMessage.java b/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/commands/UnSubscribeToMessage.java index 2ba5daa8da4..6da5c9b77bc 100644 --- a/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/commands/UnSubscribeToMessage.java +++ b/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/commands/UnSubscribeToMessage.java @@ -10,10 +10,11 @@ *******************************************************************************/ package org.eclipse.osee.ote.message.commands;
-import java.io.Serializable;
-import org.eclipse.osee.ote.message.enums.DataType;
-import org.eclipse.osee.ote.message.interfaces.IMsgToolServiceClient;
-import org.eclipse.osee.ote.message.tool.MessageMode;
+import java.io.Serializable; +import java.net.InetSocketAddress; + +import org.eclipse.osee.ote.message.enums.DataType; +import org.eclipse.osee.ote.message.tool.MessageMode; @@ -30,7 +31,7 @@ public class UnSubscribeToMessage implements Serializable { private static final long serialVersionUID = -1140091630056507142L;
private final String messageName;
private final MessageMode mode;
- private final IMsgToolServiceClient client;
+ private final InetSocketAddress address;
private final DataType memTypeOrdinal;
/**
* Creates a new unsubscribe command.
@@ -39,10 +40,10 @@ public class UnSubscribeToMessage implements Serializable { * @param address the address of the client.
*/
public UnSubscribeToMessage(final String messageName, final MessageMode mode,
- final DataType memTypeOrdinal, IMsgToolServiceClient client) {
+ final DataType memTypeOrdinal, InetSocketAddress address) {
this.messageName = messageName;
this.mode = mode;
- this.client = client;
+ this.address = address;
this.memTypeOrdinal = memTypeOrdinal;
}
@@ -50,8 +51,8 @@ public class UnSubscribeToMessage implements Serializable { return messageName;
}
- public IMsgToolServiceClient getClient() {
- return client;
+ public InetSocketAddress getAddress() {
+ return address;
}
public DataType getMemTypeOrdinal() {
diff --git a/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/interfaces/IRemoteMessageService.java b/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/interfaces/IRemoteMessageService.java index 7572cab5afb..fc80d2a0a99 100644 --- a/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/interfaces/IRemoteMessageService.java +++ b/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/interfaces/IRemoteMessageService.java @@ -18,7 +18,6 @@ import java.util.Set; import org.eclipse.osee.ote.message.commands.RecordCommand; import org.eclipse.osee.ote.message.commands.SetElementValue; -import org.eclipse.osee.ote.message.commands.SetMessageModeCmd; import org.eclipse.osee.ote.message.commands.SubscribeToMessage; import org.eclipse.osee.ote.message.commands.UnSubscribeToMessage; import org.eclipse.osee.ote.message.commands.ZeroizeElement; @@ -55,8 +54,6 @@ public interface IRemoteMessageService extends Remote { */
SubscriptionDetails subscribeToMessage(SubscribeToMessage cmd) throws RemoteException;
- SubscriptionDetails setReaderWriterMode(SetMessageModeCmd cmd) throws RemoteException;
-
Set<? extends DataType> getAvailablePhysicalTypes() throws RemoteException;
boolean startRecording(RecordCommand cmd) throws RemoteException;
diff --git a/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/tool/AbstractMessageToolService.java b/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/tool/AbstractMessageToolService.java index 924ee0cf4f1..0382c0ac06f 100644 --- a/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/tool/AbstractMessageToolService.java +++ b/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/tool/AbstractMessageToolService.java @@ -37,7 +37,6 @@ import org.eclipse.osee.ote.message.Message; import org.eclipse.osee.ote.message.MessageSystemTestEnvironment; import org.eclipse.osee.ote.message.commands.RecordCommand; import org.eclipse.osee.ote.message.commands.SetElementValue; -import org.eclipse.osee.ote.message.commands.SetMessageModeCmd; import org.eclipse.osee.ote.message.commands.SubscribeToMessage; import org.eclipse.osee.ote.message.commands.UnSubscribeToMessage; import org.eclipse.osee.ote.message.commands.ZeroizeElement; @@ -87,11 +86,16 @@ private static final boolean debugEnabled = false; private final IMsgToolServiceClient remoteReference; private final InetSocketAddress ipAddress; - + + private final int hashcode; public ClientInfo(final IMsgToolServiceClient remoteReference, final InetSocketAddress ipAddress) { super(); + if (ipAddress == null) { + throw new IllegalArgumentException("ip address is null"); + } this.remoteReference = remoteReference; this.ipAddress = ipAddress; + hashcode = 31 * (31 + ipAddress.hashCode()); } public InetSocketAddress getIpAddress() { @@ -104,13 +108,13 @@ private static final boolean debugEnabled = false; @Override public int hashCode() { - return ipAddress.hashCode() ^ remoteReference.hashCode(); + return hashcode; } @Override public boolean equals(Object obj) { ClientInfo client = (ClientInfo) obj; - return ipAddress.equals(client.ipAddress) && remoteReference.equals(client.remoteReference); + return ipAddress.equals(client.ipAddress); } } @@ -232,9 +236,9 @@ private static final boolean debugEnabled = false; // return null; // } - public synchronized ClientInfo findClient(IMsgToolServiceClient client) { + public synchronized ClientInfo findClient(InetSocketAddress address) { for (ClientInfo clientInfo : clients) { - if (clientInfo.remoteReference.equals(client)) { + if (clientInfo.ipAddress.equals(address)) { return clientInfo; } } @@ -634,16 +638,17 @@ private static final boolean debugEnabled = false; type.name())); } final SubscriptionRecord record = modeMap.get(cmd.getMode()); - ClientInfo client = record.findClient(cmd.getClient()); - OseeLog.log(MessageSystemTestEnvironment.class, Level.INFO, String.format( - "client at %s is unsubscribing to the %s for %s(%s)", client.ipAddress.toString(), cmd.getMode(), name, - type)); + ClientInfo client = record.findClient(cmd.getAddress()); + if (record != null) { /* remove the client address from the listener's client list */ record.removeClient(client); + OseeLog.log(MessageSystemTestEnvironment.class, Level.INFO, String.format( + "client at %s is unsubscribing to the %s for %s(%s)", client.ipAddress.toString(), cmd.getMode(), name, + type)); /* * if the listener has no more clients then remove the listener and unregister the listener * for message updates. @@ -865,66 +870,6 @@ private static final boolean debugEnabled = false; return available; } - public synchronized SubscriptionDetails setReaderWriterMode(SetMessageModeCmd cmd) throws RemoteException { - final String name = cmd.getName(); - - try { - final Map<DataType, EnumMap<MessageMode, SubscriptionRecord>> memToModeMap = messageMap.get(name); - if (memToModeMap == null) { - throw new IllegalStateException("No subscriptions registered for message " + name); - } - final EnumMap<MessageMode, SubscriptionRecord> modeMap = memToModeMap.get(cmd.getType()); - if (modeMap == null) { - throw new IllegalStateException("Could not find record for Mem Type of " + cmd.getType() + " for message "); - } - final SubscriptionRecord record = modeMap.get(cmd.getOldMode()); - if (record == null) { - throw new IllegalStateException("Could not find record for " + cmd.getOldMode() + " of the message "); - } - ClientInfo client = record.findClient(cmd.getClient()); - record.removeClient(client); - final InetSocketAddress address = client.ipAddress; - OseeLog.log(MessageSystemTestEnvironment.class, Level.INFO, String.format( - "Client at %s is changing message mode for %s from %s to %s", address.toString(), name, - cmd.getOldMode(), cmd.getNewMode())); - if (record.clients.isEmpty()) { - record.unregister(); - messageRequestor.remove(record.msg); - modeMap.remove(cmd.getOldMode()); - } - SubscriptionRecord newRecord = modeMap.get(cmd.getNewMode()); - if (newRecord == null) { - final Class<?> msgWriterClass = - Activator.getTestEnvironment().getRuntimeManager().loadFromRuntimeLibraryLoader(name); - /* check to see if an instance of a writer for the specified message exists */ - Message<?, ?, ?> newMsg = - cmd.getNewMode() == MessageMode.WRITER ? messageRequestor.getMessageWriter(msgWriterClass) : messageRequestor.getMessageReader(msgWriterClass); - if (newMsg == null) { - throw new ClassNotFoundException( - "Could not find class for the " + cmd.getNewMode().toString() + " of the message"); - } - OseeLog.log( - MessageSystemTestEnvironment.class, - Level.INFO, - "Adding a subscription to: " + newMsg.getName() + " type[" + record.key.getType() + "] mode[" + cmd.getNewMode() + "]"); - newRecord = new SubscriptionRecord(newMsg, record.key.getType(), cmd.getNewMode(), client); - newMsg.setMemTypeActive(record.key.getType()); - modeMap.put(cmd.getNewMode(), newRecord); - } else { - newRecord.addClient(client); - } - - OseeLog.log(MessageSystemTestEnvironment.class, Level.INFO, - "success changing reader/writer mode. New mode = " + newRecord.key.getMode()); - return new SubscriptionDetails(newRecord.key, newRecord.msg.getActiveDataSource(cmd.getType()).toByteArray(), - newRecord.msg.getAvailableMemTypes()); - } catch (Throwable t) { - String title = "Could not set reader/writer for message " + name; - OseeLog.log(MessageSystemTestEnvironment.class, Level.INFO, t.getMessage(), t); - throw new RemoteException(title, t); - } - } - public Map<String, Throwable> getCancelledSubscriptions() { return cancelledSubscriptions; } diff --git a/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/tool/SubscriptionKey.java b/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/tool/SubscriptionKey.java index 9fb38ee262c..d6e411ce23e 100644 --- a/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/tool/SubscriptionKey.java +++ b/plugins/org.eclipse.osee.ote.message/src/org/eclipse/osee/ote/message/tool/SubscriptionKey.java @@ -11,6 +11,7 @@ package org.eclipse.osee.ote.message.tool;
import java.io.Serializable;
+
import org.eclipse.osee.ote.message.enums.DataType;
/**
@@ -50,4 +51,5 @@ public final class SubscriptionKey implements Serializable { public String getMessageClassName() {
return messageClassName;
}
+
}
|