Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'apps/MQTTSN-C-Client/src/mqttsn/mqtts-core.c')
-rw-r--r--apps/MQTTSN-C-Client/src/mqttsn/mqtts-core.c1407
1 files changed, 1407 insertions, 0 deletions
diff --git a/apps/MQTTSN-C-Client/src/mqttsn/mqtts-core.c b/apps/MQTTSN-C-Client/src/mqttsn/mqtts-core.c
new file mode 100644
index 0000000..dc1594c
--- /dev/null
+++ b/apps/MQTTSN-C-Client/src/mqttsn/mqtts-core.c
@@ -0,0 +1,1407 @@
+/*******************************************************************************
+ * Copyright (c) 2008, 2013 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ * Ian Craggs - initial API and implementation and/or initial documentation
+ *******************************************************************************/
+
+/**
+ *
+ * Description : MQTT-SN client core implementation
+ *
+ *
+ */
+
+
+#include <stdlib.h>
+#include <string.h>
+
+#include "mqtts_api.h"
+#include "gp_api.h"
+
+#include "mqtts-timer.h"
+
+
+
+/**
+ * MQTT-S Message types */
+#define ADVERTISE 0x00
+#define SEARCHGW 0x01
+#define GWINFO 0x02
+
+#define CONNECT 0x04
+#define CONNACK 0x05
+#define WILLTOPICREQ 0x06
+#define WILLTOPIC 0x07
+#define WILLMSGREQ 0x08
+#define WILLMSG 0x09
+#define REGISTER 0x0A
+#define REGACK 0x0B
+#define PUBLISH 0x0C
+#define PUBACK 0x0D
+#define PUBCOMP 0x0E
+#define PUBREC 0x0F
+#define PUBREL 0x10
+
+#define SUBSCRIBE 0x12
+#define SUBACK 0x13
+#define UNSUBSCRIBE 0x14
+#define UNSUBACK 0x15
+#define PINGREQ 0x16
+#define PINGRESP 0x17
+#define DISCONNECT 0x18
+
+#define WILLTOPICUPD 0x1A
+#define WILLTOPICRESP 0x1B
+#define WILLMSGUPD 0x1C
+#define WILLMSGRESP 0x1D
+
+
+/**
+ * Flags binary masks */
+#define EMPTY_MASK 0x00 /* binary : 0000 0000 */
+#define DUP_FLAG_MASK 0x80 /* binary : 1000 0000 */
+#define QOS_MIN1_FLAG_MASK 0x60 /* binary : 0110 0000 */
+#define QOS2_FLAG_MASK 0x40 /* binary : 0100 0000 */
+#define QOS1_FLAG_MASK 0x20 /* binary : 0010 0000 */
+#define QOS0_FLAG_MASK 0x00 /* binary : 0000 0000 */
+#define RETAIN_FLAG_MASK 0x10 /* binary : 0001 0000 */
+#define WILL_FLAG_MASK 0x08 /* binary : 0000 1000 */
+#define CLEANSS_FLAG_MASK 0x04 /* binary : 0000 0100 */
+#define TOPICSHORT_FLAG_MASK 0x02 /* binary : 0000 0010 */
+#define TOPICIDPRE_FLAG_MASK 0x01 /* binary : 0000 0001 */
+#define TOPICID_FLAG_MASK 0x00 /* binary : 0000 0000 */
+
+#define GET_DUP_FLAG_MASK 0x80 /* binary : 1000 0000 */
+#define GET_QOS_FLAG_MASK 0x60 /* binary : 0110 0000 */
+
+
+/**
+ * mqtts client's state */
+static unsigned char mqtts_state = MQTTS_STATE_NOT_ACTIVE;
+/**
+ * MsgIg: incremented after having sent a message */
+static unsigned char msgId0=0;
+static unsigned char msgId1=0;
+
+static unsigned char myGwAddr[MQTTS_MAX_NETWORK_ADDRESS_LENGTH]={NULL};
+static unsigned char myGwAddrLength=0;
+static unsigned char myGwId=0;
+
+static unsigned char isConnected=0;
+static unsigned char lostGw=0;
+
+static unsigned char broadcastRadius;
+/* backupMsg: contains msg which will be retransmitted in case of ACK time-out */
+static unsigned char backupMsg[MQTTS_MAX_MSG_SIZE];
+
+static mqtts_CONNECT_Parms* conn_parms;
+
+#define HEADER_LENGTH 2
+/*
+ * macros
+ */
+#define msg_new(size) unsigned char msg[(size)]
+#define msg_set_length(len) *((msg)+0)=(len) /*msg[0]=len*/
+#define msg_get_length(msg) *((msg)+0)
+#define msg_set_type(type) *((msg)+1)=(type) /*msg[1]=type*/
+
+
+/*************************
+ * functions prototypes
+ *************************/
+static void mqtts_connecting(void);
+static void mqtts_willtopic(void);
+static void mqtts_willmsg(void);
+static void mqtts_regack(unsigned char topicId_1, unsigned char topicId_2,
+ unsigned char msgId1, unsigned char msgId2, unsigned char retCode );
+static void mqtts_puback(unsigned char topicID_1, unsigned char topicID_2,
+ unsigned char msgID_1, unsigned char msgID_2, unsigned char retCode );
+static void mqtts_pingresp(void);
+static void mqtts_pubrel(unsigned char msgID_1, unsigned char msgID_2 );
+static void gwinfo_received(unsigned char *msg, unsigned char *sender,
+ unsigned char sender_len);
+
+/****************************************************
+ * stack internal functions
+ ****************************************************/
+
+/**
+ * called when we lose a gw, e.g. after multiple ack time-outs */
+void lost_gw(void) {
+ myGwAddrLength=0;
+ mqtts_timer_stop_keep_alive();
+
+ /* if we have sent a DISC to the gw then we could go state WAITING_CONNECT
+ * otherwise we have start searching for gw */
+ if (mqtts_state == MQTTS_STATE_DISCONNECTING) {
+ mqtts_state = MQTTS_STATE_WAITING_CONNECT;
+ } else {
+ lostGw= 1;
+ mqtts_state = MQTTS_STATE_SEARCHING_GW; /* start searching for a new gw */
+ mqtts_timer_start_wait_searchgw(); /* delay sending out SEARCHGW */
+ }
+ mqttscb_disconnected(MQTTS_LOST_GATEWAY);
+}
+
+/**
+ * ack received in state WAITING_ACK */
+void ack_rx(void) {
+ mqtts_timer_stop_ack();
+ mqtts_state=MQTTS_STATE_READY;
+ if (lostGw == 1) {
+ mqttscb_connected();
+ lostGw= 0;
+ }
+}
+
+/**
+ * backup msg for later retransmission */
+void backup_msg(unsigned char *msg) {
+ unsigned char i;
+ for (i=0;i<(msg_get_length(msg));i++) {
+ backupMsg[i]= msg[i];
+ }
+}
+/**
+ * send backup message */
+void send_backupMsg(void) {
+ mqtts_timer_start_keep_alive();
+ mqtts_timer_start_ack();
+ gp_network_msg_send(backupMsg,myGwAddr,myGwAddrLength);
+}
+
+/* Set the MsgId field of the message. The MsgId is simply incremented for any
+ * message sent
+ *
+ * Input:
+ * msgId pointer to first byte of the MsgId field in the msg
+ * Returns:
+ * none
+ **/
+static void set_msg_id(unsigned char* msgId) {
+ /* msgId0 is msb, and msgId1 is lsb */
+ if (msgId1==255) {
+ msgId1=0;
+ if (msgId0==255) msgId0=0;
+ else msgId0++;
+ } else msgId1++;
+ msgId[0]= msgId0;
+ msgId[1]= msgId1;
+}
+
+
+/*************************************************************************
+ * Handle messages received
+ * to avoid race conditions and simplify the client implementation
+ * the client will respond to all gw's requests without making any check!
+ *************************************************************************/
+
+static void handleSEARCHGW(void) {
+ switch (mqtts_state) {
+ case MQTTS_STATE_SEARCHING_GW:
+ /* we are searching for a gw and receive a SEARCHGW before
+ * we send SEARCHGW => we re-schedule the sending of our SEARCHGW */
+ mqtts_timer_start_wait_searchgw();
+ break;
+ /*only reply with GWINFO if we are connected to a gw*/
+ case MQTTS_STATE_READY:
+ /* delay before sending GWINFO to give priority to gw */
+ mqtts_timer_start_wait_gwinfo();
+ break;
+ default:
+ break;
+ }
+}
+
+static void handleCONNACK(unsigned char* msg) {
+ switch (mqtts_state) {
+ /* only accept CNNACK if we are in CONNECTING_TO_GW*/
+ case MQTTS_STATE_CONNECTING_TO_GW:
+ if (gp_byte_get(msg,3) == MQTTS_RET_ACCEPTED) { /* check return code */
+ mqtts_timer_stop_ack();
+ mqtts_state=MQTTS_STATE_READY;
+ isConnected= 1;
+ lostGw= 0;
+ mqttscb_connected();
+ /* set flagCleanSession and flagWill to 0 so that in case we
+ * lose the gw we can re-connect without requesting the app
+ * to re-do the subscriptions and the will stuffs */
+ //conn_parms->flagCleanSession=0;
+ /* TODO the new Will feature is not yet supported by the broker */
+ //conn_parms->flagWill=0;
+ } else {
+ /* conn is rejected => either we look for another gw, or we give
+ * the rejection reason to the app and let it decide what to do
+ * current decision: let app decide */
+ mqtts_state = MQTTS_STATE_WAITING_CONNECT;
+ mqtts_timer_stop_ack();
+ mqtts_timer_stop_keep_alive();
+ mqttscb_disconnected(gp_byte_get(msg,3));
+ }
+ break;
+ default:
+ break;
+ }
+}
+
+
+static void handleWILLTOPICREQ(void) {
+ mqtts_timer_stop_ack();
+ mqtts_willtopic();
+}
+
+static void handleWILLMSGREQ(void) {
+ mqtts_timer_stop_ack();
+ mqtts_willmsg();
+}
+
+static void handleREGISTER(unsigned char* msg) {
+ unsigned char i;
+ unsigned char topic[MQTTS_MAX_MSG_SIZE];
+ unsigned char topic_len;
+ unsigned char msg_len;
+
+ msg_len = gp_byte_get(msg,1);
+ topic_len = 0;
+
+ /* ignore message if not sent by our gw */
+ /* gw = mqtts_conn_get_gw();
+ if (memcmp(sender,gw->address, gw->address_len)!=0) return; */
+
+ /* copy topic */
+ for (i=0;i<(msg_len-6);i++) {
+ topic[i] = gp_byte_get(msg,7+i);
+ topic_len++;
+ }
+
+ /* send regack to gw */
+ mqtts_regack(
+ gp_byte_get(msg,3), gp_byte_get(msg,4), /* topic id*/
+ gp_byte_get(msg,5), gp_byte_get(msg,6), /* msg id*/
+ MQTTS_RET_ACCEPTED); /*return code*/
+ /* inform app */
+ mqttscb_register_received(
+ gp_byte_get(msg,3), gp_byte_get(msg,4),
+ topic, topic_len);
+}
+
+static void handleREGACK(unsigned char* msg) {
+
+ /*gw = mqtts_conn_get_gw();*/
+ /*if (memcmp(sender,gw->address, gw->address_len)!=0) return;*/
+
+ switch (mqtts_state) {
+ case MQTTS_STATE_WAITING_ACK:
+ /* check MsgId */
+ if ((gp_byte_get(msg,5)!=msgId0) || (gp_byte_get(msg,6)!=msgId1)) return;
+
+ ack_rx();
+ mqttscb_regack_received(
+ gp_byte_get(msg,3), gp_byte_get(msg,4), /* Topic ID */
+ gp_byte_get(msg,7)); /* Return code */
+
+ break;
+ default:
+ break;
+ }
+}
+
+static void handlePUBLISH(unsigned char* msg) {
+ unsigned char i;
+ unsigned char data[MQTTS_MAX_MSG_SIZE];
+ unsigned char data_len= 0;
+ unsigned char msg_len= gp_byte_get(msg,1);
+
+ /* ignore message if not sent by our gw */
+ /*gw = mqtts_conn_get_gw(); */
+ /*if (memcmp(sender,gw->address, gw->address_len)!=0) return;*/
+
+ /* copy data*/
+ for (i=0;i<(msg_len-7);i++) {
+ data[i]=gp_byte_get(msg,8+i);
+ data_len++;
+ }
+ /* i is now return code from app */
+ i= mqttscb_publish_received(
+ ((gp_byte_get(msg,3) & GET_DUP_FLAG_MASK) >> 7),
+ ((gp_byte_get(msg,3) & GET_QOS_FLAG_MASK) >> 5),
+ gp_byte_get(msg,4), gp_byte_get(msg,5), /* Topic id */
+ data, data_len);
+ /* send puback if QoS = 1 or retCode = MQTTS_RET_INVALID_TOPIC_ID */
+ if (((gp_byte_get(msg,3) & GET_QOS_FLAG_MASK)>> 5)==1 ||
+ (i == MQTTS_RET_INVALID_TOPIC_ID)) {
+ mqtts_puback(gp_byte_get(msg,4), gp_byte_get(msg,5), /* Topic ID */
+ gp_byte_get(msg,6), gp_byte_get(msg,7), /* Msg Id */
+ i); /* return code */
+ }
+ /* TODO QoS=2 not implemented yet */
+
+}
+
+static void handlePUBACK(unsigned char* msg) {
+ switch (mqtts_state) {
+ case MQTTS_STATE_WAITING_ACK:
+ /*gw = mqtts_conn_get_gw();*/
+ /*if (memcmp(sender,gw->address, gw->address_len)!=0) return;*/
+
+ // check msgId
+ if ((gp_byte_get(msg,5)!=msgId0) || (gp_byte_get(msg,6)!=msgId1)) return;
+
+ ack_rx();
+ // inform app
+ mqttscb_puback_received(gp_byte_get(msg,3), gp_byte_get(msg,4), /* topicId */
+ gp_byte_get(msg,7)); /* Return code */
+ break;
+ default:
+ /*inform app in any case if return code != MQTTS_RET_ACCEPTED*/
+ if (gp_byte_get(msg,7)!=MQTTS_RET_ACCEPTED){
+ mqttscb_puback_received(gp_byte_get(msg,3), gp_byte_get(msg,4), /* topicId */
+ gp_byte_get(msg,7)); /* Return code */
+ }
+ break;
+ }
+}
+
+static void handleSUBACK(unsigned char* msg) {
+
+ switch (mqtts_state) {
+ case MQTTS_STATE_WAITING_ACK:
+ /*gw = mqtts_conn_get_gw();*/
+ /*if (memcmp(sender,gw->address, gw->address_len)!=0) return;*/
+ if ((gp_byte_get(msg,6)!=msgId0) || (gp_byte_get(msg,7)!=msgId1)) return;
+
+ ack_rx();
+ mqttscb_suback_received(
+ ((gp_byte_get(msg,3) & GET_QOS_FLAG_MASK) >> 5),
+ gp_byte_get(msg,4), gp_byte_get(msg,5), /* Topic Id */
+ gp_byte_get(msg,6)); /* Return code */
+ break;
+ default:
+ break;
+ }
+}
+
+static void handleUNSUBACK(unsigned char* msg) {
+ switch (mqtts_state) {
+ case MQTTS_STATE_WAITING_ACK:
+ /*gw = mqtts_conn_get_gw();*/
+ /*if (memcmp(sender,gw->address, gw->address_len)!=0) return;*/
+
+ if ((gp_byte_get(msg,3)!=msgId0) || (gp_byte_get(msg,4)!=msgId1)) return;
+
+ ack_rx();
+ mqttscb_unsuback_received();
+ break;
+ default:
+ break;
+ }
+}
+
+/* received a PINGREQ from gw; answer it with PINGRESP */
+static void handlePINGREQ(void) {
+ mqtts_pingresp();
+}
+
+static void handlePINGRESP(void) {
+ switch (mqtts_state) {
+ case MQTTS_STATE_WAITING_ACK:
+ /*gw = mqtts_conn_get_gw();*/
+ /*if (memcmp(sender,gw->address, gw->address_len)!=0) return;*/
+ ack_rx();
+ break;
+ default:
+ break;
+ }
+}
+
+static void handleDISCONNECT(void) {
+ mqtts_timer_stop_ack();
+ mqtts_timer_stop_keep_alive();
+ isConnected= 0;
+ if (mqtts_state == MQTTS_STATE_DISCONNECTING) {
+ mqtts_state = MQTTS_STATE_WAITING_CONNECT;
+ mqttscb_disconnected(MQTTS_OK);
+ } else {
+ /* ops , gw does not know me any more, try to re-CONNECT */
+ mqtts_state = MQTTS_STATE_CONNECTING_TO_GW;
+ mqtts_connecting();
+ }
+}
+
+static void handlePUBREC(unsigned char* msg) {
+ switch (mqtts_state) {
+ case MQTTS_STATE_WAITING_ACK:
+ /*gw = mqtts_conn_get_gw();
+ if (memcmp(sender,gw->address, gw->address_len)!=0) return;*/
+ mqtts_timer_stop_ack();
+ mqtts_pubrel(gp_byte_get(msg,3), gp_byte_get(msg,4)); /* Msg id */
+ /* WARNING: stack is not ready here because pubrel waits for pubcomp */
+ break;
+ default:
+ break;
+ }
+}
+
+
+static void handlePUBCOMP(void) {
+ switch (mqtts_state) {
+ case MQTTS_STATE_WAITING_ACK:
+ ack_rx();
+ mqttscb_pubcomp_received();
+ break;
+ default:
+ break;
+ }
+}
+
+static void handleWILLTOPICRESP(void) {
+ switch (mqtts_state) {
+ case MQTTS_STATE_WAITING_ACK:
+ /*gw = mqtts_conn_get_gw();*/
+ /*if (memcmp(sender,gw->address, gw->address_len)!=0) return;*/
+ ack_rx();
+ mqttscb_willtopicresp_received();
+ break;
+ default:
+ break;
+ }
+}
+
+static void handleWILLMSGRESP(void) {
+ switch (mqtts_state) {
+ case MQTTS_STATE_WAITING_ACK:
+ /*gw = mqtts_conn_get_gw();*/
+ /*if (memcmp(sender,gw->address, gw->address_len)!=0) return;*/
+ ack_rx();
+ mqttscb_willmsgresp_received();
+ break;
+ default:
+ break;
+ }
+}
+
+static void gwFound(void) {
+ mqtts_timer_stop_wait_searchgw();
+ if (isConnected == 0) {
+ mqtts_state = MQTTS_STATE_CONNECTING_TO_GW;
+ mqtts_connecting();
+ } else {
+ /* we have lost the gw and find now one again
+ * we resend the last message without reconnecting */
+ mqtts_state = MQTTS_STATE_WAITING_ACK;
+ send_backupMsg();
+ }
+}
+
+static void handleADVERTISE(unsigned char* msg, unsigned char* sender,
+ unsigned char sender_len) {
+
+ mqtts_timer_stop_wait_gwinfo(); /* cancel send gwinfo */
+ switch (mqtts_state) {
+ case MQTTS_STATE_SEARCHING_GW:
+#if MQTTS_DEBUG
+ gp_debug((unsigned char*)"adv ",4);
+#endif
+ memcpy(myGwAddr,sender,sender_len);
+ myGwAddrLength= sender_len;
+ myGwId=gp_byte_get(msg,3);
+ gwFound();
+ break;
+ default:
+ /* ignore ADVERTISE if we are not searching for GW */
+ break;
+ }
+}
+
+static void handleGWINFO(unsigned char* msg, unsigned char* sender,
+ unsigned char sender_len) {
+
+ /* we receive a GWINFO before sending our GWINFO */
+ /* => we don't send our GWINFO */
+ mqtts_timer_stop_wait_gwinfo();
+
+ switch (mqtts_state) {
+ case MQTTS_STATE_SEARCHING_GW:
+#if MQTTS_DEBUG
+ gp_debug((unsigned char*)"gwi ",4);
+#endif
+ gwinfo_received(msg,sender,sender_len);
+ gwFound();
+ break;
+ default:
+ /* ignore GWINFO if we are not searching for GW */
+ break;
+ }
+}
+
+/********************************************************************/
+
+/***
+ * send a SEARCHGW message */
+void mqtts_searchgw(void) {
+ unsigned char msg[3]= {3,SEARCHGW,MQTTS_SEARCHGW_BROADCAST_RADIUS};
+
+ /* broadcast message and restart timer */
+ mqtts_timer_start_wait_searchgw();
+ gp_network_msg_broadcast(msg,MQTTS_SEARCHGW_BROADCAST_RADIUS);
+#if MQTTS_DEBUG
+ gp_debug((unsigned char*)"s_gw ",5);
+#endif
+}
+
+/***
+ * send a GWINFO message */
+void mqtts_gwinfo(void) {
+ unsigned char msg[MQTTS_MAX_NETWORK_ADDRESS_LENGTH + HEADER_LENGTH + 1];
+ unsigned char i;
+
+ msg_set_type(GWINFO);
+ msg[HEADER_LENGTH]= myGwId;
+ msg_set_length (HEADER_LENGTH + 1 + myGwAddrLength);
+ for (i=0;i<myGwAddrLength;i++) {
+ msg[HEADER_LENGTH+1+i]=myGwAddr[i];
+ }
+
+ /* broadcast message  */
+ gp_network_msg_broadcast(msg,broadcastRadius);
+ /* Note: GWINFO is broadcasted with the same radius as received in SEARCHGW
+ * while SEARCHGW itself is broacasted with radius=MQTTS_SEARCHGW_BROADCAST_RADIUS
+ */
+}
+
+
+
+/***
+ * send a PUBACK message */
+static void mqtts_puback(unsigned char topicID_1, unsigned char topicID_2,
+ unsigned char msgID_1, unsigned char msgID_2, unsigned char retCode ) {
+
+ unsigned char msg[HEADER_LENGTH+5];
+
+ msg_set_type(PUBACK);
+ /* (Header size) + 2 (Topic ID) + 2 (Message ID) + 1 (Return code) */
+ msg_set_length(HEADER_LENGTH+5);
+
+ /* Fill fixed length parameters */
+ msg[HEADER_LENGTH]=topicID_1;
+ msg[HEADER_LENGTH+1]=topicID_2;
+ msg[HEADER_LENGTH+2]=msgID_1;
+ msg[HEADER_LENGTH+3]=msgID_2;
+ msg[HEADER_LENGTH+4]=retCode;
+
+ /* start keep alive timer and send the message*/
+ mqtts_timer_start_keep_alive();
+ gp_network_msg_send(msg,myGwAddr,myGwAddrLength);
+}
+
+/***
+ * send a REGACK message */
+static void mqtts_regack(unsigned char topicId_1, unsigned char topicId_2,
+ unsigned char msgId1, unsigned char msgId2, unsigned char retCode ) {
+
+ unsigned char msg[HEADER_LENGTH+5];
+
+ msg_set_type(REGACK);
+ /* (Header size) + 2 (topic id) + 2 (Message ID) + 1 (Return code) */
+ msg_set_length(HEADER_LENGTH+5);
+
+ /* Fill fixed length parameters */
+ msg[HEADER_LENGTH]=topicId_1;
+ msg[HEADER_LENGTH+1]=topicId_2;
+ msg[HEADER_LENGTH+2]=msgId1;
+ msg[HEADER_LENGTH+3]=msgId2;
+ msg[HEADER_LENGTH+4]=retCode;
+
+ /* start keep alive timer and send the message*/
+ mqtts_timer_start_keep_alive();
+ gp_network_msg_send(msg,myGwAddr,myGwAddrLength);
+}
+
+
+/***
+ * send a PUBREL message */
+static void mqtts_pubrel(unsigned char msgID_1, unsigned char msgID_2 ) {
+
+ unsigned char msg[HEADER_LENGTH+2];
+
+ /* Fill the header */
+ msg_set_type(PUBREL);
+ /*(Header size) + 2 (Message ID) */
+ msg_set_length(HEADER_LENGTH+2);
+
+ /* Fill fixed length parameters */
+ msg[HEADER_LENGTH]=msgID_1;
+ msg[HEADER_LENGTH+1]=msgID_2;
+
+ mqtts_timer_start_ack();
+ mqtts_timer_start_keep_alive();
+
+ /* we are waiting for PUBCOMP */
+ /* TODO What happens if we do not rx a PUBCOMP ? Retransmit PUBREL? */
+ /* backup the message for the ack*/
+ backup_msg(msg);
+ mqtts_state=MQTTS_STATE_WAITING_ACK;
+
+ gp_network_msg_send(msg,myGwAddr,myGwAddrLength);
+}
+
+
+
+/***
+ * send an PINGRESP message */
+static void mqtts_pingresp(void) {
+ unsigned char msg[2]={2,PINGRESP};
+ mqtts_timer_start_keep_alive();
+ gp_network_msg_send(msg,myGwAddr,myGwAddrLength);
+}
+
+/**
+ * send an PINGREQ message */
+void mqtts_pingreq(void) {
+ unsigned char msg[2]={2,PINGREQ};
+ /* backup the message for the ack*/
+ backup_msg(msg);
+ /* start timers */
+ mqtts_timer_start_ack();
+ mqtts_timer_start_keep_alive();
+ mqtts_state=MQTTS_STATE_WAITING_ACK;
+ gp_network_msg_send(msg,myGwAddr,myGwAddrLength);
+}
+
+/**
+ send an WILLMSG message
+ */
+static void mqtts_willmsg(void) {
+ unsigned char msg[MQTTS_MAX_MSG_SIZE];
+ unsigned char i;
+
+ /* Fill the header */
+ msg_set_type(WILLMSG);
+
+ /* Length = Header size + WillMsg */
+ msg_set_length(HEADER_LENGTH + conn_parms->vlpWillMsg_length);
+
+ /* then the variable parameters */
+ for (i=0; i<conn_parms->vlpWillMsg_length; i++){
+ msg[HEADER_LENGTH+i]= conn_parms->vlpWillMsg[i];
+ }
+
+ /* backup the message for the ack*/
+ backup_msg(msg);
+
+ mqtts_timer_start_ack();
+ mqtts_timer_start_keep_alive();
+ gp_network_msg_send(msg,myGwAddr,myGwAddrLength);
+}
+
+/**
+ send an WILLTOPIC message
+ */
+static void mqtts_willtopic(void) {
+
+ unsigned char msg[MQTTS_MAX_MSG_SIZE];
+ unsigned char i;
+
+ /* Fill the header */
+ msg_set_type(WILLTOPIC);
+ /* Length = Header size + Flags + WillTopic */
+ msg_set_length(HEADER_LENGTH + 1 + conn_parms->vlpWillTopic_length);
+
+ /* First the flags */
+ msg[HEADER_LENGTH] = EMPTY_MASK;
+
+ switch (conn_parms->flagWillQOS) {
+ case 2:
+ msg[HEADER_LENGTH] |= QOS2_FLAG_MASK;
+ break;
+
+ case 1:
+ msg[HEADER_LENGTH] |= QOS1_FLAG_MASK;
+ break;
+
+ default:
+ msg[HEADER_LENGTH] |= QOS0_FLAG_MASK;
+ break;
+ }
+
+ switch (conn_parms->flagWillRetain) {
+ case 1:
+ msg[HEADER_LENGTH] |= RETAIN_FLAG_MASK;
+ break;
+
+ default:
+ break;
+ }
+
+ /* then the variable parameters */
+ for (i=0; i<conn_parms->vlpWillTopic_length; i++)
+ {
+ msg[HEADER_LENGTH+1+i]=conn_parms->vlpWillTopic[i];
+ }
+
+ /* backup the message for the ack*/
+ backup_msg(msg);
+
+ mqtts_timer_start_ack();
+ mqtts_timer_start_keep_alive();
+ gp_network_msg_send(msg,myGwAddr,myGwAddrLength);
+}
+
+/**
+ send CONNECT message
+ */
+
+static void mqtts_connecting(void) {
+ unsigned char msg[MQTTS_MAX_MSG_SIZE];
+ unsigned char i;
+
+ /* set the value of the keep_alive timer */
+ mqtts_timer_set_keep_alive_time(conn_parms->flpDuration);
+
+ /* Fill the header */
+ msg_set_type(CONNECT);
+ msg_set_length(HEADER_LENGTH + 4 + conn_parms->vlpClientID_length);
+
+ /* Fill the message parameters */
+
+ /* First the flags */
+ msg[HEADER_LENGTH] = EMPTY_MASK;
+
+ switch (conn_parms->flagWill) {
+ case 1:
+ msg[HEADER_LENGTH] |= WILL_FLAG_MASK;
+ break;
+
+ default:
+ break;
+ }
+
+ switch (conn_parms->flagCleanSession)
+ {
+ case 1:
+ msg[HEADER_LENGTH] |= CLEANSS_FLAG_MASK;
+ break;
+
+ default:
+ break;
+ }
+
+ /* then the fixed parameters */
+ msg[HEADER_LENGTH+1] = conn_parms->flpProtocolID;
+ msg[HEADER_LENGTH+2] = conn_parms->flpDuration[0];
+ msg[HEADER_LENGTH+3] = conn_parms->flpDuration[1];
+
+
+ /* then the variable parameters */
+ for (i=0; i<conn_parms->vlpClientID_length; i++) {
+ msg[HEADER_LENGTH + 4 + i]= conn_parms->vlpClientID[i];
+ }
+
+
+ /* backup the message for the ack */
+ backup_msg(msg);
+
+ mqtts_timer_start_ack();
+ mqtts_timer_start_keep_alive();
+ gp_network_msg_send(msg,myGwAddr,myGwAddrLength);
+ /* inform app about CONNECT sent */
+ mqttscb_connect_sent();
+ return;
+}
+
+/*************************************************/
+/** **/
+/** application API Implementation **/
+/** **/
+/*************************************************/
+
+
+void mqtts_startStack (void) {
+ if (mqtts_state != MQTTS_STATE_NOT_ACTIVE) return;
+
+ mqtts_state = MQTTS_STATE_WAITING_CONNECT;
+ mqtts_timer_init();
+
+#if MQTTS_DEBUG
+ gp_debug((unsigned char *)"MQTT-S client v.",16);
+ gp_debug((unsigned char *)MQTTS_VERSION,4);
+ gp_debug((unsigned char *)", ",2);
+#endif
+}
+
+
+void mqtts_stopStack(void) {
+ /* reset all stack variables and stop/delete all timers */
+ mqtts_state = MQTTS_STATE_NOT_ACTIVE;
+ isConnected= 0;
+ lostGw= 0;
+ msgId0=msgId1= 0;
+ mqtts_timer_end();
+
+}
+
+
+unsigned char mqtts_connect(mqtts_CONNECT_Parms *pParms) {
+
+ if ((mqtts_state != MQTTS_STATE_WAITING_CONNECT)) {
+ return MQTTS_ERR_STACK_NOT_READY;
+ }
+
+ /* (Header size) + 4 ( 1 (Flags) + 1 (ProtocolId) + 2 (Duration)) */
+ if ( (pParms->vlpClientID_length) >
+ (MQTTS_MAX_MSG_SIZE - (HEADER_LENGTH + 4)) )
+ {
+ return MQTTS_ERR_DATA_TOO_LONG;
+ }
+
+ conn_parms = pParms;
+
+ if (myGwAddrLength == 0) { /* no gw available yet */
+ mqtts_state = MQTTS_STATE_SEARCHING_GW;
+ mqtts_timer_start_wait_searchgw();
+ } else {
+ mqtts_state = MQTTS_STATE_CONNECTING_TO_GW;
+ mqtts_connecting();
+ }
+
+ return MQTTS_OK;
+}
+
+
+unsigned char mqtts_disconnect(void) {
+ unsigned char msg[2]={2,DISCONNECT};
+
+ switch (mqtts_state) {
+ case MQTTS_STATE_NOT_ACTIVE:
+ case MQTTS_STATE_WAITING_CONNECT:
+ return MQTTS_ERR_STACK_NOT_READY;
+ case MQTTS_STATE_CONNECTING_TO_GW:
+ case MQTTS_STATE_SEARCHING_GW:
+ /* no need for sending a DISC since we are not connected */
+ mqtts_state = MQTTS_STATE_WAITING_CONNECT;
+ isConnected= 0;
+ mqtts_timer_stop_ack();
+ mqtts_timer_stop_keep_alive();
+ mqtts_timer_stop_wait_gwinfo();
+ mqtts_timer_stop_wait_searchgw();
+ mqttscb_disconnected(MQTTS_OK);
+ break;
+ case MQTTS_STATE_READY:
+ case MQTTS_STATE_WAITING_ACK:
+ /* send DISC to gw and wait for DISC */
+ backup_msg(msg);
+ mqtts_timer_start_ack();
+ mqtts_timer_stop_keep_alive();
+ mqtts_timer_stop_wait_gwinfo();
+ mqtts_timer_stop_wait_searchgw();
+ mqtts_state = MQTTS_STATE_DISCONNECTING;
+ gp_network_msg_send(msg,myGwAddr,myGwAddrLength);
+ break;
+ default:
+ break;
+ }
+ return MQTTS_OK;
+}
+
+
+unsigned char mqtts_register(mqtts_REGISTER_Parms *pParms) {
+ unsigned char msg[MQTTS_MAX_MSG_SIZE];
+ unsigned char i;
+
+ if (mqtts_state != MQTTS_STATE_READY) return MQTTS_ERR_STACK_NOT_READY;
+
+ /* check topic length */
+ if ( (pParms->vlpTopic_length) >
+ (MQTTS_MAX_MSG_SIZE - (HEADER_LENGTH + 2)) )
+ {
+ return MQTTS_ERR_DATA_TOO_LONG;
+ }
+
+ msg_set_type(REGISTER);
+ msg_set_length(HEADER_LENGTH + 4 + pParms->vlpTopic_length);
+
+ /* TopicId field */
+ msg[HEADER_LENGTH] = 0x00;
+ msg[HEADER_LENGTH+1] = 0x00;
+ /* MsgId field */
+ set_msg_id(msg+HEADER_LENGTH+2);
+ /* TopicName Field */
+ for (i=0; i<pParms->vlpTopic_length; i++) {
+ msg[HEADER_LENGTH + 4 + i]=pParms->vlpTopic[i];
+ }
+
+ /* backup the message for the ack */
+ backup_msg(msg);
+ /* we waiting for REGACK */
+ mqtts_state=MQTTS_STATE_WAITING_ACK;
+ mqtts_timer_start_ack();
+ mqtts_timer_start_keep_alive();
+
+ gp_network_msg_send(msg,myGwAddr,myGwAddrLength);
+
+ return MQTTS_OK;
+
+}
+
+unsigned char mqtts_publish(mqtts_PUBLISH_Parms *pParms) {
+ unsigned char msg[MQTTS_MAX_MSG_SIZE];
+ unsigned char i;
+
+ if (mqtts_state != MQTTS_STATE_READY) return MQTTS_ERR_STACK_NOT_READY;
+
+ /* Fill the header */
+ msg_set_type(PUBLISH);
+
+ /* Header size + 5 (1 (Flags) + 2 (Message ID) + 2 (Topic ID)) */
+
+ if ( (pParms->vlpData_length) > (MQTTS_MAX_MSG_SIZE - (HEADER_LENGTH+5)) )
+ {
+ return MQTTS_ERR_DATA_TOO_LONG;
+ }
+
+ msg_set_length(HEADER_LENGTH + 5 + pParms->vlpData_length);
+
+
+ /* Fill the message parameters */
+
+ /* First the flags */
+ msg[HEADER_LENGTH] = EMPTY_MASK;
+
+ switch (pParms->flagQOS) {
+ case 2:
+ msg[HEADER_LENGTH] |= QOS2_FLAG_MASK;
+ set_msg_id(msg+5);
+ break;
+
+ case 1:
+ msg[HEADER_LENGTH] |= QOS1_FLAG_MASK;
+ /* set the message id */
+ set_msg_id(msg+5);
+ break;
+
+ default:
+ msg[HEADER_LENGTH] |= QOS0_FLAG_MASK;
+ /* message id = 0x0000 in case of QoS level 0*/
+ msg[5]=0x00;
+ msg[6]=0x00;
+ break;
+ }
+
+ switch (pParms->flagRetain)
+ {
+ case 1:
+ msg[HEADER_LENGTH] |= RETAIN_FLAG_MASK;
+ break;
+
+ default:
+ break;
+ }
+
+ switch (pParms->flagTopicIdType)
+ {
+ case 2:
+ msg[HEADER_LENGTH] |= TOPICSHORT_FLAG_MASK;
+ break;
+
+ case 1:
+ msg[HEADER_LENGTH] |= TOPICIDPRE_FLAG_MASK;
+ break;
+
+ default:
+ break;
+ }
+
+ /* then the fixed parameters */
+ msg[HEADER_LENGTH+1] = pParms->flpTopicID[0];
+ msg[HEADER_LENGTH+2] = pParms->flpTopicID[1];
+
+ /* then the variable parameters */
+ for (i=1; i<=pParms->vlpData_length; i++)
+ {
+ msg[(msg_get_length(msg)-i)]=pParms->vlpData[pParms->vlpData_length-i];
+ }
+
+ if (pParms->flagQOS!=0)
+ {
+ /* save the flag and set the DUP flag for the backup */
+ i = msg[2];
+ msg[2] |= DUP_FLAG_MASK;
+ /* backup the message for the ack or pubrec*/
+ backup_msg(msg);
+ /* reload the saved flag */
+ msg[2] = i;
+ /* start the ack or pubrec timer */
+ mqtts_timer_start_ack();
+ mqtts_timer_start_keep_alive();
+ mqtts_state=MQTTS_STATE_WAITING_ACK;
+ }
+
+ gp_network_msg_send(msg,myGwAddr,myGwAddrLength);
+
+ return MQTTS_OK;
+}
+
+
+unsigned char mqtts_subscribe(mqtts_SUBSCRIBE_Parms *pParms) {
+ unsigned char msg[MQTTS_MAX_MSG_SIZE];
+ unsigned char i;
+
+ if (mqtts_state != MQTTS_STATE_READY) return MQTTS_ERR_STACK_NOT_READY;
+
+ /* Fill the header */
+ msg_set_type(SUBSCRIBE);
+
+ /* Header size + 1 (Flags) + 2 (MsgId)*/
+
+ if ( (pParms->vlpTopic_length) > (MQTTS_MAX_MSG_SIZE - (HEADER_LENGTH+3)) )
+ {
+ return MQTTS_ERR_DATA_TOO_LONG;
+ }
+
+ msg_set_length((HEADER_LENGTH+3) + pParms->vlpTopic_length);
+
+ /* Fill the message parameters */
+ /* First the flags */
+ msg[HEADER_LENGTH] = EMPTY_MASK;
+
+ switch (pParms->flagQOS)
+ {
+ case 2:
+ msg[HEADER_LENGTH] |= QOS2_FLAG_MASK;
+ break;
+
+ case 1:
+ msg[HEADER_LENGTH] |= QOS1_FLAG_MASK;
+ break;
+
+ default:
+ msg[HEADER_LENGTH] |= QOS0_FLAG_MASK;
+ break;
+ }
+
+ switch (pParms->flagTopicIdType)
+ {
+ case 2:
+ msg[HEADER_LENGTH] |= TOPICSHORT_FLAG_MASK;
+ break;
+ case 1:
+ msg[HEADER_LENGTH] |= TOPICIDPRE_FLAG_MASK;
+ break;
+ default:
+ break;
+ }
+
+ /* MsgId field */
+ set_msg_id(msg+HEADER_LENGTH+1);
+
+ /* then the variable parameters */
+ for (i=0; i<pParms->vlpTopic_length; i++)
+ {
+ msg[HEADER_LENGTH + 3 + i]=pParms->vlpTopic[i];
+ }
+
+ /* save the flag and set the DUP flag for the backup */
+ i = msg[HEADER_LENGTH];
+ msg[HEADER_LENGTH] |= DUP_FLAG_MASK;
+ /* backup the message for the ack */
+ backup_msg(msg);
+ /* reload the saved flag */
+ msg[HEADER_LENGTH] = i;
+ /* we wait for SUBACK */
+ mqtts_state=MQTTS_STATE_WAITING_ACK;
+ mqtts_timer_start_ack();
+ mqtts_timer_start_keep_alive();
+ gp_network_msg_send(msg,myGwAddr,myGwAddrLength);
+
+
+ return MQTTS_OK;
+}
+
+
+unsigned char mqtts_unsubscribe(mqtts_UNSUBSCRIBE_Parms *pParms) {
+ unsigned char msg[MQTTS_MAX_MSG_SIZE];
+ unsigned char i;
+
+ if (mqtts_state != MQTTS_STATE_READY) return MQTTS_ERR_STACK_NOT_READY;
+
+ /* Fill the header */
+ msg_set_type(UNSUBSCRIBE);
+ /* Header size + 3 (Flags+MsgId) + (TopicLength))*/
+ msg_set_length(HEADER_LENGTH + 3 + pParms->vlpTopic_length);
+
+ /* Fill the message parameters */
+
+ /* First the flags */
+ msg[HEADER_LENGTH] = EMPTY_MASK;
+
+ switch (pParms->flagTopicIdType)
+ {
+ case 2:
+ msg[HEADER_LENGTH] |= TOPICSHORT_FLAG_MASK;
+ break;
+ case 1:
+ msg[HEADER_LENGTH] |= TOPICIDPRE_FLAG_MASK;
+ break;
+ default:
+ break;
+ }
+
+ /* MsgId field */
+ set_msg_id(msg+HEADER_LENGTH+1);
+
+ /* then the variable parameters */
+ for (i=0; i<pParms->vlpTopic_length; i++)
+ {
+ msg[HEADER_LENGTH + 3 + i]=pParms->vlpTopic[i];
+ }
+
+ /* save the flag and set the DUP flag for the backup*/
+ i = msg[HEADER_LENGTH];
+ msg[HEADER_LENGTH] |= DUP_FLAG_MASK;
+ /* backup the message for the ack*/
+ backup_msg(msg);
+ /* reload the saved flag */
+ msg[HEADER_LENGTH] = i;
+ /* we wait for UNSUBACK */
+ mqtts_state=MQTTS_STATE_WAITING_ACK;
+ mqtts_timer_start_ack();
+ mqtts_timer_start_keep_alive();
+
+ gp_network_msg_send(msg,myGwAddr,myGwAddrLength);
+
+
+ return MQTTS_OK;
+
+}
+
+
+unsigned char mqtts_willtopic_update(mqtts_WILLTOPICUPD_Parms *pParms) {
+ unsigned char msg[MQTTS_MAX_MSG_SIZE];
+ unsigned char i;
+
+ if (mqtts_state != MQTTS_STATE_READY) return MQTTS_ERR_STACK_NOT_READY;
+
+ /* Fill the header */
+ msg_set_type(WILLTOPICUPD);
+
+ if (pParms == NULL) { //send an empty WILLTOPICUPD to delete will topic and msg
+ msg_set_length(HEADER_LENGTH);
+ } else {
+ msg_set_length(HEADER_LENGTH + 1 + pParms->vlpWillTopic_length);
+ msg[HEADER_LENGTH] = EMPTY_MASK;
+ switch (pParms->flagWillQOS) {
+ case 2:
+ msg[HEADER_LENGTH] |= QOS2_FLAG_MASK;
+ break;
+ case 1:
+ msg[HEADER_LENGTH] |= QOS1_FLAG_MASK;
+ break;
+ default:
+ msg[HEADER_LENGTH] |= QOS0_FLAG_MASK;
+ break;
+ }
+ switch (pParms->flagWillRetain) {
+ case 1:
+ msg[HEADER_LENGTH] |= RETAIN_FLAG_MASK;
+ break;
+ default:
+ break;
+ }
+ for (i=0; i<pParms->vlpWillTopic_length; i++)
+ {
+ msg[HEADER_LENGTH+1+i]=pParms->vlpWillTopic[i];
+ }
+ }
+
+ /* backup the message for the ack*/
+ backup_msg(msg);
+ /* we wait for WILLTOPICRESP */
+ mqtts_state=MQTTS_STATE_WAITING_ACK;
+ mqtts_timer_start_ack();
+ mqtts_timer_start_keep_alive();
+
+ gp_network_msg_send(msg,myGwAddr,myGwAddrLength);
+
+ return MQTTS_OK;
+}
+
+
+
+unsigned char mqtts_willmsg_update(mqtts_WILLMSGUPD_Parms *pParms) {
+ unsigned char msg[MQTTS_MAX_MSG_SIZE];
+ unsigned char i;
+
+ if (mqtts_state != MQTTS_STATE_READY) return MQTTS_ERR_STACK_NOT_READY;
+
+ /* Fill the header */
+ msg_set_type(WILLMSGUPD);
+
+ /* Length = Header size + WillMsg */
+ msg_set_length(HEADER_LENGTH + pParms->vlpWillMsg_length);
+
+ /* then the variable parameters */
+ for (i=0; i<pParms->vlpWillMsg_length; i++){
+ msg[HEADER_LENGTH+i]= pParms->vlpWillMsg[i];
+ }
+ /* backup the message for the ack*/
+ backup_msg(msg);
+ mqtts_state=MQTTS_STATE_WAITING_ACK;
+ mqtts_timer_start_ack();
+ mqtts_timer_start_keep_alive();
+
+ gp_network_msg_send(msg,myGwAddr,myGwAddrLength);
+
+ return MQTTS_OK;
+}
+
+
+
+
+static void gwinfo_received(unsigned char *msg, unsigned char *sender, unsigned char sender_len) {
+ unsigned char i;
+ unsigned char data[MQTTS_MAX_MSG_SIZE];
+ unsigned char data_len;
+ unsigned char msg_len;
+
+ msg_len = gp_byte_get(msg,1);
+ data_len = 0;
+
+ switch(msg_len) {
+ case 3:
+ /* GWINFO was sent by a gw */
+ memcpy(myGwAddr,sender,sender_len);
+ myGwAddrLength= sender_len;
+ myGwId= gp_byte_get(msg,3);
+ break;
+ default:
+ /* GWINFO was sent by a client */
+ for (i=0;i<(msg_len-3);i++) {
+ data[i]=gp_byte_get(msg,4+i);
+ data_len++;
+ }
+ memcpy(myGwAddr,data,data_len);
+ myGwAddrLength= data_len;
+ myGwId= gp_byte_get(msg,3);
+ break;
+ }
+
+}
+
+
+
+void gpcb_network_msg_received(unsigned char *msg,unsigned char *sender,unsigned char sender_len) {
+
+ unsigned char msg_type= gp_byte_get(msg,2);
+
+ if (mqtts_state == MQTTS_STATE_NOT_ACTIVE) {
+ /* stack not started, all msgs are ignored */
+ return;
+ }
+
+ switch (msg_type) {
+ case ADVERTISE:
+ handleADVERTISE(msg, sender, sender_len);
+ return;
+
+ case GWINFO:
+ handleGWINFO(msg, sender, sender_len);
+ return;
+
+ case SEARCHGW:
+ broadcastRadius= gp_byte_get(msg,3);
+ handleSEARCHGW();
+ return;
+
+ default:
+ /* ignore msg if not sent by my gw */
+ if (memcmp(sender,myGwAddr, myGwAddrLength)!=0) return;
+ }
+
+ switch (msg_type) {
+ case CONNACK:
+ handleCONNACK(msg);
+ break;
+
+ case WILLTOPICREQ:
+ handleWILLTOPICREQ();
+ break;
+
+ case WILLMSGREQ:
+ handleWILLMSGREQ();
+ break;
+
+ case REGISTER:
+ handleREGISTER(msg);
+ break;
+
+ case REGACK:
+ handleREGACK(msg);
+ break;
+
+ case PUBLISH:
+ handlePUBLISH(msg);
+ break;
+
+ case PUBACK:
+ handlePUBACK(msg);
+ break;
+
+ case PUBREC:
+ handlePUBREC(msg);
+ break;
+
+ case PUBCOMP:
+ handlePUBCOMP();
+ break;
+
+ case SUBACK:
+ handleSUBACK(msg);
+ break;
+
+ case UNSUBACK:
+ handleUNSUBACK(msg);
+ break;
+
+ case PINGREQ:
+ handlePINGREQ();
+ break;
+
+ case PINGRESP:
+ handlePINGRESP();
+ break;
+
+ case DISCONNECT:
+ handleDISCONNECT();
+ break;
+
+ case WILLTOPICRESP:
+ handleWILLTOPICRESP();
+ break;
+ case WILLMSGRESP:
+ handleWILLMSGRESP();
+ break;
+
+ default:
+ break;
+
+ } /* end switch (msg_type) */
+}
+
+unsigned char mqtts_get_state(void) {
+ return (mqtts_state);
+}
+
+
+/* END OF FILE */
+
+

Back to the top