From 4028d9f9a1ae4a6a11673bf1777512603f05414e Mon Sep 17 00:00:00 2001 From: afinkbein Date: Mon, 5 Apr 2010 23:26:48 +0000 Subject: Changes to support using selectors for message filtering --- .../osee/framework/messaging/ConnectionNode.java | 22 ++-- .../messaging/internal/FailoverConnectionNode.java | 49 ++++++-- .../activemq/ActiveMqMessageListenerWrapper.java | 35 ++---- .../internal/activemq/ConnectionNodeActiveMq.java | 137 +++++++++++++-------- 4 files changed, 144 insertions(+), 99 deletions(-) (limited to 'plugins/org.eclipse.osee.framework.messaging') diff --git a/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/ConnectionNode.java b/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/ConnectionNode.java index 4f39b84f78d..0b4a8006195 100644 --- a/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/ConnectionNode.java +++ b/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/ConnectionNode.java @@ -5,6 +5,7 @@ */ package org.eclipse.osee.framework.messaging; +import java.util.Properties; import org.eclipse.osee.framework.core.exception.OseeCoreException; /** @@ -12,20 +13,23 @@ import org.eclipse.osee.framework.core.exception.OseeCoreException; */ public interface ConnectionNode { - public void subscribe(MessageID messageId, OseeMessagingListener listener, final OseeMessagingStatusCallback statusCallback); - public void unsubscribe(MessageID messageId, OseeMessagingListener listener, final OseeMessagingStatusCallback statusCallback); + void subscribe(MessageID messageId, OseeMessagingListener listener, final OseeMessagingStatusCallback statusCallback); + void subscribe(MessageID messageId, OseeMessagingListener listener, String selector, final OseeMessagingStatusCallback statusCallback); + void unsubscribe(MessageID messageId, OseeMessagingListener listener, final OseeMessagingStatusCallback statusCallback); - public boolean subscribeToReply(MessageID messageId, OseeMessagingListener listener); - public boolean unsubscribteToReply(MessageID messageId, OseeMessagingListener listener); + boolean subscribeToReply(MessageID messageId, OseeMessagingListener listener); + boolean unsubscribteToReply(MessageID messageId, OseeMessagingListener listener); - public void send(MessageID topic, Object body, final OseeMessagingStatusCallback statusCallback) throws OseeCoreException; - - public void addConnectionListener(ConnectionListener connectionListener); - public void removeConnectionListener(ConnectionListener connectionListener); + void send(MessageID topic, Object body, final OseeMessagingStatusCallback statusCallback) throws OseeCoreException; + void send(MessageID topic, Object body, Properties properties, OseeMessagingStatusCallback statusCallback) throws OseeCoreException; + + void addConnectionListener(ConnectionListener connectionListener); + void removeConnectionListener(ConnectionListener connectionListener); - public void stop(); + void stop(); String getSummary(); String getSubscribers(); String getSenders(); + } diff --git a/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/FailoverConnectionNode.java b/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/FailoverConnectionNode.java index 21821a73723..6d23a9719f4 100644 --- a/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/FailoverConnectionNode.java +++ b/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/FailoverConnectionNode.java @@ -6,13 +6,12 @@ package org.eclipse.osee.framework.messaging.internal; import java.util.List; +import java.util.Properties; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.logging.Level; - import javax.jms.JMSException; - import org.eclipse.osee.framework.core.exception.OseeCoreException; import org.eclipse.osee.framework.logging.OseeLog; import org.eclipse.osee.framework.messaging.ConnectionListener; @@ -48,18 +47,24 @@ public class FailoverConnectionNode implements ConnectionNode, Runnable { @Override public void send(MessageID topic, Object body, OseeMessagingStatusCallback statusCallback) throws OseeCoreException { + send(topic, body, null, statusCallback); + } + + + @Override + public void send(MessageID topic, Object body, Properties properties, OseeMessagingStatusCallback statusCallback) throws OseeCoreException { attemptSmartConnect(); if(lastConnectedState){ - try{ - connectionNode.send(topic, body, statusCallback); - } catch (OseeCoreException ex){ - stop(); - run(); - connectionNode.send(topic, body, statusCallback); - } + try{ + connectionNode.send(topic, body, properties, statusCallback); + } catch (OseeCoreException ex){ + stop(); + run(); + connectionNode.send(topic, body, properties, statusCallback); + } } } - + private void attemptSmartConnect() { if(!lastConnectedState){ run(); @@ -78,6 +83,13 @@ public class FailoverConnectionNode implements ConnectionNode, Runnable { connectionNode.subscribe(messageId, listener, statusCallback); } + @Override + public void subscribe(MessageID messageId, OseeMessagingListener listener, String selector, OseeMessagingStatusCallback statusCallback) { + savedSubscribes.add(new SavedSubscribe(messageId, listener, statusCallback)); + attemptSmartConnect(); + connectionNode.subscribe(messageId, listener, selector, statusCallback); + } + @Override public boolean subscribeToReply(MessageID messageId, OseeMessagingListener listener) { return connectionNode.subscribeToReply(messageId, listener); @@ -110,7 +122,11 @@ public class FailoverConnectionNode implements ConnectionNode, Runnable { private void subscribeToMessages() { for (SavedSubscribe subscribe : savedSubscribes) { - connectionNode.subscribe(subscribe.messageId, subscribe.listener, subscribe.statusCallback); + if(subscribe.selector == null){ + connectionNode.subscribe(subscribe.messageId, subscribe.listener, subscribe.statusCallback); + } else { + connectionNode.subscribe(subscribe.messageId, subscribe.listener, subscribe.selector, subscribe.statusCallback); + } } } @@ -118,13 +134,21 @@ public class FailoverConnectionNode implements ConnectionNode, Runnable { MessageID messageId; OseeMessagingListener listener; OseeMessagingStatusCallback statusCallback; + String selector; - public SavedSubscribe(MessageID messageId, OseeMessagingListener listener, OseeMessagingStatusCallback statusCallback) { + public SavedSubscribe(MessageID messageId, OseeMessagingListener listener, String selector, OseeMessagingStatusCallback statusCallback) { this.messageId = messageId; this.listener = listener; this.statusCallback = statusCallback; + this.selector = selector; } + public SavedSubscribe(MessageID messageId, OseeMessagingListener listener, OseeMessagingStatusCallback statusCallback) { + this.messageId = messageId; + this.listener = listener; + this.statusCallback = statusCallback; + } + @Override public int hashCode() { final int prime = 31; @@ -225,5 +249,4 @@ public class FailoverConnectionNode implements ConnectionNode, Runnable { connectionNode.stop(); run(); } - } diff --git a/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/activemq/ActiveMqMessageListenerWrapper.java b/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/activemq/ActiveMqMessageListenerWrapper.java index e338eef362d..b35e3387bc8 100644 --- a/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/activemq/ActiveMqMessageListenerWrapper.java +++ b/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/activemq/ActiveMqMessageListenerWrapper.java @@ -6,9 +6,7 @@ package org.eclipse.osee.framework.messaging.internal.activemq; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.logging.Level; import javax.jms.Destination; import javax.jms.JMSException; @@ -27,30 +25,18 @@ import org.eclipse.osee.framework.messaging.internal.Activator; */ class ActiveMqMessageListenerWrapper implements MessageListener { - private List listeners; + private OseeMessagingListener listener; private MessageProducer producer; private Session session; private ActiveMqUtil activeMqUtil; - ActiveMqMessageListenerWrapper(ActiveMqUtil activeMqUtil, MessageProducer producer, Session session){ + ActiveMqMessageListenerWrapper(ActiveMqUtil activeMqUtil, MessageProducer producer, Session session, OseeMessagingListener listener){ this.producer = producer; this.session = session; - listeners = new CopyOnWriteArrayList(); + this.listener = listener; this.activeMqUtil = activeMqUtil; } - public void addListener(OseeMessagingListener listener){ - listeners.add(listener); - } - - public void removeListener(OseeMessagingListener listener){ - listeners.remove(listener); - } - - public boolean isEmpty(){ - return listeners.isEmpty(); - } - public void onMessage(javax.jms.Message jmsMessage){ try{ Destination destReply = jmsMessage.getJMSReplyTo(); @@ -69,17 +55,14 @@ class ActiveMqMessageListenerWrapper implements MessageListener { } } + OseeMessagingListener getListener(){ + return listener; + } + private void process(javax.jms.Message message, ReplyConnection replyConnection) throws JMSException, OseeCoreException{ Map headers = new HashMap(); - for(OseeMessagingListener listener:listeners){ - if(listener != null){ - listener.process(activeMqUtil.translateMessage(message, listener.getClazz()), headers, replyConnection); - } - } + listener.process(activeMqUtil.translateMessage(message, listener.getClazz()), headers, replyConnection); OseeLog.log(Activator.class, Level.FINE, String.format("recieved message %s - %s", message.getJMSDestination().toString(), message.toString())); } - - List getListeners(){ - return listeners; - } + } diff --git a/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/activemq/ConnectionNodeActiveMq.java b/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/activemq/ConnectionNodeActiveMq.java index 1a2ca19949c..91290430926 100644 --- a/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/activemq/ConnectionNodeActiveMq.java +++ b/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/activemq/ConnectionNodeActiveMq.java @@ -5,13 +5,15 @@ */ package org.eclipse.osee.framework.messaging.internal.activemq; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.logging.Level; - import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.ExceptionListener; @@ -23,10 +25,11 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TemporaryTopic; import javax.jms.Topic; - import org.apache.activemq.ActiveMQConnectionFactory; import org.eclipse.osee.framework.core.exception.OseeCoreException; import org.eclipse.osee.framework.core.exception.OseeWrappedException; +import org.eclipse.osee.framework.jdk.core.type.CompositeKeyHashMap; +import org.eclipse.osee.framework.jdk.core.type.Pair; import org.eclipse.osee.framework.logging.OseeLog; import org.eclipse.osee.framework.messaging.ConnectionListener; import org.eclipse.osee.framework.messaging.ConnectionNodeFailoverSupport; @@ -50,12 +53,11 @@ class ConnectionNodeActiveMq implements ConnectionNodeFailoverSupport, MessageLi private TemporaryTopic temporaryTopic; private MessageConsumer replyToConsumer; private Map replyListeners; - private Map regularListeners; + private CompositeKeyHashMap regularListeners; private boolean started = false; private ConcurrentHashMap topicCache; private ConcurrentHashMap messageProducerCache; - private ConcurrentHashMap messageConsumerCache; private final ExceptionListener exceptionListener; private MessageProducer replyProducer; @@ -66,9 +68,8 @@ class ConnectionNodeActiveMq implements ConnectionNodeFailoverSupport, MessageLi this.exceptionListener = exceptionListener; activeMqUtil = new ActiveMqUtil(); topicCache = new ConcurrentHashMap(); - messageConsumerCache = new ConcurrentHashMap(); messageProducerCache = new ConcurrentHashMap(); - regularListeners = new ConcurrentHashMap(); + regularListeners = new CompositeKeyHashMap(64, true); replyListeners = new ConcurrentHashMap(); } @@ -95,13 +96,18 @@ class ConnectionNodeActiveMq implements ConnectionNodeFailoverSupport, MessageLi @Override public synchronized void send(MessageID topic, Object body, OseeMessagingStatusCallback statusCallback) throws OseeCoreException { + send(topic, body, null, statusCallback); + } + + @Override + public synchronized void send(MessageID topic, Object body, Properties properties, OseeMessagingStatusCallback statusCallback) throws OseeCoreException { try { if (topic.isTopic()) { try{ - sendInternal(topic, body, statusCallback); + sendInternal(topic, body, properties, statusCallback); } catch (JMSException ex){ - removeProducerFromCache(topic); - sendInternal(topic, body, statusCallback); + removeProducerFromCache(topic); + sendInternal(topic, body, properties, statusCallback); } OseeLog.log(Activator.class, Level.FINE, String.format("Sending message %s - %s", topic.getName(), topic.getGuid())); statusCallback.success(); @@ -115,13 +121,36 @@ class ConnectionNodeActiveMq implements ConnectionNodeFailoverSupport, MessageLi } } - private synchronized void sendInternal(MessageID topic, Object body, OseeMessagingStatusCallback statusCallback) throws JMSException, OseeCoreException { + private synchronized void sendInternal(MessageID topic, Object body, Properties properties, OseeMessagingStatusCallback statusCallback) throws JMSException, OseeCoreException { Topic destination = getOrCreateTopic(topic); MessageProducer producer = getOrCreateProducer(destination); Message msg = activeMqUtil.createMessage(session, topic.getSerializationClass(), body); if (topic.isReplyRequired()) { msg.setJMSReplyTo(temporaryTopic); } + if(properties != null){ + for(Entry entry:properties.entrySet()){ + if(entry.getValue() instanceof Integer){ + msg.setIntProperty(entry.getKey().toString(), (Integer)entry.getValue()); + } if(entry.getValue() instanceof Boolean){ + msg.setBooleanProperty(entry.getKey().toString(), (Boolean)entry.getValue()); + } if(entry.getValue() instanceof Byte){ + msg.setByteProperty(entry.getKey().toString(), (Byte)entry.getValue()); + } if(entry.getValue() instanceof Double){ + msg.setDoubleProperty(entry.getKey().toString(), (Double)entry.getValue()); + } if(entry.getValue() instanceof Float){ + msg.setFloatProperty(entry.getKey().toString(), (Float)entry.getValue()); + } if(entry.getValue() instanceof Long){ + msg.setLongProperty(entry.getKey().toString(), (Long)entry.getValue()); + } if(entry.getValue() instanceof String){ + msg.setStringProperty(entry.getKey().toString(), (String)entry.getValue()); + } if(entry.getValue() instanceof Short){ + msg.setShortProperty(entry.getKey().toString(), (Short)entry.getValue()); + } else { + msg.setObjectProperty(entry.getKey().toString(), entry.getValue()); + } + } + } producer.send(msg); OseeLog.log(Activator.class, Level.FINE, String.format("Sending message %s - %s", topic.getName(), topic.getGuid())); statusCallback.success(); @@ -132,15 +161,30 @@ class ConnectionNodeActiveMq implements ConnectionNodeFailoverSupport, MessageLi Topic destination; try { if (isConnectedThrow()) { - ActiveMqMessageListenerWrapper wrapperListener = regularListeners.get(messageId.getGuid()); - if (wrapperListener == null) { - wrapperListener = new ActiveMqMessageListenerWrapper(activeMqUtil, replyProducer, session); - regularListeners.put(messageId.getGuid(), wrapperListener); - destination = getOrCreateTopic(messageId); - MessageConsumer consumer = getOrCreateConsumer(destination); - consumer.setMessageListener(wrapperListener); - } - wrapperListener.addListener(listener); + destination = getOrCreateTopic(messageId); + MessageConsumer consumer = session.createConsumer(destination); + consumer.setMessageListener(new ActiveMqMessageListenerWrapper(activeMqUtil, replyProducer, session, listener)); + regularListeners.put(messageId.getGuid(), consumer, listener); + statusCallback.success(); + } else { + statusCallback.fail(new OseeCoreException("This connection is not started.")); + } + } catch (JMSException ex) { + statusCallback.fail(ex); + } catch (NullPointerException ex) { + statusCallback.fail(ex); + } + } + + @Override + public void subscribe(MessageID messageId, OseeMessagingListener listener, String selector, OseeMessagingStatusCallback statusCallback) { + Topic destination; + try { + if (isConnectedThrow()) { + destination = getOrCreateTopic(messageId); + MessageConsumer consumer = session.createConsumer(destination, selector); + consumer.setMessageListener(new ActiveMqMessageListenerWrapper(activeMqUtil, replyProducer, session, listener)); + regularListeners.put(messageId.getGuid(), consumer, listener); statusCallback.success(); } else { statusCallback.fail(new OseeCoreException("This connection is not started.")); @@ -176,15 +220,6 @@ class ConnectionNodeActiveMq implements ConnectionNodeFailoverSupport, MessageLi messageProducerCache.remove(destination); } - private MessageConsumer getOrCreateConsumer(Topic topic) throws JMSException { - MessageConsumer consumer = messageConsumerCache.get(topic); - if (consumer == null) { - consumer = session.createConsumer(topic); - messageConsumerCache.put(topic, consumer); - } - return consumer; - } - @Override public boolean subscribeToReply(MessageID messageId, OseeMessagingListener listener) { replyListeners.put(messageId.getGuid(), listener); @@ -193,22 +228,25 @@ class ConnectionNodeActiveMq implements ConnectionNodeFailoverSupport, MessageLi @Override public void unsubscribe(MessageID messageId, OseeMessagingListener listener, OseeMessagingStatusCallback statusCallback) { - ActiveMqMessageListenerWrapper wrapperListener = regularListeners.get(messageId.getGuid()); - if (wrapperListener != null) { - wrapperListener.removeListener(listener); - if (wrapperListener.isEmpty()) { - try { - Topic topic = getOrCreateTopic(messageId); - MessageConsumer consumer = getOrCreateConsumer(topic); - consumer.setMessageListener(null); - consumer.close(); - messageConsumerCache.remove(topic); - regularListeners.remove(messageId.getGuid()); - } catch (JMSException ex) { - statusCallback.fail(ex); + Map listeners = regularListeners.getKeyedValues(messageId.getGuid()); + List consumersToRemove = new ArrayList(); + if (listeners != null) { + try{ + for(Entry entry:listeners.entrySet()){ + if(entry.getValue().equals(listener)){ + entry.getKey().setMessageListener(null); + entry.getKey().close(); + consumersToRemove.add(entry.getKey()); + } } + for(MessageConsumer messageConsumer: consumersToRemove){ + messageConsumer.setMessageListener(null); + messageConsumer.close(); + } + }catch (JMSException ex) { + statusCallback.fail(ex); } - } + } statusCallback.success(); } @@ -240,7 +278,6 @@ class ConnectionNodeActiveMq implements ConnectionNodeFailoverSupport, MessageLi public synchronized void stop() { topicCache.clear(); messageProducerCache.clear(); - messageConsumerCache.clear(); regularListeners.clear(); try { if (session != null) { @@ -305,16 +342,14 @@ class ConnectionNodeActiveMq implements ConnectionNodeFailoverSupport, MessageLi @Override public String getSubscribers() { StringBuilder sb = new StringBuilder(); - for(Entry entry:this.messageConsumerCache.entrySet()){ + for(Pair entry:this.regularListeners.keySet()){ try { - sb.append(String.format("Topic [%s] \n", entry.getKey().getTopicName())); - sb.append(String.format("\tConsumer Selector [%s]\n", entry.getValue().getMessageSelector())); - MessageListener listener = entry.getValue().getMessageListener(); + sb.append(String.format("Topic [%s] \n", entry.getFirst())); + sb.append(String.format("\tConsumer Selector [%s]\n", entry.getSecond().getMessageSelector())); + MessageListener listener = entry.getSecond().getMessageListener(); if(listener instanceof ActiveMqMessageListenerWrapper){ sb.append("\tConsumer Listeners:\n"); - for(OseeMessagingListener item:((ActiveMqMessageListenerWrapper)listener).getListeners()){ - sb.append(String.format("\t\t%s\n", item.toString())); - } + sb.append(String.format("\t\t%s\n", ((ActiveMqMessageListenerWrapper)listener).getListener().toString())); } } catch (JMSException ex) { OseeLog.log(Activator.class, Level.SEVERE, ex); @@ -333,5 +368,5 @@ class ConnectionNodeActiveMq implements ConnectionNodeFailoverSupport, MessageLi sb.append(getSubscribers()); return sb.toString(); } - + } -- cgit v1.2.3