Skip to main content
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorafinkbein2010-03-30 14:20:39 -0400
committerafinkbein2010-03-30 14:20:39 -0400
commitf962931347c142cc6bdb9aaf65a6e0f19916629b (patch)
tree10b574bc8321a6de0ab611ce09fb9536e088006b /plugins/org.eclipse.osee.framework.messaging
parentf858e9593fb79e79a9f4c32660c33345a927cf4e (diff)
downloadorg.eclipse.osee-f962931347c142cc6bdb9aaf65a6e0f19916629b.tar.gz
org.eclipse.osee-f962931347c142cc6bdb9aaf65a6e0f19916629b.tar.xz
org.eclipse.osee-f962931347c142cc6bdb9aaf65a6e0f19916629b.zip
Diffstat (limited to 'plugins/org.eclipse.osee.framework.messaging')
-rw-r--r--plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/activemq/ConnectionNodeActiveMq.java30
1 files changed, 24 insertions, 6 deletions
diff --git a/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/activemq/ConnectionNodeActiveMq.java b/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/activemq/ConnectionNodeActiveMq.java
index 28d7ef33b1..5b20951678 100644
--- a/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/activemq/ConnectionNodeActiveMq.java
+++ b/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/activemq/ConnectionNodeActiveMq.java
@@ -11,6 +11,7 @@ import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.logging.Level;
+
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
@@ -21,6 +22,7 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TemporaryTopic;
import javax.jms.Topic;
+
import org.apache.activemq.ActiveMQConnectionFactory;
import org.eclipse.osee.framework.core.exception.OseeCoreException;
import org.eclipse.osee.framework.core.exception.OseeWrappedException;
@@ -91,13 +93,12 @@ class ConnectionNodeActiveMq implements ConnectionNodeFailoverSupport, MessageLi
public synchronized void send(MessageID topic, Object body, OseeMessagingStatusCallback statusCallback) throws OseeCoreException {
try {
if (topic.isTopic()) {
- Topic destination = getOrCreateTopic(topic);
- MessageProducer producer = getOrCreateProducer(destination);
- Message msg = activeMqUtil.createMessage(session, topic.getSerializationClass(), body);
- if (topic.isReplyRequired()) {
- msg.setJMSReplyTo(temporaryTopic);
+ try{
+ sendInternal(topic, body, statusCallback);
+ } catch (JMSException ex){
+ removeProducerFromCache(topic);
+ sendInternal(topic, body, statusCallback);
}
- producer.send(msg);
OseeLog.log(Activator.class, Level.FINE, String.format("Sending message %s - %s", topic.getName(), topic.getGuid()));
statusCallback.success();
}
@@ -109,6 +110,18 @@ class ConnectionNodeActiveMq implements ConnectionNodeFailoverSupport, MessageLi
throw new OseeWrappedException(ex);
}
}
+
+ private synchronized void sendInternal(MessageID topic, Object body, OseeMessagingStatusCallback statusCallback) throws JMSException, OseeCoreException {
+ Topic destination = getOrCreateTopic(topic);
+ MessageProducer producer = getOrCreateProducer(destination);
+ Message msg = activeMqUtil.createMessage(session, topic.getSerializationClass(), body);
+ if (topic.isReplyRequired()) {
+ msg.setJMSReplyTo(temporaryTopic);
+ }
+ producer.send(msg);
+ OseeLog.log(Activator.class, Level.FINE, String.format("Sending message %s - %s", topic.getName(), topic.getGuid()));
+ statusCallback.success();
+ }
@Override
public synchronized void subscribe(MessageID messageId, OseeMessagingListener listener, OseeMessagingStatusCallback statusCallback) {
@@ -153,6 +166,11 @@ class ConnectionNodeActiveMq implements ConnectionNodeFailoverSupport, MessageLi
}
return producer;
}
+
+ private void removeProducerFromCache(MessageID topic) throws JMSException{
+ Topic destination = getOrCreateTopic(topic);
+ messageProducerCache.remove(destination);
+ }
private MessageConsumer getOrCreateConsumer(Topic topic) throws JMSException {
MessageConsumer consumer = messageConsumerCache.get(topic);

Back to the top