Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse')
-rw-r--r--org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/Activator.java151
-rw-r--r--org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/MqttRequestHandler.java255
-rw-r--r--org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/MqttRestClient.java155
-rw-r--r--org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/DataMapperRegistry.java75
-rw-r--r--org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/MqttConstants.java48
-rw-r--r--org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/QueueSender.java60
-rw-r--r--org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/ResponseRegistry.java99
-rw-r--r--org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/ResponseSemaphore.java32
-rw-r--r--org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/Utils.java19
9 files changed, 894 insertions, 0 deletions
diff --git a/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/Activator.java b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/Activator.java
new file mode 100644
index 00000000..0a74e784
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/Activator.java
@@ -0,0 +1,151 @@
+package org.eclipse.om2m.binding.mqtt;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.eclipse.om2m.binding.mqtt.util.DataMapperRegistry;
+import org.eclipse.om2m.binding.service.RestClientService;
+import org.eclipse.om2m.core.service.CseService;
+import org.eclipse.om2m.datamapping.service.DataMapperService;
+import org.osgi.framework.BundleActivator;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.osgi.util.tracker.ServiceTracker;
+import org.osgi.util.tracker.ServiceTrackerCustomizer;
+
+public class Activator implements BundleActivator {
+
+ private static BundleContext context;
+
+ static BundleContext getContext() {
+ return context;
+ }
+
+ private static final Log LOGGER = LogFactory.getLog(Activator.class);
+
+ /** {@link DataMapperServiceTracker} reference */
+ private static ServiceTracker<DataMapperService, DataMapperService> dataMapperServiceTracker;
+ /** {@link CseService} reference */
+ private static ServiceTracker<CseService, CseService> cseServiceTracker;
+
+ /** MQTT Request Handler that connects to the MQTT Broker */
+ private static MqttRequestHandler mqttRequestHandler;
+
+ public void start(BundleContext bundleContext) throws Exception {
+ Activator.context = bundleContext;
+
+ // Listening on Cse Service
+ cseServiceTracker = new ServiceTracker<CseService, CseService>(
+ bundleContext, CseService.class,
+ new CseServiceTrackerCustomizer());
+ cseServiceTracker.open();
+
+ // Listening on DataMapper Service
+ dataMapperServiceTracker = new ServiceTracker<DataMapperService, DataMapperService>(
+ bundleContext, DataMapperService.class,
+ new DataMapperServiceTracker());
+ dataMapperServiceTracker.open();
+
+ // Registering RestClientService of MQTT
+ getContext().registerService(RestClientService.class, new MqttRestClient(), null);
+ }
+
+ public void stop(BundleContext bundleContext) throws Exception {
+ Activator.context = null;
+ if (cseServiceTracker != null) {
+ cseServiceTracker.close();
+ cseServiceTracker = null;
+ }
+ if (dataMapperServiceTracker != null) {
+ dataMapperServiceTracker.close();
+ dataMapperServiceTracker = null;
+ }
+ if (mqttRequestHandler != null){
+ mqttRequestHandler.close();
+ mqttRequestHandler = null;
+ }
+ }
+
+ private static class CseServiceTrackerCustomizer implements
+ ServiceTrackerCustomizer<CseService, CseService> {
+
+ @Override
+ public CseService addingService(ServiceReference<CseService> reference) {
+ if (reference == null) {
+ return null;
+ }
+ Object service = Activator.getContext().getService(reference);
+ if (service != null && service instanceof CseService) {
+ LOGGER.debug("New CseService discovered");
+ CseService cse = (CseService) service;
+ MqttRequestHandler.setCseService(cse);
+ if (mqttRequestHandler == null) {
+ new Thread() {
+ public void run() {
+ LOGGER.info("Creating MQTT Request Handler");
+ mqttRequestHandler = new MqttRequestHandler();
+ };
+ }.start();
+ }
+ return cse;
+ }
+ return null;
+ }
+
+ @Override
+ public void modifiedService(ServiceReference<CseService> reference,
+ CseService service) {
+ if (service != null) {
+ LOGGER.info("CseService modified");
+ MqttRequestHandler.setCseService(service);
+ }
+ }
+
+ @Override
+ public void removedService(ServiceReference<CseService> reference,
+ CseService service) {
+ MqttRequestHandler.setCseService(null);
+ }
+
+ }
+
+ private static class DataMapperServiceTracker implements
+ ServiceTrackerCustomizer<DataMapperService, DataMapperService> {
+
+ @Override
+ public DataMapperService addingService(
+ ServiceReference<DataMapperService> reference) {
+ if (reference == null) {
+ return null;
+ }
+ Object service = Activator.getContext().getService(reference);
+ if (service != null && service instanceof DataMapperService) {
+ DataMapperService dms = (DataMapperService) service;
+ LOGGER.debug("New DataMapper Service discovered: "
+ + dms.getServiceDataType());
+ DataMapperRegistry.register(dms);
+ return dms;
+ }
+ return null;
+ }
+
+ @Override
+ public void modifiedService(
+ ServiceReference<DataMapperService> reference,
+ DataMapperService service) {
+ if (service != null) {
+ DataMapperRegistry.register(service);
+ }
+ }
+
+ @Override
+ public void removedService(
+ ServiceReference<DataMapperService> reference,
+ DataMapperService service) {
+ if (service != null) {
+ DataMapperRegistry.remove(service);
+ }
+ }
+
+ }
+
+}
diff --git a/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/MqttRequestHandler.java b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/MqttRequestHandler.java
new file mode 100644
index 00000000..1105a89c
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/MqttRequestHandler.java
@@ -0,0 +1,255 @@
+package org.eclipse.om2m.binding.mqtt;
+
+import java.math.BigInteger;
+import java.util.regex.Matcher;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.eclipse.om2m.binding.mqtt.util.DataMapperRegistry;
+import org.eclipse.om2m.binding.mqtt.util.MqttConstants;
+import org.eclipse.om2m.binding.mqtt.util.QueueSender;
+import org.eclipse.om2m.commons.constants.Constants;
+import org.eclipse.om2m.commons.constants.MimeMediaType;
+import org.eclipse.om2m.commons.constants.ResponseStatusCode;
+import org.eclipse.om2m.commons.resource.PrimitiveContent;
+import org.eclipse.om2m.commons.resource.RequestPrimitive;
+import org.eclipse.om2m.commons.resource.ResponsePrimitive;
+import org.eclipse.om2m.core.service.CseService;
+import org.eclipse.om2m.datamapping.service.DataMapperService;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+
+/**
+ * MQTT Request Handler class that subscribe to oneM2M request topic.
+ * When a request is received in the request topic, it is de-serialized and send
+ * to the CseService implementation available. Then the response from the service
+ * is serialized and sent to the oneM2M response topic.
+ */
+public class MqttRequestHandler implements MqttCallback {
+
+ // Static attributes of the class
+
+ /** MQTT Client ID */
+ private static final String CLIENT_ID = Constants.CSE_ID;
+ /** Logger reference */
+ private static final Log LOGGER = LogFactory.getLog(MqttRequestHandler.class);
+ /** MQTT Request Topic */
+ private static final String REQUEST_TOPIC = "/oneM2M/req/+/" + Constants.CSE_ID + "/+";
+
+
+ /** Reference to the current cseService implementation*/
+ private static CseService cseService;
+
+ /**
+ * Set the current CseService used when a request is
+ * received on the oneM2M request topic.
+ * @param cse the CseService implementation to use
+ */
+ public static void setCseService(CseService cse) {
+ cseService = cse;
+ }
+
+ // Private attributes
+
+ /** MQTT Client from the Paho library */
+ private MqttClient mainMqttClient;
+
+ /** The MQTT connection options to use */
+ private MqttConnectOptions connOpts;
+
+ /** Tell the thread to keep retrying or not */
+ private boolean retry = true;
+ /** Connection retry thread */
+ private Thread retryThread;
+
+ /**
+ * Default constructor of the Request Handler
+ */
+ public MqttRequestHandler() {
+ MemoryPersistence persistence = new MemoryPersistence();
+ String url = "tcp://" + MqttConstants.MQTT_BROKER_HOSTNAME + ":"
+ + MqttConstants.MQTT_BROKER_PORT;
+ this.connOpts = new MqttConnectOptions();
+ connOpts.setCleanSession(true);
+ if(MqttConstants.MQTT_BROKER_USERNAME != null && MqttConstants.MQTT_BROKER_PASSWORD != null){
+ connOpts.setUserName(MqttConstants.MQTT_BROKER_USERNAME);
+ connOpts.setPassword(MqttConstants.MQTT_BROKER_PASSWORD.toCharArray());
+ }
+ try {
+ LOGGER.debug("Connecting MQTT client to: " + url);
+ this.mainMqttClient = new MqttClient(url, CLIENT_ID, persistence);
+ this.mainMqttClient.setCallback(MqttRequestHandler.this);
+ this.connect(this.connOpts);
+ } catch (MqttException e) {
+ LOGGER.error("Error in MQTT Client creation", e);
+ }
+ }
+
+ /**
+ * Connect and retry if the connection fails
+ * @param connOpts
+ */
+ private void connect(final MqttConnectOptions connOpts){
+ if(retry && retryThread == null){
+ retryThread = new Thread("mqtt-connection-retrier"){
+ public void run() {
+ while(retry && !MqttRequestHandler.this.mainMqttClient.isConnected()){
+ try {
+ MqttRequestHandler.this.mainMqttClient.connect(connOpts);
+
+ LOGGER.info("Subscribing on MQTT topic: " + REQUEST_TOPIC);
+ MqttRequestHandler.this.mainMqttClient.subscribe(REQUEST_TOPIC, 2);
+ } catch (MqttException e) {
+ LOGGER.warn("Cannot connect to MQTT Broker, retrying in 10s. Cause: " + e.getMessage());
+ }
+ if(!MqttRequestHandler.this.mainMqttClient.isConnected()){
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+ }
+ MqttRequestHandler.this.retryThread = null;
+ };
+ };
+ }
+ retryThread.start();
+ }
+
+ @Override
+ public void connectionLost(Throwable throwable) {
+ LOGGER.warn("Connection lost on MQTT Broker at " + MqttConstants.MQTT_BROKER_HOSTNAME + ":" + MqttConstants.MQTT_BROKER_PORT);
+ this.connect(this.connOpts);
+ }
+
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken token) {
+ // Empty
+ }
+
+ @Override
+ public void messageArrived(String topic, MqttMessage message)
+ throws Exception {
+ Matcher matcher = MqttConstants.REQUEST_PATTERN_IN.matcher(topic);
+ if (matcher.matches()) {
+ String aeId = matcher.group(1);
+ String format = matcher.group(2);
+ String responseTopic = "/oneM2M/resp/" + Constants.CSE_ID + "/" + aeId + "/" + format;
+
+ if (message.getPayload() == null) {
+ LOGGER.info("Null message received on " + topic);
+ sendErrorResponse("The message is null", responseTopic, aeId, format);
+ return;
+ }
+ String payload = new String(message.getPayload());
+ LOGGER.debug("(" + topic + ") Message received (qos: " + message.getQos() + "):\n" + payload);
+ DataMapperService dms = DataMapperRegistry.getFromMqttFormat(format);
+
+ if (dms == null) {
+ LOGGER.warn("MQTT Request received with unhandled content type: " + format);
+ sendErrorResponse("The format type is not handled",
+ responseTopic.replace("/" + format, "/" + MqttConstants.MQTT_XML),
+ aeId, MqttConstants.MQTT_XML);
+ return;
+ }
+ Object objectPayload = dms.stringToObj(payload);
+ if(objectPayload == null || !(objectPayload instanceof RequestPrimitive)){
+ LOGGER.info("Invalid content provided in MQTT request");
+ sendErrorResponse("Invalid content provided in request primitive", responseTopic, aeId, format);
+ return;
+ }
+ RequestPrimitive requestPrimitive = (RequestPrimitive) objectPayload;
+ requestPrimitive.setRequestContentType(MimeMediaType.OBJ);
+ requestPrimitive.setReturnContentType(MimeMediaType.OBJ);
+ // Primitive content handling
+ if(requestPrimitive.getPrimitiveContent() != null &&
+ !requestPrimitive.getPrimitiveContent().getAny().isEmpty() &&
+ requestPrimitive.getContent() == null){
+ requestPrimitive.setContent(requestPrimitive.getPrimitiveContent().getAny().get(0));
+ }
+
+ ResponsePrimitive responsePrimitive = null;
+
+ if(cseService != null){
+ // Sending the request to the CSE
+ responsePrimitive = cseService.doRequest(requestPrimitive);
+
+ // Handling the custom "content" field and map it to PrimitiveContent for serialization
+ if(responsePrimitive.getContent() != null &&
+ responsePrimitive.getPrimitiveContent() == null){
+ PrimitiveContent pc = new PrimitiveContent();
+ pc.getAny().add(responsePrimitive.getContent());
+ responsePrimitive.setPrimitiveContent(pc);
+ }
+
+ // Building and sending response
+ final String responsePayload = dms.objToString(responsePrimitive);
+ LOGGER.debug("Response to be sent on topic: " + responseTopic + ". Payload:\n" + responsePayload);
+
+ // Sending the request in another thread otherwise it blocks the reception thread of Paho
+ QueueSender.queue(mainMqttClient, responseTopic, responsePayload.getBytes());
+ } else {
+ sendErrorResponse("/" + Constants.CSE_ID + " is not available", responseTopic, aeId, format, ResponseStatusCode.SERVICE_UNAVAILABLE);
+ }
+
+ } else {
+ LOGGER.debug("The topic is not well formed. (" + topic + ")");
+ }
+ }
+
+ /**
+ * Util method that send an error message to the client
+ * @param message the message to send
+ * @param responseTopic the response topic to reply on
+ * @param aeId the id of the client
+ * @param format the format of exchange
+ */
+ private void sendErrorResponse(String message, String responseTopic,
+ String aeId, String format, BigInteger responseStatusCode) {
+ ResponsePrimitive responsePrimitive = new ResponsePrimitive();
+ responsePrimitive.setTo(aeId);
+ responsePrimitive.setFrom("/" + Constants.CSE_ID);
+ responsePrimitive.setResponseStatusCode(responseStatusCode);
+ responsePrimitive.setPrimitiveContent(new PrimitiveContent());
+ responsePrimitive.getPrimitiveContent().getAny().add(message);
+ DataMapperService dms = DataMapperRegistry.getFromMqttFormat(format);
+ byte[] errorPayload = dms.objToString(responsePrimitive).getBytes();
+ QueueSender.queue(mainMqttClient, responseTopic, errorPayload);
+ }
+
+ /**
+ * Send a bad request response to the client
+ * @param message the message
+ * @param responseTopic the response topic
+ * @param aeId the id of the client
+ * @param format the format of exchange
+ */
+ private void sendErrorResponse(String message, String responseTopic,
+ String aeId, String format){
+ sendErrorResponse(message, responseTopic, aeId, format, ResponseStatusCode.BAD_REQUEST);
+ }
+
+ /**
+ * Disconnecting and closing the MQTT Client.
+ */
+ public void close(){
+ // Disconnect the MQTT Client
+ try {
+ this.mainMqttClient.disconnect();
+ } catch (MqttException e) {
+ LOGGER.debug("Error disconnecting the MQTT Client", e);
+ }
+ // Prevent on any reconnection retry
+ retry = false;
+ // Wake up the retry thread after the false on "retry" has been set
+ if(retryThread != null && retryThread.isAlive()){
+ retryThread.interrupt();
+ }
+ }
+}
diff --git a/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/MqttRestClient.java b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/MqttRestClient.java
new file mode 100644
index 00000000..7936749b
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/MqttRestClient.java
@@ -0,0 +1,155 @@
+package org.eclipse.om2m.binding.mqtt;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.eclipse.om2m.binding.mqtt.util.DataMapperRegistry;
+import org.eclipse.om2m.binding.mqtt.util.MqttConstants;
+import org.eclipse.om2m.binding.mqtt.util.QueueSender;
+import org.eclipse.om2m.binding.mqtt.util.ResponseRegistry;
+import org.eclipse.om2m.binding.mqtt.util.ResponseSemaphore;
+import org.eclipse.om2m.binding.service.RestClientService;
+import org.eclipse.om2m.commons.constants.Constants;
+import org.eclipse.om2m.commons.constants.MimeMediaType;
+import org.eclipse.om2m.commons.constants.ResponseStatusCode;
+import org.eclipse.om2m.commons.resource.PrimitiveContent;
+import org.eclipse.om2m.commons.resource.RequestPrimitive;
+import org.eclipse.om2m.commons.resource.ResponsePrimitive;
+import org.eclipse.om2m.datamapping.service.DataMapperService;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+
+public class MqttRestClient implements RestClientService {
+
+ private static final Log LOGGER = LogFactory.getLog(MqttRestClient.class);
+
+ @Override
+ public ResponsePrimitive sendRequest(RequestPrimitive requestPrimitive) {
+ if(requestPrimitive.getContent() != null){
+ PrimitiveContent pc = new PrimitiveContent();
+ switch(requestPrimitive.getRequestContentType()){
+ case MimeMediaType.XML:
+ pc.getAny().add(DataMapperRegistry.get(MimeMediaType.XML).stringToObj((String)requestPrimitive.getContent()));
+ break;
+ case MimeMediaType.JSON:
+ pc.getAny().add(DataMapperRegistry.get(MimeMediaType.JSON).stringToObj((String)requestPrimitive.getContent()));
+ break;
+ case MimeMediaType.OBJ: case MimeMediaType.TEXT_PLAIN:
+ pc.getAny().add(requestPrimitive.getContent());
+ break;
+ default:
+ break;
+ }
+ if(!pc.getAny().isEmpty()){
+ requestPrimitive.setPrimitiveContent(pc);
+ }
+ }
+
+ ResponsePrimitive responsePrimitive = new ResponsePrimitive(requestPrimitive);
+
+ if(requestPrimitive.getMqttTopic() == null || requestPrimitive.getMqttUri() == null){
+ responsePrimitive.setResponseStatusCode(ResponseStatusCode.BAD_REQUEST);
+ return responsePrimitive;
+ }
+
+ if(requestPrimitive.getRequestIdentifier() == null){
+ requestPrimitive.setRequestIdentifier(UUID.randomUUID().toString());
+ }
+
+ String uri = requestPrimitive.getMqttUri();
+ if(uri.startsWith("mqtt://")){
+ uri = uri.replaceFirst("mqtt://", "tcp://");
+ }
+
+ if(requestPrimitive.getTo().startsWith("mqtt://")){
+ Pattern mqttUriPatter = Pattern.compile("(mqtt://[^:/]*(:[0-9]{1,5})?)(/.*)");
+ Matcher matcher = mqttUriPatter.matcher(requestPrimitive.getTo());
+ if(matcher.matches()){
+ requestPrimitive.setTo(matcher.group(3));
+ }
+ }
+
+ String topic = requestPrimitive.getMqttTopic();
+ String payload = null;
+ String format = null;
+ if (topic.endsWith("/json")){
+ payload = DataMapperRegistry.get(MimeMediaType.JSON).objToString(requestPrimitive);
+ format = "json";
+ } else {
+ // Case of XML and default
+ payload = DataMapperRegistry.get(MimeMediaType.XML).objToString(requestPrimitive);
+ format = "xml";
+ }
+
+ try {
+ MqttClient mqttClient = new MqttClient(uri, requestPrimitive.getRequestIdentifier(), new MemoryPersistence());
+ mqttClient.connect();
+ LOGGER.debug("Sending request on topic: " + topic + " with payload:\n" + payload);
+ ResponseSemaphore responseSemaphore = null;
+ if(requestPrimitive.isMqttResponseExpected()){
+ Matcher matcher = MqttConstants.REQUEST_PATTERN_OUT.matcher(topic);
+ if(matcher.matches()){
+ String responseTopic = "/oneM2M/resp/" + matcher.group(1) + "/"+ Constants.CSE_ID + "/" + format;
+ responseSemaphore = ResponseRegistry.createSemaphore(requestPrimitive.getRequestIdentifier(), mqttClient, responseTopic);
+ } else {
+ responsePrimitive.setResponseStatusCode(ResponseStatusCode.TARGET_NOT_REACHABLE);
+ }
+ } else {
+ mqttClient.publish(topic, new MqttMessage(payload.getBytes()));
+ responsePrimitive.setResponseStatusCode(ResponseStatusCode.OK);
+ }
+ if(responseSemaphore != null){
+ QueueSender.queue(mqttClient, topic, payload.getBytes());
+ LOGGER.debug("Waiting for response... (" + MqttConstants.TIME_OUT_DURATION + "s)");
+ boolean released = responseSemaphore.getSemaphore().tryAcquire(1, MqttConstants.TIME_OUT_DURATION, TimeUnit.SECONDS);
+ if(released){
+ responsePrimitive = responseSemaphore.getResponsePrimitive();
+ fillAndConvertContent(requestPrimitive, responsePrimitive);
+ LOGGER.debug("Response received: " + responsePrimitive);
+ } else {
+ responsePrimitive.setResponseStatusCode(ResponseStatusCode.TARGET_NOT_REACHABLE);
+ }
+ }
+ mqttClient.disconnect();
+ mqttClient.close();
+ } catch (MqttException e) {
+ LOGGER.warn("Cannot connect to: " + requestPrimitive.getMqttUri());
+ responsePrimitive.setResponseStatusCode(ResponseStatusCode.TARGET_NOT_REACHABLE);
+ return responsePrimitive;
+ } catch (InterruptedException e) {
+ LOGGER.error("Interrupted exception caught in MqttRestClient: " + e.getMessage());
+ responsePrimitive.setResponseStatusCode(ResponseStatusCode.TARGET_NOT_REACHABLE);
+ return responsePrimitive;
+ }
+
+ return responsePrimitive;
+ }
+
+ private void fillAndConvertContent(RequestPrimitive requestPrimitive,
+ ResponsePrimitive responsePrimitive) {
+ if(responsePrimitive.getPrimitiveContent() != null &&
+ !responsePrimitive.getPrimitiveContent().getAny().isEmpty() &&
+ responsePrimitive.getContent() == null){
+ if(requestPrimitive.getReturnContentType().equals(MimeMediaType.OBJ)){
+ responsePrimitive.setContent(responsePrimitive.getPrimitiveContent().getAny().get(0));
+ } else {
+ DataMapperService dms = DataMapperRegistry.get(requestPrimitive.getReturnContentType());
+ String content = dms.objToString(responsePrimitive.getPrimitiveContent().getAny().get(0));
+ responsePrimitive.setContent(content);
+ responsePrimitive.setContentType(requestPrimitive.getReturnContentType());
+ }
+ }
+ }
+
+ @Override
+ public String getProtocol() {
+ return MqttConstants.PROTOCOL;
+ }
+
+}
diff --git a/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/DataMapperRegistry.java b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/DataMapperRegistry.java
new file mode 100644
index 00000000..18127ab3
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/DataMapperRegistry.java
@@ -0,0 +1,75 @@
+package org.eclipse.om2m.binding.mqtt.util;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.eclipse.om2m.commons.constants.MimeMediaType;
+import org.eclipse.om2m.datamapping.service.DataMapperService;
+
+/**
+ * This class is used to store instances of {@link DataMapperService} classes.
+ *
+ */
+public class DataMapperRegistry {
+
+ /** Private constructor to avoid creation */
+ private DataMapperRegistry(){}
+
+ /**
+ * Service registry classified by data type handled.
+ */
+ private static Map<String, DataMapperService> serviceRegistery = new HashMap<String, DataMapperService>();
+
+ /**
+ * Add a new {@link DataMapperService} to the registery.
+ * @param dms the service to register
+ */
+ public static void register(DataMapperService dms){
+ if(dms != null && dms.getServiceDataType() != null){
+ serviceRegistery.put(dms.getServiceDataType(), dms);
+ }
+ }
+
+ /**
+ * Retrieve a {@link DataMapperService} from a data type.
+ * @param dataType the
+ * @return the {@link DataMapperService} that handle the data type or null if none
+ */
+ public static DataMapperService get(String dataType){
+ return serviceRegistery.get(dataType);
+ }
+
+ /**
+ * Remove the {@link DataMapperService} from the registry
+ * @param dataType the data type of the service to remove
+ */
+ public static void remove(String dataType){
+ serviceRegistery.remove(dataType);
+ }
+
+ /**
+ * Remove the {@link DataMapperService} from the registry
+ * @param dms the service to remove from the registry
+ */
+ public static void remove(DataMapperService dms){
+ remove(dms.getServiceDataType());
+ }
+
+ /**
+ * Retrieve the {@link DataMapperService} from the registry
+ * from the MQTT format String
+ * @param format the format of the DMS
+ * @return the DMS with the specified format or null
+ */
+ public static DataMapperService getFromMqttFormat(String format){
+ switch (format) {
+ case "xml":
+ return DataMapperRegistry.get(MimeMediaType.XML);
+ case "json":
+ return DataMapperRegistry.get(MimeMediaType.JSON);
+ default:
+ return null;
+ }
+ }
+
+}
diff --git a/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/MqttConstants.java b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/MqttConstants.java
new file mode 100644
index 00000000..24fceff6
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/MqttConstants.java
@@ -0,0 +1,48 @@
+package org.eclipse.om2m.binding.mqtt.util;
+
+import java.util.regex.Pattern;
+
+import org.eclipse.om2m.commons.constants.Constants;
+
+/**
+ * A set of MQTT constants retrieved from the System.getProperty method.
+ *
+ */
+public final class MqttConstants {
+
+ private MqttConstants(){}
+
+ /** Hostname of the main broker */
+ public static final String MQTT_BROKER_HOSTNAME = System.getProperty("org.eclipse.om2m.mqtt.ip", "127.0.0.1");
+
+ /** IP of the main broker */
+ public static final int MQTT_BROKER_PORT = Integer.valueOf(System.getProperty("org.eclipse.om2m.mqtt.port", "1883"));
+
+ /** Username to connect to broker */
+ public static final String MQTT_BROKER_USERNAME = System.getProperty("org.eclipse.om2m.mqtt.username");
+
+ /** Password to connect to broker */
+ public static final String MQTT_BROKER_PASSWORD = System.getProperty("org.eclipse.om2m.mqtt.password");
+
+ /** MQTT Protocol prefix */
+ public static final String PROTOCOL = "mqtt";
+
+ /** Size of the request sender queue */
+ public static final int MQTT_QUEUE_SENDER_SIZE = Integer.valueOf(System.getProperty("org.eclipse.om2m.mqtt.queue.size", "8"));
+
+ /** Request pattern to parse the request topic on message reception */
+ public static final Pattern REQUEST_PATTERN_IN = Pattern.compile("/oneM2M/req/([^/]+)/" + Constants.CSE_ID + "/(.*)");
+
+ /** Request pattern when sending a message. */
+ public static final Pattern REQUEST_PATTERN_OUT = Pattern.compile("/oneM2M/req/" + Constants.CSE_ID+ "/([^/]+)+/(.*)");
+
+ /** Time out duration when waiting for a response. Unit in second. */
+ public static final long TIME_OUT_DURATION = Long.valueOf(System.getProperty("org.eclipse.om2m.mqtt.timeout", "20"));
+
+ /** MQTT format for XML in topic */
+ public static final String MQTT_XML = "xml";
+
+ /** MQTT format for JSON in topic */
+ public static final String MQTT_JSON = "json";
+
+}
diff --git a/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/QueueSender.java b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/QueueSender.java
new file mode 100644
index 00000000..43be0ecb
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/QueueSender.java
@@ -0,0 +1,60 @@
+package org.eclipse.om2m.binding.mqtt.util;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttException;
+
+public final class QueueSender {
+
+ private static final Log LOGGER = LogFactory.getLog(QueueSender.class);
+ private static ExecutorService threadPool;
+
+ static {
+ int queueSize = MqttConstants.MQTT_QUEUE_SENDER_SIZE <= 2 ? 2
+ : MqttConstants.MQTT_QUEUE_SENDER_SIZE;
+ threadPool = new ThreadPoolExecutor(2, queueSize, 1, TimeUnit.MINUTES,
+ new SynchronousQueue<Runnable>());
+ }
+
+ public static void queue(MqttClient mqttClient, String topic, byte[] payload){
+ LOGGER.debug("Sending MQTT message to " + mqttClient.getServerURI() + " topic: " + topic);
+ threadPool.execute(new MqttSender(mqttClient, topic, payload));
+ }
+
+ private static class MqttSender implements Runnable {
+
+ private MqttClient mqttClient;
+ private String topic;
+ private byte[] payload;
+
+ public MqttSender(MqttClient mqttClient, String topic, byte[] payload) {
+ super();
+ this.mqttClient = mqttClient;
+ this.topic = topic;
+ this.payload = payload;
+ }
+
+ @Override
+ public void run() {
+ try {
+ this.mqttClient.publish(topic, payload, 1, false);
+ } catch (MqttException e) {
+ LOGGER.warn("Error publishing on topic: " + this.topic
+ + " of broker " + this.mqttClient.getServerURI()
+ + ". Error: " + e.getMessage());
+ }
+ }
+
+ }
+
+ private QueueSender(){
+ // Empty and private constructor to avoid class creation
+ }
+
+}
diff --git a/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/ResponseRegistry.java b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/ResponseRegistry.java
new file mode 100644
index 00000000..20c7f246
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/ResponseRegistry.java
@@ -0,0 +1,99 @@
+package org.eclipse.om2m.binding.mqtt.util;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.eclipse.om2m.commons.constants.Constants;
+import org.eclipse.om2m.commons.constants.MimeMediaType;
+import org.eclipse.om2m.commons.resource.ResponsePrimitive;
+import org.eclipse.om2m.datamapping.service.DataMapperService;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+public final class ResponseRegistry {
+
+ private ResponseRegistry() {
+ // Empty and private constructor to avoid instantiation of this class
+ }
+
+ private static final Map<String, ResponseSemaphore> responseMap = new HashMap<String, ResponseSemaphore>();
+
+ public static ResponseSemaphore createSemaphore(String requestIdentifier, MqttClient mqttClient,
+ String responseTopic) throws MqttException{
+ synchronized (responseMap) {
+ mqttClient.setCallback(new ResponseCallback());
+ mqttClient.subscribe(responseTopic, 1);
+ if(!responseMap.containsKey(requestIdentifier)){
+ ResponseSemaphore respSemaphore = new ResponseSemaphore(new Semaphore(0));
+ responseMap.put(requestIdentifier, respSemaphore);
+ return respSemaphore;
+ }
+ return null;
+ }
+ }
+
+ private static void responseReceived(ResponsePrimitive responsePrimitive){
+ synchronized (responseMap) {
+ if(responseMap.containsKey(responsePrimitive.getRequestIdentifier())){
+ ResponseSemaphore responseSemanphore = responseMap.get(responsePrimitive.getRequestIdentifier());
+ responseSemanphore.setResponsePrimitive(responsePrimitive);
+ responseSemanphore.getSemaphore().release();
+ responseMap.remove(responsePrimitive.getRequestIdentifier());
+ }
+ }
+ }
+
+ private static class ResponseCallback implements MqttCallback {
+
+ private static Pattern responsePattern = Pattern.compile("/oneM2M/resp/([^/]+)/" + Constants.CSE_ID + "/(.*)");
+
+ @Override
+ public void connectionLost(Throwable cause) {
+ // Ignore
+ }
+
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken token) {
+ // Ignore
+ }
+
+ @Override
+ public void messageArrived(String topic, MqttMessage message)
+ throws Exception {
+ Matcher matcher = responsePattern.matcher(topic);
+ if(!matcher.matches()){
+ return;
+ }
+ String format = matcher.group(2);
+ DataMapperService dms = null;
+ switch (format) {
+ case "xml":
+ dms = DataMapperRegistry.get(MimeMediaType.XML);
+ break;
+ case "json":
+ dms = DataMapperRegistry.get(MimeMediaType.JSON);
+ break;
+ default:
+ break;
+ }
+ if(dms == null){
+ // The format is not handled here
+ return;
+ }
+ String payload = new String(message.getPayload());
+ ResponsePrimitive resp = (ResponsePrimitive) dms.stringToObj(payload);
+ if(resp == null || resp.getRequestIdentifier() == null){
+ return;
+ }
+ responseReceived(resp);
+ }
+
+ }
+
+}
diff --git a/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/ResponseSemaphore.java b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/ResponseSemaphore.java
new file mode 100644
index 00000000..7588712d
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/ResponseSemaphore.java
@@ -0,0 +1,32 @@
+package org.eclipse.om2m.binding.mqtt.util;
+
+import java.util.concurrent.Semaphore;
+
+import org.eclipse.om2m.commons.resource.ResponsePrimitive;
+
+public class ResponseSemaphore {
+
+ private Semaphore semaphore;
+ private ResponsePrimitive responsePrimitive;
+
+ public ResponseSemaphore(Semaphore semaphore) {
+ this.semaphore = semaphore;
+ }
+
+ public Semaphore getSemaphore() {
+ return semaphore;
+ }
+
+ public void setSemaphore(Semaphore semaphore) {
+ this.semaphore = semaphore;
+ }
+
+ public ResponsePrimitive getResponsePrimitive() {
+ return responsePrimitive;
+ }
+
+ public void setResponsePrimitive(ResponsePrimitive responsePrimitive) {
+ this.responsePrimitive = responsePrimitive;
+ }
+
+}
diff --git a/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/Utils.java b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/Utils.java
new file mode 100644
index 00000000..219fa66b
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/Utils.java
@@ -0,0 +1,19 @@
+package org.eclipse.om2m.binding.mqtt.util;
+
+import org.eclipse.om2m.commons.resource.ResponsePrimitive;
+
+public final class Utils {
+
+ public static void fillPrimitiveContent(){
+
+ }
+
+ public static void fillContent(ResponsePrimitive requestPrimitive){
+ if(requestPrimitive.getPrimitiveContent() != null &&
+ !requestPrimitive.getPrimitiveContent().getAny().isEmpty() &&
+ requestPrimitive.getContent() == null){
+ requestPrimitive.setContent(requestPrimitive.getPrimitiveContent().getAny().get(0));
+ }
+ }
+
+}

Back to the top