Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFrancois Aissaoui2016-10-03 10:18:52 -0400
committerFrancois Aissaoui2016-10-03 10:21:58 -0400
commit2148f78c70400509a3b746f2232818abdfe2c01d (patch)
treec44b3ff68820fa14ed5b514ba38209107ffc408c
parentb8e9ecf4d9412420c9471e34be2e0aa64ba07d44 (diff)
downloadorg.eclipse.om2m-2148f78c70400509a3b746f2232818abdfe2c01d.tar.gz
org.eclipse.om2m-2148f78c70400509a3b746f2232818abdfe2c01d.tar.xz
org.eclipse.om2m-2148f78c70400509a3b746f2232818abdfe2c01d.zip
Implementation of MQTT Protocol binding TS-0010
This implements features from the MQTT protocol binding TS-0010 from oneM2M. It does NOT include the MQTT Broker.
-rw-r--r--org.eclipse.om2m.binding.mqtt/.classpath8
-rw-r--r--org.eclipse.om2m.binding.mqtt/.project34
-rw-r--r--org.eclipse.om2m.binding.mqtt/META-INF/MANIFEST.MF18
-rw-r--r--org.eclipse.om2m.binding.mqtt/README0
-rw-r--r--org.eclipse.om2m.binding.mqtt/build.properties4
-rw-r--r--org.eclipse.om2m.binding.mqtt/libs/org.eclipse.paho.client.mqttv3-1.0.2.jarbin0 -> 171372 bytes
-rw-r--r--org.eclipse.om2m.binding.mqtt/pom.xml11
-rw-r--r--org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/Activator.java151
-rw-r--r--org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/MqttRequestHandler.java255
-rw-r--r--org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/MqttRestClient.java155
-rw-r--r--org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/DataMapperRegistry.java75
-rw-r--r--org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/MqttConstants.java48
-rw-r--r--org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/QueueSender.java60
-rw-r--r--org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/ResponseRegistry.java99
-rw-r--r--org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/ResponseSemaphore.java32
-rw-r--r--org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/Utils.java19
-rw-r--r--org.eclipse.om2m.commons/src/main/java/org/eclipse/om2m/commons/resource/RequestPrimitive.java48
-rw-r--r--org.eclipse.om2m.commons/src/main/java/org/eclipse/om2m/commons/resource/ResponsePrimitive.java2
-rw-r--r--org.eclipse.om2m.core/src/main/java/org/eclipse/om2m/core/controller/FanOutPointController.java2
-rw-r--r--org.eclipse.om2m.core/src/main/java/org/eclipse/om2m/core/notifier/Notifier.java23
-rw-r--r--org.eclipse.om2m.core/src/main/java/org/eclipse/om2m/core/redirector/Redirector.java52
-rw-r--r--org.eclipse.om2m.core/src/main/java/org/eclipse/om2m/core/router/Router.java4
-rw-r--r--org.eclipse.om2m.datamapping.jaxb/src/main/resources/json-binding.xml13
-rw-r--r--org.eclipse.om2m.site.in-cse/om2m.product1
-rw-r--r--pom.xml1
25 files changed, 1102 insertions, 13 deletions
diff --git a/org.eclipse.om2m.binding.mqtt/.classpath b/org.eclipse.om2m.binding.mqtt/.classpath
new file mode 100644
index 00000000..f7edce6b
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/.classpath
@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+ <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.7"/>
+ <classpathentry kind="con" path="org.eclipse.pde.core.requiredPlugins"/>
+ <classpathentry exported="true" kind="lib" path="libs/org.eclipse.paho.client.mqttv3-1.0.2.jar"/>
+ <classpathentry kind="src" path="src/main/java/"/>
+ <classpathentry kind="output" path="target/classes"/>
+</classpath>
diff --git a/org.eclipse.om2m.binding.mqtt/.project b/org.eclipse.om2m.binding.mqtt/.project
new file mode 100644
index 00000000..fed26a33
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/.project
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+ <name>org.eclipse.om2m.binding.mqtt</name>
+ <comment></comment>
+ <projects>
+ </projects>
+ <buildSpec>
+ <buildCommand>
+ <name>org.eclipse.jdt.core.javabuilder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ <buildCommand>
+ <name>org.eclipse.pde.ManifestBuilder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ <buildCommand>
+ <name>org.eclipse.pde.SchemaBuilder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ <buildCommand>
+ <name>org.eclipse.m2e.core.maven2Builder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ </buildSpec>
+ <natures>
+ <nature>org.eclipse.m2e.core.maven2Nature</nature>
+ <nature>org.eclipse.pde.PluginNature</nature>
+ <nature>org.eclipse.jdt.core.javanature</nature>
+ </natures>
+</projectDescription>
diff --git a/org.eclipse.om2m.binding.mqtt/META-INF/MANIFEST.MF b/org.eclipse.om2m.binding.mqtt/META-INF/MANIFEST.MF
new file mode 100644
index 00000000..a09741c6
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/META-INF/MANIFEST.MF
@@ -0,0 +1,18 @@
+Manifest-Version: 1.0
+Bundle-ManifestVersion: 2
+Bundle-Name: MQTT Binding
+Bundle-SymbolicName: org.eclipse.om2m.binding.mqtt
+Bundle-Version: 1.0.0.qualifier
+Bundle-Activator: org.eclipse.om2m.binding.mqtt.Activator
+Bundle-RequiredExecutionEnvironment: JavaSE-1.7
+Import-Package: org.apache.commons.logging,
+ org.eclipse.om2m.binding.service,
+ org.eclipse.om2m.commons.constants,
+ org.eclipse.om2m.commons.resource,
+ org.eclipse.om2m.core.service,
+ org.eclipse.om2m.datamapping.service,
+ org.osgi.framework;version="1.3.0",
+ org.osgi.util.tracker;version="1.5.1"
+Bundle-ActivationPolicy: lazy
+Bundle-ClassPath: libs/org.eclipse.paho.client.mqttv3-1.0.2.jar,
+ .
diff --git a/org.eclipse.om2m.binding.mqtt/README b/org.eclipse.om2m.binding.mqtt/README
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/README
diff --git a/org.eclipse.om2m.binding.mqtt/build.properties b/org.eclipse.om2m.binding.mqtt/build.properties
new file mode 100644
index 00000000..65cee246
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/build.properties
@@ -0,0 +1,4 @@
+source.. = src/main/java/
+bin.includes = META-INF/,\
+ .,\
+ libs/org.eclipse.paho.client.mqttv3-1.0.2.jar
diff --git a/org.eclipse.om2m.binding.mqtt/libs/org.eclipse.paho.client.mqttv3-1.0.2.jar b/org.eclipse.om2m.binding.mqtt/libs/org.eclipse.paho.client.mqttv3-1.0.2.jar
new file mode 100644
index 00000000..9a881627
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/libs/org.eclipse.paho.client.mqttv3-1.0.2.jar
Binary files differ
diff --git a/org.eclipse.om2m.binding.mqtt/pom.xml b/org.eclipse.om2m.binding.mqtt/pom.xml
new file mode 100644
index 00000000..254787e9
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/pom.xml
@@ -0,0 +1,11 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>org.eclipse.om2m.binding.mqtt</artifactId>
+ <packaging>eclipse-plugin</packaging>
+ <parent>
+ <groupId>org.eclipse.om2m</groupId>
+ <artifactId>org.eclipse.om2m</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ </parent>
+</project> \ No newline at end of file
diff --git a/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/Activator.java b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/Activator.java
new file mode 100644
index 00000000..0a74e784
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/Activator.java
@@ -0,0 +1,151 @@
+package org.eclipse.om2m.binding.mqtt;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.eclipse.om2m.binding.mqtt.util.DataMapperRegistry;
+import org.eclipse.om2m.binding.service.RestClientService;
+import org.eclipse.om2m.core.service.CseService;
+import org.eclipse.om2m.datamapping.service.DataMapperService;
+import org.osgi.framework.BundleActivator;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.osgi.util.tracker.ServiceTracker;
+import org.osgi.util.tracker.ServiceTrackerCustomizer;
+
+public class Activator implements BundleActivator {
+
+ private static BundleContext context;
+
+ static BundleContext getContext() {
+ return context;
+ }
+
+ private static final Log LOGGER = LogFactory.getLog(Activator.class);
+
+ /** {@link DataMapperServiceTracker} reference */
+ private static ServiceTracker<DataMapperService, DataMapperService> dataMapperServiceTracker;
+ /** {@link CseService} reference */
+ private static ServiceTracker<CseService, CseService> cseServiceTracker;
+
+ /** MQTT Request Handler that connects to the MQTT Broker */
+ private static MqttRequestHandler mqttRequestHandler;
+
+ public void start(BundleContext bundleContext) throws Exception {
+ Activator.context = bundleContext;
+
+ // Listening on Cse Service
+ cseServiceTracker = new ServiceTracker<CseService, CseService>(
+ bundleContext, CseService.class,
+ new CseServiceTrackerCustomizer());
+ cseServiceTracker.open();
+
+ // Listening on DataMapper Service
+ dataMapperServiceTracker = new ServiceTracker<DataMapperService, DataMapperService>(
+ bundleContext, DataMapperService.class,
+ new DataMapperServiceTracker());
+ dataMapperServiceTracker.open();
+
+ // Registering RestClientService of MQTT
+ getContext().registerService(RestClientService.class, new MqttRestClient(), null);
+ }
+
+ public void stop(BundleContext bundleContext) throws Exception {
+ Activator.context = null;
+ if (cseServiceTracker != null) {
+ cseServiceTracker.close();
+ cseServiceTracker = null;
+ }
+ if (dataMapperServiceTracker != null) {
+ dataMapperServiceTracker.close();
+ dataMapperServiceTracker = null;
+ }
+ if (mqttRequestHandler != null){
+ mqttRequestHandler.close();
+ mqttRequestHandler = null;
+ }
+ }
+
+ private static class CseServiceTrackerCustomizer implements
+ ServiceTrackerCustomizer<CseService, CseService> {
+
+ @Override
+ public CseService addingService(ServiceReference<CseService> reference) {
+ if (reference == null) {
+ return null;
+ }
+ Object service = Activator.getContext().getService(reference);
+ if (service != null && service instanceof CseService) {
+ LOGGER.debug("New CseService discovered");
+ CseService cse = (CseService) service;
+ MqttRequestHandler.setCseService(cse);
+ if (mqttRequestHandler == null) {
+ new Thread() {
+ public void run() {
+ LOGGER.info("Creating MQTT Request Handler");
+ mqttRequestHandler = new MqttRequestHandler();
+ };
+ }.start();
+ }
+ return cse;
+ }
+ return null;
+ }
+
+ @Override
+ public void modifiedService(ServiceReference<CseService> reference,
+ CseService service) {
+ if (service != null) {
+ LOGGER.info("CseService modified");
+ MqttRequestHandler.setCseService(service);
+ }
+ }
+
+ @Override
+ public void removedService(ServiceReference<CseService> reference,
+ CseService service) {
+ MqttRequestHandler.setCseService(null);
+ }
+
+ }
+
+ private static class DataMapperServiceTracker implements
+ ServiceTrackerCustomizer<DataMapperService, DataMapperService> {
+
+ @Override
+ public DataMapperService addingService(
+ ServiceReference<DataMapperService> reference) {
+ if (reference == null) {
+ return null;
+ }
+ Object service = Activator.getContext().getService(reference);
+ if (service != null && service instanceof DataMapperService) {
+ DataMapperService dms = (DataMapperService) service;
+ LOGGER.debug("New DataMapper Service discovered: "
+ + dms.getServiceDataType());
+ DataMapperRegistry.register(dms);
+ return dms;
+ }
+ return null;
+ }
+
+ @Override
+ public void modifiedService(
+ ServiceReference<DataMapperService> reference,
+ DataMapperService service) {
+ if (service != null) {
+ DataMapperRegistry.register(service);
+ }
+ }
+
+ @Override
+ public void removedService(
+ ServiceReference<DataMapperService> reference,
+ DataMapperService service) {
+ if (service != null) {
+ DataMapperRegistry.remove(service);
+ }
+ }
+
+ }
+
+}
diff --git a/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/MqttRequestHandler.java b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/MqttRequestHandler.java
new file mode 100644
index 00000000..1105a89c
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/MqttRequestHandler.java
@@ -0,0 +1,255 @@
+package org.eclipse.om2m.binding.mqtt;
+
+import java.math.BigInteger;
+import java.util.regex.Matcher;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.eclipse.om2m.binding.mqtt.util.DataMapperRegistry;
+import org.eclipse.om2m.binding.mqtt.util.MqttConstants;
+import org.eclipse.om2m.binding.mqtt.util.QueueSender;
+import org.eclipse.om2m.commons.constants.Constants;
+import org.eclipse.om2m.commons.constants.MimeMediaType;
+import org.eclipse.om2m.commons.constants.ResponseStatusCode;
+import org.eclipse.om2m.commons.resource.PrimitiveContent;
+import org.eclipse.om2m.commons.resource.RequestPrimitive;
+import org.eclipse.om2m.commons.resource.ResponsePrimitive;
+import org.eclipse.om2m.core.service.CseService;
+import org.eclipse.om2m.datamapping.service.DataMapperService;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+
+/**
+ * MQTT Request Handler class that subscribe to oneM2M request topic.
+ * When a request is received in the request topic, it is de-serialized and send
+ * to the CseService implementation available. Then the response from the service
+ * is serialized and sent to the oneM2M response topic.
+ */
+public class MqttRequestHandler implements MqttCallback {
+
+ // Static attributes of the class
+
+ /** MQTT Client ID */
+ private static final String CLIENT_ID = Constants.CSE_ID;
+ /** Logger reference */
+ private static final Log LOGGER = LogFactory.getLog(MqttRequestHandler.class);
+ /** MQTT Request Topic */
+ private static final String REQUEST_TOPIC = "/oneM2M/req/+/" + Constants.CSE_ID + "/+";
+
+
+ /** Reference to the current cseService implementation*/
+ private static CseService cseService;
+
+ /**
+ * Set the current CseService used when a request is
+ * received on the oneM2M request topic.
+ * @param cse the CseService implementation to use
+ */
+ public static void setCseService(CseService cse) {
+ cseService = cse;
+ }
+
+ // Private attributes
+
+ /** MQTT Client from the Paho library */
+ private MqttClient mainMqttClient;
+
+ /** The MQTT connection options to use */
+ private MqttConnectOptions connOpts;
+
+ /** Tell the thread to keep retrying or not */
+ private boolean retry = true;
+ /** Connection retry thread */
+ private Thread retryThread;
+
+ /**
+ * Default constructor of the Request Handler
+ */
+ public MqttRequestHandler() {
+ MemoryPersistence persistence = new MemoryPersistence();
+ String url = "tcp://" + MqttConstants.MQTT_BROKER_HOSTNAME + ":"
+ + MqttConstants.MQTT_BROKER_PORT;
+ this.connOpts = new MqttConnectOptions();
+ connOpts.setCleanSession(true);
+ if(MqttConstants.MQTT_BROKER_USERNAME != null && MqttConstants.MQTT_BROKER_PASSWORD != null){
+ connOpts.setUserName(MqttConstants.MQTT_BROKER_USERNAME);
+ connOpts.setPassword(MqttConstants.MQTT_BROKER_PASSWORD.toCharArray());
+ }
+ try {
+ LOGGER.debug("Connecting MQTT client to: " + url);
+ this.mainMqttClient = new MqttClient(url, CLIENT_ID, persistence);
+ this.mainMqttClient.setCallback(MqttRequestHandler.this);
+ this.connect(this.connOpts);
+ } catch (MqttException e) {
+ LOGGER.error("Error in MQTT Client creation", e);
+ }
+ }
+
+ /**
+ * Connect and retry if the connection fails
+ * @param connOpts
+ */
+ private void connect(final MqttConnectOptions connOpts){
+ if(retry && retryThread == null){
+ retryThread = new Thread("mqtt-connection-retrier"){
+ public void run() {
+ while(retry && !MqttRequestHandler.this.mainMqttClient.isConnected()){
+ try {
+ MqttRequestHandler.this.mainMqttClient.connect(connOpts);
+
+ LOGGER.info("Subscribing on MQTT topic: " + REQUEST_TOPIC);
+ MqttRequestHandler.this.mainMqttClient.subscribe(REQUEST_TOPIC, 2);
+ } catch (MqttException e) {
+ LOGGER.warn("Cannot connect to MQTT Broker, retrying in 10s. Cause: " + e.getMessage());
+ }
+ if(!MqttRequestHandler.this.mainMqttClient.isConnected()){
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+ }
+ MqttRequestHandler.this.retryThread = null;
+ };
+ };
+ }
+ retryThread.start();
+ }
+
+ @Override
+ public void connectionLost(Throwable throwable) {
+ LOGGER.warn("Connection lost on MQTT Broker at " + MqttConstants.MQTT_BROKER_HOSTNAME + ":" + MqttConstants.MQTT_BROKER_PORT);
+ this.connect(this.connOpts);
+ }
+
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken token) {
+ // Empty
+ }
+
+ @Override
+ public void messageArrived(String topic, MqttMessage message)
+ throws Exception {
+ Matcher matcher = MqttConstants.REQUEST_PATTERN_IN.matcher(topic);
+ if (matcher.matches()) {
+ String aeId = matcher.group(1);
+ String format = matcher.group(2);
+ String responseTopic = "/oneM2M/resp/" + Constants.CSE_ID + "/" + aeId + "/" + format;
+
+ if (message.getPayload() == null) {
+ LOGGER.info("Null message received on " + topic);
+ sendErrorResponse("The message is null", responseTopic, aeId, format);
+ return;
+ }
+ String payload = new String(message.getPayload());
+ LOGGER.debug("(" + topic + ") Message received (qos: " + message.getQos() + "):\n" + payload);
+ DataMapperService dms = DataMapperRegistry.getFromMqttFormat(format);
+
+ if (dms == null) {
+ LOGGER.warn("MQTT Request received with unhandled content type: " + format);
+ sendErrorResponse("The format type is not handled",
+ responseTopic.replace("/" + format, "/" + MqttConstants.MQTT_XML),
+ aeId, MqttConstants.MQTT_XML);
+ return;
+ }
+ Object objectPayload = dms.stringToObj(payload);
+ if(objectPayload == null || !(objectPayload instanceof RequestPrimitive)){
+ LOGGER.info("Invalid content provided in MQTT request");
+ sendErrorResponse("Invalid content provided in request primitive", responseTopic, aeId, format);
+ return;
+ }
+ RequestPrimitive requestPrimitive = (RequestPrimitive) objectPayload;
+ requestPrimitive.setRequestContentType(MimeMediaType.OBJ);
+ requestPrimitive.setReturnContentType(MimeMediaType.OBJ);
+ // Primitive content handling
+ if(requestPrimitive.getPrimitiveContent() != null &&
+ !requestPrimitive.getPrimitiveContent().getAny().isEmpty() &&
+ requestPrimitive.getContent() == null){
+ requestPrimitive.setContent(requestPrimitive.getPrimitiveContent().getAny().get(0));
+ }
+
+ ResponsePrimitive responsePrimitive = null;
+
+ if(cseService != null){
+ // Sending the request to the CSE
+ responsePrimitive = cseService.doRequest(requestPrimitive);
+
+ // Handling the custom "content" field and map it to PrimitiveContent for serialization
+ if(responsePrimitive.getContent() != null &&
+ responsePrimitive.getPrimitiveContent() == null){
+ PrimitiveContent pc = new PrimitiveContent();
+ pc.getAny().add(responsePrimitive.getContent());
+ responsePrimitive.setPrimitiveContent(pc);
+ }
+
+ // Building and sending response
+ final String responsePayload = dms.objToString(responsePrimitive);
+ LOGGER.debug("Response to be sent on topic: " + responseTopic + ". Payload:\n" + responsePayload);
+
+ // Sending the request in another thread otherwise it blocks the reception thread of Paho
+ QueueSender.queue(mainMqttClient, responseTopic, responsePayload.getBytes());
+ } else {
+ sendErrorResponse("/" + Constants.CSE_ID + " is not available", responseTopic, aeId, format, ResponseStatusCode.SERVICE_UNAVAILABLE);
+ }
+
+ } else {
+ LOGGER.debug("The topic is not well formed. (" + topic + ")");
+ }
+ }
+
+ /**
+ * Util method that send an error message to the client
+ * @param message the message to send
+ * @param responseTopic the response topic to reply on
+ * @param aeId the id of the client
+ * @param format the format of exchange
+ */
+ private void sendErrorResponse(String message, String responseTopic,
+ String aeId, String format, BigInteger responseStatusCode) {
+ ResponsePrimitive responsePrimitive = new ResponsePrimitive();
+ responsePrimitive.setTo(aeId);
+ responsePrimitive.setFrom("/" + Constants.CSE_ID);
+ responsePrimitive.setResponseStatusCode(responseStatusCode);
+ responsePrimitive.setPrimitiveContent(new PrimitiveContent());
+ responsePrimitive.getPrimitiveContent().getAny().add(message);
+ DataMapperService dms = DataMapperRegistry.getFromMqttFormat(format);
+ byte[] errorPayload = dms.objToString(responsePrimitive).getBytes();
+ QueueSender.queue(mainMqttClient, responseTopic, errorPayload);
+ }
+
+ /**
+ * Send a bad request response to the client
+ * @param message the message
+ * @param responseTopic the response topic
+ * @param aeId the id of the client
+ * @param format the format of exchange
+ */
+ private void sendErrorResponse(String message, String responseTopic,
+ String aeId, String format){
+ sendErrorResponse(message, responseTopic, aeId, format, ResponseStatusCode.BAD_REQUEST);
+ }
+
+ /**
+ * Disconnecting and closing the MQTT Client.
+ */
+ public void close(){
+ // Disconnect the MQTT Client
+ try {
+ this.mainMqttClient.disconnect();
+ } catch (MqttException e) {
+ LOGGER.debug("Error disconnecting the MQTT Client", e);
+ }
+ // Prevent on any reconnection retry
+ retry = false;
+ // Wake up the retry thread after the false on "retry" has been set
+ if(retryThread != null && retryThread.isAlive()){
+ retryThread.interrupt();
+ }
+ }
+}
diff --git a/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/MqttRestClient.java b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/MqttRestClient.java
new file mode 100644
index 00000000..7936749b
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/MqttRestClient.java
@@ -0,0 +1,155 @@
+package org.eclipse.om2m.binding.mqtt;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.eclipse.om2m.binding.mqtt.util.DataMapperRegistry;
+import org.eclipse.om2m.binding.mqtt.util.MqttConstants;
+import org.eclipse.om2m.binding.mqtt.util.QueueSender;
+import org.eclipse.om2m.binding.mqtt.util.ResponseRegistry;
+import org.eclipse.om2m.binding.mqtt.util.ResponseSemaphore;
+import org.eclipse.om2m.binding.service.RestClientService;
+import org.eclipse.om2m.commons.constants.Constants;
+import org.eclipse.om2m.commons.constants.MimeMediaType;
+import org.eclipse.om2m.commons.constants.ResponseStatusCode;
+import org.eclipse.om2m.commons.resource.PrimitiveContent;
+import org.eclipse.om2m.commons.resource.RequestPrimitive;
+import org.eclipse.om2m.commons.resource.ResponsePrimitive;
+import org.eclipse.om2m.datamapping.service.DataMapperService;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+
+public class MqttRestClient implements RestClientService {
+
+ private static final Log LOGGER = LogFactory.getLog(MqttRestClient.class);
+
+ @Override
+ public ResponsePrimitive sendRequest(RequestPrimitive requestPrimitive) {
+ if(requestPrimitive.getContent() != null){
+ PrimitiveContent pc = new PrimitiveContent();
+ switch(requestPrimitive.getRequestContentType()){
+ case MimeMediaType.XML:
+ pc.getAny().add(DataMapperRegistry.get(MimeMediaType.XML).stringToObj((String)requestPrimitive.getContent()));
+ break;
+ case MimeMediaType.JSON:
+ pc.getAny().add(DataMapperRegistry.get(MimeMediaType.JSON).stringToObj((String)requestPrimitive.getContent()));
+ break;
+ case MimeMediaType.OBJ: case MimeMediaType.TEXT_PLAIN:
+ pc.getAny().add(requestPrimitive.getContent());
+ break;
+ default:
+ break;
+ }
+ if(!pc.getAny().isEmpty()){
+ requestPrimitive.setPrimitiveContent(pc);
+ }
+ }
+
+ ResponsePrimitive responsePrimitive = new ResponsePrimitive(requestPrimitive);
+
+ if(requestPrimitive.getMqttTopic() == null || requestPrimitive.getMqttUri() == null){
+ responsePrimitive.setResponseStatusCode(ResponseStatusCode.BAD_REQUEST);
+ return responsePrimitive;
+ }
+
+ if(requestPrimitive.getRequestIdentifier() == null){
+ requestPrimitive.setRequestIdentifier(UUID.randomUUID().toString());
+ }
+
+ String uri = requestPrimitive.getMqttUri();
+ if(uri.startsWith("mqtt://")){
+ uri = uri.replaceFirst("mqtt://", "tcp://");
+ }
+
+ if(requestPrimitive.getTo().startsWith("mqtt://")){
+ Pattern mqttUriPatter = Pattern.compile("(mqtt://[^:/]*(:[0-9]{1,5})?)(/.*)");
+ Matcher matcher = mqttUriPatter.matcher(requestPrimitive.getTo());
+ if(matcher.matches()){
+ requestPrimitive.setTo(matcher.group(3));
+ }
+ }
+
+ String topic = requestPrimitive.getMqttTopic();
+ String payload = null;
+ String format = null;
+ if (topic.endsWith("/json")){
+ payload = DataMapperRegistry.get(MimeMediaType.JSON).objToString(requestPrimitive);
+ format = "json";
+ } else {
+ // Case of XML and default
+ payload = DataMapperRegistry.get(MimeMediaType.XML).objToString(requestPrimitive);
+ format = "xml";
+ }
+
+ try {
+ MqttClient mqttClient = new MqttClient(uri, requestPrimitive.getRequestIdentifier(), new MemoryPersistence());
+ mqttClient.connect();
+ LOGGER.debug("Sending request on topic: " + topic + " with payload:\n" + payload);
+ ResponseSemaphore responseSemaphore = null;
+ if(requestPrimitive.isMqttResponseExpected()){
+ Matcher matcher = MqttConstants.REQUEST_PATTERN_OUT.matcher(topic);
+ if(matcher.matches()){
+ String responseTopic = "/oneM2M/resp/" + matcher.group(1) + "/"+ Constants.CSE_ID + "/" + format;
+ responseSemaphore = ResponseRegistry.createSemaphore(requestPrimitive.getRequestIdentifier(), mqttClient, responseTopic);
+ } else {
+ responsePrimitive.setResponseStatusCode(ResponseStatusCode.TARGET_NOT_REACHABLE);
+ }
+ } else {
+ mqttClient.publish(topic, new MqttMessage(payload.getBytes()));
+ responsePrimitive.setResponseStatusCode(ResponseStatusCode.OK);
+ }
+ if(responseSemaphore != null){
+ QueueSender.queue(mqttClient, topic, payload.getBytes());
+ LOGGER.debug("Waiting for response... (" + MqttConstants.TIME_OUT_DURATION + "s)");
+ boolean released = responseSemaphore.getSemaphore().tryAcquire(1, MqttConstants.TIME_OUT_DURATION, TimeUnit.SECONDS);
+ if(released){
+ responsePrimitive = responseSemaphore.getResponsePrimitive();
+ fillAndConvertContent(requestPrimitive, responsePrimitive);
+ LOGGER.debug("Response received: " + responsePrimitive);
+ } else {
+ responsePrimitive.setResponseStatusCode(ResponseStatusCode.TARGET_NOT_REACHABLE);
+ }
+ }
+ mqttClient.disconnect();
+ mqttClient.close();
+ } catch (MqttException e) {
+ LOGGER.warn("Cannot connect to: " + requestPrimitive.getMqttUri());
+ responsePrimitive.setResponseStatusCode(ResponseStatusCode.TARGET_NOT_REACHABLE);
+ return responsePrimitive;
+ } catch (InterruptedException e) {
+ LOGGER.error("Interrupted exception caught in MqttRestClient: " + e.getMessage());
+ responsePrimitive.setResponseStatusCode(ResponseStatusCode.TARGET_NOT_REACHABLE);
+ return responsePrimitive;
+ }
+
+ return responsePrimitive;
+ }
+
+ private void fillAndConvertContent(RequestPrimitive requestPrimitive,
+ ResponsePrimitive responsePrimitive) {
+ if(responsePrimitive.getPrimitiveContent() != null &&
+ !responsePrimitive.getPrimitiveContent().getAny().isEmpty() &&
+ responsePrimitive.getContent() == null){
+ if(requestPrimitive.getReturnContentType().equals(MimeMediaType.OBJ)){
+ responsePrimitive.setContent(responsePrimitive.getPrimitiveContent().getAny().get(0));
+ } else {
+ DataMapperService dms = DataMapperRegistry.get(requestPrimitive.getReturnContentType());
+ String content = dms.objToString(responsePrimitive.getPrimitiveContent().getAny().get(0));
+ responsePrimitive.setContent(content);
+ responsePrimitive.setContentType(requestPrimitive.getReturnContentType());
+ }
+ }
+ }
+
+ @Override
+ public String getProtocol() {
+ return MqttConstants.PROTOCOL;
+ }
+
+}
diff --git a/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/DataMapperRegistry.java b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/DataMapperRegistry.java
new file mode 100644
index 00000000..18127ab3
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/DataMapperRegistry.java
@@ -0,0 +1,75 @@
+package org.eclipse.om2m.binding.mqtt.util;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.eclipse.om2m.commons.constants.MimeMediaType;
+import org.eclipse.om2m.datamapping.service.DataMapperService;
+
+/**
+ * This class is used to store instances of {@link DataMapperService} classes.
+ *
+ */
+public class DataMapperRegistry {
+
+ /** Private constructor to avoid creation */
+ private DataMapperRegistry(){}
+
+ /**
+ * Service registry classified by data type handled.
+ */
+ private static Map<String, DataMapperService> serviceRegistery = new HashMap<String, DataMapperService>();
+
+ /**
+ * Add a new {@link DataMapperService} to the registery.
+ * @param dms the service to register
+ */
+ public static void register(DataMapperService dms){
+ if(dms != null && dms.getServiceDataType() != null){
+ serviceRegistery.put(dms.getServiceDataType(), dms);
+ }
+ }
+
+ /**
+ * Retrieve a {@link DataMapperService} from a data type.
+ * @param dataType the
+ * @return the {@link DataMapperService} that handle the data type or null if none
+ */
+ public static DataMapperService get(String dataType){
+ return serviceRegistery.get(dataType);
+ }
+
+ /**
+ * Remove the {@link DataMapperService} from the registry
+ * @param dataType the data type of the service to remove
+ */
+ public static void remove(String dataType){
+ serviceRegistery.remove(dataType);
+ }
+
+ /**
+ * Remove the {@link DataMapperService} from the registry
+ * @param dms the service to remove from the registry
+ */
+ public static void remove(DataMapperService dms){
+ remove(dms.getServiceDataType());
+ }
+
+ /**
+ * Retrieve the {@link DataMapperService} from the registry
+ * from the MQTT format String
+ * @param format the format of the DMS
+ * @return the DMS with the specified format or null
+ */
+ public static DataMapperService getFromMqttFormat(String format){
+ switch (format) {
+ case "xml":
+ return DataMapperRegistry.get(MimeMediaType.XML);
+ case "json":
+ return DataMapperRegistry.get(MimeMediaType.JSON);
+ default:
+ return null;
+ }
+ }
+
+}
diff --git a/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/MqttConstants.java b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/MqttConstants.java
new file mode 100644
index 00000000..24fceff6
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/MqttConstants.java
@@ -0,0 +1,48 @@
+package org.eclipse.om2m.binding.mqtt.util;
+
+import java.util.regex.Pattern;
+
+import org.eclipse.om2m.commons.constants.Constants;
+
+/**
+ * A set of MQTT constants retrieved from the System.getProperty method.
+ *
+ */
+public final class MqttConstants {
+
+ private MqttConstants(){}
+
+ /** Hostname of the main broker */
+ public static final String MQTT_BROKER_HOSTNAME = System.getProperty("org.eclipse.om2m.mqtt.ip", "127.0.0.1");
+
+ /** IP of the main broker */
+ public static final int MQTT_BROKER_PORT = Integer.valueOf(System.getProperty("org.eclipse.om2m.mqtt.port", "1883"));
+
+ /** Username to connect to broker */
+ public static final String MQTT_BROKER_USERNAME = System.getProperty("org.eclipse.om2m.mqtt.username");
+
+ /** Password to connect to broker */
+ public static final String MQTT_BROKER_PASSWORD = System.getProperty("org.eclipse.om2m.mqtt.password");
+
+ /** MQTT Protocol prefix */
+ public static final String PROTOCOL = "mqtt";
+
+ /** Size of the request sender queue */
+ public static final int MQTT_QUEUE_SENDER_SIZE = Integer.valueOf(System.getProperty("org.eclipse.om2m.mqtt.queue.size", "8"));
+
+ /** Request pattern to parse the request topic on message reception */
+ public static final Pattern REQUEST_PATTERN_IN = Pattern.compile("/oneM2M/req/([^/]+)/" + Constants.CSE_ID + "/(.*)");
+
+ /** Request pattern when sending a message. */
+ public static final Pattern REQUEST_PATTERN_OUT = Pattern.compile("/oneM2M/req/" + Constants.CSE_ID+ "/([^/]+)+/(.*)");
+
+ /** Time out duration when waiting for a response. Unit in second. */
+ public static final long TIME_OUT_DURATION = Long.valueOf(System.getProperty("org.eclipse.om2m.mqtt.timeout", "20"));
+
+ /** MQTT format for XML in topic */
+ public static final String MQTT_XML = "xml";
+
+ /** MQTT format for JSON in topic */
+ public static final String MQTT_JSON = "json";
+
+}
diff --git a/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/QueueSender.java b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/QueueSender.java
new file mode 100644
index 00000000..43be0ecb
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/QueueSender.java
@@ -0,0 +1,60 @@
+package org.eclipse.om2m.binding.mqtt.util;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttException;
+
+public final class QueueSender {
+
+ private static final Log LOGGER = LogFactory.getLog(QueueSender.class);
+ private static ExecutorService threadPool;
+
+ static {
+ int queueSize = MqttConstants.MQTT_QUEUE_SENDER_SIZE <= 2 ? 2
+ : MqttConstants.MQTT_QUEUE_SENDER_SIZE;
+ threadPool = new ThreadPoolExecutor(2, queueSize, 1, TimeUnit.MINUTES,
+ new SynchronousQueue<Runnable>());
+ }
+
+ public static void queue(MqttClient mqttClient, String topic, byte[] payload){
+ LOGGER.debug("Sending MQTT message to " + mqttClient.getServerURI() + " topic: " + topic);
+ threadPool.execute(new MqttSender(mqttClient, topic, payload));
+ }
+
+ private static class MqttSender implements Runnable {
+
+ private MqttClient mqttClient;
+ private String topic;
+ private byte[] payload;
+
+ public MqttSender(MqttClient mqttClient, String topic, byte[] payload) {
+ super();
+ this.mqttClient = mqttClient;
+ this.topic = topic;
+ this.payload = payload;
+ }
+
+ @Override
+ public void run() {
+ try {
+ this.mqttClient.publish(topic, payload, 1, false);
+ } catch (MqttException e) {
+ LOGGER.warn("Error publishing on topic: " + this.topic
+ + " of broker " + this.mqttClient.getServerURI()
+ + ". Error: " + e.getMessage());
+ }
+ }
+
+ }
+
+ private QueueSender(){
+ // Empty and private constructor to avoid class creation
+ }
+
+}
diff --git a/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/ResponseRegistry.java b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/ResponseRegistry.java
new file mode 100644
index 00000000..20c7f246
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/ResponseRegistry.java
@@ -0,0 +1,99 @@
+package org.eclipse.om2m.binding.mqtt.util;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.eclipse.om2m.commons.constants.Constants;
+import org.eclipse.om2m.commons.constants.MimeMediaType;
+import org.eclipse.om2m.commons.resource.ResponsePrimitive;
+import org.eclipse.om2m.datamapping.service.DataMapperService;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+public final class ResponseRegistry {
+
+ private ResponseRegistry() {
+ // Empty and private constructor to avoid instantiation of this class
+ }
+
+ private static final Map<String, ResponseSemaphore> responseMap = new HashMap<String, ResponseSemaphore>();
+
+ public static ResponseSemaphore createSemaphore(String requestIdentifier, MqttClient mqttClient,
+ String responseTopic) throws MqttException{
+ synchronized (responseMap) {
+ mqttClient.setCallback(new ResponseCallback());
+ mqttClient.subscribe(responseTopic, 1);
+ if(!responseMap.containsKey(requestIdentifier)){
+ ResponseSemaphore respSemaphore = new ResponseSemaphore(new Semaphore(0));
+ responseMap.put(requestIdentifier, respSemaphore);
+ return respSemaphore;
+ }
+ return null;
+ }
+ }
+
+ private static void responseReceived(ResponsePrimitive responsePrimitive){
+ synchronized (responseMap) {
+ if(responseMap.containsKey(responsePrimitive.getRequestIdentifier())){
+ ResponseSemaphore responseSemanphore = responseMap.get(responsePrimitive.getRequestIdentifier());
+ responseSemanphore.setResponsePrimitive(responsePrimitive);
+ responseSemanphore.getSemaphore().release();
+ responseMap.remove(responsePrimitive.getRequestIdentifier());
+ }
+ }
+ }
+
+ private static class ResponseCallback implements MqttCallback {
+
+ private static Pattern responsePattern = Pattern.compile("/oneM2M/resp/([^/]+)/" + Constants.CSE_ID + "/(.*)");
+
+ @Override
+ public void connectionLost(Throwable cause) {
+ // Ignore
+ }
+
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken token) {
+ // Ignore
+ }
+
+ @Override
+ public void messageArrived(String topic, MqttMessage message)
+ throws Exception {
+ Matcher matcher = responsePattern.matcher(topic);
+ if(!matcher.matches()){
+ return;
+ }
+ String format = matcher.group(2);
+ DataMapperService dms = null;
+ switch (format) {
+ case "xml":
+ dms = DataMapperRegistry.get(MimeMediaType.XML);
+ break;
+ case "json":
+ dms = DataMapperRegistry.get(MimeMediaType.JSON);
+ break;
+ default:
+ break;
+ }
+ if(dms == null){
+ // The format is not handled here
+ return;
+ }
+ String payload = new String(message.getPayload());
+ ResponsePrimitive resp = (ResponsePrimitive) dms.stringToObj(payload);
+ if(resp == null || resp.getRequestIdentifier() == null){
+ return;
+ }
+ responseReceived(resp);
+ }
+
+ }
+
+}
diff --git a/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/ResponseSemaphore.java b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/ResponseSemaphore.java
new file mode 100644
index 00000000..7588712d
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/ResponseSemaphore.java
@@ -0,0 +1,32 @@
+package org.eclipse.om2m.binding.mqtt.util;
+
+import java.util.concurrent.Semaphore;
+
+import org.eclipse.om2m.commons.resource.ResponsePrimitive;
+
+public class ResponseSemaphore {
+
+ private Semaphore semaphore;
+ private ResponsePrimitive responsePrimitive;
+
+ public ResponseSemaphore(Semaphore semaphore) {
+ this.semaphore = semaphore;
+ }
+
+ public Semaphore getSemaphore() {
+ return semaphore;
+ }
+
+ public void setSemaphore(Semaphore semaphore) {
+ this.semaphore = semaphore;
+ }
+
+ public ResponsePrimitive getResponsePrimitive() {
+ return responsePrimitive;
+ }
+
+ public void setResponsePrimitive(ResponsePrimitive responsePrimitive) {
+ this.responsePrimitive = responsePrimitive;
+ }
+
+}
diff --git a/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/Utils.java b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/Utils.java
new file mode 100644
index 00000000..219fa66b
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/Utils.java
@@ -0,0 +1,19 @@
+package org.eclipse.om2m.binding.mqtt.util;
+
+import org.eclipse.om2m.commons.resource.ResponsePrimitive;
+
+public final class Utils {
+
+ public static void fillPrimitiveContent(){
+
+ }
+
+ public static void fillContent(ResponsePrimitive requestPrimitive){
+ if(requestPrimitive.getPrimitiveContent() != null &&
+ !requestPrimitive.getPrimitiveContent().getAny().isEmpty() &&
+ requestPrimitive.getContent() == null){
+ requestPrimitive.setContent(requestPrimitive.getPrimitiveContent().getAny().get(0));
+ }
+ }
+
+}
diff --git a/org.eclipse.om2m.commons/src/main/java/org/eclipse/om2m/commons/resource/RequestPrimitive.java b/org.eclipse.om2m.commons/src/main/java/org/eclipse/om2m/commons/resource/RequestPrimitive.java
index 7bb68b11..9e351993 100644
--- a/org.eclipse.om2m.commons/src/main/java/org/eclipse/om2m/commons/resource/RequestPrimitive.java
+++ b/org.eclipse.om2m.commons/src/main/java/org/eclipse/om2m/commons/resource/RequestPrimitive.java
@@ -135,7 +135,15 @@ public class RequestPrimitive {
protected String requestContentType;
@XmlTransient
protected Map<String,List<String>> queryStrings;
-
+ @XmlTransient
+ protected String targetId;
+ @XmlTransient
+ protected String mqttTopic;
+ @XmlTransient
+ protected String mqttUri;
+ @XmlTransient
+ protected boolean mqttResponseExpected = true;
+
/**
* @return the queryStrings
*/
@@ -558,7 +566,7 @@ public class RequestPrimitive {
* @return the targetId
*/
public String getTargetId() {
- return this.getTo();
+ return this.targetId;
}
/**
@@ -566,7 +574,7 @@ public class RequestPrimitive {
* the targetId to set
*/
public void setTargetId(String targetId) {
- this.setTo(targetId);
+ this.targetId= targetId;
}
/**
@@ -597,6 +605,38 @@ public class RequestPrimitive {
this.requestContentType = requestContentType;
}
+ public PrimitiveContent getPrimitiveContent() {
+ return primitiveContent;
+ }
+
+ public void setPrimitiveContent(PrimitiveContent primitiveContent) {
+ this.primitiveContent = primitiveContent;
+ }
+
+ public String getMqttTopic() {
+ return mqttTopic;
+ }
+
+ public void setMqttTopic(String mqttTopic) {
+ this.mqttTopic = mqttTopic;
+ }
+
+ public String getMqttUri() {
+ return mqttUri;
+ }
+
+ public void setMqttUri(String mqttUri) {
+ this.mqttUri = mqttUri;
+ }
+
+ public boolean isMqttResponseExpected() {
+ return mqttResponseExpected;
+ }
+
+ public void setMqttResponseExpected(boolean mqttResponseExpected) {
+ this.mqttResponseExpected = mqttResponseExpected;
+ }
+
/* (non-Javadoc)
* @see java.lang.Object#toString()
*/
@@ -675,5 +715,5 @@ public class RequestPrimitive {
result.to = this.to;
return result;
}
-
+
}
diff --git a/org.eclipse.om2m.commons/src/main/java/org/eclipse/om2m/commons/resource/ResponsePrimitive.java b/org.eclipse.om2m.commons/src/main/java/org/eclipse/om2m/commons/resource/ResponsePrimitive.java
index 93b6b1b0..d6ad846b 100644
--- a/org.eclipse.om2m.commons/src/main/java/org/eclipse/om2m/commons/resource/ResponsePrimitive.java
+++ b/org.eclipse.om2m.commons/src/main/java/org/eclipse/om2m/commons/resource/ResponsePrimitive.java
@@ -331,7 +331,7 @@ public class ResponsePrimitive {
this.content = content;
}
- public PrimitiveContent getPritimitiveContent(){
+ public PrimitiveContent getPrimitiveContent(){
return content;
}
diff --git a/org.eclipse.om2m.core/src/main/java/org/eclipse/om2m/core/controller/FanOutPointController.java b/org.eclipse.om2m.core/src/main/java/org/eclipse/om2m/core/controller/FanOutPointController.java
index 1fc142a4..424ea3fb 100644
--- a/org.eclipse.om2m.core/src/main/java/org/eclipse/om2m/core/controller/FanOutPointController.java
+++ b/org.eclipse.om2m.core/src/main/java/org/eclipse/om2m/core/controller/FanOutPointController.java
@@ -158,7 +158,7 @@ public class FanOutPointController extends Controller {
public ResponsePrimitive call() throws Exception {
ResponsePrimitive resp = new Router().doRequest(request);
resp.setPrimitiveContent(new PrimitiveContent());
- resp.getPritimitiveContent().getAny().add(resp.getContent());
+ resp.getPrimitiveContent().getAny().add(resp.getContent());
return resp;
}
diff --git a/org.eclipse.om2m.core/src/main/java/org/eclipse/om2m/core/notifier/Notifier.java b/org.eclipse.om2m.core/src/main/java/org/eclipse/om2m/core/notifier/Notifier.java
index 4ea0d702..191f26cb 100644
--- a/org.eclipse.om2m.core/src/main/java/org/eclipse/om2m/core/notifier/Notifier.java
+++ b/org.eclipse.om2m.core/src/main/java/org/eclipse/om2m/core/notifier/Notifier.java
@@ -22,6 +22,8 @@ package org.eclipse.om2m.core.notifier;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -133,6 +135,25 @@ public class Notifier {
LOGGER.info("Sending notify request to: " + contact);
if(contact.matches(".*://.*")){
// Contact = protocol-dependent -> direct notification using the rest client.
+ // In case of MQTT, the URI of the broker and the Topic has to be handled separatly
+ if(contact.startsWith("mqtt://")){
+ Pattern mqttUriPattern = Pattern.compile("(mqtt://[^:/]*(:[0-9]{1,5})?)(/.*)");
+ Matcher matcher = mqttUriPattern.matcher(contact);
+ if(matcher.matches()){
+ String uri = matcher.group(1);
+ String topic = matcher.group(3) == null ? "" : matcher.group(3).substring(1);
+ request.setMqttTopic(topic);
+ request.setMqttUri(uri);
+ // We do not want to wait for a response on AE topic
+ request.setMqttResponseExpected(false);
+ } else {
+ ResponsePrimitive resp = new ResponsePrimitive(request);
+ resp.setResponseStatusCode(ResponseStatusCode.BAD_REQUEST);
+ resp.setContent("Error in mqtt URI");
+ resp.setContentType(MimeMediaType.TEXT_PLAIN);
+ return resp;
+ }
+ }
request.setTo(contact);
return RestClient.sendRequest(request);
}else{
@@ -267,7 +288,7 @@ public class Notifier {
for(final String uri : sub.getNotificationURI()){
CoreExecutor.postThread(new Runnable(){
public void run() {
- Notifier.notify(request, uri);
+ Notifier.notify(request, uri);
};
});
}
diff --git a/org.eclipse.om2m.core/src/main/java/org/eclipse/om2m/core/redirector/Redirector.java b/org.eclipse.om2m.core/src/main/java/org/eclipse/om2m/core/redirector/Redirector.java
index fd20c05b..b5132356 100644
--- a/org.eclipse.om2m.core/src/main/java/org/eclipse/om2m/core/redirector/Redirector.java
+++ b/org.eclipse.om2m.core/src/main/java/org/eclipse/om2m/core/redirector/Redirector.java
@@ -19,6 +19,9 @@
*******************************************************************************/
package org.eclipse.om2m.core.redirector;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.om2m.commons.constants.CSEType;
@@ -114,15 +117,32 @@ public class Redirector {
LOGGER.debug("Removing / at the end of poa: " + url);
url = url.substring(0, url.length() - 1);
}
+
- if(request.getTo().startsWith("//")){
- url += request.getTo().replaceFirst("//", "/_/");
- } else if(request.getTo().startsWith("/")){
- url += request.getTo().replaceFirst("/", "/~/");
+ if(url.startsWith("mqtt://")){
+ url += request.getTo();
+ Pattern mqttUriPattern = Pattern.compile("(mqtt://[^:/]*(:[0-9]{1,5})?)(/.*)");
+ Matcher matcher = mqttUriPattern.matcher(url);
+ if(matcher.matches()){
+ // FIXME we need a response but not yet implemented in MQTT binding
+ request.setMqttResponseExpected(true);
+ // TODO Format type can be enhanced
+ request.setMqttTopic("/oneM2M/req/" + Constants.CSE_ID + "/" + csrEntity.getRemoteCseId().replaceAll("/", "") + "/xml");
+ request.setMqttUri(matcher.group(1));
+ } else {
+ LOGGER.warn("Incorrect MQTT URI specified in remoteCSE: " + url);
+ i++;
+ continue;
+ }
} else {
- url+= "/" + request.getTo();
+ if(request.getTo().startsWith("//")){
+ url += request.getTo().replaceFirst("//", "/_/");
+ } else if(request.getTo().startsWith("/")){
+ url += request.getTo().replaceFirst("/", "/~/");
+ } else {
+ url+= "/" + request.getTo();
+ }
}
-
request.setTo(url);
ResponsePrimitive response = RestClient.sendRequest(request);
if(!(response.getResponseStatusCode()
@@ -192,6 +212,26 @@ public class Redirector {
done = true;
} else {
request.setTo(poa);
+ if(poa.startsWith("mqtt://")){
+ Pattern mqttUriPattern = Pattern.compile("(mqtt://[^:/]*(:[0-9]{1,5})?)(/.*)?");
+ Matcher matcher = mqttUriPattern.matcher(poa);
+ if(matcher.matches()){
+ String topic = matcher.group(3);
+ String aeId = ae.getAeid();
+ if(topic != null){
+ request.setMqttTopic(topic);
+ request.setMqttResponseExpected(false);
+ } else {
+ request.setMqttTopic("/oneM2M/req/" + Constants.CSE_ID + "/" + aeId + "/xml");
+ request.setMqttResponseExpected(true);
+ }
+ request.setMqttUri(matcher.group(1));
+ } else {
+ LOGGER.warn("POA is incorrect for MQTT: " + poa);
+ i++;
+ continue;
+ }
+ }
response = RestClient.sendRequest(request);
if(!response.getResponseStatusCode().equals(ResponseStatusCode.TARGET_NOT_REACHABLE)){
done = true;
diff --git a/org.eclipse.om2m.core/src/main/java/org/eclipse/om2m/core/router/Router.java b/org.eclipse.om2m.core/src/main/java/org/eclipse/om2m/core/router/Router.java
index 44f4f5c2..44b27cb3 100644
--- a/org.eclipse.om2m.core/src/main/java/org/eclipse/om2m/core/router/Router.java
+++ b/org.eclipse.om2m.core/src/main/java/org/eclipse/om2m/core/router/Router.java
@@ -126,6 +126,10 @@ public class Router implements CseService {
}
// URI Handling
+ if(request.getTo() == null && request.getTargetId() == null){
+ throw new BadRequestException("No To parameter provided provided");
+ }
+
if(request.getTargetId() == null){
request.setTargetId(request.getTo());
}
diff --git a/org.eclipse.om2m.datamapping.jaxb/src/main/resources/json-binding.xml b/org.eclipse.om2m.datamapping.jaxb/src/main/resources/json-binding.xml
index 74b8d2ab..583a3ee1 100644
--- a/org.eclipse.om2m.datamapping.jaxb/src/main/resources/json-binding.xml
+++ b/org.eclipse.om2m.datamapping.jaxb/src/main/resources/json-binding.xml
@@ -19,6 +19,19 @@
</java-attributes>
</java-type>
+ <!-- Request and Response Descriptions -->
+ <java-type name="RequestPrimitive">
+ <xml-root-element name="m2m:rqp"/>
+ </java-type>
+
+ <java-type name="PrimitiveContent">
+ <xml-root-element name="pc"/>
+ </java-type>
+
+ <java-type name="ResponsePrimitive">
+ <xml-root-element name="m2m:rsp"/>
+ </java-type>
+
<!-- CSE Descriptions -->
<java-type name="CSEBase">
<xml-root-element name="m2m:cb" />
diff --git a/org.eclipse.om2m.site.in-cse/om2m.product b/org.eclipse.om2m.site.in-cse/om2m.product
index 517b1d71..47305cb7 100644
--- a/org.eclipse.om2m.site.in-cse/om2m.product
+++ b/org.eclipse.om2m.site.in-cse/om2m.product
@@ -52,6 +52,7 @@
<plugin id="org.eclipse.jetty.util"/>
<plugin id="org.eclipse.om2m.binding.coap"/>
<plugin id="org.eclipse.om2m.binding.http"/>
+ <plugin id="org.eclipse.om2m.binding.mqtt"/>
<plugin id="org.eclipse.om2m.binding.service"/>
<plugin id="org.eclipse.om2m.commons"/>
<plugin id="org.eclipse.om2m.commons.logging" fragment="true"/>
diff --git a/pom.xml b/pom.xml
index 9ee48c01..e365d1c3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -122,6 +122,7 @@
<module>org.eclipse.om2m.commons.logging</module>
<module>org.eclipse.om2m.binding.http</module>
<module>org.eclipse.om2m.binding.coap</module>
+ <module>org.eclipse.om2m.binding.mqtt</module>
<module>org.eclipse.om2m.binding.service</module>
<module>org.eclipse.om2m.core.service</module>
<module>org.eclipse.om2m.datamapping.jaxb</module>

Back to the top