1 /*
2 * Copyright (c) 2022 HiSilicon (Shanghai) Technologies CO., LIMITED.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 /**
17 * STEPS:
18 * 1, CONNECT TO THE IOT SERVER
19 * 2, SUBSCRIBE THE DEFAULT TOPIC
20 * 3, WAIT FOR ANY MESSAGE COMES OR ANY MESSAGE TO SEND
21 */
22
23 #include <securec.h>
24 #include <hi_task.h>
25 #include <hi_msg.h>
26 #include <hi_mem.h>
27 #include <string.h>
28 #include <stdbool.h>
29 #include "iot_config.h"
30 #include "iot_log.h"
31 #include "iot_hmac.h"
32 #include "ohos_init.h"
33 #include "cmsis_os2.h"
34 #include "iot_watchdog.h"
35 #include "MQTTClient.h"
36 #include "iot_errno.h"
37 #include "iot_main.h"
38
39 // < this is the configuration head
40 #define CN_IOT_SERVER "tcp://106.55.124.154:1883" // Tencent iot cloud address
41 #define CONFIG_COMMAND_TIMEOUT 10000L
42 #define CN_KEEPALIVE_TIME 50
43 #define CN_CLEANSESSION 1
44 #define CN_HMAC_PWD_LEN 65 // < SHA256 IS 32 BYTES AND END APPEND '\0'
45 #define CN_EVENT_TIME "1970000100"
46 #define CN_CLIENTID_FMT "%s_0_0_%s" // < This is the cient ID format, deviceID_0_0_TIME
47 #define CN_QUEUE_WAITTIMEOUT 1000
48 #define CN_QUEUE_MSGNUM 16
49 #define CN_QUEUE_MSGSIZE (sizeof(hi_pvoid))
50
51 #define CN_TASK_PRIOR 28
52 #define CN_TASK_STACKSIZE 0X2000
53 #define CN_TASK_NAME "IoTMain"
54
55 typedef enum {
56 EN_IOT_MSG_PUBLISH = 0,
57 EN_IOT_MSG_RECV,
58 }EnIotMsg;
59
60 typedef struct {
61 EnIotMsg type;
62 int qos;
63 const char *topic;
64 const char *payload;
65 }IoTMsg_t;
66
67 typedef struct {
68 bool stop;
69 unsigned int conLost;
70 unsigned int queueID;
71 unsigned int iotTaskID;
72 fnMsgCallBack msgCallBack;
73 MQTTClient_deliveryToken tocken;
74 }IotAppCb_t;
75 static IotAppCb_t gIoTAppCb;
76
77 static const char *gDefaultSubscribeTopic[] = {
78 /* Tencent iot cloud topic */
79 "19VUBHD786/mqtt/data",
80 "19VUBHD786/mqtt/control",
81 "19VUBHD786/mqtt/msg/len_on",
82 };
83
84 #define CN_TOPIC_SUBSCRIBE_NUM (sizeof(gDefaultSubscribeTopic) / sizeof(const char *))
MsgRcvCallBack(char * context,char * topic,int topicLen,MQTTClient_message * message)85 static int MsgRcvCallBack(char *context, char *topic, int topicLen, MQTTClient_message *message)
86 {
87 IoTMsg_t *msg;
88 char *buf;
89 unsigned int bufSize;
90 int topiLen = topicLen;
91
92 if (topiLen == 0) {
93 topiLen = strlen(topic);
94 }
95 bufSize = topiLen + 1 + message->payloadlen + 1 + sizeof(IoTMsg_t);
96 buf = hi_malloc(0, bufSize);
97 if (buf != NULL) {
98 msg = (IoTMsg_t *)buf;
99 buf += sizeof(IoTMsg_t);
100 bufSize -= sizeof(IoTMsg_t);
101 msg->qos = message->qos;
102 msg->type = EN_IOT_MSG_RECV;
103 (void)memcpy_s(buf, bufSize, topic, topiLen);
104 buf[topiLen] = '\0';
105 msg->topic = buf;
106 buf += topiLen + 1;
107 bufSize -= (topiLen + 1);
108 (void)memcpy_s(buf, bufSize, message->payload, message->payloadlen);
109 buf[message->payloadlen] = '\0';
110 msg->payload = buf;
111 IOT_LOG_DEBUG("RCVMSG:QOS:%d TOPIC:%s PAYLOAD:%s\r\n", msg->qos, msg->topic, msg->payload);
112 if (IOT_SUCCESS != osMessageQueuePut(gIoTAppCb.queueID, &msg, 0, CN_QUEUE_WAITTIMEOUT)) {
113 IOT_LOG_ERROR("Write queue failed\r\n");
114 hi_free(0, msg);
115 }
116 }
117
118 MQTTClient_freeMessage(&message);
119 MQTTClient_free(topic);
120 return 1;
121 }
122
123 // < when the connect lost and this callback will be called
ConnLostCallBack(char * context,char * cause)124 static void ConnLostCallBack(char *context, char *cause)
125 {
126 IOT_LOG_DEBUG("Connection lost:caused by:%s\r\n", cause == NULL? "Unknown" : cause);
127 return;
128 }
129
MqttProcessQueueMsg(MQTTClient client,IoTMsg_t * msg,MQTTClient_message pubMsg)130 static int MqttProcessQueueMsg(MQTTClient client, IoTMsg_t *msg, MQTTClient_message pubMsg)
131 {
132 int ret = 0;
133
134 switch (msg->type) {
135 case EN_IOT_MSG_PUBLISH:
136 pubMsg.payload = (void *)msg->payload;
137 pubMsg.payloadlen = (int)strlen(msg->payload);
138 pubMsg.qos = msg->qos;
139 pubMsg.retained = 0;
140 ret = MQTTClient_publishMessage(client, msg->topic, &pubMsg, &gIoTAppCb.tocken);
141 if (ret != MQTTCLIENT_SUCCESS) {
142 IOT_LOG_ERROR("MSGSEND:failed\r\n");
143 }
144 IOT_LOG_DEBUG("MSGSEND:SUCCESS\r\n");
145 gIoTAppCb.tocken++;
146 break;
147 case EN_IOT_MSG_RECV:
148 if (gIoTAppCb.msgCallBack != NULL) {
149 gIoTAppCb.msgCallBack(msg->qos, msg->topic, msg->payload);
150 }
151 break;
152 default:
153 break;
154 }
155 }
156
157 // <use this function to deal all the coming message
ProcessQueueMsg(MQTTClient client)158 static int ProcessQueueMsg(MQTTClient client)
159 {
160 unsigned int ret;
161 unsigned int msgSize;
162 IoTMsg_t *msg;
163 unsigned int timeout;
164 MQTTClient_message pubmsg = MQTTClient_message_initializer;
165
166 timeout = CN_QUEUE_WAITTIMEOUT;
167 do {
168 msg = NULL;
169 msgSize = sizeof(hi_pvoid);
170 ret = osMessageQueueGet(gIoTAppCb.queueID, &msg, &msgSize, timeout);
171 if (msg != NULL) {
172 IOT_LOG_DEBUG("QUEUEMSG:QOS:%d TOPIC:%s PAYLOAD:%s\r\n", msg->qos, msg->topic, msg->payload);
173 MqttProcessQueueMsg(client, msg, pubmsg);
174 hi_free(0, msg);
175 }
176 timeout = 0; // < continuos to deal the message without wait here
177 } while (ret == IOT_SUCCESS);
178
179 return 0;
180 }
181
MqttDestory(int ret,MQTTClient client)182 int MqttDestory(int ret, MQTTClient client)
183 {
184 if (ret != MQTTCLIENT_SUCCESS) {
185 MQTTClient_destroy(&client);
186 return -1;
187 }
188 return MQTTCLIENT_SUCCESS;
189 }
190
MainEntryProcess(void)191 static void MainEntryProcess(void)
192 {
193 int rc = 0, subQos[CN_TOPIC_SUBSCRIBE_NUM] = {1};
194
195 MQTTClient client = NULL;
196 MQTTClient_connectOptions connOpts = MQTTClient_connectOptions_initializer;
197 char *clientID = CN_CLIENTID;
198 char *userID = CONFIG_USER_ID;
199 char *userPwd = hi_malloc(0, CN_HMAC_PWD_LEN);
200 if (userPwd == NULL) {
201 hi_free(0, clientID);
202 return;
203 }
204 userPwd = CONFIG_USER_PWD;
205 connOpts.keepAliveInterval = CN_KEEPALIVE_TIME;
206 connOpts.cleansession = CN_CLEANSESSION;
207 connOpts.username = userID;
208 connOpts.password = userPwd;
209 connOpts.MQTTVersion = MQTTVERSION_3_1_1;
210 IOT_LOG_DEBUG("CLIENTID:%s USERID:%s USERPWD:%s, IOTSERVER:%s\r\n",
211 clientID, userID, userPwd==NULL?"NULL" : userPwd, CN_IOT_SERVER);
212 rc = MQTTClient_create(&client, CN_IOT_SERVER, clientID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
213 if (rc != MQTTCLIENT_SUCCESS) {
214 if (userPwd != NULL) {
215 hi_free(0, userPwd);
216 }
217 return;
218 }
219 rc = MQTTClient_setCallbacks(client, NULL, ConnLostCallBack, MsgRcvCallBack, NULL);
220 if (MqttDestory(rc, client) != MQTTCLIENT_SUCCESS) {
221 return;
222 }
223 rc = MQTTClient_connect(client, &connOpts);
224 if (MqttDestory(rc, client) != MQTTCLIENT_SUCCESS) {
225 return;
226 }
227 for (int i = 0; i < CN_TOPIC_SUBSCRIBE_NUM; i++) {
228 rc = MQTTClient_subscribeMany(client, CN_TOPIC_SUBSCRIBE_NUM,
229 (char *const *)gDefaultSubscribeTopic, (int *)&subQos[0]);
230 if (MqttDestory(rc, client) != MQTTCLIENT_SUCCESS) {
231 return;
232 }
233 }
234 IOT_LOG_DEBUG ("Connect success and Subscribe success\r\n");
235 while (MQTTClient_isConnected(client)) {
236 ProcessQueueMsg(client); // < do the job here
237 MQTTClient_yield(); // < make the keepalive done
238 }
239 MQTTClient_disconnect(client, CONFIG_COMMAND_TIMEOUT);
240 return;
241 }
242
243 /* MQTT processing entry */
MainEntry(char * arg)244 static hi_void *MainEntry(char *arg)
245 {
246 (void)arg;
247 while (gIoTAppCb.stop == false) {
248 MainEntryProcess();
249 IOT_LOG_DEBUG("The connection lost and we will try another connect\r\n");
250 hi_sleep(1000); /* 1000: cpu sleep 1000ms */
251 }
252 return NULL;
253 }
254
IoTMain(void)255 int IoTMain(void)
256 {
257 unsigned int ret = 0;
258 hi_task_attr attr = {0};
259
260 gIoTAppCb.queueID = osMessageQueueNew(CN_QUEUE_MSGNUM, CN_QUEUE_MSGSIZE, NULL);
261 if (ret != IOT_SUCCESS) {
262 IOT_LOG_ERROR("Create the msg queue Failed\r\n");
263 }
264
265 attr.stack_size = CN_TASK_STACKSIZE;
266 attr.task_prio = CN_TASK_PRIOR;
267 attr.task_name = CN_TASK_NAME;
268 ret = hi_task_create(&gIoTAppCb.iotTaskID, &attr, MainEntry, NULL);
269 if (ret != IOT_SUCCESS) {
270 IOT_LOG_ERROR("Create the Main Entry Failed\r\n");
271 }
272
273 return 0;
274 }
275
IoTSetMsgCallback(fnMsgCallBack msgCallback)276 int IoTSetMsgCallback(fnMsgCallBack msgCallback)
277 {
278 gIoTAppCb.msgCallBack = msgCallback;
279 return 0;
280 }
281
IotSendMsg(int qos,const char * topic,const char * payload)282 int IotSendMsg(int qos, const char *topic, const char *payload)
283 {
284 int rc = -1;
285 IoTMsg_t *msg;
286 char *buf;
287 unsigned int bufSize;
288
289 bufSize = strlen(topic) + 1 +strlen(payload) + 1 + sizeof(IoTMsg_t);
290 buf = hi_malloc(0, bufSize);
291 if (buf != NULL) {
292 msg = (IoTMsg_t *)buf;
293 buf += sizeof(IoTMsg_t);
294 bufSize -= sizeof(IoTMsg_t);
295 msg->qos = qos;
296 msg->type = EN_IOT_MSG_PUBLISH;
297 (void)memcpy_s(buf, bufSize, topic, strlen(topic));
298 buf[strlen(topic)] = '\0';
299 msg->topic = buf;
300 buf += strlen(topic) + 1;
301 bufSize -= (strlen(topic) + 1);
302 (void)memcpy_s(buf, bufSize, payload, strlen(payload));
303 buf[strlen(payload)] = '\0';
304 msg->payload = buf;
305 IOT_LOG_DEBUG("SNDMSG:QOS:%d TOPIC:%s PAYLOAD:%s\r\n", msg->qos, msg->topic, msg->payload);
306 if (IOT_SUCCESS != osMessageQueuePut(gIoTAppCb.queueID, &msg, 0, CN_QUEUE_WAITTIMEOUT)) {
307 IOT_LOG_ERROR("Write queue failed\r\n");
308 hi_free(0, msg);
309 } else {
310 rc = 0;
311 }
312 }
313 return rc;
314 }