diff options
Diffstat (limited to 'org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse')
9 files changed, 894 insertions, 0 deletions
diff --git a/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/Activator.java b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/Activator.java new file mode 100644 index 00000000..0a74e784 --- /dev/null +++ b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/Activator.java @@ -0,0 +1,151 @@ +package org.eclipse.om2m.binding.mqtt; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.eclipse.om2m.binding.mqtt.util.DataMapperRegistry; +import org.eclipse.om2m.binding.service.RestClientService; +import org.eclipse.om2m.core.service.CseService; +import org.eclipse.om2m.datamapping.service.DataMapperService; +import org.osgi.framework.BundleActivator; +import org.osgi.framework.BundleContext; +import org.osgi.framework.ServiceReference; +import org.osgi.util.tracker.ServiceTracker; +import org.osgi.util.tracker.ServiceTrackerCustomizer; + +public class Activator implements BundleActivator { + + private static BundleContext context; + + static BundleContext getContext() { + return context; + } + + private static final Log LOGGER = LogFactory.getLog(Activator.class); + + /** {@link DataMapperServiceTracker} reference */ + private static ServiceTracker<DataMapperService, DataMapperService> dataMapperServiceTracker; + /** {@link CseService} reference */ + private static ServiceTracker<CseService, CseService> cseServiceTracker; + + /** MQTT Request Handler that connects to the MQTT Broker */ + private static MqttRequestHandler mqttRequestHandler; + + public void start(BundleContext bundleContext) throws Exception { + Activator.context = bundleContext; + + // Listening on Cse Service + cseServiceTracker = new ServiceTracker<CseService, CseService>( + bundleContext, CseService.class, + new CseServiceTrackerCustomizer()); + cseServiceTracker.open(); + + // Listening on DataMapper Service + dataMapperServiceTracker = new ServiceTracker<DataMapperService, DataMapperService>( + bundleContext, DataMapperService.class, + new DataMapperServiceTracker()); + dataMapperServiceTracker.open(); + + // Registering RestClientService of MQTT + getContext().registerService(RestClientService.class, new MqttRestClient(), null); + } + + public void stop(BundleContext bundleContext) throws Exception { + Activator.context = null; + if (cseServiceTracker != null) { + cseServiceTracker.close(); + cseServiceTracker = null; + } + if (dataMapperServiceTracker != null) { + dataMapperServiceTracker.close(); + dataMapperServiceTracker = null; + } + if (mqttRequestHandler != null){ + mqttRequestHandler.close(); + mqttRequestHandler = null; + } + } + + private static class CseServiceTrackerCustomizer implements + ServiceTrackerCustomizer<CseService, CseService> { + + @Override + public CseService addingService(ServiceReference<CseService> reference) { + if (reference == null) { + return null; + } + Object service = Activator.getContext().getService(reference); + if (service != null && service instanceof CseService) { + LOGGER.debug("New CseService discovered"); + CseService cse = (CseService) service; + MqttRequestHandler.setCseService(cse); + if (mqttRequestHandler == null) { + new Thread() { + public void run() { + LOGGER.info("Creating MQTT Request Handler"); + mqttRequestHandler = new MqttRequestHandler(); + }; + }.start(); + } + return cse; + } + return null; + } + + @Override + public void modifiedService(ServiceReference<CseService> reference, + CseService service) { + if (service != null) { + LOGGER.info("CseService modified"); + MqttRequestHandler.setCseService(service); + } + } + + @Override + public void removedService(ServiceReference<CseService> reference, + CseService service) { + MqttRequestHandler.setCseService(null); + } + + } + + private static class DataMapperServiceTracker implements + ServiceTrackerCustomizer<DataMapperService, DataMapperService> { + + @Override + public DataMapperService addingService( + ServiceReference<DataMapperService> reference) { + if (reference == null) { + return null; + } + Object service = Activator.getContext().getService(reference); + if (service != null && service instanceof DataMapperService) { + DataMapperService dms = (DataMapperService) service; + LOGGER.debug("New DataMapper Service discovered: " + + dms.getServiceDataType()); + DataMapperRegistry.register(dms); + return dms; + } + return null; + } + + @Override + public void modifiedService( + ServiceReference<DataMapperService> reference, + DataMapperService service) { + if (service != null) { + DataMapperRegistry.register(service); + } + } + + @Override + public void removedService( + ServiceReference<DataMapperService> reference, + DataMapperService service) { + if (service != null) { + DataMapperRegistry.remove(service); + } + } + + } + +} diff --git a/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/MqttRequestHandler.java b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/MqttRequestHandler.java new file mode 100644 index 00000000..1105a89c --- /dev/null +++ b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/MqttRequestHandler.java @@ -0,0 +1,255 @@ +package org.eclipse.om2m.binding.mqtt; + +import java.math.BigInteger; +import java.util.regex.Matcher; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.eclipse.om2m.binding.mqtt.util.DataMapperRegistry; +import org.eclipse.om2m.binding.mqtt.util.MqttConstants; +import org.eclipse.om2m.binding.mqtt.util.QueueSender; +import org.eclipse.om2m.commons.constants.Constants; +import org.eclipse.om2m.commons.constants.MimeMediaType; +import org.eclipse.om2m.commons.constants.ResponseStatusCode; +import org.eclipse.om2m.commons.resource.PrimitiveContent; +import org.eclipse.om2m.commons.resource.RequestPrimitive; +import org.eclipse.om2m.commons.resource.ResponsePrimitive; +import org.eclipse.om2m.core.service.CseService; +import org.eclipse.om2m.datamapping.service.DataMapperService; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; + +/** + * MQTT Request Handler class that subscribe to oneM2M request topic. + * When a request is received in the request topic, it is de-serialized and send + * to the CseService implementation available. Then the response from the service + * is serialized and sent to the oneM2M response topic. + */ +public class MqttRequestHandler implements MqttCallback { + + // Static attributes of the class + + /** MQTT Client ID */ + private static final String CLIENT_ID = Constants.CSE_ID; + /** Logger reference */ + private static final Log LOGGER = LogFactory.getLog(MqttRequestHandler.class); + /** MQTT Request Topic */ + private static final String REQUEST_TOPIC = "/oneM2M/req/+/" + Constants.CSE_ID + "/+"; + + + /** Reference to the current cseService implementation*/ + private static CseService cseService; + + /** + * Set the current CseService used when a request is + * received on the oneM2M request topic. + * @param cse the CseService implementation to use + */ + public static void setCseService(CseService cse) { + cseService = cse; + } + + // Private attributes + + /** MQTT Client from the Paho library */ + private MqttClient mainMqttClient; + + /** The MQTT connection options to use */ + private MqttConnectOptions connOpts; + + /** Tell the thread to keep retrying or not */ + private boolean retry = true; + /** Connection retry thread */ + private Thread retryThread; + + /** + * Default constructor of the Request Handler + */ + public MqttRequestHandler() { + MemoryPersistence persistence = new MemoryPersistence(); + String url = "tcp://" + MqttConstants.MQTT_BROKER_HOSTNAME + ":" + + MqttConstants.MQTT_BROKER_PORT; + this.connOpts = new MqttConnectOptions(); + connOpts.setCleanSession(true); + if(MqttConstants.MQTT_BROKER_USERNAME != null && MqttConstants.MQTT_BROKER_PASSWORD != null){ + connOpts.setUserName(MqttConstants.MQTT_BROKER_USERNAME); + connOpts.setPassword(MqttConstants.MQTT_BROKER_PASSWORD.toCharArray()); + } + try { + LOGGER.debug("Connecting MQTT client to: " + url); + this.mainMqttClient = new MqttClient(url, CLIENT_ID, persistence); + this.mainMqttClient.setCallback(MqttRequestHandler.this); + this.connect(this.connOpts); + } catch (MqttException e) { + LOGGER.error("Error in MQTT Client creation", e); + } + } + + /** + * Connect and retry if the connection fails + * @param connOpts + */ + private void connect(final MqttConnectOptions connOpts){ + if(retry && retryThread == null){ + retryThread = new Thread("mqtt-connection-retrier"){ + public void run() { + while(retry && !MqttRequestHandler.this.mainMqttClient.isConnected()){ + try { + MqttRequestHandler.this.mainMqttClient.connect(connOpts); + + LOGGER.info("Subscribing on MQTT topic: " + REQUEST_TOPIC); + MqttRequestHandler.this.mainMqttClient.subscribe(REQUEST_TOPIC, 2); + } catch (MqttException e) { + LOGGER.warn("Cannot connect to MQTT Broker, retrying in 10s. Cause: " + e.getMessage()); + } + if(!MqttRequestHandler.this.mainMqttClient.isConnected()){ + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + // Ignore + } + } + } + MqttRequestHandler.this.retryThread = null; + }; + }; + } + retryThread.start(); + } + + @Override + public void connectionLost(Throwable throwable) { + LOGGER.warn("Connection lost on MQTT Broker at " + MqttConstants.MQTT_BROKER_HOSTNAME + ":" + MqttConstants.MQTT_BROKER_PORT); + this.connect(this.connOpts); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + // Empty + } + + @Override + public void messageArrived(String topic, MqttMessage message) + throws Exception { + Matcher matcher = MqttConstants.REQUEST_PATTERN_IN.matcher(topic); + if (matcher.matches()) { + String aeId = matcher.group(1); + String format = matcher.group(2); + String responseTopic = "/oneM2M/resp/" + Constants.CSE_ID + "/" + aeId + "/" + format; + + if (message.getPayload() == null) { + LOGGER.info("Null message received on " + topic); + sendErrorResponse("The message is null", responseTopic, aeId, format); + return; + } + String payload = new String(message.getPayload()); + LOGGER.debug("(" + topic + ") Message received (qos: " + message.getQos() + "):\n" + payload); + DataMapperService dms = DataMapperRegistry.getFromMqttFormat(format); + + if (dms == null) { + LOGGER.warn("MQTT Request received with unhandled content type: " + format); + sendErrorResponse("The format type is not handled", + responseTopic.replace("/" + format, "/" + MqttConstants.MQTT_XML), + aeId, MqttConstants.MQTT_XML); + return; + } + Object objectPayload = dms.stringToObj(payload); + if(objectPayload == null || !(objectPayload instanceof RequestPrimitive)){ + LOGGER.info("Invalid content provided in MQTT request"); + sendErrorResponse("Invalid content provided in request primitive", responseTopic, aeId, format); + return; + } + RequestPrimitive requestPrimitive = (RequestPrimitive) objectPayload; + requestPrimitive.setRequestContentType(MimeMediaType.OBJ); + requestPrimitive.setReturnContentType(MimeMediaType.OBJ); + // Primitive content handling + if(requestPrimitive.getPrimitiveContent() != null && + !requestPrimitive.getPrimitiveContent().getAny().isEmpty() && + requestPrimitive.getContent() == null){ + requestPrimitive.setContent(requestPrimitive.getPrimitiveContent().getAny().get(0)); + } + + ResponsePrimitive responsePrimitive = null; + + if(cseService != null){ + // Sending the request to the CSE + responsePrimitive = cseService.doRequest(requestPrimitive); + + // Handling the custom "content" field and map it to PrimitiveContent for serialization + if(responsePrimitive.getContent() != null && + responsePrimitive.getPrimitiveContent() == null){ + PrimitiveContent pc = new PrimitiveContent(); + pc.getAny().add(responsePrimitive.getContent()); + responsePrimitive.setPrimitiveContent(pc); + } + + // Building and sending response + final String responsePayload = dms.objToString(responsePrimitive); + LOGGER.debug("Response to be sent on topic: " + responseTopic + ". Payload:\n" + responsePayload); + + // Sending the request in another thread otherwise it blocks the reception thread of Paho + QueueSender.queue(mainMqttClient, responseTopic, responsePayload.getBytes()); + } else { + sendErrorResponse("/" + Constants.CSE_ID + " is not available", responseTopic, aeId, format, ResponseStatusCode.SERVICE_UNAVAILABLE); + } + + } else { + LOGGER.debug("The topic is not well formed. (" + topic + ")"); + } + } + + /** + * Util method that send an error message to the client + * @param message the message to send + * @param responseTopic the response topic to reply on + * @param aeId the id of the client + * @param format the format of exchange + */ + private void sendErrorResponse(String message, String responseTopic, + String aeId, String format, BigInteger responseStatusCode) { + ResponsePrimitive responsePrimitive = new ResponsePrimitive(); + responsePrimitive.setTo(aeId); + responsePrimitive.setFrom("/" + Constants.CSE_ID); + responsePrimitive.setResponseStatusCode(responseStatusCode); + responsePrimitive.setPrimitiveContent(new PrimitiveContent()); + responsePrimitive.getPrimitiveContent().getAny().add(message); + DataMapperService dms = DataMapperRegistry.getFromMqttFormat(format); + byte[] errorPayload = dms.objToString(responsePrimitive).getBytes(); + QueueSender.queue(mainMqttClient, responseTopic, errorPayload); + } + + /** + * Send a bad request response to the client + * @param message the message + * @param responseTopic the response topic + * @param aeId the id of the client + * @param format the format of exchange + */ + private void sendErrorResponse(String message, String responseTopic, + String aeId, String format){ + sendErrorResponse(message, responseTopic, aeId, format, ResponseStatusCode.BAD_REQUEST); + } + + /** + * Disconnecting and closing the MQTT Client. + */ + public void close(){ + // Disconnect the MQTT Client + try { + this.mainMqttClient.disconnect(); + } catch (MqttException e) { + LOGGER.debug("Error disconnecting the MQTT Client", e); + } + // Prevent on any reconnection retry + retry = false; + // Wake up the retry thread after the false on "retry" has been set + if(retryThread != null && retryThread.isAlive()){ + retryThread.interrupt(); + } + } +} diff --git a/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/MqttRestClient.java b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/MqttRestClient.java new file mode 100644 index 00000000..7936749b --- /dev/null +++ b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/MqttRestClient.java @@ -0,0 +1,155 @@ +package org.eclipse.om2m.binding.mqtt; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.eclipse.om2m.binding.mqtt.util.DataMapperRegistry; +import org.eclipse.om2m.binding.mqtt.util.MqttConstants; +import org.eclipse.om2m.binding.mqtt.util.QueueSender; +import org.eclipse.om2m.binding.mqtt.util.ResponseRegistry; +import org.eclipse.om2m.binding.mqtt.util.ResponseSemaphore; +import org.eclipse.om2m.binding.service.RestClientService; +import org.eclipse.om2m.commons.constants.Constants; +import org.eclipse.om2m.commons.constants.MimeMediaType; +import org.eclipse.om2m.commons.constants.ResponseStatusCode; +import org.eclipse.om2m.commons.resource.PrimitiveContent; +import org.eclipse.om2m.commons.resource.RequestPrimitive; +import org.eclipse.om2m.commons.resource.ResponsePrimitive; +import org.eclipse.om2m.datamapping.service.DataMapperService; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; + +public class MqttRestClient implements RestClientService { + + private static final Log LOGGER = LogFactory.getLog(MqttRestClient.class); + + @Override + public ResponsePrimitive sendRequest(RequestPrimitive requestPrimitive) { + if(requestPrimitive.getContent() != null){ + PrimitiveContent pc = new PrimitiveContent(); + switch(requestPrimitive.getRequestContentType()){ + case MimeMediaType.XML: + pc.getAny().add(DataMapperRegistry.get(MimeMediaType.XML).stringToObj((String)requestPrimitive.getContent())); + break; + case MimeMediaType.JSON: + pc.getAny().add(DataMapperRegistry.get(MimeMediaType.JSON).stringToObj((String)requestPrimitive.getContent())); + break; + case MimeMediaType.OBJ: case MimeMediaType.TEXT_PLAIN: + pc.getAny().add(requestPrimitive.getContent()); + break; + default: + break; + } + if(!pc.getAny().isEmpty()){ + requestPrimitive.setPrimitiveContent(pc); + } + } + + ResponsePrimitive responsePrimitive = new ResponsePrimitive(requestPrimitive); + + if(requestPrimitive.getMqttTopic() == null || requestPrimitive.getMqttUri() == null){ + responsePrimitive.setResponseStatusCode(ResponseStatusCode.BAD_REQUEST); + return responsePrimitive; + } + + if(requestPrimitive.getRequestIdentifier() == null){ + requestPrimitive.setRequestIdentifier(UUID.randomUUID().toString()); + } + + String uri = requestPrimitive.getMqttUri(); + if(uri.startsWith("mqtt://")){ + uri = uri.replaceFirst("mqtt://", "tcp://"); + } + + if(requestPrimitive.getTo().startsWith("mqtt://")){ + Pattern mqttUriPatter = Pattern.compile("(mqtt://[^:/]*(:[0-9]{1,5})?)(/.*)"); + Matcher matcher = mqttUriPatter.matcher(requestPrimitive.getTo()); + if(matcher.matches()){ + requestPrimitive.setTo(matcher.group(3)); + } + } + + String topic = requestPrimitive.getMqttTopic(); + String payload = null; + String format = null; + if (topic.endsWith("/json")){ + payload = DataMapperRegistry.get(MimeMediaType.JSON).objToString(requestPrimitive); + format = "json"; + } else { + // Case of XML and default + payload = DataMapperRegistry.get(MimeMediaType.XML).objToString(requestPrimitive); + format = "xml"; + } + + try { + MqttClient mqttClient = new MqttClient(uri, requestPrimitive.getRequestIdentifier(), new MemoryPersistence()); + mqttClient.connect(); + LOGGER.debug("Sending request on topic: " + topic + " with payload:\n" + payload); + ResponseSemaphore responseSemaphore = null; + if(requestPrimitive.isMqttResponseExpected()){ + Matcher matcher = MqttConstants.REQUEST_PATTERN_OUT.matcher(topic); + if(matcher.matches()){ + String responseTopic = "/oneM2M/resp/" + matcher.group(1) + "/"+ Constants.CSE_ID + "/" + format; + responseSemaphore = ResponseRegistry.createSemaphore(requestPrimitive.getRequestIdentifier(), mqttClient, responseTopic); + } else { + responsePrimitive.setResponseStatusCode(ResponseStatusCode.TARGET_NOT_REACHABLE); + } + } else { + mqttClient.publish(topic, new MqttMessage(payload.getBytes())); + responsePrimitive.setResponseStatusCode(ResponseStatusCode.OK); + } + if(responseSemaphore != null){ + QueueSender.queue(mqttClient, topic, payload.getBytes()); + LOGGER.debug("Waiting for response... (" + MqttConstants.TIME_OUT_DURATION + "s)"); + boolean released = responseSemaphore.getSemaphore().tryAcquire(1, MqttConstants.TIME_OUT_DURATION, TimeUnit.SECONDS); + if(released){ + responsePrimitive = responseSemaphore.getResponsePrimitive(); + fillAndConvertContent(requestPrimitive, responsePrimitive); + LOGGER.debug("Response received: " + responsePrimitive); + } else { + responsePrimitive.setResponseStatusCode(ResponseStatusCode.TARGET_NOT_REACHABLE); + } + } + mqttClient.disconnect(); + mqttClient.close(); + } catch (MqttException e) { + LOGGER.warn("Cannot connect to: " + requestPrimitive.getMqttUri()); + responsePrimitive.setResponseStatusCode(ResponseStatusCode.TARGET_NOT_REACHABLE); + return responsePrimitive; + } catch (InterruptedException e) { + LOGGER.error("Interrupted exception caught in MqttRestClient: " + e.getMessage()); + responsePrimitive.setResponseStatusCode(ResponseStatusCode.TARGET_NOT_REACHABLE); + return responsePrimitive; + } + + return responsePrimitive; + } + + private void fillAndConvertContent(RequestPrimitive requestPrimitive, + ResponsePrimitive responsePrimitive) { + if(responsePrimitive.getPrimitiveContent() != null && + !responsePrimitive.getPrimitiveContent().getAny().isEmpty() && + responsePrimitive.getContent() == null){ + if(requestPrimitive.getReturnContentType().equals(MimeMediaType.OBJ)){ + responsePrimitive.setContent(responsePrimitive.getPrimitiveContent().getAny().get(0)); + } else { + DataMapperService dms = DataMapperRegistry.get(requestPrimitive.getReturnContentType()); + String content = dms.objToString(responsePrimitive.getPrimitiveContent().getAny().get(0)); + responsePrimitive.setContent(content); + responsePrimitive.setContentType(requestPrimitive.getReturnContentType()); + } + } + } + + @Override + public String getProtocol() { + return MqttConstants.PROTOCOL; + } + +} diff --git a/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/DataMapperRegistry.java b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/DataMapperRegistry.java new file mode 100644 index 00000000..18127ab3 --- /dev/null +++ b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/DataMapperRegistry.java @@ -0,0 +1,75 @@ +package org.eclipse.om2m.binding.mqtt.util; + +import java.util.HashMap; +import java.util.Map; + +import org.eclipse.om2m.commons.constants.MimeMediaType; +import org.eclipse.om2m.datamapping.service.DataMapperService; + +/** + * This class is used to store instances of {@link DataMapperService} classes. + * + */ +public class DataMapperRegistry { + + /** Private constructor to avoid creation */ + private DataMapperRegistry(){} + + /** + * Service registry classified by data type handled. + */ + private static Map<String, DataMapperService> serviceRegistery = new HashMap<String, DataMapperService>(); + + /** + * Add a new {@link DataMapperService} to the registery. + * @param dms the service to register + */ + public static void register(DataMapperService dms){ + if(dms != null && dms.getServiceDataType() != null){ + serviceRegistery.put(dms.getServiceDataType(), dms); + } + } + + /** + * Retrieve a {@link DataMapperService} from a data type. + * @param dataType the + * @return the {@link DataMapperService} that handle the data type or null if none + */ + public static DataMapperService get(String dataType){ + return serviceRegistery.get(dataType); + } + + /** + * Remove the {@link DataMapperService} from the registry + * @param dataType the data type of the service to remove + */ + public static void remove(String dataType){ + serviceRegistery.remove(dataType); + } + + /** + * Remove the {@link DataMapperService} from the registry + * @param dms the service to remove from the registry + */ + public static void remove(DataMapperService dms){ + remove(dms.getServiceDataType()); + } + + /** + * Retrieve the {@link DataMapperService} from the registry + * from the MQTT format String + * @param format the format of the DMS + * @return the DMS with the specified format or null + */ + public static DataMapperService getFromMqttFormat(String format){ + switch (format) { + case "xml": + return DataMapperRegistry.get(MimeMediaType.XML); + case "json": + return DataMapperRegistry.get(MimeMediaType.JSON); + default: + return null; + } + } + +} diff --git a/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/MqttConstants.java b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/MqttConstants.java new file mode 100644 index 00000000..24fceff6 --- /dev/null +++ b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/MqttConstants.java @@ -0,0 +1,48 @@ +package org.eclipse.om2m.binding.mqtt.util; + +import java.util.regex.Pattern; + +import org.eclipse.om2m.commons.constants.Constants; + +/** + * A set of MQTT constants retrieved from the System.getProperty method. + * + */ +public final class MqttConstants { + + private MqttConstants(){} + + /** Hostname of the main broker */ + public static final String MQTT_BROKER_HOSTNAME = System.getProperty("org.eclipse.om2m.mqtt.ip", "127.0.0.1"); + + /** IP of the main broker */ + public static final int MQTT_BROKER_PORT = Integer.valueOf(System.getProperty("org.eclipse.om2m.mqtt.port", "1883")); + + /** Username to connect to broker */ + public static final String MQTT_BROKER_USERNAME = System.getProperty("org.eclipse.om2m.mqtt.username"); + + /** Password to connect to broker */ + public static final String MQTT_BROKER_PASSWORD = System.getProperty("org.eclipse.om2m.mqtt.password"); + + /** MQTT Protocol prefix */ + public static final String PROTOCOL = "mqtt"; + + /** Size of the request sender queue */ + public static final int MQTT_QUEUE_SENDER_SIZE = Integer.valueOf(System.getProperty("org.eclipse.om2m.mqtt.queue.size", "8")); + + /** Request pattern to parse the request topic on message reception */ + public static final Pattern REQUEST_PATTERN_IN = Pattern.compile("/oneM2M/req/([^/]+)/" + Constants.CSE_ID + "/(.*)"); + + /** Request pattern when sending a message. */ + public static final Pattern REQUEST_PATTERN_OUT = Pattern.compile("/oneM2M/req/" + Constants.CSE_ID+ "/([^/]+)+/(.*)"); + + /** Time out duration when waiting for a response. Unit in second. */ + public static final long TIME_OUT_DURATION = Long.valueOf(System.getProperty("org.eclipse.om2m.mqtt.timeout", "20")); + + /** MQTT format for XML in topic */ + public static final String MQTT_XML = "xml"; + + /** MQTT format for JSON in topic */ + public static final String MQTT_JSON = "json"; + +} diff --git a/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/QueueSender.java b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/QueueSender.java new file mode 100644 index 00000000..43be0ecb --- /dev/null +++ b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/QueueSender.java @@ -0,0 +1,60 @@ +package org.eclipse.om2m.binding.mqtt.util; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttException; + +public final class QueueSender { + + private static final Log LOGGER = LogFactory.getLog(QueueSender.class); + private static ExecutorService threadPool; + + static { + int queueSize = MqttConstants.MQTT_QUEUE_SENDER_SIZE <= 2 ? 2 + : MqttConstants.MQTT_QUEUE_SENDER_SIZE; + threadPool = new ThreadPoolExecutor(2, queueSize, 1, TimeUnit.MINUTES, + new SynchronousQueue<Runnable>()); + } + + public static void queue(MqttClient mqttClient, String topic, byte[] payload){ + LOGGER.debug("Sending MQTT message to " + mqttClient.getServerURI() + " topic: " + topic); + threadPool.execute(new MqttSender(mqttClient, topic, payload)); + } + + private static class MqttSender implements Runnable { + + private MqttClient mqttClient; + private String topic; + private byte[] payload; + + public MqttSender(MqttClient mqttClient, String topic, byte[] payload) { + super(); + this.mqttClient = mqttClient; + this.topic = topic; + this.payload = payload; + } + + @Override + public void run() { + try { + this.mqttClient.publish(topic, payload, 1, false); + } catch (MqttException e) { + LOGGER.warn("Error publishing on topic: " + this.topic + + " of broker " + this.mqttClient.getServerURI() + + ". Error: " + e.getMessage()); + } + } + + } + + private QueueSender(){ + // Empty and private constructor to avoid class creation + } + +} diff --git a/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/ResponseRegistry.java b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/ResponseRegistry.java new file mode 100644 index 00000000..20c7f246 --- /dev/null +++ b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/ResponseRegistry.java @@ -0,0 +1,99 @@ +package org.eclipse.om2m.binding.mqtt.util; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Semaphore; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.eclipse.om2m.commons.constants.Constants; +import org.eclipse.om2m.commons.constants.MimeMediaType; +import org.eclipse.om2m.commons.resource.ResponsePrimitive; +import org.eclipse.om2m.datamapping.service.DataMapperService; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; + +public final class ResponseRegistry { + + private ResponseRegistry() { + // Empty and private constructor to avoid instantiation of this class + } + + private static final Map<String, ResponseSemaphore> responseMap = new HashMap<String, ResponseSemaphore>(); + + public static ResponseSemaphore createSemaphore(String requestIdentifier, MqttClient mqttClient, + String responseTopic) throws MqttException{ + synchronized (responseMap) { + mqttClient.setCallback(new ResponseCallback()); + mqttClient.subscribe(responseTopic, 1); + if(!responseMap.containsKey(requestIdentifier)){ + ResponseSemaphore respSemaphore = new ResponseSemaphore(new Semaphore(0)); + responseMap.put(requestIdentifier, respSemaphore); + return respSemaphore; + } + return null; + } + } + + private static void responseReceived(ResponsePrimitive responsePrimitive){ + synchronized (responseMap) { + if(responseMap.containsKey(responsePrimitive.getRequestIdentifier())){ + ResponseSemaphore responseSemanphore = responseMap.get(responsePrimitive.getRequestIdentifier()); + responseSemanphore.setResponsePrimitive(responsePrimitive); + responseSemanphore.getSemaphore().release(); + responseMap.remove(responsePrimitive.getRequestIdentifier()); + } + } + } + + private static class ResponseCallback implements MqttCallback { + + private static Pattern responsePattern = Pattern.compile("/oneM2M/resp/([^/]+)/" + Constants.CSE_ID + "/(.*)"); + + @Override + public void connectionLost(Throwable cause) { + // Ignore + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + // Ignore + } + + @Override + public void messageArrived(String topic, MqttMessage message) + throws Exception { + Matcher matcher = responsePattern.matcher(topic); + if(!matcher.matches()){ + return; + } + String format = matcher.group(2); + DataMapperService dms = null; + switch (format) { + case "xml": + dms = DataMapperRegistry.get(MimeMediaType.XML); + break; + case "json": + dms = DataMapperRegistry.get(MimeMediaType.JSON); + break; + default: + break; + } + if(dms == null){ + // The format is not handled here + return; + } + String payload = new String(message.getPayload()); + ResponsePrimitive resp = (ResponsePrimitive) dms.stringToObj(payload); + if(resp == null || resp.getRequestIdentifier() == null){ + return; + } + responseReceived(resp); + } + + } + +} diff --git a/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/ResponseSemaphore.java b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/ResponseSemaphore.java new file mode 100644 index 00000000..7588712d --- /dev/null +++ b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/ResponseSemaphore.java @@ -0,0 +1,32 @@ +package org.eclipse.om2m.binding.mqtt.util;
+
+import java.util.concurrent.Semaphore;
+
+import org.eclipse.om2m.commons.resource.ResponsePrimitive;
+
+public class ResponseSemaphore {
+
+ private Semaphore semaphore;
+ private ResponsePrimitive responsePrimitive;
+
+ public ResponseSemaphore(Semaphore semaphore) {
+ this.semaphore = semaphore;
+ }
+
+ public Semaphore getSemaphore() {
+ return semaphore;
+ }
+
+ public void setSemaphore(Semaphore semaphore) {
+ this.semaphore = semaphore;
+ }
+
+ public ResponsePrimitive getResponsePrimitive() {
+ return responsePrimitive;
+ }
+
+ public void setResponsePrimitive(ResponsePrimitive responsePrimitive) {
+ this.responsePrimitive = responsePrimitive;
+ }
+
+}
diff --git a/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/Utils.java b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/Utils.java new file mode 100644 index 00000000..219fa66b --- /dev/null +++ b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/Utils.java @@ -0,0 +1,19 @@ +package org.eclipse.om2m.binding.mqtt.util; + +import org.eclipse.om2m.commons.resource.ResponsePrimitive; + +public final class Utils { + + public static void fillPrimitiveContent(){ + + } + + public static void fillContent(ResponsePrimitive requestPrimitive){ + if(requestPrimitive.getPrimitiveContent() != null && + !requestPrimitive.getPrimitiveContent().getAny().isEmpty() && + requestPrimitive.getContent() == null){ + requestPrimitive.setContent(requestPrimitive.getPrimitiveContent().getAny().get(0)); + } + } + +} |