Skip to main content
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorafinkbein2010-04-05 19:26:48 -0400
committerafinkbein2010-04-05 19:26:48 -0400
commit4028d9f9a1ae4a6a11673bf1777512603f05414e (patch)
tree0b53c15d8283240eea75cca2f8ca5d7797100d25 /plugins/org.eclipse.osee.framework.messaging
parent10cd3e36a7a4728eb19da046a8696476680fdd41 (diff)
downloadorg.eclipse.osee-4028d9f9a1ae4a6a11673bf1777512603f05414e.tar.gz
org.eclipse.osee-4028d9f9a1ae4a6a11673bf1777512603f05414e.tar.xz
org.eclipse.osee-4028d9f9a1ae4a6a11673bf1777512603f05414e.zip
Changes to support using selectors for message filtering
Diffstat (limited to 'plugins/org.eclipse.osee.framework.messaging')
-rw-r--r--plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/ConnectionNode.java22
-rw-r--r--plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/FailoverConnectionNode.java49
-rw-r--r--plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/activemq/ActiveMqMessageListenerWrapper.java35
-rw-r--r--plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/activemq/ConnectionNodeActiveMq.java137
4 files changed, 144 insertions, 99 deletions
diff --git a/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/ConnectionNode.java b/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/ConnectionNode.java
index 4f39b84f78..0b4a800619 100644
--- a/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/ConnectionNode.java
+++ b/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/ConnectionNode.java
@@ -5,6 +5,7 @@
*/
package org.eclipse.osee.framework.messaging;
+import java.util.Properties;
import org.eclipse.osee.framework.core.exception.OseeCoreException;
/**
@@ -12,20 +13,23 @@ import org.eclipse.osee.framework.core.exception.OseeCoreException;
*/
public interface ConnectionNode {
- public void subscribe(MessageID messageId, OseeMessagingListener listener, final OseeMessagingStatusCallback statusCallback);
- public void unsubscribe(MessageID messageId, OseeMessagingListener listener, final OseeMessagingStatusCallback statusCallback);
+ void subscribe(MessageID messageId, OseeMessagingListener listener, final OseeMessagingStatusCallback statusCallback);
+ void subscribe(MessageID messageId, OseeMessagingListener listener, String selector, final OseeMessagingStatusCallback statusCallback);
+ void unsubscribe(MessageID messageId, OseeMessagingListener listener, final OseeMessagingStatusCallback statusCallback);
- public boolean subscribeToReply(MessageID messageId, OseeMessagingListener listener);
- public boolean unsubscribteToReply(MessageID messageId, OseeMessagingListener listener);
+ boolean subscribeToReply(MessageID messageId, OseeMessagingListener listener);
+ boolean unsubscribteToReply(MessageID messageId, OseeMessagingListener listener);
- public void send(MessageID topic, Object body, final OseeMessagingStatusCallback statusCallback) throws OseeCoreException;
-
- public void addConnectionListener(ConnectionListener connectionListener);
- public void removeConnectionListener(ConnectionListener connectionListener);
+ void send(MessageID topic, Object body, final OseeMessagingStatusCallback statusCallback) throws OseeCoreException;
+ void send(MessageID topic, Object body, Properties properties, OseeMessagingStatusCallback statusCallback) throws OseeCoreException;
+
+ void addConnectionListener(ConnectionListener connectionListener);
+ void removeConnectionListener(ConnectionListener connectionListener);
- public void stop();
+ void stop();
String getSummary();
String getSubscribers();
String getSenders();
+
}
diff --git a/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/FailoverConnectionNode.java b/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/FailoverConnectionNode.java
index 21821a7372..6d23a9719f 100644
--- a/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/FailoverConnectionNode.java
+++ b/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/FailoverConnectionNode.java
@@ -6,13 +6,12 @@
package org.eclipse.osee.framework.messaging.internal;
import java.util.List;
+import java.util.Properties;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
-
import javax.jms.JMSException;
-
import org.eclipse.osee.framework.core.exception.OseeCoreException;
import org.eclipse.osee.framework.logging.OseeLog;
import org.eclipse.osee.framework.messaging.ConnectionListener;
@@ -48,18 +47,24 @@ public class FailoverConnectionNode implements ConnectionNode, Runnable {
@Override
public void send(MessageID topic, Object body, OseeMessagingStatusCallback statusCallback) throws OseeCoreException {
+ send(topic, body, null, statusCallback);
+ }
+
+
+ @Override
+ public void send(MessageID topic, Object body, Properties properties, OseeMessagingStatusCallback statusCallback) throws OseeCoreException {
attemptSmartConnect();
if(lastConnectedState){
- try{
- connectionNode.send(topic, body, statusCallback);
- } catch (OseeCoreException ex){
- stop();
- run();
- connectionNode.send(topic, body, statusCallback);
- }
+ try{
+ connectionNode.send(topic, body, properties, statusCallback);
+ } catch (OseeCoreException ex){
+ stop();
+ run();
+ connectionNode.send(topic, body, properties, statusCallback);
+ }
}
}
-
+
private void attemptSmartConnect() {
if(!lastConnectedState){
run();
@@ -79,6 +84,13 @@ public class FailoverConnectionNode implements ConnectionNode, Runnable {
}
@Override
+ public void subscribe(MessageID messageId, OseeMessagingListener listener, String selector, OseeMessagingStatusCallback statusCallback) {
+ savedSubscribes.add(new SavedSubscribe(messageId, listener, statusCallback));
+ attemptSmartConnect();
+ connectionNode.subscribe(messageId, listener, selector, statusCallback);
+ }
+
+ @Override
public boolean subscribeToReply(MessageID messageId, OseeMessagingListener listener) {
return connectionNode.subscribeToReply(messageId, listener);
}
@@ -110,7 +122,11 @@ public class FailoverConnectionNode implements ConnectionNode, Runnable {
private void subscribeToMessages() {
for (SavedSubscribe subscribe : savedSubscribes) {
- connectionNode.subscribe(subscribe.messageId, subscribe.listener, subscribe.statusCallback);
+ if(subscribe.selector == null){
+ connectionNode.subscribe(subscribe.messageId, subscribe.listener, subscribe.statusCallback);
+ } else {
+ connectionNode.subscribe(subscribe.messageId, subscribe.listener, subscribe.selector, subscribe.statusCallback);
+ }
}
}
@@ -118,13 +134,21 @@ public class FailoverConnectionNode implements ConnectionNode, Runnable {
MessageID messageId;
OseeMessagingListener listener;
OseeMessagingStatusCallback statusCallback;
+ String selector;
- public SavedSubscribe(MessageID messageId, OseeMessagingListener listener, OseeMessagingStatusCallback statusCallback) {
+ public SavedSubscribe(MessageID messageId, OseeMessagingListener listener, String selector, OseeMessagingStatusCallback statusCallback) {
this.messageId = messageId;
this.listener = listener;
this.statusCallback = statusCallback;
+ this.selector = selector;
}
+ public SavedSubscribe(MessageID messageId, OseeMessagingListener listener, OseeMessagingStatusCallback statusCallback) {
+ this.messageId = messageId;
+ this.listener = listener;
+ this.statusCallback = statusCallback;
+ }
+
@Override
public int hashCode() {
final int prime = 31;
@@ -225,5 +249,4 @@ public class FailoverConnectionNode implements ConnectionNode, Runnable {
connectionNode.stop();
run();
}
-
}
diff --git a/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/activemq/ActiveMqMessageListenerWrapper.java b/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/activemq/ActiveMqMessageListenerWrapper.java
index e338eef362..b35e3387bc 100644
--- a/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/activemq/ActiveMqMessageListenerWrapper.java
+++ b/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/activemq/ActiveMqMessageListenerWrapper.java
@@ -6,9 +6,7 @@
package org.eclipse.osee.framework.messaging.internal.activemq;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.logging.Level;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -27,30 +25,18 @@ import org.eclipse.osee.framework.messaging.internal.Activator;
*/
class ActiveMqMessageListenerWrapper implements MessageListener {
- private List<OseeMessagingListener> listeners;
+ private OseeMessagingListener listener;
private MessageProducer producer;
private Session session;
private ActiveMqUtil activeMqUtil;
- ActiveMqMessageListenerWrapper(ActiveMqUtil activeMqUtil, MessageProducer producer, Session session){
+ ActiveMqMessageListenerWrapper(ActiveMqUtil activeMqUtil, MessageProducer producer, Session session, OseeMessagingListener listener){
this.producer = producer;
this.session = session;
- listeners = new CopyOnWriteArrayList<OseeMessagingListener>();
+ this.listener = listener;
this.activeMqUtil = activeMqUtil;
}
- public void addListener(OseeMessagingListener listener){
- listeners.add(listener);
- }
-
- public void removeListener(OseeMessagingListener listener){
- listeners.remove(listener);
- }
-
- public boolean isEmpty(){
- return listeners.isEmpty();
- }
-
public void onMessage(javax.jms.Message jmsMessage){
try{
Destination destReply = jmsMessage.getJMSReplyTo();
@@ -69,17 +55,14 @@ class ActiveMqMessageListenerWrapper implements MessageListener {
}
}
+ OseeMessagingListener getListener(){
+ return listener;
+ }
+
private void process(javax.jms.Message message, ReplyConnection replyConnection) throws JMSException, OseeCoreException{
Map<String, Object> headers = new HashMap<String, Object>();
- for(OseeMessagingListener listener:listeners){
- if(listener != null){
- listener.process(activeMqUtil.translateMessage(message, listener.getClazz()), headers, replyConnection);
- }
- }
+ listener.process(activeMqUtil.translateMessage(message, listener.getClazz()), headers, replyConnection);
OseeLog.log(Activator.class, Level.FINE, String.format("recieved message %s - %s", message.getJMSDestination().toString(), message.toString()));
}
-
- List<OseeMessagingListener> getListeners(){
- return listeners;
- }
+
}
diff --git a/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/activemq/ConnectionNodeActiveMq.java b/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/activemq/ConnectionNodeActiveMq.java
index 1a2ca19949..9129043092 100644
--- a/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/activemq/ConnectionNodeActiveMq.java
+++ b/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/activemq/ConnectionNodeActiveMq.java
@@ -5,13 +5,15 @@
*/
package org.eclipse.osee.framework.messaging.internal.activemq;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.logging.Level;
-
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.ExceptionListener;
@@ -23,10 +25,11 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TemporaryTopic;
import javax.jms.Topic;
-
import org.apache.activemq.ActiveMQConnectionFactory;
import org.eclipse.osee.framework.core.exception.OseeCoreException;
import org.eclipse.osee.framework.core.exception.OseeWrappedException;
+import org.eclipse.osee.framework.jdk.core.type.CompositeKeyHashMap;
+import org.eclipse.osee.framework.jdk.core.type.Pair;
import org.eclipse.osee.framework.logging.OseeLog;
import org.eclipse.osee.framework.messaging.ConnectionListener;
import org.eclipse.osee.framework.messaging.ConnectionNodeFailoverSupport;
@@ -50,12 +53,11 @@ class ConnectionNodeActiveMq implements ConnectionNodeFailoverSupport, MessageLi
private TemporaryTopic temporaryTopic;
private MessageConsumer replyToConsumer;
private Map<String, OseeMessagingListener> replyListeners;
- private Map<String, ActiveMqMessageListenerWrapper> regularListeners;
+ private CompositeKeyHashMap<String, MessageConsumer, OseeMessagingListener> regularListeners;
private boolean started = false;
private ConcurrentHashMap<String, Topic> topicCache;
private ConcurrentHashMap<Topic, MessageProducer> messageProducerCache;
- private ConcurrentHashMap<Topic, MessageConsumer> messageConsumerCache;
private final ExceptionListener exceptionListener;
private MessageProducer replyProducer;
@@ -66,9 +68,8 @@ class ConnectionNodeActiveMq implements ConnectionNodeFailoverSupport, MessageLi
this.exceptionListener = exceptionListener;
activeMqUtil = new ActiveMqUtil();
topicCache = new ConcurrentHashMap<String, Topic>();
- messageConsumerCache = new ConcurrentHashMap<Topic, MessageConsumer>();
messageProducerCache = new ConcurrentHashMap<Topic, MessageProducer>();
- regularListeners = new ConcurrentHashMap<String, ActiveMqMessageListenerWrapper>();
+ regularListeners = new CompositeKeyHashMap<String, MessageConsumer, OseeMessagingListener>(64, true);
replyListeners = new ConcurrentHashMap<String, OseeMessagingListener>();
}
@@ -95,13 +96,18 @@ class ConnectionNodeActiveMq implements ConnectionNodeFailoverSupport, MessageLi
@Override
public synchronized void send(MessageID topic, Object body, OseeMessagingStatusCallback statusCallback) throws OseeCoreException {
+ send(topic, body, null, statusCallback);
+ }
+
+ @Override
+ public synchronized void send(MessageID topic, Object body, Properties properties, OseeMessagingStatusCallback statusCallback) throws OseeCoreException {
try {
if (topic.isTopic()) {
try{
- sendInternal(topic, body, statusCallback);
+ sendInternal(topic, body, properties, statusCallback);
} catch (JMSException ex){
- removeProducerFromCache(topic);
- sendInternal(topic, body, statusCallback);
+ removeProducerFromCache(topic);
+ sendInternal(topic, body, properties, statusCallback);
}
OseeLog.log(Activator.class, Level.FINE, String.format("Sending message %s - %s", topic.getName(), topic.getGuid()));
statusCallback.success();
@@ -115,13 +121,36 @@ class ConnectionNodeActiveMq implements ConnectionNodeFailoverSupport, MessageLi
}
}
- private synchronized void sendInternal(MessageID topic, Object body, OseeMessagingStatusCallback statusCallback) throws JMSException, OseeCoreException {
+ private synchronized void sendInternal(MessageID topic, Object body, Properties properties, OseeMessagingStatusCallback statusCallback) throws JMSException, OseeCoreException {
Topic destination = getOrCreateTopic(topic);
MessageProducer producer = getOrCreateProducer(destination);
Message msg = activeMqUtil.createMessage(session, topic.getSerializationClass(), body);
if (topic.isReplyRequired()) {
msg.setJMSReplyTo(temporaryTopic);
}
+ if(properties != null){
+ for(Entry<Object, Object> entry:properties.entrySet()){
+ if(entry.getValue() instanceof Integer){
+ msg.setIntProperty(entry.getKey().toString(), (Integer)entry.getValue());
+ } if(entry.getValue() instanceof Boolean){
+ msg.setBooleanProperty(entry.getKey().toString(), (Boolean)entry.getValue());
+ } if(entry.getValue() instanceof Byte){
+ msg.setByteProperty(entry.getKey().toString(), (Byte)entry.getValue());
+ } if(entry.getValue() instanceof Double){
+ msg.setDoubleProperty(entry.getKey().toString(), (Double)entry.getValue());
+ } if(entry.getValue() instanceof Float){
+ msg.setFloatProperty(entry.getKey().toString(), (Float)entry.getValue());
+ } if(entry.getValue() instanceof Long){
+ msg.setLongProperty(entry.getKey().toString(), (Long)entry.getValue());
+ } if(entry.getValue() instanceof String){
+ msg.setStringProperty(entry.getKey().toString(), (String)entry.getValue());
+ } if(entry.getValue() instanceof Short){
+ msg.setShortProperty(entry.getKey().toString(), (Short)entry.getValue());
+ } else {
+ msg.setObjectProperty(entry.getKey().toString(), entry.getValue());
+ }
+ }
+ }
producer.send(msg);
OseeLog.log(Activator.class, Level.FINE, String.format("Sending message %s - %s", topic.getName(), topic.getGuid()));
statusCallback.success();
@@ -132,15 +161,30 @@ class ConnectionNodeActiveMq implements ConnectionNodeFailoverSupport, MessageLi
Topic destination;
try {
if (isConnectedThrow()) {
- ActiveMqMessageListenerWrapper wrapperListener = regularListeners.get(messageId.getGuid());
- if (wrapperListener == null) {
- wrapperListener = new ActiveMqMessageListenerWrapper(activeMqUtil, replyProducer, session);
- regularListeners.put(messageId.getGuid(), wrapperListener);
- destination = getOrCreateTopic(messageId);
- MessageConsumer consumer = getOrCreateConsumer(destination);
- consumer.setMessageListener(wrapperListener);
- }
- wrapperListener.addListener(listener);
+ destination = getOrCreateTopic(messageId);
+ MessageConsumer consumer = session.createConsumer(destination);
+ consumer.setMessageListener(new ActiveMqMessageListenerWrapper(activeMqUtil, replyProducer, session, listener));
+ regularListeners.put(messageId.getGuid(), consumer, listener);
+ statusCallback.success();
+ } else {
+ statusCallback.fail(new OseeCoreException("This connection is not started."));
+ }
+ } catch (JMSException ex) {
+ statusCallback.fail(ex);
+ } catch (NullPointerException ex) {
+ statusCallback.fail(ex);
+ }
+ }
+
+ @Override
+ public void subscribe(MessageID messageId, OseeMessagingListener listener, String selector, OseeMessagingStatusCallback statusCallback) {
+ Topic destination;
+ try {
+ if (isConnectedThrow()) {
+ destination = getOrCreateTopic(messageId);
+ MessageConsumer consumer = session.createConsumer(destination, selector);
+ consumer.setMessageListener(new ActiveMqMessageListenerWrapper(activeMqUtil, replyProducer, session, listener));
+ regularListeners.put(messageId.getGuid(), consumer, listener);
statusCallback.success();
} else {
statusCallback.fail(new OseeCoreException("This connection is not started."));
@@ -176,15 +220,6 @@ class ConnectionNodeActiveMq implements ConnectionNodeFailoverSupport, MessageLi
messageProducerCache.remove(destination);
}
- private MessageConsumer getOrCreateConsumer(Topic topic) throws JMSException {
- MessageConsumer consumer = messageConsumerCache.get(topic);
- if (consumer == null) {
- consumer = session.createConsumer(topic);
- messageConsumerCache.put(topic, consumer);
- }
- return consumer;
- }
-
@Override
public boolean subscribeToReply(MessageID messageId, OseeMessagingListener listener) {
replyListeners.put(messageId.getGuid(), listener);
@@ -193,22 +228,25 @@ class ConnectionNodeActiveMq implements ConnectionNodeFailoverSupport, MessageLi
@Override
public void unsubscribe(MessageID messageId, OseeMessagingListener listener, OseeMessagingStatusCallback statusCallback) {
- ActiveMqMessageListenerWrapper wrapperListener = regularListeners.get(messageId.getGuid());
- if (wrapperListener != null) {
- wrapperListener.removeListener(listener);
- if (wrapperListener.isEmpty()) {
- try {
- Topic topic = getOrCreateTopic(messageId);
- MessageConsumer consumer = getOrCreateConsumer(topic);
- consumer.setMessageListener(null);
- consumer.close();
- messageConsumerCache.remove(topic);
- regularListeners.remove(messageId.getGuid());
- } catch (JMSException ex) {
- statusCallback.fail(ex);
+ Map<MessageConsumer, OseeMessagingListener> listeners = regularListeners.getKeyedValues(messageId.getGuid());
+ List<MessageConsumer> consumersToRemove = new ArrayList<MessageConsumer>();
+ if (listeners != null) {
+ try{
+ for(Entry<MessageConsumer, OseeMessagingListener> entry:listeners.entrySet()){
+ if(entry.getValue().equals(listener)){
+ entry.getKey().setMessageListener(null);
+ entry.getKey().close();
+ consumersToRemove.add(entry.getKey());
+ }
}
+ for(MessageConsumer messageConsumer: consumersToRemove){
+ messageConsumer.setMessageListener(null);
+ messageConsumer.close();
+ }
+ }catch (JMSException ex) {
+ statusCallback.fail(ex);
}
- }
+ }
statusCallback.success();
}
@@ -240,7 +278,6 @@ class ConnectionNodeActiveMq implements ConnectionNodeFailoverSupport, MessageLi
public synchronized void stop() {
topicCache.clear();
messageProducerCache.clear();
- messageConsumerCache.clear();
regularListeners.clear();
try {
if (session != null) {
@@ -305,16 +342,14 @@ class ConnectionNodeActiveMq implements ConnectionNodeFailoverSupport, MessageLi
@Override
public String getSubscribers() {
StringBuilder sb = new StringBuilder();
- for(Entry<Topic, MessageConsumer> entry:this.messageConsumerCache.entrySet()){
+ for(Pair<String, MessageConsumer> entry:this.regularListeners.keySet()){
try {
- sb.append(String.format("Topic [%s] \n", entry.getKey().getTopicName()));
- sb.append(String.format("\tConsumer Selector [%s]\n", entry.getValue().getMessageSelector()));
- MessageListener listener = entry.getValue().getMessageListener();
+ sb.append(String.format("Topic [%s] \n", entry.getFirst()));
+ sb.append(String.format("\tConsumer Selector [%s]\n", entry.getSecond().getMessageSelector()));
+ MessageListener listener = entry.getSecond().getMessageListener();
if(listener instanceof ActiveMqMessageListenerWrapper){
sb.append("\tConsumer Listeners:\n");
- for(OseeMessagingListener item:((ActiveMqMessageListenerWrapper)listener).getListeners()){
- sb.append(String.format("\t\t%s\n", item.toString()));
- }
+ sb.append(String.format("\t\t%s\n", ((ActiveMqMessageListenerWrapper)listener).getListener().toString()));
}
} catch (JMSException ex) {
OseeLog.log(Activator.class, Level.SEVERE, ex);
@@ -333,5 +368,5 @@ class ConnectionNodeActiveMq implements ConnectionNodeFailoverSupport, MessageLi
sb.append(getSubscribers());
return sb.toString();
}
-
+
}

Back to the top