1 /*******************************************************************************
2 * Copyright (c) 2009, 2021 IBM Corp. and Ian Craggs
3 *
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v2.0
6 * and Eclipse Distribution License v1.0 which accompany this distribution.
7 *
8 * The Eclipse Public License is available at
9 * https://www.eclipse.org/legal/epl-2.0/
10 * and the Eclipse Distribution License is available at
11 * http://www.eclipse.org/org/documents/edl-v10.php.
12 *
13 * Contributors:
14 * Ian Craggs - initial API and implementation and/or initial documentation
15 * Ian Craggs, Allan Stockdill-Mander - SSL updates
16 * Ian Craggs - MQTT 3.1.1 support
17 * Rong Xiang, Ian Craggs - C++ compatibility
18 * Ian Craggs - binary password and will payload
19 * Ian Craggs - MQTT 5.0 support
20 *******************************************************************************/
21
22 /**
23 * @file
24 * \brief functions to deal with reading and writing of MQTT packets from and to sockets
25 *
26 * Some other related functions are in the MQTTPacket module
27 */
28
29
30 #include "MQTTPacketOut.h"
31 #include "Log.h"
32 #include "StackTrace.h"
33
34 #include <string.h>
35 #include <stdlib.h>
36
37 #include "Heap.h"
38
39
40 /**
41 * Send an MQTT CONNECT packet down a socket for V5 or later
42 * @param client a structure from which to get all the required values
43 * @param MQTTVersion the MQTT version to connect with
44 * @param connectProperties MQTT V5 properties for the connect packet
45 * @param willProperties MQTT V5 properties for the will message, if any
46 * @return the completion code (e.g. TCPSOCKET_COMPLETE)
47 */
MQTTPacket_send_connect(Clients * client,int MQTTVersion,MQTTProperties * connectProperties,MQTTProperties * willProperties)48 int MQTTPacket_send_connect(Clients* client, int MQTTVersion,
49 MQTTProperties* connectProperties, MQTTProperties* willProperties)
50 {
51 char *buf, *ptr;
52 Connect packet;
53 int rc = SOCKET_ERROR, len;
54
55 FUNC_ENTRY;
56 packet.header.byte = 0;
57 packet.header.bits.type = CONNECT;
58
59 len = ((MQTTVersion == MQTTVERSION_3_1) ? 12 : 10) + (int)strlen(client->clientID)+2;
60 if (client->will)
61 len += (int)strlen(client->will->topic)+2 + client->will->payloadlen+2;
62 if (client->username)
63 len += (int)strlen(client->username)+2;
64 if (client->password)
65 len += client->passwordlen+2;
66 if (MQTTVersion >= MQTTVERSION_5)
67 {
68 len += MQTTProperties_len(connectProperties);
69 if (client->will)
70 len += MQTTProperties_len(willProperties);
71 }
72
73 ptr = buf = malloc(len);
74 if (ptr == NULL)
75 goto exit_nofree;
76 if (MQTTVersion == MQTTVERSION_3_1)
77 {
78 writeUTF(&ptr, "MQIsdp");
79 writeChar(&ptr, (char)MQTTVERSION_3_1);
80 }
81 else if (MQTTVersion == MQTTVERSION_3_1_1 || MQTTVersion == MQTTVERSION_5)
82 {
83 writeUTF(&ptr, "MQTT");
84 writeChar(&ptr, (char)MQTTVersion);
85 }
86 else
87 goto exit;
88
89 packet.flags.all = 0;
90 if (MQTTVersion >= MQTTVERSION_5)
91 packet.flags.bits.cleanstart = client->cleanstart;
92 else
93 packet.flags.bits.cleanstart = client->cleansession;
94 packet.flags.bits.will = (client->will) ? 1 : 0;
95 if (packet.flags.bits.will)
96 {
97 packet.flags.bits.willQoS = client->will->qos;
98 packet.flags.bits.willRetain = client->will->retained;
99 }
100 if (client->username)
101 packet.flags.bits.username = 1;
102 if (client->password)
103 packet.flags.bits.password = 1;
104
105 writeChar(&ptr, packet.flags.all);
106 writeInt(&ptr, client->keepAliveInterval);
107 if (MQTTVersion >= MQTTVERSION_5)
108 MQTTProperties_write(&ptr, connectProperties);
109 writeUTF(&ptr, client->clientID);
110 if (client->will)
111 {
112 if (MQTTVersion >= MQTTVERSION_5)
113 MQTTProperties_write(&ptr, willProperties);
114 writeUTF(&ptr, client->will->topic);
115 writeData(&ptr, client->will->payload, client->will->payloadlen);
116 }
117 if (client->username)
118 writeUTF(&ptr, client->username);
119 if (client->password)
120 writeData(&ptr, client->password, client->passwordlen);
121
122 rc = MQTTPacket_send(&client->net, packet.header, buf, len, 1, MQTTVersion);
123 Log(LOG_PROTOCOL, 0, NULL, client->net.socket, client->clientID,
124 MQTTVersion, client->cleansession, rc);
125 exit:
126 if (rc != TCPSOCKET_INTERRUPTED)
127 free(buf);
128 exit_nofree:
129 FUNC_EXIT_RC(rc);
130 return rc;
131 }
132
133
134 /**
135 * Function used in the new packets table to create connack packets.
136 * @param MQTTVersion MQTT 5 or less?
137 * @param aHeader the MQTT header byte
138 * @param data the rest of the packet
139 * @param datalen the length of the rest of the packet
140 * @return pointer to the packet structure
141 */
MQTTPacket_connack(int MQTTVersion,unsigned char aHeader,char * data,size_t datalen)142 void* MQTTPacket_connack(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
143 {
144 Connack* pack = NULL;
145 char* curdata = data;
146 char* enddata = &data[datalen];
147
148 FUNC_ENTRY;
149 if ((pack = malloc(sizeof(Connack))) == NULL)
150 goto exit;
151 pack->MQTTVersion = MQTTVersion;
152 pack->header.byte = aHeader;
153 if (datalen < 2) /* enough data for connect flags and reason code? */
154 {
155 free(pack);
156 pack = NULL;
157 goto exit;
158 }
159 pack->flags.all = readChar(&curdata); /* connect flags */
160 pack->rc = readChar(&curdata); /* reason code */
161 if (MQTTVersion >= MQTTVERSION_5 && datalen > 2)
162 {
163 MQTTProperties props = MQTTProperties_initializer;
164 pack->properties = props;
165 if (MQTTProperties_read(&pack->properties, &curdata, enddata) != 1)
166 {
167 if (pack->properties.array)
168 free(pack->properties.array);
169 if (pack)
170 free(pack);
171 pack = NULL; /* signal protocol error */
172 goto exit;
173 }
174 }
175 exit:
176 FUNC_EXIT;
177 return pack;
178 }
179
180
181 /**
182 * Free allocated storage for a connack packet.
183 * @param pack pointer to the connack packet structure
184 */
MQTTPacket_freeConnack(Connack * pack)185 void MQTTPacket_freeConnack(Connack* pack)
186 {
187 FUNC_ENTRY;
188 if (pack->MQTTVersion >= MQTTVERSION_5)
189 MQTTProperties_free(&pack->properties);
190 free(pack);
191 FUNC_EXIT;
192 }
193
194
195 /**
196 * Send an MQTT PINGREQ packet down a socket.
197 * @param socket the open socket to send the data to
198 * @param clientID the string client identifier, only used for tracing
199 * @return the completion code (e.g. TCPSOCKET_COMPLETE)
200 */
MQTTPacket_send_pingreq(networkHandles * net,const char * clientID)201 int MQTTPacket_send_pingreq(networkHandles* net, const char* clientID)
202 {
203 Header header;
204 int rc = 0;
205
206 FUNC_ENTRY;
207 header.byte = 0;
208 header.bits.type = PINGREQ;
209 rc = MQTTPacket_send(net, header, NULL, 0, 0, MQTTVERSION_3_1_1);
210 Log(LOG_PROTOCOL, 20, NULL, net->socket, clientID, rc);
211 FUNC_EXIT_RC(rc);
212 return rc;
213 }
214
215
216 /**
217 * Send an MQTT subscribe packet down a socket.
218 * @param topics list of topics
219 * @param qoss list of corresponding QoSs
220 * @param msgid the MQTT message id to use
221 * @param dup boolean - whether to set the MQTT DUP flag
222 * @param socket the open socket to send the data to
223 * @param clientID the string client identifier, only used for tracing
224 * @return the completion code (e.g. TCPSOCKET_COMPLETE)
225 */
MQTTPacket_send_subscribe(List * topics,List * qoss,MQTTSubscribe_options * opts,MQTTProperties * props,int msgid,int dup,Clients * client)226 int MQTTPacket_send_subscribe(List* topics, List* qoss, MQTTSubscribe_options* opts, MQTTProperties* props,
227 int msgid, int dup, Clients* client)
228 {
229 Header header;
230 char *data, *ptr;
231 int rc = -1;
232 ListElement *elem = NULL, *qosElem = NULL;
233 int datalen, i = 0;
234
235 FUNC_ENTRY;
236 header.bits.type = SUBSCRIBE;
237 header.bits.dup = dup;
238 header.bits.qos = 1;
239 header.bits.retain = 0;
240
241 datalen = 2 + topics->count * 3; /* utf length + char qos == 3 */
242 while (ListNextElement(topics, &elem))
243 datalen += (int)strlen((char*)(elem->content));
244 if (client->MQTTVersion >= MQTTVERSION_5)
245 datalen += MQTTProperties_len(props);
246
247 ptr = data = malloc(datalen);
248 if (ptr == NULL)
249 goto exit;
250 writeInt(&ptr, msgid);
251
252 if (client->MQTTVersion >= MQTTVERSION_5)
253 MQTTProperties_write(&ptr, props);
254
255 elem = NULL;
256 while (ListNextElement(topics, &elem))
257 {
258 char subopts = 0;
259
260 ListNextElement(qoss, &qosElem);
261 writeUTF(&ptr, (char*)(elem->content));
262 subopts = *(int*)(qosElem->content);
263 if (client->MQTTVersion >= MQTTVERSION_5 && opts != NULL)
264 {
265 subopts |= (opts[i].noLocal << 2); /* 1 bit */
266 subopts |= (opts[i].retainAsPublished << 3); /* 1 bit */
267 subopts |= (opts[i].retainHandling << 4); /* 2 bits */
268 }
269 writeChar(&ptr, subopts);
270 ++i;
271 }
272 rc = MQTTPacket_send(&client->net, header, data, datalen, 1, client->MQTTVersion);
273 Log(LOG_PROTOCOL, 22, NULL, client->net.socket, client->clientID, msgid, rc);
274 if (rc != TCPSOCKET_INTERRUPTED)
275 free(data);
276 exit:
277 FUNC_EXIT_RC(rc);
278 return rc;
279 }
280
281
282 /**
283 * Function used in the new packets table to create suback packets.
284 * @param MQTTVersion the version of MQTT
285 * @param aHeader the MQTT header byte
286 * @param data the rest of the packet
287 * @param datalen the length of the rest of the packet
288 * @return pointer to the packet structure
289 */
MQTTPacket_suback(int MQTTVersion,unsigned char aHeader,char * data,size_t datalen)290 void* MQTTPacket_suback(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
291 {
292 Suback* pack = NULL;
293 char* curdata = data;
294 char* enddata = &data[datalen];
295 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
296 int ret = -1;
297 #endif
298
299 FUNC_ENTRY;
300 if ((pack = malloc(sizeof(Suback))) == NULL)
301 goto exit;
302 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
303 ret = memset_s(pack, sizeof(Suback), 0, sizeof(Suback));
304 if (ret != 0)
305 {
306 if (pack != NULL)
307 free(pack);
308 pack = NULL;
309 goto exit;
310 }
311 #endif
312 pack->MQTTVersion = MQTTVersion;
313 pack->header.byte = aHeader;
314 if (enddata - curdata < 2) /* Is there enough data to read the msgid? */
315 {
316 free(pack);
317 pack = NULL;
318 goto exit;
319 }
320
321 pack->msgId = readInt(&curdata);
322 if (MQTTVersion >= MQTTVERSION_5)
323 {
324 MQTTProperties props = MQTTProperties_initializer;
325 pack->properties = props;
326 if (MQTTProperties_read(&pack->properties, &curdata, enddata) != 1)
327 {
328 if (pack->properties.array)
329 free(pack->properties.array);
330 if (pack)
331 free(pack);
332 pack = NULL; /* signal protocol error */
333 goto exit;
334 }
335 }
336 pack->qoss = ListInitialize();
337 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
338 if (pack->qoss == NULL)
339 {
340 if (pack->properties.array != NULL)
341 free(pack->properties.array);
342 if (pack != NULL)
343 free(pack);
344 pack = NULL; /* signal protocol error */
345 goto exit;
346
347 }
348 #endif
349 while ((size_t)(curdata - data) < datalen)
350 {
351 unsigned int* newint;
352 newint = malloc(sizeof(unsigned int));
353 if (newint == NULL)
354 {
355 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
356 if (pack->qoss != NULL)
357 ListFree(pack->qoss);
358 #endif
359 if (pack->properties.array)
360 free(pack->properties.array);
361 if (pack)
362 free(pack);
363 pack = NULL; /* signal protocol error */
364 goto exit;
365 }
366 *newint = (unsigned int)readChar(&curdata);
367 ListAppend(pack->qoss, newint, sizeof(unsigned int));
368 }
369 if (pack->qoss->count == 0)
370 {
371 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
372 ListFree(pack->qoss);
373 if (pack->properties.array)
374 free(pack->properties.array);
375 if (pack)
376 free(pack);
377 #else
378 if (pack->properties.array)
379 free(pack->properties.array);
380 ListFree(pack->qoss);
381 free(pack);
382 #endif
383 pack = NULL;
384 }
385 exit:
386 FUNC_EXIT;
387 return pack;
388 }
389
390
391 /**
392 * Send an MQTT unsubscribe packet down a socket.
393 * @param topics list of topics
394 * @param msgid the MQTT message id to use
395 * @param dup boolean - whether to set the MQTT DUP flag
396 * @param socket the open socket to send the data to
397 * @param clientID the string client identifier, only used for tracing
398 * @return the completion code (e.g. TCPSOCKET_COMPLETE)
399 */
MQTTPacket_send_unsubscribe(List * topics,MQTTProperties * props,int msgid,int dup,Clients * client)400 int MQTTPacket_send_unsubscribe(List* topics, MQTTProperties* props, int msgid, int dup, Clients* client)
401 {
402 Header header;
403 char *data, *ptr;
404 int rc = SOCKET_ERROR;
405 ListElement *elem = NULL;
406 int datalen;
407
408 FUNC_ENTRY;
409 header.bits.type = UNSUBSCRIBE;
410 header.bits.dup = dup;
411 header.bits.qos = 1;
412 header.bits.retain = 0;
413
414 datalen = 2 + topics->count * 2; /* utf length == 2 */
415 while (ListNextElement(topics, &elem))
416 datalen += (int)strlen((char*)(elem->content));
417 if (client->MQTTVersion >= MQTTVERSION_5)
418 datalen += MQTTProperties_len(props);
419 ptr = data = malloc(datalen);
420 if (ptr == NULL)
421 goto exit;
422
423 writeInt(&ptr, msgid);
424
425 if (client->MQTTVersion >= MQTTVERSION_5)
426 MQTTProperties_write(&ptr, props);
427
428 elem = NULL;
429 while (ListNextElement(topics, &elem))
430 writeUTF(&ptr, (char*)(elem->content));
431 rc = MQTTPacket_send(&client->net, header, data, datalen, 1, client->MQTTVersion);
432 Log(LOG_PROTOCOL, 25, NULL, client->net.socket, client->clientID, msgid, rc);
433 if (rc != TCPSOCKET_INTERRUPTED)
434 free(data);
435 exit:
436 FUNC_EXIT_RC(rc);
437 return rc;
438 }
439
440
441 /**
442 * Function used in the new packets table to create unsuback packets.
443 * @param MQTTVersion the version of MQTT
444 * @param aHeader the MQTT header byte
445 * @param data the rest of the packet
446 * @param datalen the length of the rest of the packet
447 * @return pointer to the packet structure
448 */
MQTTPacket_unsuback(int MQTTVersion,unsigned char aHeader,char * data,size_t datalen)449 void* MQTTPacket_unsuback(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
450 {
451 Unsuback* pack = NULL;
452 char* curdata = data;
453 char* enddata = &data[datalen];
454
455 FUNC_ENTRY;
456 if ((pack = malloc(sizeof(Unsuback))) == NULL)
457 goto exit;
458 pack->MQTTVersion = MQTTVersion;
459 pack->header.byte = aHeader;
460 if (enddata - curdata < 2) /* Is there enough data? */
461 {
462 free(pack);
463 pack = NULL;
464 goto exit;
465 }
466 pack->msgId = readInt(&curdata);
467 pack->reasonCodes = NULL;
468 if (MQTTVersion >= MQTTVERSION_5)
469 {
470 MQTTProperties props = MQTTProperties_initializer;
471 pack->properties = props;
472 if (MQTTProperties_read(&pack->properties, &curdata, enddata) != 1)
473 {
474 if (pack->properties.array)
475 free(pack->properties.array);
476 if (pack)
477 free(pack);
478 pack = NULL; /* signal protocol error */
479 goto exit;
480 }
481 pack->reasonCodes = ListInitialize();
482 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
483 if (pack->reasonCodes == NULL)
484 {
485 if (pack->properties.array != NULL)
486 free(pack->properties.array);
487 if (pack != NULL)
488 free(pack);
489 pack = NULL; /* signal protocol error */
490 goto exit;
491 }
492 #endif
493 while ((size_t)(curdata - data) < datalen)
494 {
495 enum MQTTReasonCodes* newrc;
496 newrc = malloc(sizeof(enum MQTTReasonCodes));
497 if (newrc == NULL)
498 {
499 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
500 if (pack->reasonCodes != NULL)
501 ListFree(pack->reasonCodes);
502 #endif
503 if (pack->properties.array)
504 free(pack->properties.array);
505 if (pack)
506 free(pack);
507 pack = NULL; /* signal protocol error */
508 goto exit;
509 }
510 *newrc = (enum MQTTReasonCodes)readChar(&curdata);
511 ListAppend(pack->reasonCodes, newrc, sizeof(enum MQTTReasonCodes));
512 }
513 if (pack->reasonCodes->count == 0)
514 {
515 ListFree(pack->reasonCodes);
516 if (pack->properties.array)
517 free(pack->properties.array);
518 if (pack)
519 free(pack);
520 pack = NULL;
521 }
522 }
523 exit:
524 FUNC_EXIT;
525 return pack;
526 }
527