Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/MqttRestClient.java')
-rw-r--r--org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/MqttRestClient.java155
1 files changed, 155 insertions, 0 deletions
diff --git a/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/MqttRestClient.java b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/MqttRestClient.java
new file mode 100644
index 00000000..7936749b
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/MqttRestClient.java
@@ -0,0 +1,155 @@
+package org.eclipse.om2m.binding.mqtt;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.eclipse.om2m.binding.mqtt.util.DataMapperRegistry;
+import org.eclipse.om2m.binding.mqtt.util.MqttConstants;
+import org.eclipse.om2m.binding.mqtt.util.QueueSender;
+import org.eclipse.om2m.binding.mqtt.util.ResponseRegistry;
+import org.eclipse.om2m.binding.mqtt.util.ResponseSemaphore;
+import org.eclipse.om2m.binding.service.RestClientService;
+import org.eclipse.om2m.commons.constants.Constants;
+import org.eclipse.om2m.commons.constants.MimeMediaType;
+import org.eclipse.om2m.commons.constants.ResponseStatusCode;
+import org.eclipse.om2m.commons.resource.PrimitiveContent;
+import org.eclipse.om2m.commons.resource.RequestPrimitive;
+import org.eclipse.om2m.commons.resource.ResponsePrimitive;
+import org.eclipse.om2m.datamapping.service.DataMapperService;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+
+public class MqttRestClient implements RestClientService {
+
+ private static final Log LOGGER = LogFactory.getLog(MqttRestClient.class);
+
+ @Override
+ public ResponsePrimitive sendRequest(RequestPrimitive requestPrimitive) {
+ if(requestPrimitive.getContent() != null){
+ PrimitiveContent pc = new PrimitiveContent();
+ switch(requestPrimitive.getRequestContentType()){
+ case MimeMediaType.XML:
+ pc.getAny().add(DataMapperRegistry.get(MimeMediaType.XML).stringToObj((String)requestPrimitive.getContent()));
+ break;
+ case MimeMediaType.JSON:
+ pc.getAny().add(DataMapperRegistry.get(MimeMediaType.JSON).stringToObj((String)requestPrimitive.getContent()));
+ break;
+ case MimeMediaType.OBJ: case MimeMediaType.TEXT_PLAIN:
+ pc.getAny().add(requestPrimitive.getContent());
+ break;
+ default:
+ break;
+ }
+ if(!pc.getAny().isEmpty()){
+ requestPrimitive.setPrimitiveContent(pc);
+ }
+ }
+
+ ResponsePrimitive responsePrimitive = new ResponsePrimitive(requestPrimitive);
+
+ if(requestPrimitive.getMqttTopic() == null || requestPrimitive.getMqttUri() == null){
+ responsePrimitive.setResponseStatusCode(ResponseStatusCode.BAD_REQUEST);
+ return responsePrimitive;
+ }
+
+ if(requestPrimitive.getRequestIdentifier() == null){
+ requestPrimitive.setRequestIdentifier(UUID.randomUUID().toString());
+ }
+
+ String uri = requestPrimitive.getMqttUri();
+ if(uri.startsWith("mqtt://")){
+ uri = uri.replaceFirst("mqtt://", "tcp://");
+ }
+
+ if(requestPrimitive.getTo().startsWith("mqtt://")){
+ Pattern mqttUriPatter = Pattern.compile("(mqtt://[^:/]*(:[0-9]{1,5})?)(/.*)");
+ Matcher matcher = mqttUriPatter.matcher(requestPrimitive.getTo());
+ if(matcher.matches()){
+ requestPrimitive.setTo(matcher.group(3));
+ }
+ }
+
+ String topic = requestPrimitive.getMqttTopic();
+ String payload = null;
+ String format = null;
+ if (topic.endsWith("/json")){
+ payload = DataMapperRegistry.get(MimeMediaType.JSON).objToString(requestPrimitive);
+ format = "json";
+ } else {
+ // Case of XML and default
+ payload = DataMapperRegistry.get(MimeMediaType.XML).objToString(requestPrimitive);
+ format = "xml";
+ }
+
+ try {
+ MqttClient mqttClient = new MqttClient(uri, requestPrimitive.getRequestIdentifier(), new MemoryPersistence());
+ mqttClient.connect();
+ LOGGER.debug("Sending request on topic: " + topic + " with payload:\n" + payload);
+ ResponseSemaphore responseSemaphore = null;
+ if(requestPrimitive.isMqttResponseExpected()){
+ Matcher matcher = MqttConstants.REQUEST_PATTERN_OUT.matcher(topic);
+ if(matcher.matches()){
+ String responseTopic = "/oneM2M/resp/" + matcher.group(1) + "/"+ Constants.CSE_ID + "/" + format;
+ responseSemaphore = ResponseRegistry.createSemaphore(requestPrimitive.getRequestIdentifier(), mqttClient, responseTopic);
+ } else {
+ responsePrimitive.setResponseStatusCode(ResponseStatusCode.TARGET_NOT_REACHABLE);
+ }
+ } else {
+ mqttClient.publish(topic, new MqttMessage(payload.getBytes()));
+ responsePrimitive.setResponseStatusCode(ResponseStatusCode.OK);
+ }
+ if(responseSemaphore != null){
+ QueueSender.queue(mqttClient, topic, payload.getBytes());
+ LOGGER.debug("Waiting for response... (" + MqttConstants.TIME_OUT_DURATION + "s)");
+ boolean released = responseSemaphore.getSemaphore().tryAcquire(1, MqttConstants.TIME_OUT_DURATION, TimeUnit.SECONDS);
+ if(released){
+ responsePrimitive = responseSemaphore.getResponsePrimitive();
+ fillAndConvertContent(requestPrimitive, responsePrimitive);
+ LOGGER.debug("Response received: " + responsePrimitive);
+ } else {
+ responsePrimitive.setResponseStatusCode(ResponseStatusCode.TARGET_NOT_REACHABLE);
+ }
+ }
+ mqttClient.disconnect();
+ mqttClient.close();
+ } catch (MqttException e) {
+ LOGGER.warn("Cannot connect to: " + requestPrimitive.getMqttUri());
+ responsePrimitive.setResponseStatusCode(ResponseStatusCode.TARGET_NOT_REACHABLE);
+ return responsePrimitive;
+ } catch (InterruptedException e) {
+ LOGGER.error("Interrupted exception caught in MqttRestClient: " + e.getMessage());
+ responsePrimitive.setResponseStatusCode(ResponseStatusCode.TARGET_NOT_REACHABLE);
+ return responsePrimitive;
+ }
+
+ return responsePrimitive;
+ }
+
+ private void fillAndConvertContent(RequestPrimitive requestPrimitive,
+ ResponsePrimitive responsePrimitive) {
+ if(responsePrimitive.getPrimitiveContent() != null &&
+ !responsePrimitive.getPrimitiveContent().getAny().isEmpty() &&
+ responsePrimitive.getContent() == null){
+ if(requestPrimitive.getReturnContentType().equals(MimeMediaType.OBJ)){
+ responsePrimitive.setContent(responsePrimitive.getPrimitiveContent().getAny().get(0));
+ } else {
+ DataMapperService dms = DataMapperRegistry.get(requestPrimitive.getReturnContentType());
+ String content = dms.objToString(responsePrimitive.getPrimitiveContent().getAny().get(0));
+ responsePrimitive.setContent(content);
+ responsePrimitive.setContentType(requestPrimitive.getReturnContentType());
+ }
+ }
+ }
+
+ @Override
+ public String getProtocol() {
+ return MqttConstants.PROTOCOL;
+ }
+
+}

Back to the top