diff options
Diffstat (limited to 'paho/mqtt.lua')
-rw-r--r-- | paho/mqtt.lua | 865 |
1 files changed, 865 insertions, 0 deletions
diff --git a/paho/mqtt.lua b/paho/mqtt.lua new file mode 100644 index 0000000..3c20c63 --- /dev/null +++ b/paho/mqtt.lua @@ -0,0 +1,865 @@ +--- +-- @module mqtt_library +-- ~~~~~~~~~~~~~~~~ +-- Version: 0.2 2012-06-01 +-- -------------------------------------------------------------------------- -- +-- Copyright (c) 2011-2012 Geekscape Pty. Ltd. +-- All rights reserved. This program and the accompanying materials +-- are made available under the terms of the Eclipse Public License v1.0 +-- which accompanies this distribution, and is available at +-- http://www.eclipse.org/legal/epl-v10.html +-- +-- Contributors: +-- Andy Gelme - Initial API and implementation +-- -------------------------------------------------------------------------- -- +-- +-- Documentation +-- ~~~~~~~~~~~~~ +-- Paho MQTT Lua website +-- http://eclipse.org/paho/ +-- +-- References +-- ~~~~~~~~~~ +-- MQTT Community +-- http://mqtt.org + +-- MQTT protocol specification 3.1 +-- https://www.ibm.com/developerworks/webservices/library/ws-mqtt +-- http://mqtt.org/wiki/doku.php/mqtt_protocol # Clarifications +-- +-- Notes +-- ~~~~~ +-- - Always assumes MQTT connection "clean session" enabled. +-- - Supports connection last will and testament message. +-- - Does not support connection username and password. +-- - Fixed message header byte 1, only implements the "message type". +-- - Only supports QOS level 0. +-- - Maximum payload length is 268,435,455 bytes (as per specification). +-- - Publish message doesn't support "message identifier". +-- - Subscribe acknowledgement messages don't check granted QOS level. +-- - Outstanding subscribe acknowledgement messages aren't escalated. +-- - Works on the Sony PlayStation Portable (aka Sony PSP) ... +-- See http://en.wikipedia.org/wiki/Lua_Player_HM +-- +-- ToDo +-- ~~~~ +-- * Consider when payload needs to be an array of bytes (not characters). +-- * Maintain both "last_activity_out" and "last_activity_in". +-- * - http://mqtt.org/wiki/doku.php/keepalive_for_the_client +-- * Update "last_activity_in" when messages are received. +-- * When a PINGREQ is sent, must check for a PINGRESP, within KEEP_ALIVE_TIME.. +-- * Otherwise, fail the connection. +-- * When connecting, wait for CONACK, until KEEP_ALIVE_TIME, before failing. +-- * Should MQTT.client:connect() be asynchronous with a callback ? +-- * Review all public APIs for asynchronous callback behaviour. +-- * Implement parse PUBACK message. +-- * Handle failed subscriptions, i.e no subscription acknowledgement received. +-- * Fix problem when KEEP_ALIVE_TIME is short, e.g. mqtt_publish -k 1 +-- MQTT.client:handler(): Message length mismatch +-- - On socket error, optionally try reconnection to MQTT server. +-- - Consider use of assert() and pcall() ? +-- - Only expose public API functions, don't expose internal API functions. +-- - Refactor "if self.connected()" to "self.checkConnected(error_message)". +-- - Maintain and publish messaging statistics. +-- - Memory heap/stack monitoring. +-- - When debugging, why isn't mosquitto sending back CONACK error code ? +-- - Subscription callbacks invoked by topic name (including wildcards). +-- - Implement asynchronous state machine, rather than single-thread waiting. +-- - After CONNECT, expect and wait for a CONACK. +-- - Implement complete MQTT broker (server). +-- - Consider using Copas http://keplerproject.github.com/copas/manual.html +-- ------------------------------------------------------------------------- -- + +function isPsp() return(Socket ~= nil) end + +if (not isPsp()) then + require("socket") + require("io") + require("ltn12") +--require("ssl") +end + +local MQTT = {} + +--- +-- @field [parent = #mqtt_library] utility#utility Utility +-- +MQTT.Utility = require "paho.utility" + +--- +-- @field [parent = #mqtt_library] #number VERSION +-- +MQTT.VERSION = 0x03 + +--- +-- @field [parent = #mqtt_library] #boolean ERROR_TERMINATE +-- +MQTT.ERROR_TERMINATE = false -- Message handler errors terminate process ? + +--- +-- @field [parent = #mqtt_library] #string DEFAULT_BROKER_HOSTNAME +-- +MQTT.DEFAULT_BROKER_HOSTNAME = "m2m.eclipse.org" + +--- +-- An MQTT client +-- @type client + +--- +-- @field [parent = #mqtt_library] #client client +-- +MQTT.client = {} +MQTT.client.__index = MQTT.client + +--- +-- @field [parent = #client] #number DEFAULT_PORT +-- +MQTT.client.DEFAULT_PORT = 1883 + +--- +-- @field [parent = #client] #number KEEP_ALIVE_TIME +-- +MQTT.client.KEEP_ALIVE_TIME = 60 -- seconds (maximum is 65535) + +--- +-- @field [parent = #client] #number MAX_PAYLOAD_LENGTH +-- +MQTT.client.MAX_PAYLOAD_LENGTH = 268435455 -- bytes + +-- MQTT 3.1 Specification: Section 2.1: Fixed header, Message type + +--- +-- @field [parent = #mqtt_library] message +-- +MQTT.message = {} +MQTT.message.TYPE_RESERVED = 0x00 +MQTT.message.TYPE_CONNECT = 0x01 +MQTT.message.TYPE_CONACK = 0x02 +MQTT.message.TYPE_PUBLISH = 0x03 +MQTT.message.TYPE_PUBACK = 0x04 +MQTT.message.TYPE_PUBREC = 0x05 +MQTT.message.TYPE_PUBREL = 0x06 +MQTT.message.TYPE_PUBCOMP = 0x07 +MQTT.message.TYPE_SUBSCRIBE = 0x08 +MQTT.message.TYPE_SUBACK = 0x09 +MQTT.message.TYPE_UNSUBSCRIBE = 0x0a +MQTT.message.TYPE_UNSUBACK = 0x0b +MQTT.message.TYPE_PINGREQ = 0x0c +MQTT.message.TYPE_PINGRESP = 0x0d +MQTT.message.TYPE_DISCONNECT = 0x0e +MQTT.message.TYPE_RESERVED = 0x0f + +-- MQTT 3.1 Specification: Section 3.2: CONACK acknowledge connection errors +-- http://mqtt.org/wiki/doku.php/extended_connack_codes + +MQTT.CONACK = {} +MQTT.CONACK.error_message = { -- CONACK return code used as the index + "Unacceptable protocol version", + "Identifer rejected", + "Server unavailable", + "Bad user name or password", + "Not authorized" +--"Invalid will topic" -- Proposed +} + +-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -- +-- Create an MQTT client instance +-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +--- +-- Create an MQTT client instance. +-- @param #string hostname Host name or address of the MQTT broker +-- @param #number port Port number of the MQTT broker (default: 1883) +-- @param #function callback Invoked when subscribed topic messages received +-- @function [parent = #client] create +-- @return #client created client +-- +function MQTT.client.create( -- Public API + hostname, -- string: Host name or address of the MQTT broker + port, -- integer: Port number of the MQTT broker (default: 1883) + callback) -- function: Invoked when subscribed topic messages received + -- return: mqtt_client table + + local mqtt_client = {} + + setmetatable(mqtt_client, MQTT.client) + + mqtt_client.callback = callback -- function(topic, payload) + mqtt_client.hostname = hostname + mqtt_client.port = port or MQTT.client.DEFAULT_PORT + + mqtt_client.connected = false + mqtt_client.destroyed = false + mqtt_client.last_activity = 0 + mqtt_client.message_id = 0 + mqtt_client.outstanding = {} + mqtt_client.socket_client = nil + + return(mqtt_client) +end + +-------------------------------------------------------------------------------- +-- Specify username and password before #client.connect +-- +-- If called with empty _username_ or _password_, connection flags will be set +-- but no string will be appended to payload. +-- +-- @function [parent = #client] auth +-- @param self +-- @param #string username Name of the user who is connecting. It is recommended +-- that user names are kept to 12 characters. +-- @param #string password Password corresponding to the user who is connecting. +function MQTT.client.auth(self, username, password) + -- When no string is provided, remember current call to set flags + self.username = username or true + self.password = password or true +end + +-------------------------------------------------------------------------------- +-- Transmit MQTT Client request a connection to an MQTT broker (server). +-- MQTT 3.1 Specification: Section 3.1: CONNECT +-- @param self +-- @param #string identifier MQTT client identifier (maximum 23 characters) +-- @param #string will_topic Last will and testament topic +-- @param #string will_qos Last will and testament Quality Of Service +-- @param #string will_retain Last will and testament retention status +-- @param #string will_message Last will and testament message +-- @function [parent = #client] connect +-- +function MQTT.client:connect( -- Public API + identifier, -- string: MQTT client identifier (maximum 23 characters) + will_topic, -- string: Last will and testament topic + will_qos, -- byte: Last will and testament Quality Of Service + will_retain, -- byte: Last will and testament retention status + will_message) -- string: Last will and testament message + -- return: nil or error message + + if (self.connected) then + return("MQTT.client:connect(): Already connected") + end + + MQTT.Utility.debug("MQTT.client:connect(): " .. identifier) + + self.socket_client = socket.connect(self.hostname, self.port) + + if (self.socket_client == nil) then + return("MQTT.client:connect(): Couldn't open MQTT broker connection") + end + + MQTT.Utility.socket_wait_connected(self.socket_client) + + self.connected = true + +-- Construct CONNECT variable header fields (bytes 1 through 9) +-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + local payload + payload = MQTT.client.encode_utf8("MQIsdp") + payload = payload .. string.char(MQTT.VERSION) + +-- Connect flags (byte 10) +-- ~~~~~~~~~~~~~ +-- bit 7: Username flag = 0 -- recommended no more than 12 characters +-- bit 6: Password flag = 0 -- ditto +-- bit 5: Will retain = 0 +-- bits 4,3: Will QOS = 00 +-- bit 2: Will flag = 0 +-- bit 1: Clean session = 1 +-- bit 0: Unused = 0 + + local username = self.username and 0x80 or 0 + local password = self.password and 0x40 or 0 + local flags = username + password + + if (will_topic == nil) then + -- Clean session, no last will + flags = flags + 0x02 + else + flags = flags + MQTT.Utility.shift_left(will_retain, 5) + flags = flags + MQTT.Utility.shift_left(will_qos, 3) + -- Last will and clean session + flags = flags + 0x04 + 0x02 + end + payload = payload .. string.char(flags) + +-- Keep alive timer (bytes 11 LSB and 12 MSB, unit is seconds) +-- ~~~~~~~~~~~~~~~~~ + payload = payload .. string.char(math.floor(MQTT.client.KEEP_ALIVE_TIME / 256)) + payload = payload .. string.char(MQTT.client.KEEP_ALIVE_TIME % 256) + +-- Client identifier +-- ~~~~~~~~~~~~~~~~~ + payload = payload .. MQTT.client.encode_utf8(identifier) + +-- Last will and testament +-- ~~~~~~~~~~~~~~~~~~~~~~~ + if (will_topic ~= nil) then + payload = payload .. MQTT.client.encode_utf8(will_topic) + payload = payload .. MQTT.client.encode_utf8(will_message) + end + + -- Username and password + -- ~~~~~~~~~~~~~~~~~~~~~ + if type(self.username) == 'string' then + payload = payload .. MQTT.client.encode_utf8(self.username) + end + if type(self.password) == 'string' then + payload = payload .. MQTT.client.encode_utf8(self.password) + end + +-- Send MQTT message +-- ~~~~~~~~~~~~~~~~~ + return(self:message_write(MQTT.message.TYPE_CONNECT, payload)) +end + +--- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -- +-- Destroy an MQTT client instance. +-- @param self +-- @function [parent = #client] destroy +-- +function MQTT.client:destroy() -- Public API + MQTT.Utility.debug("MQTT.client:destroy()") + + if (self.destroyed == false) then + self.destroyed = true -- Avoid recursion when message_write() fails + + if (self.connected) then self:disconnect() end + + self.callback = nil + self.outstanding = nil + end +end + +--- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -- +-- Transmit MQTT Disconnect message. +-- MQTT 3.1 Specification: Section 3.14: Disconnect notification +-- bytes 1,2: Fixed message header, see MQTT.client:message_write() +-- @param self +-- @function [parent = #client] disconnect +-- +function MQTT.client:disconnect() -- Public API + MQTT.Utility.debug("MQTT.client:disconnect()") + + if (self.connected) then + self:message_write(MQTT.message.TYPE_DISCONNECT, nil) + self.socket_client:close() + self.connected = false + else + error("MQTT.client:disconnect(): Already disconnected") + end +end + +-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -- +-- Encode a message string using UTF-8 (for variable header) +-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +-- MQTT 3.1 Specification: Section 2.5: MQTT and UTF-8 +-- +-- byte 1: String length MSB +-- byte 2: String length LSB +-- bytes 3-n: String encoded as UTF-8 + +function MQTT.client.encode_utf8( -- Internal API + input) -- string + + local output + output = string.char(math.floor(#input / 256)) + output = output .. string.char(#input % 256) + output = output .. input + + return(output) +end + +--- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -- +-- Handle received messages and maintain keep-alive PING messages. +-- This function must be invoked periodically (more often than the +-- `MQTT.client.KEEP_ALIVE_TIME`) which maintains the connection and +-- services the incoming subscribed topic messages +-- @param self +-- @function [parent = #client] handler +-- +function MQTT.client:handler() -- Public API + if (self.connected == false) then + error("MQTT.client:handler(): Not connected") + end + + MQTT.Utility.debug("MQTT.client:handler()") + +-- Transmit MQTT PING message +-- ~~~~~~~~~~~~~~~~~~~~~~~~~~ +-- MQTT 3.1 Specification: Section 3.13: PING request +-- +-- bytes 1,2: Fixed message header, see MQTT.client:message_write() + + local activity_timeout = self.last_activity + MQTT.client.KEEP_ALIVE_TIME + + if (MQTT.Utility.get_time() > activity_timeout) then + MQTT.Utility.debug("MQTT.client:handler(): PINGREQ") + + self:message_write(MQTT.message.TYPE_PINGREQ, nil) + end + +-- Check for available client socket data +-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + local ready = MQTT.Utility.socket_ready(self.socket_client) + + if (ready) then + local error_message, buffer = + MQTT.Utility.socket_receive(self.socket_client) + + if (error_message ~= nil) then + self:destroy() + error_message = "socket_client:receive(): " .. error_message + MQTT.Utility.debug(error_message) + return(error_message) + end + + if (buffer ~= nil and #buffer > 0) then + local index = 1 + + -- Parse individual messages (each must be at least 2 bytes long) + -- Decode "remaining length" (MQTT v3.1 specification pages 6 and 7) + + while (index < #buffer) do + local message_type_flags = string.byte(buffer, index) + local multiplier = 1 + local remaining_length = 0 + + repeat + index = index + 1 + local digit = string.byte(buffer, index) + remaining_length = remaining_length + ((digit % 128) * multiplier) + multiplier = multiplier * 128 + until digit < 128 -- check continuation bit + + local message = string.sub(buffer, index + 1, index + remaining_length) + + if (#message == remaining_length) then + self:parse_message(message_type_flags, remaining_length, message) + else + MQTT.Utility.debug( + "MQTT.client:handler(): Incorrect remaining length: " .. + remaining_length .. " ~= message length: " .. #message + ) + end + + index = index + remaining_length + 1 + end + + -- Check for any left over bytes, i.e. partial message received + + if (index ~= (#buffer + 1)) then + local error_message = + "MQTT.client:handler(): Partial message received" .. + index .. " ~= " .. (#buffer + 1) + + if (MQTT.ERROR_TERMINATE) then -- TODO: Refactor duplicate code + self:destroy() + error(error_message) + else + MQTT.Utility.debug(error_message) + end + end + end + end + + return(nil) +end + +-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -- +-- Transmit an MQTT message +-- ~~~~~~~~~~~~~~~~~~~~~~~~ +-- MQTT 3.1 Specification: Section 2.1: Fixed header +-- +-- byte 1: Message type and flags (DUP, QOS level, and Retain) fields +-- bytes 2-5: Remaining length field (between one and four bytes long) +-- bytes m- : Optional variable header and payload + +function MQTT.client:message_write( -- Internal API + message_type, -- enumeration + payload) -- string + -- return: nil or error message + +-- TODO: Complete implementation of fixed header byte 1 + + local message = string.char(MQTT.Utility.shift_left(message_type, 4)) + + if (payload == nil) then + message = message .. string.char(0) -- Zero length, no payload + else + if (#payload > MQTT.client.MAX_PAYLOAD_LENGTH) then + return( + "MQTT.client:message_write(): Payload length = " .. #payload .. + " exceeds maximum of " .. MQTT.client.MAX_PAYLOAD_LENGTH + ) + end + + -- Encode "remaining length" (MQTT v3.1 specification pages 6 and 7) + + local remaining_length = #payload + + repeat + local digit = remaining_length % 128 + remaining_length = math.floor(remaining_length / 128) + if (remaining_length > 0) then digit = digit + 128 end -- continuation bit + message = message .. string.char(digit) + until remaining_length == 0 + + message = message .. payload + end + + local status, error_message = self.socket_client:send(message) + + if (status == nil) then + self:destroy() + return("MQTT.client:message_write(): " .. error_message) + end + + self.last_activity = MQTT.Utility.get_time() + return(nil) +end + +-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -- +-- Parse MQTT message +-- ~~~~~~~~~~~~~~~~~~ +-- MQTT 3.1 Specification: Section 2.1: Fixed header +-- +-- byte 1: Message type and flags (DUP, QOS level, and Retain) fields +-- bytes 2-5: Remaining length field (between one and four bytes long) +-- bytes m- : Optional variable header and payload +-- +-- The message type/flags and remaining length are already parsed and +-- removed from the message by the time this function is invoked. +-- Leaving just the optional variable header and payload. + +function MQTT.client:parse_message( -- Internal API + message_type_flags, -- byte + remaining_length, -- integer + message) -- string: Optional variable header and payload + + local message_type = MQTT.Utility.shift_right(message_type_flags, 4) + +-- TODO: MQTT.message.TYPE table should include "parser handler" function. +-- This would nicely collapse the if .. then .. elseif .. end. + + if (message_type == MQTT.message.TYPE_CONACK) then + self:parse_message_conack(message_type_flags, remaining_length, message) + + elseif (message_type == MQTT.message.TYPE_PUBLISH) then + self:parse_message_publish(message_type_flags, remaining_length, message) + + elseif (message_type == MQTT.message.TYPE_PUBACK) then + print("MQTT.client:parse_message(): PUBACK -- UNIMPLEMENTED --") -- TODO + + elseif (message_type == MQTT.message.TYPE_SUBACK) then + self:parse_message_suback(message_type_flags, remaining_length, message) + + elseif (message_type == MQTT.message.TYPE_UNSUBACK) then + self:parse_message_unsuback(message_type_flags, remaining_length, message) + + elseif (message_type == MQTT.message.TYPE_PINGREQ) then + self:ping_response() + + elseif (message_type == MQTT.message.TYPE_PINGRESP) then + self:parse_message_pingresp(message_type_flags, remaining_length, message) + + else + local error_message = + "MQTT.client:parse_message(): Unknown message type: " .. message_type + + if (MQTT.ERROR_TERMINATE) then -- TODO: Refactor duplicate code + self:destroy() + error(error_message) + else + MQTT.Utility.debug(error_message) + end + end +end + +-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -- +-- Parse MQTT CONACK message +-- ~~~~~~~~~~~~~~~~~~~~~~~~~ +-- MQTT 3.1 Specification: Section 3.2: CONACK Acknowledge connection +-- +-- byte 1: Reserved value +-- byte 2: Connect return code, see MQTT.CONACK.error_message[] + +function MQTT.client:parse_message_conack( -- Internal API + message_type_flags, -- byte + remaining_length, -- integer + message) -- string + + local me = "MQTT.client:parse_message_conack()" + MQTT.Utility.debug(me) + + if (remaining_length ~= 2) then + error(me .. ": Invalid remaining length") + end + + local return_code = string.byte(message, 2) + + if (return_code ~= 0) then + local error_message = "Unknown return code" + + if (return_code <= table.getn(MQTT.CONACK.error_message)) then + error_message = MQTT.CONACK.error_message[return_code] + end + + error(me .. ": Connection refused: " .. error_message) + end +end + +-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -- +-- Parse MQTT PINGRESP message +-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~ +-- MQTT 3.1 Specification: Section 3.13: PING response + +function MQTT.client:parse_message_pingresp( -- Internal API + message_type_flags, -- byte + remaining_length, -- integer + message) -- string + + local me = "MQTT.client:parse_message_pingresp()" + MQTT.Utility.debug(me) + + if (remaining_length ~= 0) then + error(me .. ": Invalid remaining length") + end + +-- ToDo: self.ping_response_outstanding = false +end + +-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -- +-- Parse MQTT PUBLISH message +-- ~~~~~~~~~~~~~~~~~~~~~~~~~~ +-- MQTT 3.1 Specification: Section 3.3: Publish message +-- +-- Variable header .. +-- bytes 1- : Topic name and optional Message Identifier (if QOS > 0) +-- bytes m- : Payload + +function MQTT.client:parse_message_publish( -- Internal API + message_type_flags, -- byte + remaining_length, -- integer + message) -- string + + local me = "MQTT.client:parse_message_publish()" + MQTT.Utility.debug(me) + + if (self.callback ~= nil) then + if (remaining_length < 3) then + error(me .. ": Invalid remaining length: " .. remaining_length) + end + + local topic_length = string.byte(message, 1) * 256 + topic_length = topic_length + string.byte(message, 2) + local topic = string.sub(message, 3, topic_length + 2) + local index = topic_length + 3 + +-- Handle optional Message Identifier, for QOS levels 1 and 2 +-- TODO: Enable Subscribe with QOS and deal with PUBACK, etc. + + local qos = MQTT.Utility.shift_right(message_type_flags, 1) % 3 + + if (qos > 0) then + local message_id = string.byte(message, index) * 256 + message_id = message_id + string.byte(message, index + 1) + index = index + 2 + end + + local payload_length = remaining_length - index + 1 + local payload = string.sub(message, index, index + payload_length - 1) + + self.callback(topic, payload) + end +end + +-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -- +-- Parse MQTT SUBACK message +-- ~~~~~~~~~~~~~~~~~~~~~~~~~ +-- MQTT 3.1 Specification: Section 3.9: SUBACK Subscription acknowledgement +-- +-- bytes 1,2: Message Identifier +-- bytes 3- : List of granted QOS for each subscribed topic + +function MQTT.client:parse_message_suback( -- Internal API + message_type_flags, -- byte + remaining_length, -- integer + message) -- string + + local me = "MQTT.client:parse_message_suback()" + MQTT.Utility.debug(me) + + if (remaining_length < 3) then + error(me .. ": Invalid remaining length: " .. remaining_length) + end + + local message_id = string.byte(message, 1) * 256 + string.byte(message, 2) + local outstanding = self.outstanding[message_id] + + if (outstanding == nil) then + error(me .. ": No outstanding message: " .. message_id) + end + + self.outstanding[message_id] = nil + + if (outstanding[1] ~= "subscribe") then + error(me .. ": Outstanding message wasn't SUBSCRIBE") + end + + local topic_count = table.getn(outstanding[2]) + + if (topic_count ~= remaining_length - 2) then + error(me .. ": Didn't received expected number of topics: " .. topic_count) + end +end + +-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -- +-- Parse MQTT UNSUBACK message +-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~ +-- MQTT 3.1 Specification: Section 3.11: UNSUBACK Unsubscription acknowledgement +-- +-- bytes 1,2: Message Identifier + +function MQTT.client:parse_message_unsuback( -- Internal API + message_type_flags, -- byte + remaining_length, -- integer + message) -- string + + local me = "MQTT.client:parse_message_unsuback()" + MQTT.Utility.debug(me) + + if (remaining_length ~= 2) then + error(me .. ": Invalid remaining length") + end + + local message_id = string.byte(message, 1) * 256 + string.byte(message, 2) + + local outstanding = self.outstanding[message_id] + + if (outstanding == nil) then + error(me .. ": No outstanding message: " .. message_id) + end + + self.outstanding[message_id] = nil + + if (outstanding[1] ~= "unsubscribe") then + error(me .. ": Outstanding message wasn't UNSUBSCRIBE") + end +end + +-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -- +-- Transmit MQTT Ping response message +-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +-- MQTT 3.1 Specification: Section 3.13: PING response + +function MQTT.client:ping_response() -- Internal API + MQTT.Utility.debug("MQTT.client:ping_response()") + + if (self.connected == false) then + error("MQTT.client:ping_response(): Not connected") + end + + self:message_write(MQTT.message.TYPE_PINGRESP, nil) +end + +--- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -- +-- Transmit MQTT Publish message. +-- MQTT 3.1 Specification: Section 3.3: Publish message +-- +-- * bytes 1,2: Fixed message header, see MQTT.client:message_write() +-- Variable header .. +-- * bytes 3- : Topic name and optional Message Identifier (if QOS > 0) +-- * bytes m- : Payload +-- @param self +-- @param #string topic +-- @param #string payload +-- @function [parent = #client] publish +-- +function MQTT.client:publish( -- Public API + topic, -- string + payload) -- string + + if (self.connected == false) then + error("MQTT.client:publish(): Not connected") + end + + MQTT.Utility.debug("MQTT.client:publish(): " .. topic) + + local message = MQTT.client.encode_utf8(topic) .. payload + + self:message_write(MQTT.message.TYPE_PUBLISH, message) +end + +--- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -- +-- Transmit MQTT Subscribe message. +-- MQTT 3.1 Specification: Section 3.8: Subscribe to named topics +-- +-- * bytes 1,2: Fixed message header, see MQTT.client:message_write() +-- Variable header .. +-- * bytes 3,4: Message Identifier +-- * bytes 5- : List of topic names and their QOS level +-- @param self +-- @param #string topics table of strings +-- @function [parent = #client] subscribe +-- +function MQTT.client:subscribe( -- Public API + topics) -- table of strings + + if (self.connected == false) then + error("MQTT.client:subscribe(): Not connected") + end + + self.message_id = self.message_id + 1 + + local message + message = string.char(math.floor(self.message_id / 256)) + message = message .. string.char(self.message_id % 256) + + for index, topic in ipairs(topics) do + MQTT.Utility.debug("MQTT.client:subscribe(): " .. topic) + message = message .. MQTT.client.encode_utf8(topic) + message = message .. string.char(0) -- QOS level 0 + end + + self:message_write(MQTT.message.TYPE_SUBSCRIBE, message) + + self.outstanding[self.message_id] = { "subscribe", topics } +end + +--- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -- +-- Transmit MQTT Unsubscribe message +-- MQTT 3.1 Specification: Section 3.10: Unsubscribe from named topics +-- +-- * bytes 1,2: Fixed message header, see MQTT.client:message_write() +-- Variable header .. +-- * bytes 3,4: Message Identifier +-- * bytes 5- : List of topic names +-- @param self +-- @param #string topics table of strings +-- @function [parent = #client] unsubscribe +-- +function MQTT.client:unsubscribe( -- Public API + topics) -- table of strings + + if (self.connected == false) then + error("MQTT.client:unsubscribe(): Not connected") + end + + self.message_id = self.message_id + 1 + + local message + message = string.char(math.floor(self.message_id / 256)) + message = message .. string.char(self.message_id % 256) + + for index, topic in ipairs(topics) do + MQTT.Utility.debug("MQTT.client:unsubscribe(): " .. topic) + message = message .. MQTT.client.encode_utf8(topic) + end + + self:message_write(MQTT.message.TYPE_UNSUBSCRIBE, message) + + self.outstanding[self.message_id] = { "unsubscribe", topics } +end + +-- For ... MQTT = require 'paho.mqtt' + +return(MQTT) |