• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 HiSilicon (Shanghai) Technologies CO., LIMITED.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 /*
17  * STEPS:
18  * 1, CONNECT TO THE IOT SERVER
19  * 2, SUBSCRIBE  THE DEFAULT TOPIC
20  * 3, WAIT FOR ANY MESSAGE COMES OR ANY MESSAGE TO SEND
21  */
22 
23 #include <securec.h>
24 #include <hi_task.h>
25 #include <string.h>
26 #include "iot_config.h"
27 #include "iot_log.h"
28 #include "cmsis_os2.h"
29 #include "iot_watchdog.h"
30 #include "MQTTClient.h"
31 #include "iot_errno.h"
32 #include "iot_main.h"
33 
34 // this is the configuration head
35 #define CN_IOT_SERVER    "tcp://121.36.42.100:1883"
36 
37 #define CONFIG_COMMAND_TIMEOUT    10000L
38 #define CN_KEEPALIVE_TIME    50
39 #define CN_CLEANSESSION    1
40 #define CN_HMAC_PWD_LEN   65 // SHA256 IS 32 BYTES AND END APPEND'\0'
41 #define CN_EVENT_TIME    "1970000100"
42 #define CN_CLIENTID_FMT    "%s_0_0_%s" // This is the cient ID format, deviceID_0_0_TIME
43 #define CN_QUEUE_WAITTIMEOUT    1000
44 #define CN_QUEUE_MSGNUM    16
45 #define CN_QUEUE_MSGSIZE    (sizeof(hi_pvoid))
46 
47 #define CN_TASK_PRIOR    28
48 #define CN_TASK_STACKSIZE    0X2000
49 #define CN_TASK_NAME    "IoTMain"
50 
51 typedef enum {
52     EN_IOT_MSG_PUBLISH = 0,
53     EN_IOT_MSG_RECV,
54 }EnIotMsgT;
55 
56 typedef struct {
57     EnIotMsgT type;
58     int qos;
59     const char *topic;
60     const char *payload;
61 }IoTMsgT;
62 
63 typedef struct {
64     hi_bool  stop;
65     hi_u32 conLost;
66     hi_u32 queueID;
67     hi_u32 iotTaskID;
68     FnMsgCallBack msgCallBack;
69     MQTTClient_deliveryToken tocken;
70 }IotAppCbT;
71 static IotAppCbT g_ioTAppCb;
72 
73 static const char *g_defaultSubscribeTopic[] = {
74     "$oc/devices/"CONFIG_DEVICE_ID"/sys/messages/down",
75     "$oc/devices/"CONFIG_DEVICE_ID"/sys/properties/set/#",
76     "$oc/devices/"CONFIG_DEVICE_ID"/sys/properties/get/#",
77     "$oc/devices/"CONFIG_DEVICE_ID"/sys/shadow/get/response/#",
78     "$oc/devices/"CONFIG_DEVICE_ID"/sys/events/down",
79     "$oc/devices/"CONFIG_DEVICE_ID"/sys/commands/#"
80 };
81 
82 #define CN_TOPIC_SUBSCRIBE_NUM    (sizeof(g_defaultSubscribeTopic) / sizeof(const char *))
83 
MsgRcvCallBack(unsigned char * context,char * topic,int topicLen,MQTTClient_message * message)84 static int MsgRcvCallBack(unsigned char *context, char *topic, int topicLen, MQTTClient_message *message)
85 {
86     IoTMsgT *msg;
87     char *buf;
88     hi_u32 bufSize;
89     int topicLength = topicLen;
90     int ret = 0;
91 
92     if (topicLength == 0) {
93         topicLength = strlen(topic);
94     }
95     bufSize = topicLength + 1  + message->payloadlen + 1 + sizeof(IoTMsgT);
96     buf = hi_malloc(0, bufSize);
97     if (buf != NULL) {
98         msg = (IoTMsgT *)buf;
99         buf += sizeof(IoTMsgT);
100         bufSize -= sizeof(IoTMsgT);
101         msg->qos = message->qos;
102         msg->type = EN_IOT_MSG_RECV;
103         ret = memcpy_s(buf, bufSize, topic, topicLength);
104         if (ret != EOK) {
105             return;
106         }
107         buf[topicLength] = '\0';
108         msg->topic = buf;
109         buf += topicLength + 1;
110         bufSize -= (topicLength + 1);
111         ret = memcpy_s(buf, bufSize, message->payload, message->payloadlen);
112         if (ret != EOK) {
113             return;
114         }
115         buf[message->payloadlen] = '\0';
116         msg->payload = buf;
117         IOT_LOG_DEBUG("RCVMSG:QOS:%d TOPIC:%s PAYLOAD:%s\r\n", msg->qos, msg->topic, msg->payload);
118         if (IOT_SUCCESS != osMessageQueuePut(g_ioTAppCb.queueID, &msg, 0, CN_QUEUE_WAITTIMEOUT)) {
119             IOT_LOG_ERROR("Write queue failed\r\n");
120             hi_free(0, msg);
121         }
122     }
123 
124     MQTTClient_freeMessage(&message);
125     MQTTClient_free(topic);
126     return 1;
127 }
128 
129 // when the connect lost and this callback will be called
ConnLostCallBack(unsigned char * context,char * cause)130 static void ConnLostCallBack(unsigned char *context, char *cause)
131 {
132     IOT_LOG_DEBUG("Connection lost:caused by:%s\r\n", cause == NULL ? "Unknown" : cause);
133     return;
134 }
135 
IoTMsgProcess(IoTMsgT * msg,MQTTClient_message pubmsg,MQTTClient client)136 void IoTMsgProcess(IoTMsgT *msg, MQTTClient_message pubmsg, MQTTClient client)
137 {
138     hi_u32     ret;
139     switch (msg->type) {
140         case EN_IOT_MSG_PUBLISH:
141             pubmsg.payload = (void *)msg->payload;
142             pubmsg.payloadlen = (int)strlen(msg->payload);
143             pubmsg.qos = msg->qos;
144             pubmsg.retained = 0;
145             ret = MQTTClient_publishMessage(client, msg->topic, &pubmsg, &g_ioTAppCb.tocken);
146             if (ret != MQTTCLIENT_SUCCESS) {
147                 IOT_LOG_ERROR("MSGSEND:failed\r\n");
148             }
149             IOT_LOG_DEBUG("MSGSEND:SUCCESS\r\n");
150             g_ioTAppCb.tocken++;
151             break;
152         case EN_IOT_MSG_RECV:
153             if (g_ioTAppCb.msgCallBack != NULL) {
154                 g_ioTAppCb.msgCallBack(msg->qos, msg->topic, msg->payload);
155             }
156             break;
157         default:
158             break;
159     }
160     return;
161 }
162 
163 // use this function to deal all the coming message
ProcessQueueMsg(MQTTClient client)164 static int ProcessQueueMsg(MQTTClient client)
165 {
166     printf("ProcessQueueMsg\r\n");
167     hi_u32     ret;
168     hi_u32     msgSize;
169     IoTMsgT    *msg;
170     hi_u32     timeout;
171     MQTTClient_message pubmsg = MQTTClient_message_initializer;
172 
173     timeout = CN_QUEUE_WAITTIMEOUT;
174     do {
175         msg = NULL;
176         msgSize = sizeof(hi_pvoid);
177         ret = osMessageQueueGet(g_ioTAppCb.queueID, &msg, &msgSize, timeout);
178         if (ret != MQTTCLIENT_SUCCESS) {
179             return HI_ERR_FAILURE;
180         }
181         if (msg != NULL) {
182             IoTMsgProcess(msg, pubmsg, client);
183             hi_free(0, msg);
184         }
185         timeout = 0;  // continue to deal the message without wait here
186     } while (ret == HI_ERR_SUCCESS);
187 
188     return 0;
189 }
190 
MqttProcess(MQTTClient client,char * clientID,char * userPwd,MQTTClient_connectOptions connOpts,int subQos[])191 void MqttProcess(MQTTClient client, char *clientID, char *userPwd, MQTTClient_connectOptions connOpts, int subQos[])
192 {
193     int rc = MQTTClient_create(&client, CN_IOT_SERVER, clientID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
194     if (rc != MQTTCLIENT_SUCCESS) {
195         IOT_LOG_ERROR("Create Client failed,Please check the parameters--%d\r\n", rc);
196         if (userPwd != NULL) {
197             hi_free(0, userPwd);
198             return;
199         }
200     }
201 
202     rc = MQTTClient_setCallbacks(client, NULL, ConnLostCallBack, MsgRcvCallBack, NULL);
203     if (rc != MQTTCLIENT_SUCCESS) {
204         IOT_LOG_ERROR("Set the callback failed,Please check the callback paras\r\n");
205         MQTTClient_destroy(&client);
206         return;
207     }
208 
209     rc = MQTTClient_connect(client, &connOpts);
210     if (rc != MQTTCLIENT_SUCCESS) {
211         IOT_LOG_ERROR("Connect IoT server failed,please check the network and parameters:%d\r\n", rc);
212         MQTTClient_destroy(&client);
213         return;
214     }
215     IOT_LOG_DEBUG("Connect success\r\n");
216 
217     rc = MQTTClient_subscribeMany(client, CN_TOPIC_SUBSCRIBE_NUM, (char* const*)g_defaultSubscribeTopic,
218                                   (int *)&subQos[0]);
219     if (rc != MQTTCLIENT_SUCCESS) {
220         IOT_LOG_ERROR("Subscribe the default topic failed,Please check the parameters\r\n");
221         MQTTClient_destroy(&client);
222         return;
223     }
224     IOT_LOG_DEBUG("Subscribe success\r\n");
225     while (MQTTClient_isConnected(client)) {
226         int ret = ProcessQueueMsg(client); // do the job here
227         if (ret == HI_ERR_SUCCESS) {
228             return;
229         }
230         MQTTClient_yield(); // make the keepalive done
231     }
232     MQTTClient_disconnect(client, CONFIG_COMMAND_TIMEOUT);
233     return;
234 }
235 
MainEntryProcess(hi_void)236 static hi_void MainEntryProcess(hi_void)
237 {
238     int subQos[CN_TOPIC_SUBSCRIBE_NUM] = {1};
239     char *clientID = NULL;
240     char *userID = NULL;
241     char *userPwd = NULL;
242     int ret = 0;
243 
244     MQTTClient client = NULL;
245     MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
246     // make the clientID userID userPwd
247     clientID = hi_malloc(0, strlen(CN_CLIENTID_FMT) + strlen(CONFIG_DEVICE_ID) + strlen(CN_EVENT_TIME) + 1);
248     if (clientID == NULL) {
249         return;
250     }
251     ret = snprintf_s(clientID, strlen(CN_CLIENTID_FMT) + strlen(CONFIG_DEVICE_ID) + strlen(CN_EVENT_TIME) +
252                      CN_QUEUE_MSGNUM, strlen(CN_CLIENTID_FMT) + strlen(CONFIG_DEVICE_ID) + strlen(CN_EVENT_TIME) + 1,
253                      CN_CLIENTID_FMT, CONFIG_DEVICE_ID, CN_EVENT_TIME);
254     if (ret < 0) {
255         printf("string is null\r\n");
256     }
257     userID = CONFIG_DEVICE_ID;
258     if (CONFIG_DEVICE_PWD != DEVICE_PWD_DEFAULT) {
259         userPwd = hi_malloc(0, CN_HMAC_PWD_LEN);
260         if (userPwd == NULL) {
261             hi_free(0, clientID);
262             return;
263         }
264         (void)HmacGeneratePwd((const unsigned char *)CONFIG_DEVICE_PWD, strlen(CONFIG_DEVICE_PWD),
265                               (const unsigned char *)CN_EVENT_TIME, strlen(CN_EVENT_TIME),
266                               (unsigned char *)userPwd);
267     }
268 
269     conn_opts.keepAliveInterval = CN_KEEPALIVE_TIME;
270     conn_opts.cleansession = CN_CLEANSESSION;
271     conn_opts.username = userID;
272     conn_opts.password = userPwd;
273     conn_opts.MQTTVersion = MQTTVERSION_3_1_1;
274     // wait for the wifi connect ok
275     IOT_LOG_DEBUG("IOTSERVER:%s\r\n", CN_IOT_SERVER);
276     MqttProcess(client, clientID, userPwd, conn_opts, subQos);
277     return;
278 }
279 
MainEntry(hi_void * arg)280 static hi_void *MainEntry(hi_void *arg)
281 {
282     (void)arg;
283     while (g_ioTAppCb.stop == HI_FALSE) {
284         MainEntryProcess();
285         IOT_LOG_DEBUG("The connection lost and we will try another connect\r\n");
286         hi_sleep(1000*5); /* 延时5*1000ms */
287     }
288     return NULL;
289 }
290 
IoTMain(void)291 int IoTMain(void)
292 {
293     hi_u32 ret;
294     hi_task_attr attr = {0};
295 
296     g_ioTAppCb.queueID = osMessageQueueNew(CN_QUEUE_MSGNUM, CN_QUEUE_MSGSIZE, NULL);
297     attr.stack_size = CN_TASK_STACKSIZE;
298     attr.task_prio = CN_TASK_PRIOR;
299     attr.task_name = CN_TASK_NAME;
300     ret = hi_task_create(&g_ioTAppCb.iotTaskID, &attr, MainEntry, NULL);
301     if (ret != HI_ERR_SUCCESS) {
302         IOT_LOG_ERROR("Create the Main Entry Failed\r\n");
303     }
304 
305     return 0;
306 }
307 
IoTSetMsgCallback(FnMsgCallBack msgCallback)308 int IoTSetMsgCallback(FnMsgCallBack msgCallback)
309 {
310     g_ioTAppCb.msgCallBack = msgCallback;
311     return 0;
312 }
313 
IotSendMsg(int qos,const char * topic,const char * payload)314 int IotSendMsg(int qos, const char *topic, const char *payload)
315 {
316     int rc = -1;
317     IoTMsgT *msg;
318     char *buf;
319     hi_u32 bufSize;
320     int ret = 0;
321 
322     bufSize = strlen(topic) + 1 + strlen(payload) + 1 + sizeof(IoTMsgT);
323     buf = hi_malloc(0, bufSize);
324     if (buf != NULL) {
325         msg = (IoTMsgT *)buf;
326         buf += sizeof(IoTMsgT);
327         bufSize -= sizeof(IoTMsgT);
328         msg->qos = qos;
329         msg->type = EN_IOT_MSG_PUBLISH;
330         ret = memcpy_s(buf, bufSize, topic, strlen(topic));
331         if (ret != EOK) {
332             return;
333         }
334         buf[strlen(topic)] = '\0';
335         msg->topic = buf;
336         buf += strlen(topic) + 1;
337         bufSize -= (strlen(topic) + 1);
338         ret = memcpy_s(buf, bufSize, payload, strlen(payload));
339         if (ret != EOK) {
340             return;
341         }
342         buf[strlen(payload)] = '\0';
343         msg->payload = buf;
344         IOT_LOG_DEBUG("SNDMSG:QOS:%d TOPIC:%s PAYLOAD:%s\r\n", msg->qos, msg->topic, msg->payload);
345         if (IOT_SUCCESS != osMessageQueuePut(g_ioTAppCb.queueID, &msg, 0, CN_QUEUE_WAITTIMEOUT)) {
346             IOT_LOG_ERROR("Write queue failed\r\n");
347             hi_free(0, msg);
348         } else {
349             rc = 0;
350         }
351     }
352     return rc;
353 }