1 /*
2 * Copyright (c) 2022 HiSilicon (Shanghai) Technologies CO., LIMITED.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 /* we use the mqtt to connect to the IoT platform */
17 /*
18 * STEPS:
19 * 1, CONNECT TO THE IOT SERVER
20 * 2, SUBSCRIBE THE DEFAULT TOPIC
21 * 3, WAIT FOR ANY MESSAGE COMES OR ANY MESSAGE TO SEND
22 */
23
24 #include <string.h>
25 #include <securec.h>
26 #include <hi_task.h>
27 #include "cmsis_os2.h"
28 #include "iot_watchdog.h"
29 #include "iot_errno.h"
30 #include "iot_config.h"
31 #include "iot_log.h"
32 #include "MQTTClient.h"
33 #include "iot_main.h"
34
35 // this is the configuration head
36 #define CN_IOT_SERVER "tcp://121.36.42.100:1883"
37
38 #define CONFIG_COMMAND_TIMEOUT 10000L
39 #define CN_KEEPALIVE_TIME 50
40 #define CN_CLEANSESSION 1
41 #define CN_HMAC_PWD_LEN 65 // SHA256 IS 32 BYTES AND END APPEND'\0'
42 #define CN_EVENT_TIME "1970000100"
43 #define CN_CLIENTID_FMT "%s_0_0_%s" // This is the cient ID format, deviceID_0_0_TIME
44 #define CN_QUEUE_WAITTIMEOUT 1000
45 #define CN_QUEUE_MSGNUM 16
46 #define CN_QUEUE_MSGSIZE (sizeof(hi_pvoid))
47
48 #define CN_TASK_PRIOR 28
49 #define CN_TASK_STACKSIZE 0X2000
50 #define CN_TASK_NAME "IoTMain"
51
52 typedef enum {
53 EN_IOT_MSG_PUBLISH = 0,
54 EN_IOT_MSG_RECV,
55 }EnIotMsgT;
56
57 typedef struct {
58 EnIotMsgT type;
59 int qos;
60 const char *topic;
61 const char *payload;
62 }IoTMsgT;
63
64 typedef struct {
65 hi_bool stop;
66 hi_u32 conLost;
67 hi_u32 queueID;
68 hi_u32 iotTaskID;
69 FnMsgCallBack msgCallBack;
70 MQTTClient_deliveryToken tocken;
71 }IotAppCbT;
72 static IotAppCbT g_ioTAppCb;
73
74 static const char *g_defaultSubscribeTopic[] = {
75 "$oc/devices/"CONFIG_DEVICE_ID"/sys/messages/down",
76 "$oc/devices/"CONFIG_DEVICE_ID"/sys/properties/set/#",
77 "$oc/devices/"CONFIG_DEVICE_ID"/sys/properties/get/#",
78 "$oc/devices/"CONFIG_DEVICE_ID"/sys/shadow/get/response/#",
79 "$oc/devices/"CONFIG_DEVICE_ID"/sys/events/down",
80 "$oc/devices/"CONFIG_DEVICE_ID"/sys/commands/#"
81 };
82
83 #define CN_TOPIC_SUBSCRIBE_NUM (sizeof(g_defaultSubscribeTopic) / sizeof(const char *))
84
MsgRcvCallBack(unsigned char * context,char * topic,int topicLen,MQTTClient_message * message)85 static int MsgRcvCallBack(unsigned char *context, char *topic, int topicLen, MQTTClient_message *message)
86 {
87 IoTMsgT *msg;
88 char *buf;
89 hi_u32 bufSize;
90 int topicLength = topicLen;
91
92 if (topicLength == 0) {
93 topicLength = strlen(topic);
94 }
95 bufSize = topicLength + 1 + message->payloadlen + 1 + sizeof(IoTMsgT);
96 buf = hi_malloc(0, bufSize);
97 if (buf != NULL) {
98 msg = (IoTMsgT *)buf;
99 buf += sizeof(IoTMsgT);
100 bufSize -= sizeof(IoTMsgT);
101 msg->qos = message->qos;
102 msg->type = EN_IOT_MSG_RECV;
103 (void)memcpy_s(buf, bufSize, topic, topicLength);
104 buf[topicLength] = '\0';
105 msg->topic = buf;
106 buf += topicLength + 1;
107 bufSize -= (topicLength + 1);
108 (void)memcpy_s(buf, bufSize, message->payload, message->payloadlen);
109 buf[message->payloadlen] = '\0';
110 msg->payload = buf;
111 IOT_LOG_DEBUG("RCVMSG:QOS:%d TOPIC:%s PAYLOAD:%s\r\n", msg->qos, msg->topic, msg->payload);
112 if (IOT_SUCCESS != osMessageQueuePut(g_ioTAppCb.queueID, &msg, 0, CN_QUEUE_WAITTIMEOUT)) {
113 IOT_LOG_ERROR("Write queue failed\r\n");
114 hi_free(0, msg);
115 }
116 }
117
118 MQTTClient_freeMessage(&message);
119 MQTTClient_free(topic);
120 return 1;
121 }
122
123 // when the connect lost and this callback will be called
ConnLostCallBack(unsigned char * context,char * cause)124 static void ConnLostCallBack(unsigned char *context, char *cause)
125 {
126 IOT_LOG_DEBUG("Connection lost:caused by:%s\r\n", cause == NULL ? "Unknown" : cause);
127 return;
128 }
129
IoTMsgProcess(IoTMsgT * msg,MQTTClient_message pubmsg,MQTTClient client)130 void IoTMsgProcess(IoTMsgT *msg, MQTTClient_message pubmsg, MQTTClient client)
131 {
132 hi_u32 ret;
133 switch (msg->type) {
134 case EN_IOT_MSG_PUBLISH:
135 pubmsg.payload = (void *)msg->payload;
136 pubmsg.payloadlen = (int)strlen(msg->payload);
137 pubmsg.qos = msg->qos;
138 pubmsg.retained = 0;
139 ret = MQTTClient_publishMessage(client, msg->topic, &pubmsg, &g_ioTAppCb.tocken);
140 if (ret != MQTTCLIENT_SUCCESS) {
141 IOT_LOG_ERROR("MSGSEND:failed\r\n");
142 }
143 IOT_LOG_DEBUG("MSGSEND:SUCCESS\r\n");
144 g_ioTAppCb.tocken++;
145 break;
146 case EN_IOT_MSG_RECV:
147 if (g_ioTAppCb.msgCallBack != NULL) {
148 g_ioTAppCb.msgCallBack(msg->qos, msg->topic, msg->payload);
149 }
150 break;
151 default:
152 break;
153 }
154 return;
155 }
156
157 // use this function to deal all the coming message
ProcessQueueMsg(MQTTClient client)158 static int ProcessQueueMsg(MQTTClient client)
159 {
160 printf("ProcessQueueMsg\r\n");
161 hi_u32 ret;
162 hi_u32 msgSize;
163 IoTMsgT *msg;
164 hi_u32 timeout;
165 MQTTClient_message pubmsg = MQTTClient_message_initializer;
166
167 timeout = CN_QUEUE_WAITTIMEOUT;
168 do {
169 msg = NULL;
170 msgSize = sizeof(hi_pvoid);
171 ret = osMessageQueueGet(g_ioTAppCb.queueID, &msg, &msgSize, timeout);
172 if (ret != MQTTCLIENT_SUCCESS) {
173 return HI_ERR_FAILURE;
174 }
175 if (msg != NULL) {
176 IoTMsgProcess(msg, pubmsg, client);
177 hi_free(0, msg);
178 }
179 timeout = 0; // continuos to deal the message without wait here
180 } while (ret == HI_ERR_SUCCESS);
181 return;
182 }
183
MqttProcess(MQTTClient client,char * clientID,char * userPwd,MQTTClient_connectOptions connOpts,int subQos[])184 void MqttProcess(MQTTClient client, char *clientID, char *userPwd, MQTTClient_connectOptions connOpts, int subQos[])
185 {
186 int rc = MQTTClient_create(&client, CN_IOT_SERVER, clientID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
187 if (rc != MQTTCLIENT_SUCCESS) {
188 IOT_LOG_ERROR("Create Client failed,Please check the parameters--%d\r\n", rc);
189 if (userPwd != NULL) {
190 hi_free(0, userPwd);
191 return;
192 }
193 }
194
195 rc = MQTTClient_setCallbacks(client, NULL, ConnLostCallBack, MsgRcvCallBack, NULL);
196 if (rc != MQTTCLIENT_SUCCESS) {
197 IOT_LOG_ERROR("Set the callback failed,Please check the callback paras\r\n");
198 MQTTClient_destroy(&client);
199 return;
200 }
201
202 rc = MQTTClient_connect(client, &connOpts);
203 if (rc != MQTTCLIENT_SUCCESS) {
204 IOT_LOG_ERROR("Connect IoT server failed,please check the network and parameters:%d\r\n", rc);
205 MQTTClient_destroy(&client);
206 return;
207 }
208 IOT_LOG_DEBUG("Connect success\r\n");
209
210 rc = MQTTClient_subscribeMany(client, CN_TOPIC_SUBSCRIBE_NUM, (char* const*)g_defaultSubscribeTopic,
211 (int *)&subQos[0]);
212 if (rc != MQTTCLIENT_SUCCESS) {
213 IOT_LOG_ERROR("Subscribe the default topic failed,Please check the parameters\r\n");
214 MQTTClient_destroy(&client);
215 return;
216 }
217 IOT_LOG_DEBUG("Subscribe success\r\n");
218 while (MQTTClient_isConnected(client)) {
219 ProcessQueueMsg(client); // do the job here
220 int ret = ProcessQueueMsg(client); // do the job here
221 if (ret == HI_ERR_SUCCESS) {
222 return;
223 }
224 MQTTClient_yield(); // make the keepalive done
225 }
226 MQTTClient_disconnect(client, CONFIG_COMMAND_TIMEOUT);
227 return;
228 }
229
MainEntryProcess(hi_void)230 static hi_void MainEntryProcess(hi_void)
231 {
232 int subQos[CN_TOPIC_SUBSCRIBE_NUM] = {1};
233 char *clientID = NULL;
234 char *userID = NULL;
235 char *userPwd = NULL;
236
237 MQTTClient client = NULL;
238 MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
239 // make the clientID userID userPwd
240 clientID = hi_malloc(0, strlen(CN_CLIENTID_FMT) + strlen(CONFIG_DEVICE_ID) + strlen(CN_EVENT_TIME) + 1);
241 if (clientID == NULL) {
242 return;
243 }
244 if (snprintf_s(clientID, strlen(CN_CLIENTID_FMT) + strlen(CONFIG_DEVICE_ID) + strlen(CN_EVENT_TIME) +
245 CN_QUEUE_MSGNUM, strlen(CN_CLIENTID_FMT) + strlen(CONFIG_DEVICE_ID) + strlen(CN_EVENT_TIME) + 1,
246 CN_CLIENTID_FMT, CONFIG_DEVICE_ID, CN_EVENT_TIME) < 0) {
247 return;
248 }
249 userID = CONFIG_DEVICE_ID;
250 userPwd = hi_malloc(0, CN_HMAC_PWD_LEN);
251 if (userPwd == NULL) {
252 hi_free(0, clientID);
253 return;
254 }
255 (void)HmacGeneratePwd((const unsigned char *)CONFIG_DEVICE_PWD, strlen(CONFIG_DEVICE_PWD),
256 (const unsigned char *)CN_EVENT_TIME, strlen(CN_EVENT_TIME),
257 (unsigned char *)userPwd, CN_HMAC_PWD_LEN);
258
259 conn_opts.keepAliveInterval = CN_KEEPALIVE_TIME;
260 conn_opts.cleansession = CN_CLEANSESSION;
261 conn_opts.username = userID;
262 conn_opts.password = userPwd;
263 conn_opts.MQTTVersion = MQTTVERSION_3_1_1;
264 // wait for the wifi connect ok
265 IOT_LOG_DEBUG("IOTSERVER:%s\r\n", CN_IOT_SERVER);
266 MqttProcess(client, clientID, userPwd, conn_opts, subQos);
267 return;
268 }
269
MainEntry(hi_void * arg)270 static hi_void *MainEntry(hi_void *arg)
271 {
272 (void)arg;
273 while (g_ioTAppCb.stop == HI_FALSE) {
274 MainEntryProcess();
275 IOT_LOG_DEBUG("The connection lost and we will try another connect\r\n");
276 hi_sleep(1000*5); /* 延时5*1000ms */
277 }
278 return NULL;
279 }
280
IoTMain(void)281 int IoTMain(void)
282 {
283 hi_u32 ret;
284 hi_task_attr attr = {0};
285 g_ioTAppCb.queueID = osMessageQueueNew(CN_QUEUE_MSGNUM, CN_QUEUE_MSGSIZE, NULL);
286 attr.stack_size = CN_TASK_STACKSIZE;
287 attr.task_prio = CN_TASK_PRIOR;
288 attr.task_name = CN_TASK_NAME;
289 ret = hi_task_create(&g_ioTAppCb.iotTaskID, &attr, MainEntry, NULL);
290 if (ret != HI_ERR_SUCCESS) {
291 IOT_LOG_ERROR("Create the Main Entry Failed\r\n");
292 }
293
294 return 0;
295 }
296
IoTSetMsgCallback(FnMsgCallBack msgCallback)297 int IoTSetMsgCallback(FnMsgCallBack msgCallback)
298 {
299 g_ioTAppCb.msgCallBack = msgCallback;
300 return 0;
301 }
302
IotSendMsg(int qos,const char * topic,const char * payload)303 int IotSendMsg(int qos, const char *topic, const char *payload)
304 {
305 int rc = -1;
306 IoTMsgT *msg;
307 char *buf;
308 hi_u32 bufSize;
309
310 bufSize = strlen(topic) + 1 + strlen(payload) + 1 + sizeof(IoTMsgT);
311 buf = hi_malloc(0, bufSize);
312 if (buf != NULL) {
313 msg = (IoTMsgT *)buf;
314 buf += sizeof(IoTMsgT);
315 bufSize -= sizeof(IoTMsgT);
316 msg->qos = qos;
317 msg->type = EN_IOT_MSG_PUBLISH;
318 (void)memcpy_s(buf, bufSize, topic, strlen(topic));
319 buf[strlen(topic)] = '\0';
320 msg->topic = buf;
321 buf += strlen(topic) + 1;
322 bufSize -= (strlen(topic) + 1);
323 (void)memcpy_s(buf, bufSize, payload, strlen(payload));
324 buf[strlen(payload)] = '\0';
325 msg->payload = buf;
326 IOT_LOG_DEBUG("SNDMSG:QOS:%d TOPIC:%s PAYLOAD:%s\r\n", msg->qos, msg->topic, msg->payload);
327 if (osMessageQueuePut(g_ioTAppCb.queueID, &msg, 0, CN_QUEUE_WAITTIMEOUT) != IOT_SUCCESS) {
328 IOT_LOG_ERROR("Write queue failed\r\n");
329 hi_free(0, msg);
330 return;
331 } else {
332 rc = 0;
333 }
334 }
335 return rc;
336 }