Skip to main content
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--plugins/org.eclipse.osee.framework.messaging.test/src/org/eclipse/osee/framework/messaging/services/internal/BasicListener.java44
-rw-r--r--plugins/org.eclipse.osee.framework.messaging.test/src/org/eclipse/osee/framework/messaging/services/internal/TestMessageServicesMultipleConsumers.java107
-rw-r--r--plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/ConnectionNode.java22
-rw-r--r--plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/FailoverConnectionNode.java49
-rw-r--r--plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/activemq/ActiveMqMessageListenerWrapper.java35
-rw-r--r--plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/activemq/ConnectionNodeActiveMq.java137
6 files changed, 295 insertions, 99 deletions
diff --git a/plugins/org.eclipse.osee.framework.messaging.test/src/org/eclipse/osee/framework/messaging/services/internal/BasicListener.java b/plugins/org.eclipse.osee.framework.messaging.test/src/org/eclipse/osee/framework/messaging/services/internal/BasicListener.java
new file mode 100644
index 00000000000..250f3ad17af
--- /dev/null
+++ b/plugins/org.eclipse.osee.framework.messaging.test/src/org/eclipse/osee/framework/messaging/services/internal/BasicListener.java
@@ -0,0 +1,44 @@
+/*
+ * Created on Apr 5, 2010
+ *
+ * PLACE_YOUR_DISTRIBUTION_STATEMENT_RIGHT_HERE
+ */
+package org.eclipse.osee.framework.messaging.services.internal;
+
+import java.util.Map;
+import org.eclipse.osee.framework.messaging.OseeMessagingListener;
+import org.eclipse.osee.framework.messaging.ReplyConnection;
+import org.eclipse.osee.framework.messaging.test.msg.TestMessage;
+
+/**
+ * @author b1528444
+ *
+ */
+public class BasicListener extends OseeMessagingListener {
+
+ private int id;
+ private boolean received = false;
+
+ public BasicListener(int id) {
+ this.id = id;
+ }
+
+ @Override
+ public Class<?> getClazz() {
+ return TestMessage.class;
+ }
+
+ @Override
+ public void process(Object message, Map<String, Object> headers, ReplyConnection replyConnection) {
+ System.out.println(message + " - " + id);
+ received = true;
+ }
+
+ public boolean isReceived(){
+ return received;
+ }
+
+ public String toString(){
+ return "BasicListener " + id;
+ }
+}
diff --git a/plugins/org.eclipse.osee.framework.messaging.test/src/org/eclipse/osee/framework/messaging/services/internal/TestMessageServicesMultipleConsumers.java b/plugins/org.eclipse.osee.framework.messaging.test/src/org/eclipse/osee/framework/messaging/services/internal/TestMessageServicesMultipleConsumers.java
new file mode 100644
index 00000000000..f8405f01f85
--- /dev/null
+++ b/plugins/org.eclipse.osee.framework.messaging.test/src/org/eclipse/osee/framework/messaging/services/internal/TestMessageServicesMultipleConsumers.java
@@ -0,0 +1,107 @@
+/*
+ * Created on Jan 26, 2010
+ *
+ * PLACE_YOUR_DISTRIBUTION_STATEMENT_RIGHT_HERE
+ */
+package org.eclipse.osee.framework.messaging.services.internal;
+
+import static org.junit.Assert.assertTrue;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import org.eclipse.osee.framework.messaging.ConnectionNode;
+import org.eclipse.osee.framework.messaging.NodeInfo;
+import org.eclipse.osee.framework.messaging.OseeMessagingStatusCallback;
+import org.eclipse.osee.framework.messaging.internal.BaseBrokerTesting;
+import org.eclipse.osee.framework.messaging.internal.TestMessages;
+import org.eclipse.osee.framework.messaging.test.msg.TestMessage;
+
+/**
+ * @author b1528444
+ *
+ */
+public class TestMessageServicesMultipleConsumers extends BaseBrokerTesting implements OseeMessagingStatusCallback {
+
+ private static String BROKER_URI_SERVER = "tcp://localhost:61616";
+ private static String BROKER_URI = "tcp://localhost:61616";
+
+ @org.junit.Before
+ public void startBroker(){
+ try {
+ startEmbeddedBroker("testBroker", BROKER_URI_SERVER);
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+ }
+
+ @org.junit.After
+ public void stopBroker(){
+ try {
+ stopEmbeddedBroker("testBroker", BROKER_URI_SERVER);
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+ }
+
+ @org.junit.Test
+ public void testMultipleConsumers() throws Exception{
+ ConnectionNode connection = getMessaging().get(new NodeInfo("osee-jms", new URI(BROKER_URI)));
+
+ List<BasicListener> listeners = new ArrayList<BasicListener>();
+ for(int i = 0; i < 20; i++){
+ BasicListener listener = new BasicListener(i);
+ listeners.add(listener);
+ connection.subscribe(TestMessages.JMS_TOPIC, listener, this);
+ }
+
+ TestMessage message = new TestMessage();
+ message.setMessage("TestMessage 1");
+ connection.send(TestMessages.JMS_TOPIC, message, this);
+
+ testWait(500);
+
+ for(BasicListener listener:listeners){
+ assertTrue(listener.toString(), listener.isReceived());
+ }
+ }
+
+ @org.junit.Test
+ public void testMultipleConsumersWithSelector() throws Exception{
+ ConnectionNode connection = getMessaging().get(new NodeInfo("osee-jms", new URI(BROKER_URI)));
+
+ List<BasicListener> listeners = new ArrayList<BasicListener>();
+ for(int i = 0; i < 20; i++){
+ BasicListener listener = new BasicListener(i);
+ listeners.add(listener);
+ connection.subscribe(TestMessages.JMS_TOPIC, listener, String.format("id = %d", i), this);
+ }
+
+ TestMessage message = new TestMessage();
+ message.setMessage("TestMessage 1");
+ Properties properties = new Properties();
+ properties.put("id", 1);
+ connection.send(TestMessages.JMS_TOPIC, message, properties, this);
+
+ testWait(500);
+
+ int receivedCount = 0;
+ for(BasicListener listener:listeners){
+ if(listener.isReceived()){
+ receivedCount++;
+ }
+ }
+
+ assertTrue(String.format("received %d messages", receivedCount), receivedCount == 1);
+ }
+
+ @Override
+ public void fail(Throwable th) {
+ assertTrue(th.getMessage(), false);
+ }
+
+ @Override
+ public void success() {
+ }
+
+}
diff --git a/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/ConnectionNode.java b/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/ConnectionNode.java
index 4f39b84f78d..0b4a8006195 100644
--- a/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/ConnectionNode.java
+++ b/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/ConnectionNode.java
@@ -5,6 +5,7 @@
*/
package org.eclipse.osee.framework.messaging;
+import java.util.Properties;
import org.eclipse.osee.framework.core.exception.OseeCoreException;
/**
@@ -12,20 +13,23 @@ import org.eclipse.osee.framework.core.exception.OseeCoreException;
*/
public interface ConnectionNode {
- public void subscribe(MessageID messageId, OseeMessagingListener listener, final OseeMessagingStatusCallback statusCallback);
- public void unsubscribe(MessageID messageId, OseeMessagingListener listener, final OseeMessagingStatusCallback statusCallback);
+ void subscribe(MessageID messageId, OseeMessagingListener listener, final OseeMessagingStatusCallback statusCallback);
+ void subscribe(MessageID messageId, OseeMessagingListener listener, String selector, final OseeMessagingStatusCallback statusCallback);
+ void unsubscribe(MessageID messageId, OseeMessagingListener listener, final OseeMessagingStatusCallback statusCallback);
- public boolean subscribeToReply(MessageID messageId, OseeMessagingListener listener);
- public boolean unsubscribteToReply(MessageID messageId, OseeMessagingListener listener);
+ boolean subscribeToReply(MessageID messageId, OseeMessagingListener listener);
+ boolean unsubscribteToReply(MessageID messageId, OseeMessagingListener listener);
- public void send(MessageID topic, Object body, final OseeMessagingStatusCallback statusCallback) throws OseeCoreException;
-
- public void addConnectionListener(ConnectionListener connectionListener);
- public void removeConnectionListener(ConnectionListener connectionListener);
+ void send(MessageID topic, Object body, final OseeMessagingStatusCallback statusCallback) throws OseeCoreException;
+ void send(MessageID topic, Object body, Properties properties, OseeMessagingStatusCallback statusCallback) throws OseeCoreException;
+
+ void addConnectionListener(ConnectionListener connectionListener);
+ void removeConnectionListener(ConnectionListener connectionListener);
- public void stop();
+ void stop();
String getSummary();
String getSubscribers();
String getSenders();
+
}
diff --git a/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/FailoverConnectionNode.java b/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/FailoverConnectionNode.java
index 21821a73723..6d23a9719f4 100644
--- a/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/FailoverConnectionNode.java
+++ b/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/FailoverConnectionNode.java
@@ -6,13 +6,12 @@
package org.eclipse.osee.framework.messaging.internal;
import java.util.List;
+import java.util.Properties;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
-
import javax.jms.JMSException;
-
import org.eclipse.osee.framework.core.exception.OseeCoreException;
import org.eclipse.osee.framework.logging.OseeLog;
import org.eclipse.osee.framework.messaging.ConnectionListener;
@@ -48,18 +47,24 @@ public class FailoverConnectionNode implements ConnectionNode, Runnable {
@Override
public void send(MessageID topic, Object body, OseeMessagingStatusCallback statusCallback) throws OseeCoreException {
+ send(topic, body, null, statusCallback);
+ }
+
+
+ @Override
+ public void send(MessageID topic, Object body, Properties properties, OseeMessagingStatusCallback statusCallback) throws OseeCoreException {
attemptSmartConnect();
if(lastConnectedState){
- try{
- connectionNode.send(topic, body, statusCallback);
- } catch (OseeCoreException ex){
- stop();
- run();
- connectionNode.send(topic, body, statusCallback);
- }
+ try{
+ connectionNode.send(topic, body, properties, statusCallback);
+ } catch (OseeCoreException ex){
+ stop();
+ run();
+ connectionNode.send(topic, body, properties, statusCallback);
+ }
}
}
-
+
private void attemptSmartConnect() {
if(!lastConnectedState){
run();
@@ -79,6 +84,13 @@ public class FailoverConnectionNode implements ConnectionNode, Runnable {
}
@Override
+ public void subscribe(MessageID messageId, OseeMessagingListener listener, String selector, OseeMessagingStatusCallback statusCallback) {
+ savedSubscribes.add(new SavedSubscribe(messageId, listener, statusCallback));
+ attemptSmartConnect();
+ connectionNode.subscribe(messageId, listener, selector, statusCallback);
+ }
+
+ @Override
public boolean subscribeToReply(MessageID messageId, OseeMessagingListener listener) {
return connectionNode.subscribeToReply(messageId, listener);
}
@@ -110,7 +122,11 @@ public class FailoverConnectionNode implements ConnectionNode, Runnable {
private void subscribeToMessages() {
for (SavedSubscribe subscribe : savedSubscribes) {
- connectionNode.subscribe(subscribe.messageId, subscribe.listener, subscribe.statusCallback);
+ if(subscribe.selector == null){
+ connectionNode.subscribe(subscribe.messageId, subscribe.listener, subscribe.statusCallback);
+ } else {
+ connectionNode.subscribe(subscribe.messageId, subscribe.listener, subscribe.selector, subscribe.statusCallback);
+ }
}
}
@@ -118,13 +134,21 @@ public class FailoverConnectionNode implements ConnectionNode, Runnable {
MessageID messageId;
OseeMessagingListener listener;
OseeMessagingStatusCallback statusCallback;
+ String selector;
- public SavedSubscribe(MessageID messageId, OseeMessagingListener listener, OseeMessagingStatusCallback statusCallback) {
+ public SavedSubscribe(MessageID messageId, OseeMessagingListener listener, String selector, OseeMessagingStatusCallback statusCallback) {
this.messageId = messageId;
this.listener = listener;
this.statusCallback = statusCallback;
+ this.selector = selector;
}
+ public SavedSubscribe(MessageID messageId, OseeMessagingListener listener, OseeMessagingStatusCallback statusCallback) {
+ this.messageId = messageId;
+ this.listener = listener;
+ this.statusCallback = statusCallback;
+ }
+
@Override
public int hashCode() {
final int prime = 31;
@@ -225,5 +249,4 @@ public class FailoverConnectionNode implements ConnectionNode, Runnable {
connectionNode.stop();
run();
}
-
}
diff --git a/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/activemq/ActiveMqMessageListenerWrapper.java b/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/activemq/ActiveMqMessageListenerWrapper.java
index e338eef362d..b35e3387bc8 100644
--- a/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/activemq/ActiveMqMessageListenerWrapper.java
+++ b/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/activemq/ActiveMqMessageListenerWrapper.java
@@ -6,9 +6,7 @@
package org.eclipse.osee.framework.messaging.internal.activemq;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.logging.Level;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -27,30 +25,18 @@ import org.eclipse.osee.framework.messaging.internal.Activator;
*/
class ActiveMqMessageListenerWrapper implements MessageListener {
- private List<OseeMessagingListener> listeners;
+ private OseeMessagingListener listener;
private MessageProducer producer;
private Session session;
private ActiveMqUtil activeMqUtil;
- ActiveMqMessageListenerWrapper(ActiveMqUtil activeMqUtil, MessageProducer producer, Session session){
+ ActiveMqMessageListenerWrapper(ActiveMqUtil activeMqUtil, MessageProducer producer, Session session, OseeMessagingListener listener){
this.producer = producer;
this.session = session;
- listeners = new CopyOnWriteArrayList<OseeMessagingListener>();
+ this.listener = listener;
this.activeMqUtil = activeMqUtil;
}
- public void addListener(OseeMessagingListener listener){
- listeners.add(listener);
- }
-
- public void removeListener(OseeMessagingListener listener){
- listeners.remove(listener);
- }
-
- public boolean isEmpty(){
- return listeners.isEmpty();
- }
-
public void onMessage(javax.jms.Message jmsMessage){
try{
Destination destReply = jmsMessage.getJMSReplyTo();
@@ -69,17 +55,14 @@ class ActiveMqMessageListenerWrapper implements MessageListener {
}
}
+ OseeMessagingListener getListener(){
+ return listener;
+ }
+
private void process(javax.jms.Message message, ReplyConnection replyConnection) throws JMSException, OseeCoreException{
Map<String, Object> headers = new HashMap<String, Object>();
- for(OseeMessagingListener listener:listeners){
- if(listener != null){
- listener.process(activeMqUtil.translateMessage(message, listener.getClazz()), headers, replyConnection);
- }
- }
+ listener.process(activeMqUtil.translateMessage(message, listener.getClazz()), headers, replyConnection);
OseeLog.log(Activator.class, Level.FINE, String.format("recieved message %s - %s", message.getJMSDestination().toString(), message.toString()));
}
-
- List<OseeMessagingListener> getListeners(){
- return listeners;
- }
+
}
diff --git a/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/activemq/ConnectionNodeActiveMq.java b/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/activemq/ConnectionNodeActiveMq.java
index 1a2ca19949c..91290430926 100644
--- a/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/activemq/ConnectionNodeActiveMq.java
+++ b/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/activemq/ConnectionNodeActiveMq.java
@@ -5,13 +5,15 @@
*/
package org.eclipse.osee.framework.messaging.internal.activemq;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.logging.Level;
-
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.ExceptionListener;
@@ -23,10 +25,11 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TemporaryTopic;
import javax.jms.Topic;
-
import org.apache.activemq.ActiveMQConnectionFactory;
import org.eclipse.osee.framework.core.exception.OseeCoreException;
import org.eclipse.osee.framework.core.exception.OseeWrappedException;
+import org.eclipse.osee.framework.jdk.core.type.CompositeKeyHashMap;
+import org.eclipse.osee.framework.jdk.core.type.Pair;
import org.eclipse.osee.framework.logging.OseeLog;
import org.eclipse.osee.framework.messaging.ConnectionListener;
import org.eclipse.osee.framework.messaging.ConnectionNodeFailoverSupport;
@@ -50,12 +53,11 @@ class ConnectionNodeActiveMq implements ConnectionNodeFailoverSupport, MessageLi
private TemporaryTopic temporaryTopic;
private MessageConsumer replyToConsumer;
private Map<String, OseeMessagingListener> replyListeners;
- private Map<String, ActiveMqMessageListenerWrapper> regularListeners;
+ private CompositeKeyHashMap<String, MessageConsumer, OseeMessagingListener> regularListeners;
private boolean started = false;
private ConcurrentHashMap<String, Topic> topicCache;
private ConcurrentHashMap<Topic, MessageProducer> messageProducerCache;
- private ConcurrentHashMap<Topic, MessageConsumer> messageConsumerCache;
private final ExceptionListener exceptionListener;
private MessageProducer replyProducer;
@@ -66,9 +68,8 @@ class ConnectionNodeActiveMq implements ConnectionNodeFailoverSupport, MessageLi
this.exceptionListener = exceptionListener;
activeMqUtil = new ActiveMqUtil();
topicCache = new ConcurrentHashMap<String, Topic>();
- messageConsumerCache = new ConcurrentHashMap<Topic, MessageConsumer>();
messageProducerCache = new ConcurrentHashMap<Topic, MessageProducer>();
- regularListeners = new ConcurrentHashMap<String, ActiveMqMessageListenerWrapper>();
+ regularListeners = new CompositeKeyHashMap<String, MessageConsumer, OseeMessagingListener>(64, true);
replyListeners = new ConcurrentHashMap<String, OseeMessagingListener>();
}
@@ -95,13 +96,18 @@ class ConnectionNodeActiveMq implements ConnectionNodeFailoverSupport, MessageLi
@Override
public synchronized void send(MessageID topic, Object body, OseeMessagingStatusCallback statusCallback) throws OseeCoreException {
+ send(topic, body, null, statusCallback);
+ }
+
+ @Override
+ public synchronized void send(MessageID topic, Object body, Properties properties, OseeMessagingStatusCallback statusCallback) throws OseeCoreException {
try {
if (topic.isTopic()) {
try{
- sendInternal(topic, body, statusCallback);
+ sendInternal(topic, body, properties, statusCallback);
} catch (JMSException ex){
- removeProducerFromCache(topic);
- sendInternal(topic, body, statusCallback);
+ removeProducerFromCache(topic);
+ sendInternal(topic, body, properties, statusCallback);
}
OseeLog.log(Activator.class, Level.FINE, String.format("Sending message %s - %s", topic.getName(), topic.getGuid()));
statusCallback.success();
@@ -115,13 +121,36 @@ class ConnectionNodeActiveMq implements ConnectionNodeFailoverSupport, MessageLi
}
}
- private synchronized void sendInternal(MessageID topic, Object body, OseeMessagingStatusCallback statusCallback) throws JMSException, OseeCoreException {
+ private synchronized void sendInternal(MessageID topic, Object body, Properties properties, OseeMessagingStatusCallback statusCallback) throws JMSException, OseeCoreException {
Topic destination = getOrCreateTopic(topic);
MessageProducer producer = getOrCreateProducer(destination);
Message msg = activeMqUtil.createMessage(session, topic.getSerializationClass(), body);
if (topic.isReplyRequired()) {
msg.setJMSReplyTo(temporaryTopic);
}
+ if(properties != null){
+ for(Entry<Object, Object> entry:properties.entrySet()){
+ if(entry.getValue() instanceof Integer){
+ msg.setIntProperty(entry.getKey().toString(), (Integer)entry.getValue());
+ } if(entry.getValue() instanceof Boolean){
+ msg.setBooleanProperty(entry.getKey().toString(), (Boolean)entry.getValue());
+ } if(entry.getValue() instanceof Byte){
+ msg.setByteProperty(entry.getKey().toString(), (Byte)entry.getValue());
+ } if(entry.getValue() instanceof Double){
+ msg.setDoubleProperty(entry.getKey().toString(), (Double)entry.getValue());
+ } if(entry.getValue() instanceof Float){
+ msg.setFloatProperty(entry.getKey().toString(), (Float)entry.getValue());
+ } if(entry.getValue() instanceof Long){
+ msg.setLongProperty(entry.getKey().toString(), (Long)entry.getValue());
+ } if(entry.getValue() instanceof String){
+ msg.setStringProperty(entry.getKey().toString(), (String)entry.getValue());
+ } if(entry.getValue() instanceof Short){
+ msg.setShortProperty(entry.getKey().toString(), (Short)entry.getValue());
+ } else {
+ msg.setObjectProperty(entry.getKey().toString(), entry.getValue());
+ }
+ }
+ }
producer.send(msg);
OseeLog.log(Activator.class, Level.FINE, String.format("Sending message %s - %s", topic.getName(), topic.getGuid()));
statusCallback.success();
@@ -132,15 +161,30 @@ class ConnectionNodeActiveMq implements ConnectionNodeFailoverSupport, MessageLi
Topic destination;
try {
if (isConnectedThrow()) {
- ActiveMqMessageListenerWrapper wrapperListener = regularListeners.get(messageId.getGuid());
- if (wrapperListener == null) {
- wrapperListener = new ActiveMqMessageListenerWrapper(activeMqUtil, replyProducer, session);
- regularListeners.put(messageId.getGuid(), wrapperListener);
- destination = getOrCreateTopic(messageId);
- MessageConsumer consumer = getOrCreateConsumer(destination);
- consumer.setMessageListener(wrapperListener);
- }
- wrapperListener.addListener(listener);
+ destination = getOrCreateTopic(messageId);
+ MessageConsumer consumer = session.createConsumer(destination);
+ consumer.setMessageListener(new ActiveMqMessageListenerWrapper(activeMqUtil, replyProducer, session, listener));
+ regularListeners.put(messageId.getGuid(), consumer, listener);
+ statusCallback.success();
+ } else {
+ statusCallback.fail(new OseeCoreException("This connection is not started."));
+ }
+ } catch (JMSException ex) {
+ statusCallback.fail(ex);
+ } catch (NullPointerException ex) {
+ statusCallback.fail(ex);
+ }
+ }
+
+ @Override
+ public void subscribe(MessageID messageId, OseeMessagingListener listener, String selector, OseeMessagingStatusCallback statusCallback) {
+ Topic destination;
+ try {
+ if (isConnectedThrow()) {
+ destination = getOrCreateTopic(messageId);
+ MessageConsumer consumer = session.createConsumer(destination, selector);
+ consumer.setMessageListener(new ActiveMqMessageListenerWrapper(activeMqUtil, replyProducer, session, listener));
+ regularListeners.put(messageId.getGuid(), consumer, listener);
statusCallback.success();
} else {
statusCallback.fail(new OseeCoreException("This connection is not started."));
@@ -176,15 +220,6 @@ class ConnectionNodeActiveMq implements ConnectionNodeFailoverSupport, MessageLi
messageProducerCache.remove(destination);
}
- private MessageConsumer getOrCreateConsumer(Topic topic) throws JMSException {
- MessageConsumer consumer = messageConsumerCache.get(topic);
- if (consumer == null) {
- consumer = session.createConsumer(topic);
- messageConsumerCache.put(topic, consumer);
- }
- return consumer;
- }
-
@Override
public boolean subscribeToReply(MessageID messageId, OseeMessagingListener listener) {
replyListeners.put(messageId.getGuid(), listener);
@@ -193,22 +228,25 @@ class ConnectionNodeActiveMq implements ConnectionNodeFailoverSupport, MessageLi
@Override
public void unsubscribe(MessageID messageId, OseeMessagingListener listener, OseeMessagingStatusCallback statusCallback) {
- ActiveMqMessageListenerWrapper wrapperListener = regularListeners.get(messageId.getGuid());
- if (wrapperListener != null) {
- wrapperListener.removeListener(listener);
- if (wrapperListener.isEmpty()) {
- try {
- Topic topic = getOrCreateTopic(messageId);
- MessageConsumer consumer = getOrCreateConsumer(topic);
- consumer.setMessageListener(null);
- consumer.close();
- messageConsumerCache.remove(topic);
- regularListeners.remove(messageId.getGuid());
- } catch (JMSException ex) {
- statusCallback.fail(ex);
+ Map<MessageConsumer, OseeMessagingListener> listeners = regularListeners.getKeyedValues(messageId.getGuid());
+ List<MessageConsumer> consumersToRemove = new ArrayList<MessageConsumer>();
+ if (listeners != null) {
+ try{
+ for(Entry<MessageConsumer, OseeMessagingListener> entry:listeners.entrySet()){
+ if(entry.getValue().equals(listener)){
+ entry.getKey().setMessageListener(null);
+ entry.getKey().close();
+ consumersToRemove.add(entry.getKey());
+ }
}
+ for(MessageConsumer messageConsumer: consumersToRemove){
+ messageConsumer.setMessageListener(null);
+ messageConsumer.close();
+ }
+ }catch (JMSException ex) {
+ statusCallback.fail(ex);
}
- }
+ }
statusCallback.success();
}
@@ -240,7 +278,6 @@ class ConnectionNodeActiveMq implements ConnectionNodeFailoverSupport, MessageLi
public synchronized void stop() {
topicCache.clear();
messageProducerCache.clear();
- messageConsumerCache.clear();
regularListeners.clear();
try {
if (session != null) {
@@ -305,16 +342,14 @@ class ConnectionNodeActiveMq implements ConnectionNodeFailoverSupport, MessageLi
@Override
public String getSubscribers() {
StringBuilder sb = new StringBuilder();
- for(Entry<Topic, MessageConsumer> entry:this.messageConsumerCache.entrySet()){
+ for(Pair<String, MessageConsumer> entry:this.regularListeners.keySet()){
try {
- sb.append(String.format("Topic [%s] \n", entry.getKey().getTopicName()));
- sb.append(String.format("\tConsumer Selector [%s]\n", entry.getValue().getMessageSelector()));
- MessageListener listener = entry.getValue().getMessageListener();
+ sb.append(String.format("Topic [%s] \n", entry.getFirst()));
+ sb.append(String.format("\tConsumer Selector [%s]\n", entry.getSecond().getMessageSelector()));
+ MessageListener listener = entry.getSecond().getMessageListener();
if(listener instanceof ActiveMqMessageListenerWrapper){
sb.append("\tConsumer Listeners:\n");
- for(OseeMessagingListener item:((ActiveMqMessageListenerWrapper)listener).getListeners()){
- sb.append(String.format("\t\t%s\n", item.toString()));
- }
+ sb.append(String.format("\t\t%s\n", ((ActiveMqMessageListenerWrapper)listener).getListener().toString()));
}
} catch (JMSException ex) {
OseeLog.log(Activator.class, Level.SEVERE, ex);
@@ -333,5 +368,5 @@ class ConnectionNodeActiveMq implements ConnectionNodeFailoverSupport, MessageLi
sb.append(getSubscribers());
return sb.toString();
}
-
+
}

Back to the top