Skip to main content
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormmasterso2010-07-13 16:16:55 -0400
committermmasterso2010-07-13 16:16:55 -0400
commitfbd963ea70e860af0f6ec59cb14ac682bec281c0 (patch)
treeb8b805eb6fe4d3dc44957f4a52a684726b2f11dd /plugins/org.eclipse.osee.framework.messaging
parent7cc19a59ca3dac05373bbfda5218adb45d7ed981 (diff)
downloadorg.eclipse.osee-fbd963ea70e860af0f6ec59cb14ac682bec281c0.tar.gz
org.eclipse.osee-fbd963ea70e860af0f6ec59cb14ac682bec281c0.tar.xz
org.eclipse.osee-fbd963ea70e860af0f6ec59cb14ac682bec281c0.zip
Added ConnectionNode functions for using default status callback handlers.
Diffstat (limited to 'plugins/org.eclipse.osee.framework.messaging')
-rw-r--r--plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/ConnectionNode.java66
-rw-r--r--plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/FailoverConnectionNode.java33
-rw-r--r--plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/activemq/ConnectionNodeActiveMq.java48
-rw-r--r--plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/services/internal/OseeMessagingStatusImpl.java8
4 files changed, 132 insertions, 23 deletions
diff --git a/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/ConnectionNode.java b/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/ConnectionNode.java
index 6783b5fb0f..253f8c0080 100644
--- a/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/ConnectionNode.java
+++ b/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/ConnectionNode.java
@@ -6,6 +6,7 @@
package org.eclipse.osee.framework.messaging;
import java.util.Properties;
+
import org.eclipse.osee.framework.core.exception.OseeCoreException;
/**
@@ -13,15 +14,76 @@ import org.eclipse.osee.framework.core.exception.OseeCoreException;
*/
public interface ConnectionNode {
+ /**
+ * Subscribes listener to updates to message/s with the given id.
+ * @param messageId
+ * @param listener
+ * @param statusCallback Used to relay the status of the subscribe command ( success, failure, etc). Can not be null.
+ */
void subscribe(MessageID messageId, OseeMessagingListener listener, final OseeMessagingStatusCallback statusCallback);
+
+ /**
+ * Subscribes listener to updates to message/s with the given id.
+ * @param messageId
+ * @param listener
+ * @param selector A string conforming to the SQL 92 syntax used for extra filtering of subscriptions
+ * @param statusCallback Used to relay the status of the subscribe command ( success, failure, etc). Can not be null.
+ */
void subscribe(MessageID messageId, OseeMessagingListener listener, String selector, final OseeMessagingStatusCallback statusCallback);
+
+ /**
+ * Subscribes listener to updates to message/s with the given id. A default status
+ * handler will be used to log failures.
+ * @param messageId
+ * @param listener
+ */
+ void subscribe(MessageID messageId, OseeMessagingListener listener);
+
+ /**
+ * Unsubscribes listener from updates for message with id equal to messageId.
+ * @param messageId
+ * @param listener
+ * @param statusCallback Used to relay the status of the unsubscribe ( success, failure, etc). Can not be null.
+ */
void unsubscribe(MessageID messageId, OseeMessagingListener listener, final OseeMessagingStatusCallback statusCallback);
+
+ /**
+ * Unsubscribes listener from updates for message with id equal to messageId. A default status
+ * handler will be used to log failures.
+ * @param messageId
+ * @param listener
+ */
+ void unsubscribe(MessageID messageId, OseeMessagingListener listener);
boolean subscribeToReply(MessageID messageId, OseeMessagingListener listener);
boolean unsubscribteToReply(MessageID messageId, OseeMessagingListener listener);
- void send(MessageID topic, Object body, final OseeMessagingStatusCallback statusCallback) throws OseeCoreException;
- void send(MessageID topic, Object body, Properties properties, OseeMessagingStatusCallback statusCallback) throws OseeCoreException;
+ /**
+ * Sends given message.
+ * @param messageId
+ * @param message
+ * @param statusCallback Used to relay the status of the sending of the message ( success, failure, etc). Can not be null.
+ * @throws OseeCoreException
+ */
+ void send(MessageID messageId, Object message, final OseeMessagingStatusCallback statusCallback) throws OseeCoreException;
+
+ /**
+ * Attaches the properties provided to the message before sending.
+ * @param messageId
+ * @param message
+ * @param properties Set of properties to be attached to the message.
+ * @param statusCallback Used to relay the status of the sending of the message ( success, failure, etc). Can not be null.
+ * @throws OseeCoreException
+ */
+ void send(MessageID messageId, Object message, Properties properties, OseeMessagingStatusCallback statusCallback) throws OseeCoreException;
+
+ /**
+ * Sends given message. Uses default status handler to log failures in sending given message.
+ * @param messageId
+ * @param message
+ * @throws OseeCoreException
+ */
+ void send(MessageID messageId, Object message) throws OseeCoreException;
void addConnectionListener(ConnectionListener connectionListener);
void removeConnectionListener(ConnectionListener connectionListener);
diff --git a/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/FailoverConnectionNode.java b/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/FailoverConnectionNode.java
index 4b0206458c..3ab0672f12 100644
--- a/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/FailoverConnectionNode.java
+++ b/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/FailoverConnectionNode.java
@@ -24,6 +24,7 @@ import org.eclipse.osee.framework.messaging.MessageID;
import org.eclipse.osee.framework.messaging.OseeMessagingListener;
import org.eclipse.osee.framework.messaging.OseeMessagingStatusCallback;
import org.eclipse.osee.framework.messaging.internal.activemq.OseeExceptionListener;
+import org.eclipse.osee.framework.messaging.services.internal.OseeMessagingStatusImpl;
/**
* @author Andrew M. Finkbeiner This is written using ActiveMQ as the use case. So it will only retry connection and it will keep
@@ -50,25 +51,32 @@ private ScheduledFuture<?> itemToCancel;
}
@Override
- public void send(MessageID topic, Object body, OseeMessagingStatusCallback statusCallback) throws OseeCoreException {
- send(topic, body, null, statusCallback);
+ public void send(MessageID messageId, Object message, OseeMessagingStatusCallback statusCallback) throws OseeCoreException {
+ send(messageId, message, null, statusCallback);
}
@Override
- public void send(MessageID topic, Object body, Properties properties, OseeMessagingStatusCallback statusCallback) throws OseeCoreException {
+ public void send(MessageID messageId, Object message, Properties properties, OseeMessagingStatusCallback statusCallback) throws OseeCoreException {
attemptSmartConnect();
if(lastConnectedState){
try{
- connectionNode.send(topic, body, properties, statusCallback);
+ connectionNode.send(messageId, message, properties, statusCallback);
} catch (OseeCoreException ex){
stop();
run();
- connectionNode.send(topic, body, properties, statusCallback);
+ connectionNode.send(messageId, message, properties, statusCallback);
}
}
}
+ @Override
+ public void send(MessageID messageId, Object message) throws OseeCoreException {
+ String errorMessage = String.format("Error sending message(%s)", messageId.getId());
+ OseeMessagingStatusImpl defaultErrorHandler = new OseeMessagingStatusImpl(errorMessage, getClass());
+ this.send(messageId, message, defaultErrorHandler);
+ }
+
private void attemptSmartConnect() {
if(!lastConnectedState){
run();
@@ -96,10 +104,25 @@ private ScheduledFuture<?> itemToCancel;
}
@Override
+ public void subscribe(MessageID messageId, OseeMessagingListener listener) {
+ String errorMessage = String.format("Error subscribing message(%s)", messageId.getId());
+ OseeMessagingStatusImpl defaultErrorHandler = new OseeMessagingStatusImpl(errorMessage, getClass());
+ this.subscribe(messageId, listener, defaultErrorHandler);
+ }
+
+ @Override
public boolean subscribeToReply(MessageID messageId, OseeMessagingListener listener) {
return connectionNode.subscribeToReply(messageId, listener);
}
+
+ @Override
+ public void unsubscribe(MessageID messageId, OseeMessagingListener listener) {
+ String errorMessage = String.format("Error unsubscribing message(%s)", messageId.getId());
+ OseeMessagingStatusImpl defaultErrorHandler = new OseeMessagingStatusImpl(errorMessage, getClass());
+ this.unsubscribe(messageId, listener, defaultErrorHandler);
+ }
+
@Override
public void unsubscribe(MessageID messageId, OseeMessagingListener listener, OseeMessagingStatusCallback statusCallback) {
savedSubscribes.remove(new SavedSubscribe(messageId, listener, statusCallback));
diff --git a/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/activemq/ConnectionNodeActiveMq.java b/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/activemq/ConnectionNodeActiveMq.java
index 7291839ed7..65f8db3cf0 100644
--- a/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/activemq/ConnectionNodeActiveMq.java
+++ b/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/internal/activemq/ConnectionNodeActiveMq.java
@@ -9,11 +9,12 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import java.util.Map.Entry;
+import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.logging.Level;
+
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.ExceptionListener;
@@ -25,6 +26,7 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TemporaryTopic;
import javax.jms.Topic;
+
import org.apache.activemq.ActiveMQConnectionFactory;
import org.eclipse.osee.framework.core.exception.OseeCoreException;
import org.eclipse.osee.framework.core.exception.OseeWrappedException;
@@ -38,6 +40,7 @@ import org.eclipse.osee.framework.messaging.NodeInfo;
import org.eclipse.osee.framework.messaging.OseeMessagingListener;
import org.eclipse.osee.framework.messaging.OseeMessagingStatusCallback;
import org.eclipse.osee.framework.messaging.internal.Activator;
+import org.eclipse.osee.framework.messaging.services.internal.OseeMessagingStatusImpl;
/**
* @author Andrew M. Finkbeiner
@@ -95,19 +98,26 @@ class ConnectionNodeActiveMq implements ConnectionNodeFailoverSupport, MessageLi
}
@Override
- public synchronized void send(MessageID topic, Object body, OseeMessagingStatusCallback statusCallback) throws OseeCoreException {
- send(topic, body, null, statusCallback);
+ public void send(MessageID topic, Object body) throws OseeCoreException {
+ String errorMessage = String.format("Error sending message(%s)", topic.getId());
+ OseeMessagingStatusImpl defaultErrorHandler = new OseeMessagingStatusImpl(errorMessage, getClass());
+ this.send(topic, body, defaultErrorHandler);
+ }
+
+ @Override
+ public synchronized void send(MessageID messageId, Object message, OseeMessagingStatusCallback statusCallback) throws OseeCoreException {
+ send(messageId, message, null, statusCallback);
}
@Override
- public synchronized void send(MessageID topic, Object body, Properties properties, OseeMessagingStatusCallback statusCallback) throws OseeCoreException {
+ public synchronized void send(MessageID messageId, Object message, Properties properties, OseeMessagingStatusCallback statusCallback) throws OseeCoreException {
try {
- if (topic.isTopic()) {
+ if (messageId.isTopic()) {
try{
- sendInternal(topic, body, properties, statusCallback);
+ sendInternal(messageId, message, properties, statusCallback);
} catch (JMSException ex){
- removeProducerFromCache(topic);
- sendInternal(topic, body, properties, statusCallback);
+ removeProducerFromCache(messageId);
+ sendInternal(messageId, message, properties, statusCallback);
}
// OseeLog.log(Activator.class, Level.FINE, String.format("Sending message %s - %s", topic.getName(), topic.getGuid()));
statusCallback.success();
@@ -121,11 +131,11 @@ class ConnectionNodeActiveMq implements ConnectionNodeFailoverSupport, MessageLi
}
}
- private synchronized void sendInternal(MessageID topic, Object body, Properties properties, OseeMessagingStatusCallback statusCallback) throws JMSException, OseeCoreException {
- Topic destination = getOrCreateTopic(topic);
+ private synchronized void sendInternal(MessageID messageId, Object message, Properties properties, OseeMessagingStatusCallback statusCallback) throws JMSException, OseeCoreException {
+ Topic destination = getOrCreateTopic(messageId);
MessageProducer producer = getOrCreateProducer(destination);
- Message msg = activeMqUtil.createMessage(session, topic.getSerializationClass(), body);
- if (topic.isReplyRequired()) {
+ Message msg = activeMqUtil.createMessage(session, messageId.getSerializationClass(), message);
+ if (messageId.isReplyRequired()) {
msg.setJMSReplyTo(temporaryTopic);
}
if(properties != null){
@@ -195,6 +205,13 @@ class ConnectionNodeActiveMq implements ConnectionNodeFailoverSupport, MessageLi
statusCallback.fail(ex);
}
}
+
+ @Override
+ public void subscribe(MessageID messageId, OseeMessagingListener listener) {
+ String errorMessage = String.format("Error subscribing message(%s)", messageId.getId());
+ OseeMessagingStatusImpl defaultErrorHandler = new OseeMessagingStatusImpl(errorMessage, getClass());
+ this.subscribe(messageId, listener, defaultErrorHandler);
+ }
private Topic getOrCreateTopic(MessageID messageId) throws JMSException {
Topic topic = topicCache.get(messageId.getId());
@@ -225,6 +242,13 @@ class ConnectionNodeActiveMq implements ConnectionNodeFailoverSupport, MessageLi
replyListeners.put(messageId.getId(), listener);
return true;
}
+
+ @Override
+ public void unsubscribe(MessageID messageId, OseeMessagingListener listener) {
+ String errorMessage = String.format("Error unsubscribing message(%s)", messageId.getId());
+ OseeMessagingStatusImpl defaultErrorHandler = new OseeMessagingStatusImpl(errorMessage, getClass());
+ this.unsubscribe(messageId, listener, defaultErrorHandler);
+ }
@Override
public void unsubscribe(MessageID messageId, OseeMessagingListener listener, OseeMessagingStatusCallback statusCallback) {
diff --git a/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/services/internal/OseeMessagingStatusImpl.java b/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/services/internal/OseeMessagingStatusImpl.java
index f1f286100b..e8fd22774a 100644
--- a/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/services/internal/OseeMessagingStatusImpl.java
+++ b/plugins/org.eclipse.osee.framework.messaging/src/org/eclipse/osee/framework/messaging/services/internal/OseeMessagingStatusImpl.java
@@ -16,18 +16,18 @@ import org.eclipse.osee.framework.messaging.OseeMessagingStatusCallback;
*/
public class OseeMessagingStatusImpl implements OseeMessagingStatusCallback {
- private String message;
+ private String failureMessage;
private Class<?> clazz;
- public OseeMessagingStatusImpl(String message, Class<?> clazz){
- this.message = message;
+ public OseeMessagingStatusImpl(String failureMessage, Class<?> clazz){
+ this.failureMessage = failureMessage;
this.clazz = clazz;
}
@Override
public void fail(Throwable th) {
th.printStackTrace();
- OseeLog.log(clazz, Level.SEVERE, message, th);
+ OseeLog.log(clazz, Level.SEVERE, failureMessage, th);
}
@Override

Back to the top