1 /*******************************************************************************
2 * Copyright (c) 2009, 2022 IBM Corp. and Ian Craggs
3 *
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v2.0
6 * and Eclipse Distribution License v1.0 which accompany this distribution.
7 *
8 * The Eclipse Public License is available at
9 * https://www.eclipse.org/legal/epl-2.0/
10 * and the Eclipse Distribution License is available at
11 * http://www.eclipse.org/org/documents/edl-v10.php.
12 *
13 * Contributors:
14 * Ian Craggs - initial API and implementation and/or initial documentation
15 * Ian Craggs, Allan Stockdill-Mander - SSL updates
16 * Ian Craggs - MQTT 3.1.1 support
17 * Ian Craggs - fix for issue 453
18 * Ian Craggs - MQTT 5.0 support
19 *******************************************************************************/
20
21 /**
22 * @file
23 * \brief functions to deal with reading and writing of MQTT packets from and to sockets
24 *
25 * Some other related functions are in the MQTTPacketOut module
26 */
27
28 #include "MQTTPacket.h"
29 #include "Log.h"
30 #if !defined(NO_PERSISTENCE)
31 #include "MQTTPersistence.h"
32 #endif
33 #include "Messages.h"
34 #include "StackTrace.h"
35 #include "WebSocket.h"
36 #include "MQTTTime.h"
37
38 #include <stdlib.h>
39 #include <string.h>
40
41 #include "Heap.h"
42
43 #if !defined(min)
44 #define min(A,B) ( (A) < (B) ? (A):(B))
45 #endif
46
47 /**
48 * List of the predefined MQTT v3/v5 packet names.
49 */
50 static const char *packet_names[] =
51 {
52 "RESERVED", "CONNECT", "CONNACK", "PUBLISH", "PUBACK", "PUBREC", "PUBREL",
53 "PUBCOMP", "SUBSCRIBE", "SUBACK", "UNSUBSCRIBE", "UNSUBACK",
54 "PINGREQ", "PINGRESP", "DISCONNECT", "AUTH"
55 };
56
57 const char** MQTTClient_packet_names = packet_names;
58
59
60 /**
61 * Converts an MQTT packet code into its name
62 * @param ptype packet code
63 * @return the corresponding string, or "UNKNOWN"
64 */
MQTTPacket_name(int ptype)65 const char* MQTTPacket_name(int ptype)
66 {
67 return (ptype >= 0 && ptype <= AUTH) ? packet_names[ptype] : "UNKNOWN";
68 }
69
70 /**
71 * Array of functions to build packets, indexed according to packet code
72 */
73 pf new_packets[] =
74 {
75 NULL, /**< reserved */
76 NULL, /**< MQTTPacket_connect*/
77 MQTTPacket_connack, /**< CONNACK */
78 MQTTPacket_publish, /**< PUBLISH */
79 MQTTPacket_ack, /**< PUBACK */
80 MQTTPacket_ack, /**< PUBREC */
81 MQTTPacket_ack, /**< PUBREL */
82 MQTTPacket_ack, /**< PUBCOMP */
83 NULL, /**< MQTTPacket_subscribe*/
84 MQTTPacket_suback, /**< SUBACK */
85 NULL, /**< MQTTPacket_unsubscribe*/
86 MQTTPacket_unsuback, /**< UNSUBACK */
87 MQTTPacket_header_only, /**< PINGREQ */
88 MQTTPacket_header_only, /**< PINGRESP */
89 MQTTPacket_ack, /**< DISCONNECT */
90 MQTTPacket_ack /**< AUTH */
91 };
92
93
94 static char* readUTFlen(char** pptr, char* enddata, int* len);
95 static int MQTTPacket_send_ack(int MQTTVersion, int type, int msgid, int dup, networkHandles *net);
96
97 /**
98 * Reads one MQTT packet from a socket.
99 * @param socket a socket from which to read an MQTT packet
100 * @param error pointer to the error code which is completed if no packet is returned
101 * @return the packet structure or NULL if there was an error
102 */
MQTTPacket_Factory(int MQTTVersion,networkHandles * net,int * error)103 void* MQTTPacket_Factory(int MQTTVersion, networkHandles* net, int* error)
104 {
105 char* data = NULL;
106 static Header header;
107 size_t remaining_length;
108 int ptype;
109 void* pack = NULL;
110 size_t actual_len = 0;
111
112 FUNC_ENTRY;
113 *error = SOCKET_ERROR; /* indicate whether an error occurred, or not */
114
115 const size_t headerWsFramePos = WebSocket_framePos();
116
117 /* read the packet data from the socket */
118 *error = WebSocket_getch(net, &header.byte);
119 if (*error != TCPSOCKET_COMPLETE) /* first byte is the header byte */
120 goto exit; /* packet not read, *error indicates whether SOCKET_ERROR occurred */
121
122 /* now read the remaining length, so we know how much more to read */
123 if ((*error = MQTTPacket_decode(net, &remaining_length)) != TCPSOCKET_COMPLETE)
124 goto exit; /* packet not read, *error indicates whether SOCKET_ERROR occurred */
125
126 /* now read the rest, the variable header and payload */
127 data = WebSocket_getdata(net, remaining_length, &actual_len);
128 if (remaining_length && data == NULL)
129 {
130 *error = SOCKET_ERROR;
131 goto exit; /* socket error */
132 }
133
134 if (actual_len < remaining_length)
135 {
136 *error = TCPSOCKET_INTERRUPTED;
137 net->lastReceived = MQTTTime_now();
138 }
139 else
140 {
141 ptype = header.bits.type;
142 if (ptype < CONNECT || (MQTTVersion < MQTTVERSION_5 && ptype >= DISCONNECT) ||
143 (MQTTVersion >= MQTTVERSION_5 && ptype > AUTH) ||
144 new_packets[ptype] == NULL)
145 Log(TRACE_MIN, 2, NULL, ptype);
146 else
147 {
148 if ((pack = (*new_packets[ptype])(MQTTVersion, header.byte, data, remaining_length)) == NULL)
149 {
150 *error = SOCKET_ERROR; // was BAD_MQTT_PACKET;
151 Log(LOG_ERROR, -1, "Bad MQTT packet, type %d", ptype);
152 }
153 #if !defined(NO_PERSISTENCE)
154 else if (header.bits.type == PUBLISH && header.bits.qos == 2)
155 {
156 int buf0len;
157 char *buf = malloc(10);
158
159 if (buf == NULL)
160 {
161 *error = SOCKET_ERROR;
162 goto exit;
163 }
164 buf[0] = header.byte;
165 buf0len = 1 + MQTTPacket_encode(&buf[1], remaining_length);
166 *error = MQTTPersistence_putPacket(net->socket, buf, buf0len, 1,
167 &data, &remaining_length, header.bits.type, ((Publish *)pack)->msgId, 1, MQTTVersion);
168 free(buf);
169 }
170 #endif
171 }
172 }
173 if (pack)
174 net->lastReceived = MQTTTime_now();
175 exit:
176 if (*error == TCPSOCKET_INTERRUPTED)
177 WebSocket_framePosSeekTo(headerWsFramePos);
178
179 FUNC_EXIT_RC(*error);
180 return pack;
181 }
182
183
184 /**
185 * Sends an MQTT packet in one system call write
186 * @param socket the socket to which to write the data
187 * @param header the one-byte MQTT header
188 * @param buffer the rest of the buffer to write (not including remaining length)
189 * @param buflen the length of the data in buffer to be written
190 * @param MQTTVersion the version of MQTT being used
191 * @return the completion code (TCPSOCKET_COMPLETE etc)
192 */
MQTTPacket_send(networkHandles * net,Header header,char * buffer,size_t buflen,int freeData,int MQTTVersion)193 int MQTTPacket_send(networkHandles* net, Header header, char* buffer, size_t buflen, int freeData,
194 int MQTTVersion)
195 {
196 int rc = SOCKET_ERROR;
197 size_t buf0len;
198 char *buf;
199 PacketBuffers packetbufs;
200
201 FUNC_ENTRY;
202 buf0len = 1 + MQTTPacket_encode(NULL, buflen);
203 buf = malloc(buf0len);
204 if (buf == NULL)
205 {
206 rc = SOCKET_ERROR;
207 goto exit;
208 }
209 buf[0] = header.byte;
210 MQTTPacket_encode(&buf[1], buflen);
211
212 #if !defined(NO_PERSISTENCE)
213 if (header.bits.type == PUBREL)
214 {
215 char* ptraux = buffer;
216 int msgId = readInt(&ptraux);
217
218 rc = MQTTPersistence_putPacket(net->socket, buf, buf0len, 1, &buffer, &buflen,
219 header.bits.type, msgId, 0, MQTTVersion);
220 }
221 #endif
222 packetbufs.count = 1;
223 packetbufs.buffers = &buffer;
224 packetbufs.buflens = &buflen;
225 packetbufs.frees = &freeData;
226 memset(packetbufs.mask, '\0', sizeof(packetbufs.mask));
227 rc = WebSocket_putdatas(net, &buf, &buf0len, &packetbufs);
228
229 if (rc == TCPSOCKET_COMPLETE)
230 net->lastSent = MQTTTime_now();
231
232 if (rc != TCPSOCKET_INTERRUPTED)
233 free(buf);
234
235 exit:
236 FUNC_EXIT_RC(rc);
237 return rc;
238 }
239
240
241
242
243 /**
244 * Sends an MQTT packet from multiple buffers in one system call write
245 * @param socket the socket to which to write the data
246 * @param header the one-byte MQTT header
247 * @param count the number of buffers
248 * @param buffers the rest of the buffers to write (not including remaining length)
249 * @param buflens the lengths of the data in the array of buffers to be written
250 * @param the MQTT version being used
251 * @return the completion code (TCPSOCKET_COMPLETE etc)
252 */
MQTTPacket_sends(networkHandles * net,Header header,PacketBuffers * bufs,int MQTTVersion)253 int MQTTPacket_sends(networkHandles* net, Header header, PacketBuffers* bufs, int MQTTVersion)
254 {
255 int i, rc = SOCKET_ERROR;
256 size_t buf0len, total = 0;
257 char *buf;
258
259 FUNC_ENTRY;
260 for (i = 0; i < bufs->count; i++)
261 total += bufs->buflens[i];
262 buf0len = 1 + MQTTPacket_encode(NULL, total);
263 buf = malloc(buf0len);
264 if (buf == NULL)
265 {
266 rc = SOCKET_ERROR;
267 goto exit;
268 }
269 buf[0] = header.byte;
270 MQTTPacket_encode(&buf[1], total);
271
272 #if !defined(NO_PERSISTENCE)
273 if (header.bits.type == PUBLISH && header.bits.qos != 0)
274 { /* persist PUBLISH QoS1 and Qo2 */
275 char *ptraux = bufs->buffers[2];
276 int msgId = readInt(&ptraux);
277 rc = MQTTPersistence_putPacket(net->socket, buf, buf0len, bufs->count, bufs->buffers, bufs->buflens,
278 header.bits.type, msgId, 0, MQTTVersion);
279 }
280 #endif
281 rc = WebSocket_putdatas(net, &buf, &buf0len, bufs);
282
283 if (rc == TCPSOCKET_COMPLETE)
284 net->lastSent = MQTTTime_now();
285
286 if (rc != TCPSOCKET_INTERRUPTED)
287 free(buf);
288 exit:
289 FUNC_EXIT_RC(rc);
290 return rc;
291 }
292
293
294 /**
295 * Encodes the message length according to the MQTT algorithm
296 * @param buf the buffer into which the encoded data is written
297 * @param length the length to be encoded
298 * @return the number of bytes written to buffer
299 */
MQTTPacket_encode(char * buf,size_t length)300 int MQTTPacket_encode(char* buf, size_t length)
301 {
302 int rc = 0;
303
304 FUNC_ENTRY;
305 do
306 {
307 char d = length % 128;
308 length /= 128;
309 /* if there are more digits to encode, set the top bit of this digit */
310 if (length > 0)
311 d |= 0x80;
312 if (buf)
313 buf[rc++] = d;
314 else
315 rc++;
316 } while (length > 0);
317 FUNC_EXIT_RC(rc);
318 return rc;
319 }
320
321
322 /**
323 * Decodes the message length according to the MQTT algorithm
324 * @param socket the socket from which to read the bytes
325 * @param value the decoded length returned
326 * @return the number of bytes read from the socket
327 */
MQTTPacket_decode(networkHandles * net,size_t * value)328 int MQTTPacket_decode(networkHandles* net, size_t* value)
329 {
330 int rc = SOCKET_ERROR;
331 char c;
332 int multiplier = 1;
333 int len = 0;
334 #define MAX_NO_OF_REMAINING_LENGTH_BYTES 4
335
336 FUNC_ENTRY;
337 *value = 0;
338 do
339 {
340 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
341 {
342 rc = SOCKET_ERROR; /* bad data */
343 goto exit;
344 }
345 rc = WebSocket_getch(net, &c);
346 if (rc != TCPSOCKET_COMPLETE)
347 goto exit;
348 *value += (c & 127) * multiplier;
349 multiplier *= 128;
350 } while ((c & 128) != 0);
351 exit:
352 FUNC_EXIT_RC(rc);
353 return rc;
354 }
355
356
357 /**
358 * Calculates an integer from two bytes read from the input buffer
359 * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
360 * @return the integer value calculated
361 */
readInt(char ** pptr)362 int readInt(char** pptr)
363 {
364 char* ptr = *pptr;
365 int len = 256*((unsigned char)(*ptr)) + (unsigned char)(*(ptr+1));
366 *pptr += 2;
367 return len;
368 }
369
370
371 /**
372 * Reads a "UTF" string from the input buffer. UTF as in the MQTT v3 spec which really means
373 * a length delimited string. So it reads the two byte length then the data according to
374 * that length. The end of the buffer is provided too, so we can prevent buffer overruns caused
375 * by an incorrect length.
376 * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
377 * @param enddata pointer to the end of the buffer not to be read beyond
378 * @param len returns the calculcated value of the length bytes read
379 * @return an allocated C string holding the characters read, or NULL if the length read would
380 * have caused an overrun.
381 *
382 */
readUTFlen(char ** pptr,char * enddata,int * len)383 static char* readUTFlen(char** pptr, char* enddata, int* len)
384 {
385 char* string = NULL;
386
387 FUNC_ENTRY;
388 if (enddata - (*pptr) > 1) /* enough length to read the integer? */
389 {
390 *len = readInt(pptr);
391 if (&(*pptr)[*len] <= enddata)
392 {
393 if ((string = malloc(*len+1)) == NULL)
394 goto exit;
395 memcpy(string, *pptr, *len);
396 string[*len] = '\0';
397 *pptr += *len;
398 }
399 }
400 exit:
401 FUNC_EXIT;
402 return string;
403 }
404
405
406 /**
407 * Reads a "UTF" string from the input buffer. UTF as in the MQTT v3 spec which really means
408 * a length delimited string. So it reads the two byte length then the data according to
409 * that length. The end of the buffer is provided too, so we can prevent buffer overruns caused
410 * by an incorrect length.
411 * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
412 * @param enddata pointer to the end of the buffer not to be read beyond
413 * @return an allocated C string holding the characters read, or NULL if the length read would
414 * have caused an overrun.
415 */
readUTF(char ** pptr,char * enddata)416 char* readUTF(char** pptr, char* enddata)
417 {
418 int len;
419 return readUTFlen(pptr, enddata, &len);
420 }
421
422
423 /**
424 * Reads one character from the input buffer.
425 * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
426 * @return the character read
427 */
readChar(char ** pptr)428 unsigned char readChar(char** pptr)
429 {
430 unsigned char c = **pptr;
431 (*pptr)++;
432 return c;
433 }
434
435
436 /**
437 * Writes one character to an output buffer.
438 * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
439 * @param c the character to write
440 */
writeChar(char ** pptr,char c)441 void writeChar(char** pptr, char c)
442 {
443 **pptr = c;
444 (*pptr)++;
445 }
446
447
448 /**
449 * Writes an integer as 2 bytes to an output buffer.
450 * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
451 * @param anInt the integer to write
452 */
writeInt(char ** pptr,int anInt)453 void writeInt(char** pptr, int anInt)
454 {
455 **pptr = (char)(anInt / 256);
456 (*pptr)++;
457 **pptr = (char)(anInt % 256);
458 (*pptr)++;
459 }
460
461
462 /**
463 * Writes a "UTF" string to an output buffer. Converts C string to length-delimited.
464 * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
465 * @param string the C string to write
466 */
writeUTF(char ** pptr,const char * string)467 void writeUTF(char** pptr, const char* string)
468 {
469 size_t len = strlen(string);
470 writeInt(pptr, (int)len);
471 memcpy(*pptr, string, len);
472 *pptr += len;
473 }
474
475
476 /**
477 * Writes length delimited data to an output buffer
478 * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
479 * @param data the data to write
480 * @param datalen the length of the data to write
481 */
writeData(char ** pptr,const void * data,int datalen)482 void writeData(char** pptr, const void* data, int datalen)
483 {
484 writeInt(pptr, datalen);
485 memcpy(*pptr, data, datalen);
486 *pptr += datalen;
487 }
488
489
490 /**
491 * Function used in the new packets table to create packets which have only a header.
492 * @param MQTTVersion the version of MQTT
493 * @param aHeader the MQTT header byte
494 * @param data the rest of the packet
495 * @param datalen the length of the rest of the packet
496 * @return pointer to the packet structure
497 */
MQTTPacket_header_only(int MQTTVersion,unsigned char aHeader,char * data,size_t datalen)498 void* MQTTPacket_header_only(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
499 {
500 static unsigned char header = 0;
501 header = aHeader;
502 return &header;
503 }
504
505
506 /**
507 * Send an MQTT disconnect packet down a socket.
508 * @param socket the open socket to send the data to
509 * @return the completion code (e.g. TCPSOCKET_COMPLETE)
510 */
MQTTPacket_send_disconnect(Clients * client,enum MQTTReasonCodes reason,MQTTProperties * props)511 int MQTTPacket_send_disconnect(Clients* client, enum MQTTReasonCodes reason, MQTTProperties* props)
512 {
513 Header header;
514 int rc = 0;
515
516 FUNC_ENTRY;
517 header.byte = 0;
518 header.bits.type = DISCONNECT;
519
520 if (client->MQTTVersion >= 5 && (props || reason != MQTTREASONCODE_SUCCESS))
521 {
522 size_t buflen = 1 + ((props == NULL) ? 0 : MQTTProperties_len(props));
523 char *buf = NULL;
524 char *ptr = NULL;
525
526 if ((buf = malloc(buflen)) == NULL)
527 {
528 rc = SOCKET_ERROR;
529 goto exit;
530 }
531 ptr = buf;
532 writeChar(&ptr, reason);
533 if (props)
534 MQTTProperties_write(&ptr, props);
535 if ((rc = MQTTPacket_send(&client->net, header, buf, buflen, 1,
536 client->MQTTVersion)) != TCPSOCKET_INTERRUPTED)
537 free(buf);
538 }
539 else
540 rc = MQTTPacket_send(&client->net, header, NULL, 0, 0, client->MQTTVersion);
541 exit:
542 Log(LOG_PROTOCOL, 28, NULL, client->net.socket, client->clientID, rc);
543 FUNC_EXIT_RC(rc);
544 return rc;
545 }
546
547
548 /**
549 * Function used in the new packets table to create publish packets.
550 * @param MQTTVersion
551 * @param aHeader the MQTT header byte
552 * @param data the rest of the packet
553 * @param datalen the length of the rest of the packet
554 * @return pointer to the packet structure
555 */
MQTTPacket_publish(int MQTTVersion,unsigned char aHeader,char * data,size_t datalen)556 void* MQTTPacket_publish(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
557 {
558 Publish* pack = NULL;
559 char* curdata = data;
560 char* enddata = &data[datalen];
561
562 FUNC_ENTRY;
563 if ((pack = malloc(sizeof(Publish))) == NULL)
564 goto exit;
565 memset(pack, '\0', sizeof(Publish));
566 pack->MQTTVersion = MQTTVersion;
567 pack->header.byte = aHeader;
568 if ((pack->topic = readUTFlen(&curdata, enddata, &pack->topiclen)) == NULL) /* Topic name on which to publish */
569 {
570 free(pack);
571 pack = NULL;
572 goto exit;
573 }
574 if (pack->header.bits.qos > 0) /* Msgid only exists for QoS 1 or 2 */
575 {
576 if (enddata - curdata < 2) /* Is there enough data for the msgid? */
577 {
578 free(pack);
579 pack = NULL;
580 goto exit;
581 }
582 pack->msgId = readInt(&curdata);
583 }
584 else
585 pack->msgId = 0;
586 if (MQTTVersion >= MQTTVERSION_5)
587 {
588 MQTTProperties props = MQTTProperties_initializer;
589 pack->properties = props;
590 if (MQTTProperties_read(&pack->properties, &curdata, enddata) != 1)
591 {
592 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
593 if (pack->topic != NULL)
594 free(pack->topic);
595 #endif
596 if (pack->properties.array)
597 free(pack->properties.array);
598 if (pack)
599 free(pack);
600 pack = NULL; /* signal protocol error */
601 goto exit;
602 }
603 }
604 pack->payload = curdata;
605 pack->payloadlen = (int)(datalen-(curdata-data));
606 exit:
607 FUNC_EXIT;
608 return pack;
609 }
610
611
612 /**
613 * Free allocated storage for a publish packet.
614 * @param pack pointer to the publish packet structure
615 */
MQTTPacket_freePublish(Publish * pack)616 void MQTTPacket_freePublish(Publish* pack)
617 {
618 FUNC_ENTRY;
619 if (pack->topic != NULL)
620 free(pack->topic);
621 if (pack->MQTTVersion >= MQTTVERSION_5)
622 MQTTProperties_free(&pack->properties);
623 free(pack);
624 FUNC_EXIT;
625 }
626
627
628 /**
629 * Free allocated storage for an ack packet.
630 * @param pack pointer to the publish packet structure
631 */
MQTTPacket_freeAck(Ack * pack)632 void MQTTPacket_freeAck(Ack* pack)
633 {
634 FUNC_ENTRY;
635 if (pack->MQTTVersion >= MQTTVERSION_5)
636 MQTTProperties_free(&pack->properties);
637 free(pack);
638 FUNC_EXIT;
639 }
640
641
642 /**
643 * Send an MQTT acknowledgement packet down a socket.
644 * @param MQTTVersion the version of MQTT being used
645 * @param type the MQTT packet type e.g. SUBACK
646 * @param msgid the MQTT message id to use
647 * @param dup boolean - whether to set the MQTT DUP flag
648 * @param net the network handle to send the data to
649 * @return the completion code (e.g. TCPSOCKET_COMPLETE)
650 */
MQTTPacket_send_ack(int MQTTVersion,int type,int msgid,int dup,networkHandles * net)651 static int MQTTPacket_send_ack(int MQTTVersion, int type, int msgid, int dup, networkHandles *net)
652 {
653 Header header;
654 int rc = SOCKET_ERROR;
655 char *buf = NULL;
656 char *ptr = NULL;
657
658 FUNC_ENTRY;
659 if ((ptr = buf = malloc(2)) == NULL)
660 goto exit;
661 header.byte = 0;
662 header.bits.type = type;
663 header.bits.dup = dup;
664 if (type == PUBREL)
665 header.bits.qos = 1;
666 writeInt(&ptr, msgid);
667 if ((rc = MQTTPacket_send(net, header, buf, 2, 1, MQTTVersion)) != TCPSOCKET_INTERRUPTED)
668 free(buf);
669 exit:
670 FUNC_EXIT_RC(rc);
671 return rc;
672 }
673
674
675 /**
676 * Send an MQTT PUBACK packet down a socket.
677 * @param MQTTVersion the version of MQTT being used
678 * @param msgid the MQTT message id to use
679 * @param socket the open socket to send the data to
680 * @param clientID the string client identifier, only used for tracing
681 * @return the completion code (e.g. TCPSOCKET_COMPLETE)
682 */
MQTTPacket_send_puback(int MQTTVersion,int msgid,networkHandles * net,const char * clientID)683 int MQTTPacket_send_puback(int MQTTVersion, int msgid, networkHandles* net, const char* clientID)
684 {
685 int rc = 0;
686
687 FUNC_ENTRY;
688 rc = MQTTPacket_send_ack(MQTTVersion, PUBACK, msgid, 0, net);
689 Log(LOG_PROTOCOL, 12, NULL, net->socket, clientID, msgid, rc);
690 FUNC_EXIT_RC(rc);
691 return rc;
692 }
693
694
695 /**
696 * Free allocated storage for a suback packet.
697 * @param pack pointer to the suback packet structure
698 */
MQTTPacket_freeSuback(Suback * pack)699 void MQTTPacket_freeSuback(Suback* pack)
700 {
701 FUNC_ENTRY;
702 if (pack->MQTTVersion >= MQTTVERSION_5)
703 MQTTProperties_free(&pack->properties);
704 if (pack->qoss != NULL)
705 ListFree(pack->qoss);
706 free(pack);
707 FUNC_EXIT;
708 }
709
710
711 /**
712 * Free allocated storage for a suback packet.
713 * @param pack pointer to the suback packet structure
714 */
MQTTPacket_freeUnsuback(Unsuback * pack)715 void MQTTPacket_freeUnsuback(Unsuback* pack)
716 {
717 FUNC_ENTRY;
718 if (pack->MQTTVersion >= MQTTVERSION_5)
719 {
720 MQTTProperties_free(&pack->properties);
721 if (pack->reasonCodes != NULL)
722 ListFree(pack->reasonCodes);
723 }
724 free(pack);
725 FUNC_EXIT;
726 }
727
728
729 /**
730 * Send an MQTT PUBREC packet down a socket.
731 * @param MQTTVersion the version of MQTT being used
732 * @param msgid the MQTT message id to use
733 * @param socket the open socket to send the data to
734 * @param clientID the string client identifier, only used for tracing
735 * @return the completion code (e.g. TCPSOCKET_COMPLETE)
736 */
MQTTPacket_send_pubrec(int MQTTVersion,int msgid,networkHandles * net,const char * clientID)737 int MQTTPacket_send_pubrec(int MQTTVersion, int msgid, networkHandles* net, const char* clientID)
738 {
739 int rc = 0;
740
741 FUNC_ENTRY;
742 rc = MQTTPacket_send_ack(MQTTVersion, PUBREC, msgid, 0, net);
743 Log(LOG_PROTOCOL, 13, NULL, net->socket, clientID, msgid, rc);
744 FUNC_EXIT_RC(rc);
745 return rc;
746 }
747
748
749 /**
750 * Send an MQTT PUBREL packet down a socket.
751 * @param MQTTVersion the version of MQTT being used
752 * @param msgid the MQTT message id to use
753 * @param dup boolean - whether to set the MQTT DUP flag
754 * @param socket the open socket to send the data to
755 * @param clientID the string client identifier, only used for tracing
756 * @return the completion code (e.g. TCPSOCKET_COMPLETE)
757 */
MQTTPacket_send_pubrel(int MQTTVersion,int msgid,int dup,networkHandles * net,const char * clientID)758 int MQTTPacket_send_pubrel(int MQTTVersion, int msgid, int dup, networkHandles* net, const char* clientID)
759 {
760 int rc = 0;
761
762 FUNC_ENTRY;
763 rc = MQTTPacket_send_ack(MQTTVersion, PUBREL, msgid, dup, net);
764 Log(LOG_PROTOCOL, 16, NULL, net->socket, clientID, msgid, rc);
765 FUNC_EXIT_RC(rc);
766 return rc;
767 }
768
769
770 /**
771 * Send an MQTT PUBCOMP packet down a socket.
772 * @param MQTTVersion the version of MQTT being used
773 * @param msgid the MQTT message id to use
774 * @param socket the open socket to send the data to
775 * @param clientID the string client identifier, only used for tracing
776 * @return the completion code (e.g. TCPSOCKET_COMPLETE)
777 */
MQTTPacket_send_pubcomp(int MQTTVersion,int msgid,networkHandles * net,const char * clientID)778 int MQTTPacket_send_pubcomp(int MQTTVersion, int msgid, networkHandles* net, const char* clientID)
779 {
780 int rc = 0;
781
782 FUNC_ENTRY;
783 rc = MQTTPacket_send_ack(MQTTVersion, PUBCOMP, msgid, 0, net);
784 Log(LOG_PROTOCOL, 18, NULL, net->socket, clientID, msgid, rc);
785 FUNC_EXIT_RC(rc);
786 return rc;
787 }
788
789
790 /**
791 * Function used in the new packets table to create acknowledgement packets.
792 * @param MQTTVersion the version of MQTT being used
793 * @param aHeader the MQTT header byte
794 * @param data the rest of the packet
795 * @param datalen the length of the rest of the packet
796 * @return pointer to the packet structure
797 */
MQTTPacket_ack(int MQTTVersion,unsigned char aHeader,char * data,size_t datalen)798 void* MQTTPacket_ack(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
799 {
800 Ack* pack = NULL;
801 char* curdata = data;
802 char* enddata = &data[datalen];
803
804 FUNC_ENTRY;
805 if ((pack = malloc(sizeof(Ack))) == NULL)
806 goto exit;
807 pack->MQTTVersion = MQTTVersion;
808 pack->header.byte = aHeader;
809 if (pack->header.bits.type != DISCONNECT)
810 {
811 if (enddata - curdata < 2) /* Is there enough data for the msgid? */
812 {
813 free(pack);
814 pack = NULL;
815 goto exit;
816 }
817 pack->msgId = readInt(&curdata);
818 }
819 if (MQTTVersion >= MQTTVERSION_5)
820 {
821 MQTTProperties props = MQTTProperties_initializer;
822
823 pack->rc = MQTTREASONCODE_SUCCESS;
824 pack->properties = props;
825
826 /* disconnect has no msgid */
827 if (datalen > 2 || (pack->header.bits.type == DISCONNECT && datalen > 0))
828 pack->rc = readChar(&curdata); /* reason code */
829
830 if (datalen > 3 || (pack->header.bits.type == DISCONNECT && datalen > 1))
831 {
832 if (MQTTProperties_read(&pack->properties, &curdata, enddata) != 1)
833 {
834 if (pack->properties.array)
835 free(pack->properties.array);
836 if (pack)
837 free(pack);
838 pack = NULL; /* signal protocol error */
839 goto exit;
840 }
841 }
842 }
843 exit:
844 FUNC_EXIT;
845 return pack;
846 }
847
848
849 /**
850 * Send an MQTT PUBLISH packet down a socket.
851 * @param pack a structure from which to get some values to use, e.g topic, payload
852 * @param dup boolean - whether to set the MQTT DUP flag
853 * @param qos the value to use for the MQTT QoS setting
854 * @param retained boolean - whether to set the MQTT retained flag
855 * @param socket the open socket to send the data to
856 * @param clientID the string client identifier, only used for tracing
857 * @return the completion code (e.g. TCPSOCKET_COMPLETE)
858 */
MQTTPacket_send_publish(Publish * pack,int dup,int qos,int retained,networkHandles * net,const char * clientID)859 int MQTTPacket_send_publish(Publish* pack, int dup, int qos, int retained, networkHandles* net, const char* clientID)
860 {
861 Header header;
862 char *topiclen;
863 int rc = SOCKET_ERROR;
864
865 FUNC_ENTRY;
866 topiclen = malloc(2);
867 if (topiclen == NULL)
868 goto exit;
869
870 header.bits.type = PUBLISH;
871 header.bits.dup = dup;
872 header.bits.qos = qos;
873 header.bits.retain = retained;
874 if (qos > 0 || pack->MQTTVersion >= 5)
875 {
876 int buflen = ((qos > 0) ? 2 : 0) + ((pack->MQTTVersion >= 5) ? MQTTProperties_len(&pack->properties) : 0);
877 char *ptr = NULL;
878 char* bufs[4] = {topiclen, pack->topic, NULL, pack->payload};
879 size_t lens[4] = {2, strlen(pack->topic), buflen, pack->payloadlen};
880 int frees[4] = {1, 0, 1, 0};
881 PacketBuffers packetbufs = {4, bufs, lens, frees, {pack->mask[0], pack->mask[1], pack->mask[2], pack->mask[3]}};
882
883 bufs[2] = ptr = malloc(buflen);
884 if (ptr == NULL)
885 goto exit_free;
886 if (qos > 0)
887 writeInt(&ptr, pack->msgId);
888 if (pack->MQTTVersion >= 5)
889 MQTTProperties_write(&ptr, &pack->properties);
890
891 ptr = topiclen;
892 writeInt(&ptr, (int)lens[1]);
893 rc = MQTTPacket_sends(net, header, &packetbufs, pack->MQTTVersion);
894 if (rc != TCPSOCKET_INTERRUPTED)
895 free(bufs[2]);
896 memcpy(pack->mask, packetbufs.mask, sizeof(pack->mask));
897 }
898 else
899 {
900 char* ptr = topiclen;
901 char* bufs[3] = {topiclen, pack->topic, pack->payload};
902 size_t lens[3] = {2, strlen(pack->topic), pack->payloadlen};
903 int frees[3] = {1, 0, 0};
904 PacketBuffers packetbufs = {3, bufs, lens, frees, {pack->mask[0], pack->mask[1], pack->mask[2], pack->mask[3]}};
905
906 writeInt(&ptr, (int)lens[1]);
907 rc = MQTTPacket_sends(net, header, &packetbufs, pack->MQTTVersion);
908 memcpy(pack->mask, packetbufs.mask, sizeof(pack->mask));
909 }
910 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
911 if (qos == 0)
912 Log(LOG_PROTOCOL, 27, NULL, net->socket, clientID, rc);
913 else
914 Log(LOG_PROTOCOL, 10, NULL, net->socket, clientID, pack->msgId, rc);
915 #else
916 if (qos == 0)
917 Log(LOG_PROTOCOL, 27, NULL, net->socket, clientID, retained, rc, pack->payloadlen,
918 min(20, pack->payloadlen), pack->payload);
919 else
920 Log(LOG_PROTOCOL, 10, NULL, net->socket, clientID, pack->msgId, qos, retained, rc, pack->payloadlen,
921 min(20, pack->payloadlen), pack->payload);
922 #endif
923 exit_free:
924 if (rc != TCPSOCKET_INTERRUPTED)
925 free(topiclen);
926 exit:
927 FUNC_EXIT_RC(rc);
928 return rc;
929 }
930
931
932 /**
933 * Free allocated storage for a various packet tyoes
934 * @param pack pointer to the suback packet structure
935 */
MQTTPacket_free_packet(MQTTPacket * pack)936 void MQTTPacket_free_packet(MQTTPacket* pack)
937 {
938 FUNC_ENTRY;
939 if (pack->header.bits.type == PUBLISH)
940 MQTTPacket_freePublish((Publish*)pack);
941 /*else if (pack->header.type == SUBSCRIBE)
942 MQTTPacket_freeSubscribe((Subscribe*)pack, 1);
943 else if (pack->header.type == UNSUBSCRIBE)
944 MQTTPacket_freeUnsubscribe((Unsubscribe*)pack);*/
945 else
946 free(pack);
947 FUNC_EXIT;
948 }
949
950
951 /**
952 * Writes an integer as 4 bytes to an output buffer.
953 * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
954 * @param anInt the integer to write
955 */
writeInt4(char ** pptr,int anInt)956 void writeInt4(char** pptr, int anInt)
957 {
958 **pptr = (char)(anInt / 16777216);
959 (*pptr)++;
960 anInt %= 16777216;
961 **pptr = (char)(anInt / 65536);
962 (*pptr)++;
963 anInt %= 65536;
964 **pptr = (char)(anInt / 256);
965 (*pptr)++;
966 **pptr = (char)(anInt % 256);
967 (*pptr)++;
968 }
969
970
971 /**
972 * Calculates an integer from two bytes read from the input buffer
973 * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
974 * @return the integer value calculated
975 */
readInt4(char ** pptr)976 int readInt4(char** pptr)
977 {
978 unsigned char* ptr = (unsigned char*)*pptr;
979 int value = 16777216*(*ptr) + 65536*(*(ptr+1)) + 256*(*(ptr+2)) + (*(ptr+3));
980 *pptr += 4;
981 return value;
982 }
983
984
writeMQTTLenString(char ** pptr,MQTTLenString lenstring)985 void writeMQTTLenString(char** pptr, MQTTLenString lenstring)
986 {
987 writeInt(pptr, lenstring.len);
988 memcpy(*pptr, lenstring.data, lenstring.len);
989 *pptr += lenstring.len;
990 }
991
992
MQTTLenStringRead(MQTTLenString * lenstring,char ** pptr,char * enddata)993 int MQTTLenStringRead(MQTTLenString* lenstring, char** pptr, char* enddata)
994 {
995 int len = -1;
996
997 /* the first two bytes are the length of the string */
998 if (enddata - (*pptr) > 1) /* enough length to read the integer? */
999 {
1000 lenstring->len = readInt(pptr); /* increments pptr to point past length */
1001 if (&(*pptr)[lenstring->len] <= enddata)
1002 {
1003 lenstring->data = (char*)*pptr;
1004 *pptr += lenstring->len;
1005 len = 2 + lenstring->len;
1006 }
1007 }
1008 return len;
1009 }
1010
1011 /*
1012 if (prop->value.integer4 >= 0 && prop->value.integer4 <= 127)
1013 len = 1;
1014 else if (prop->value.integer4 >= 128 && prop->value.integer4 <= 16383)
1015 len = 2;
1016 else if (prop->value.integer4 >= 16384 && prop->value.integer4 < 2097151)
1017 len = 3;
1018 else if (prop->value.integer4 >= 2097152 && prop->value.integer4 < 268435455)
1019 len = 4;
1020 */
MQTTPacket_VBIlen(int rem_len)1021 int MQTTPacket_VBIlen(int rem_len)
1022 {
1023 int rc = 0;
1024
1025 if (rem_len < 128)
1026 rc = 1;
1027 else if (rem_len < 16384)
1028 rc = 2;
1029 else if (rem_len < 2097152)
1030 rc = 3;
1031 else
1032 rc = 4;
1033 return rc;
1034 }
1035
1036
1037 /**
1038 * Decodes the message length according to the MQTT algorithm
1039 * @param getcharfn pointer to function to read the next character from the data source
1040 * @param value the decoded length returned
1041 * @return the number of bytes read from the socket
1042 */
MQTTPacket_VBIdecode(int (* getcharfn)(char *,int),unsigned int * value)1043 int MQTTPacket_VBIdecode(int (*getcharfn)(char*, int), unsigned int* value)
1044 {
1045 char c;
1046 int multiplier = 1;
1047 int len = 0;
1048 #define MAX_NO_OF_REMAINING_LENGTH_BYTES 4
1049
1050 *value = 0;
1051 do
1052 {
1053 int rc = MQTTPACKET_READ_ERROR;
1054
1055 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
1056 {
1057 rc = MQTTPACKET_READ_ERROR; /* bad data */
1058 goto exit;
1059 }
1060 rc = (*getcharfn)(&c, 1);
1061 if (rc != 1)
1062 goto exit;
1063 *value += (c & 127) * multiplier;
1064 multiplier *= 128;
1065 } while ((c & 128) != 0);
1066 exit:
1067 return len;
1068 }
1069
1070
1071 static char* bufptr;
1072
bufchar(char * c,int count)1073 int bufchar(char* c, int count)
1074 {
1075 int i;
1076
1077 for (i = 0; i < count; ++i)
1078 *c = *bufptr++;
1079 return count;
1080 }
1081
1082
MQTTPacket_decodeBuf(char * buf,unsigned int * value)1083 int MQTTPacket_decodeBuf(char* buf, unsigned int* value)
1084 {
1085 bufptr = buf;
1086 return MQTTPacket_VBIdecode(bufchar, value);
1087 }
1088
1089