• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 HiSilicon (Shanghai) Technologies CO., LIMITED.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 /**
17  * STEPS:
18  * 1, CONNECT TO THE IOT SERVER
19  * 2, SUBSCRIBE  THE DEFAULT TOPIC
20  * 3, WAIT FOR ANY MESSAGE COMES OR ANY MESSAGE TO SEND
21 */
22 
23 #include <securec.h>
24 #include <hi_task.h>
25 #include <hi_msg.h>
26 #include <hi_mem.h>
27 #include <string.h>
28 #include <stdbool.h>
29 #include "iot_config.h"
30 #include "iot_log.h"
31 #include "iot_hmac.h"
32 #include "ohos_init.h"
33 #include "cmsis_os2.h"
34 #include "iot_watchdog.h"
35 #include "MQTTClient.h"
36 #include "iot_errno.h"
37 #include "iot_main.h"
38 
39 // < this is the configuration head
40 #define CN_IOT_SERVER           "tcp://106.55.124.154:1883" // Tencent iot cloud address
41 #define CONFIG_COMMAND_TIMEOUT     10000L
42 #define CN_KEEPALIVE_TIME 50
43 #define CN_CLEANSESSION   1
44 #define CN_HMAC_PWD_LEN   65     // < SHA256 IS 32 BYTES AND END APPEND '\0'
45 #define CN_EVENT_TIME     "1970000100"
46 #define CN_CLIENTID_FMT   "%s_0_0_%s"      // < This is the cient ID format, deviceID_0_0_TIME
47 #define CN_QUEUE_WAITTIMEOUT   1000
48 #define CN_QUEUE_MSGNUM 16
49 #define CN_QUEUE_MSGSIZE (sizeof(hi_pvoid))
50 
51 #define CN_TASK_PRIOR 28
52 #define CN_TASK_STACKSIZE 0X2000
53 #define CN_TASK_NAME "IoTMain"
54 
55 typedef enum {
56     EN_IOT_MSG_PUBLISH = 0,
57     EN_IOT_MSG_RECV,
58 }EnIotMsg;
59 
60 typedef struct {
61     EnIotMsg type;
62     int qos;
63     const char *topic;
64     const char *payload;
65 }IoTMsg_t;
66 
67 typedef struct {
68     bool  stop;
69     unsigned  int conLost;
70     unsigned  int queueID;
71     unsigned  int iotTaskID;
72     fnMsgCallBack msgCallBack;
73     MQTTClient_deliveryToken tocken;
74 }IotAppCb_t;
75 static IotAppCb_t   gIoTAppCb;
76 
77 static const char *gDefaultSubscribeTopic[] = {
78     /* Tencent iot cloud topic */
79     "19VUBHD786/mqtt/data",
80     "19VUBHD786/mqtt/control",
81     "19VUBHD786/mqtt/msg/len_on",
82 };
83 
84 #define CN_TOPIC_SUBSCRIBE_NUM     (sizeof(gDefaultSubscribeTopic) / sizeof(const char *))
MsgRcvCallBack(char * context,char * topic,int topicLen,MQTTClient_message * message)85 static int MsgRcvCallBack(char *context, char *topic, int topicLen, MQTTClient_message *message)
86 {
87     IoTMsg_t  *msg;
88     char *buf;
89     unsigned  int bufSize;
90     int topiLen = topicLen;
91 
92     if (topiLen == 0) {
93         topiLen = strlen(topic);
94     }
95     bufSize = topiLen + 1  + message->payloadlen + 1 + sizeof(IoTMsg_t);
96     buf = hi_malloc(0, bufSize);
97     if (buf != NULL) {
98         msg = (IoTMsg_t *)buf;
99         buf += sizeof(IoTMsg_t);
100         bufSize -= sizeof(IoTMsg_t);
101         msg->qos = message->qos;
102         msg->type = EN_IOT_MSG_RECV;
103         (void)memcpy_s(buf, bufSize, topic, topiLen);
104         buf[topiLen] = '\0';
105         msg->topic = buf;
106         buf += topiLen + 1;
107         bufSize -= (topiLen + 1);
108         (void)memcpy_s(buf, bufSize, message->payload, message->payloadlen);
109         buf[message->payloadlen] = '\0';
110         msg->payload = buf;
111         IOT_LOG_DEBUG("RCVMSG:QOS:%d TOPIC:%s PAYLOAD:%s\r\n", msg->qos, msg->topic, msg->payload);
112         if (IOT_SUCCESS != osMessageQueuePut(gIoTAppCb.queueID, &msg, 0, CN_QUEUE_WAITTIMEOUT)) {
113             IOT_LOG_ERROR("Write queue failed\r\n");
114             hi_free(0, msg);
115         }
116     }
117 
118     MQTTClient_freeMessage(&message);
119     MQTTClient_free(topic);
120     return 1;
121 }
122 
123 // < when the connect lost and this callback will be called
ConnLostCallBack(char * context,char * cause)124 static void ConnLostCallBack(char *context, char *cause)
125 {
126     IOT_LOG_DEBUG("Connection lost:caused by:%s\r\n", cause == NULL? "Unknown" : cause);
127     return;
128 }
129 
MqttProcessQueueMsg(MQTTClient client,IoTMsg_t * msg,MQTTClient_message pubMsg)130 static int MqttProcessQueueMsg(MQTTClient client, IoTMsg_t  *msg, MQTTClient_message pubMsg)
131 {
132     int ret = 0;
133 
134     switch (msg->type) {
135         case EN_IOT_MSG_PUBLISH:
136             pubMsg.payload = (void *)msg->payload;
137             pubMsg.payloadlen = (int)strlen(msg->payload);
138             pubMsg.qos = msg->qos;
139             pubMsg.retained = 0;
140             ret = MQTTClient_publishMessage(client, msg->topic, &pubMsg, &gIoTAppCb.tocken);
141             if (ret != MQTTCLIENT_SUCCESS) {
142                 IOT_LOG_ERROR("MSGSEND:failed\r\n");
143             }
144             IOT_LOG_DEBUG("MSGSEND:SUCCESS\r\n");
145             gIoTAppCb.tocken++;
146             break;
147         case EN_IOT_MSG_RECV:
148             if (gIoTAppCb.msgCallBack != NULL) {
149                 gIoTAppCb.msgCallBack(msg->qos, msg->topic, msg->payload);
150             }
151             break;
152         default:
153             break;
154     }
155 }
156 
157 // <use this function to deal all the coming message
ProcessQueueMsg(MQTTClient client)158 static int ProcessQueueMsg(MQTTClient client)
159 {
160     unsigned  int     ret;
161     unsigned  int     msgSize;
162     IoTMsg_t  *msg;
163     unsigned  int     timeout;
164     MQTTClient_message pubmsg = MQTTClient_message_initializer;
165 
166     timeout = CN_QUEUE_WAITTIMEOUT;
167     do {
168         msg = NULL;
169         msgSize = sizeof(hi_pvoid);
170         ret = osMessageQueueGet(gIoTAppCb.queueID, &msg, &msgSize, timeout);
171         if (msg != NULL) {
172             IOT_LOG_DEBUG("QUEUEMSG:QOS:%d TOPIC:%s PAYLOAD:%s\r\n", msg->qos, msg->topic, msg->payload);
173             MqttProcessQueueMsg(client, msg, pubmsg);
174             hi_free(0, msg);
175         }
176         timeout = 0;  // < continuos to deal the message without wait here
177     } while (ret == IOT_SUCCESS);
178 
179     return 0;
180 }
181 
MqttDestory(int ret,MQTTClient client)182 int MqttDestory(int ret, MQTTClient client)
183 {
184     if (ret != MQTTCLIENT_SUCCESS) {
185         MQTTClient_destroy(&client);
186         return -1;
187     }
188     return MQTTCLIENT_SUCCESS;
189 }
190 
MainEntryProcess(void)191 static void MainEntryProcess(void)
192 {
193     int rc = 0, subQos[CN_TOPIC_SUBSCRIBE_NUM] = {1};
194 
195     MQTTClient client = NULL;
196     MQTTClient_connectOptions connOpts = MQTTClient_connectOptions_initializer;
197     char *clientID = CN_CLIENTID;
198     char *userID = CONFIG_USER_ID;
199     char *userPwd = hi_malloc(0, CN_HMAC_PWD_LEN);
200     if (userPwd == NULL) {
201         hi_free(0, clientID);
202         return;
203     }
204     userPwd = CONFIG_USER_PWD;
205     connOpts.keepAliveInterval = CN_KEEPALIVE_TIME;
206     connOpts.cleansession = CN_CLEANSESSION;
207     connOpts.username = userID;
208     connOpts.password = userPwd;
209     connOpts.MQTTVersion = MQTTVERSION_3_1_1;
210     IOT_LOG_DEBUG("CLIENTID:%s USERID:%s USERPWD:%s, IOTSERVER:%s\r\n",
211         clientID, userID, userPwd==NULL?"NULL" : userPwd, CN_IOT_SERVER);
212     rc = MQTTClient_create(&client, CN_IOT_SERVER, clientID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
213     if (rc != MQTTCLIENT_SUCCESS) {
214         if (userPwd != NULL) {
215             hi_free(0, userPwd);
216         }
217         return;
218     }
219     rc = MQTTClient_setCallbacks(client, NULL, ConnLostCallBack, MsgRcvCallBack, NULL);
220     if (MqttDestory(rc, client) != MQTTCLIENT_SUCCESS) {
221         return;
222     }
223     rc = MQTTClient_connect(client, &connOpts);
224     if (MqttDestory(rc, client) != MQTTCLIENT_SUCCESS) {
225         return;
226     }
227     for (int i = 0; i < CN_TOPIC_SUBSCRIBE_NUM; i++) {
228         rc = MQTTClient_subscribeMany(client, CN_TOPIC_SUBSCRIBE_NUM,
229             (char *const *)gDefaultSubscribeTopic, (int *)&subQos[0]);
230         if (MqttDestory(rc, client) != MQTTCLIENT_SUCCESS) {
231             return;
232         }
233     }
234     IOT_LOG_DEBUG ("Connect success and Subscribe success\r\n");
235     while (MQTTClient_isConnected(client)) {
236         ProcessQueueMsg(client); // < do the job here
237         MQTTClient_yield();  // < make the keepalive done
238     }
239     MQTTClient_disconnect(client, CONFIG_COMMAND_TIMEOUT);
240     return;
241 }
242 
243 /* MQTT processing entry */
MainEntry(char * arg)244 static hi_void *MainEntry(char *arg)
245 {
246     (void)arg;
247     while (gIoTAppCb.stop == false) {
248         MainEntryProcess();
249         IOT_LOG_DEBUG("The connection lost and we will try another connect\r\n");
250         hi_sleep(1000); /* 1000: cpu sleep 1000ms */
251     }
252     return NULL;
253 }
254 
IoTMain(void)255 int IoTMain(void)
256 {
257     unsigned  int ret = 0;
258     hi_task_attr attr = {0};
259 
260     gIoTAppCb.queueID = osMessageQueueNew(CN_QUEUE_MSGNUM, CN_QUEUE_MSGSIZE, NULL);
261     if (ret != IOT_SUCCESS) {
262         IOT_LOG_ERROR("Create the msg queue Failed\r\n");
263     }
264 
265     attr.stack_size = CN_TASK_STACKSIZE;
266     attr.task_prio = CN_TASK_PRIOR;
267     attr.task_name = CN_TASK_NAME;
268     ret = hi_task_create(&gIoTAppCb.iotTaskID, &attr, MainEntry, NULL);
269     if (ret != IOT_SUCCESS) {
270         IOT_LOG_ERROR("Create the Main Entry Failed\r\n");
271     }
272 
273     return 0;
274 }
275 
IoTSetMsgCallback(fnMsgCallBack msgCallback)276 int IoTSetMsgCallback(fnMsgCallBack msgCallback)
277 {
278     gIoTAppCb.msgCallBack = msgCallback;
279     return 0;
280 }
281 
IotSendMsg(int qos,const char * topic,const char * payload)282 int IotSendMsg(int qos, const char *topic, const char *payload)
283 {
284     int rc = -1;
285     IoTMsg_t  *msg;
286     char *buf;
287     unsigned  int bufSize;
288 
289     bufSize = strlen(topic) + 1 +strlen(payload) + 1 + sizeof(IoTMsg_t);
290     buf = hi_malloc(0, bufSize);
291     if (buf != NULL) {
292         msg = (IoTMsg_t *)buf;
293         buf += sizeof(IoTMsg_t);
294         bufSize -= sizeof(IoTMsg_t);
295         msg->qos = qos;
296         msg->type = EN_IOT_MSG_PUBLISH;
297         (void)memcpy_s(buf, bufSize, topic, strlen(topic));
298         buf[strlen(topic)] = '\0';
299         msg->topic = buf;
300         buf += strlen(topic) + 1;
301         bufSize -= (strlen(topic) + 1);
302         (void)memcpy_s(buf, bufSize, payload, strlen(payload));
303         buf[strlen(payload)] = '\0';
304         msg->payload = buf;
305         IOT_LOG_DEBUG("SNDMSG:QOS:%d TOPIC:%s PAYLOAD:%s\r\n", msg->qos, msg->topic, msg->payload);
306         if (IOT_SUCCESS != osMessageQueuePut(gIoTAppCb.queueID, &msg, 0, CN_QUEUE_WAITTIMEOUT)) {
307             IOT_LOG_ERROR("Write queue failed\r\n");
308             hi_free(0, msg);
309         } else {
310             rc = 0;
311         }
312     }
313     return rc;
314 }