• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*******************************************************************************
2  * Copyright (c) 2009, 2022 IBM Corp. and Ian Craggs
3  *
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v2.0
6  * and Eclipse Distribution License v1.0 which accompany this distribution.
7  *
8  * The Eclipse Public License is available at
9  *    https://www.eclipse.org/legal/epl-2.0/
10  * and the Eclipse Distribution License is available at
11  *   http://www.eclipse.org/org/documents/edl-v10.php.
12  *
13  * Contributors:
14  *    Ian Craggs - initial API and implementation and/or initial documentation
15  *    Ian Craggs, Allan Stockdill-Mander - SSL updates
16  *    Ian Craggs - MQTT 3.1.1 support
17  *    Ian Craggs - fix for issue 453
18  *    Ian Craggs - MQTT 5.0 support
19  *******************************************************************************/
20 
21 /**
22  * @file
23  * \brief functions to deal with reading and writing of MQTT packets from and to sockets
24  *
25  * Some other related functions are in the MQTTPacketOut module
26  */
27 
28 #include "MQTTPacket.h"
29 #include "Log.h"
30 #if !defined(NO_PERSISTENCE)
31 	#include "MQTTPersistence.h"
32 #endif
33 #include "Messages.h"
34 #include "StackTrace.h"
35 #include "WebSocket.h"
36 #include "MQTTTime.h"
37 
38 #include <stdlib.h>
39 #include <string.h>
40 
41 #include "Heap.h"
42 
43 #if !defined(min)
44 #define min(A,B) ( (A) < (B) ? (A):(B))
45 #endif
46 
47 /**
48  * List of the predefined MQTT v3/v5 packet names.
49  */
50 static const char *packet_names[] =
51 {
52 	"RESERVED", "CONNECT", "CONNACK", "PUBLISH", "PUBACK", "PUBREC", "PUBREL",
53 	"PUBCOMP", "SUBSCRIBE", "SUBACK", "UNSUBSCRIBE", "UNSUBACK",
54 	"PINGREQ", "PINGRESP", "DISCONNECT", "AUTH"
55 };
56 
57 const char** MQTTClient_packet_names = packet_names;
58 
59 
60 /**
61  * Converts an MQTT packet code into its name
62  * @param ptype packet code
63  * @return the corresponding string, or "UNKNOWN"
64  */
MQTTPacket_name(int ptype)65 const char* MQTTPacket_name(int ptype)
66 {
67 	return (ptype >= 0 && ptype <= AUTH) ? packet_names[ptype] : "UNKNOWN";
68 }
69 
70 /**
71  * Array of functions to build packets, indexed according to packet code
72  */
73 pf new_packets[] =
74 {
75 	NULL,	/**< reserved */
76 	NULL,	/**< MQTTPacket_connect*/
77 	MQTTPacket_connack, /**< CONNACK */
78 	MQTTPacket_publish,	/**< PUBLISH */
79 	MQTTPacket_ack, /**< PUBACK */
80 	MQTTPacket_ack, /**< PUBREC */
81 	MQTTPacket_ack, /**< PUBREL */
82 	MQTTPacket_ack, /**< PUBCOMP */
83 	NULL, /**< MQTTPacket_subscribe*/
84 	MQTTPacket_suback, /**< SUBACK */
85 	NULL, /**< MQTTPacket_unsubscribe*/
86 	MQTTPacket_unsuback, /**< UNSUBACK */
87 	MQTTPacket_header_only, /**< PINGREQ */
88 	MQTTPacket_header_only, /**< PINGRESP */
89 	MQTTPacket_ack,  /**< DISCONNECT */
90 	MQTTPacket_ack   /**< AUTH */
91 };
92 
93 
94 static char* readUTFlen(char** pptr, char* enddata, int* len);
95 static int MQTTPacket_send_ack(int MQTTVersion, int type, int msgid, int dup, networkHandles *net);
96 
97 /**
98  * Reads one MQTT packet from a socket.
99  * @param socket a socket from which to read an MQTT packet
100  * @param error pointer to the error code which is completed if no packet is returned
101  * @return the packet structure or NULL if there was an error
102  */
MQTTPacket_Factory(int MQTTVersion,networkHandles * net,int * error)103 void* MQTTPacket_Factory(int MQTTVersion, networkHandles* net, int* error)
104 {
105 	char* data = NULL;
106 	static Header header;
107 	size_t remaining_length;
108 	int ptype;
109 	void* pack = NULL;
110 	size_t actual_len = 0;
111 
112 	FUNC_ENTRY;
113 	*error = SOCKET_ERROR;  /* indicate whether an error occurred, or not */
114 
115 	const size_t headerWsFramePos = WebSocket_framePos();
116 
117 	/* read the packet data from the socket */
118 	*error = WebSocket_getch(net, &header.byte);
119 	if (*error != TCPSOCKET_COMPLETE)   /* first byte is the header byte */
120 		goto exit; /* packet not read, *error indicates whether SOCKET_ERROR occurred */
121 
122 	/* now read the remaining length, so we know how much more to read */
123 	if ((*error = MQTTPacket_decode(net, &remaining_length)) != TCPSOCKET_COMPLETE)
124 		goto exit; /* packet not read, *error indicates whether SOCKET_ERROR occurred */
125 
126 	/* now read the rest, the variable header and payload */
127 	data = WebSocket_getdata(net, remaining_length, &actual_len);
128 	if (remaining_length && data == NULL)
129 	{
130 		*error = SOCKET_ERROR;
131 		goto exit; /* socket error */
132 	}
133 
134 	if (actual_len < remaining_length)
135 	{
136 		*error = TCPSOCKET_INTERRUPTED;
137 		net->lastReceived = MQTTTime_now();
138 	}
139 	else
140 	{
141 		ptype = header.bits.type;
142 		if (ptype < CONNECT || (MQTTVersion < MQTTVERSION_5 && ptype >= DISCONNECT) ||
143 				(MQTTVersion >= MQTTVERSION_5 && ptype > AUTH) ||
144 				new_packets[ptype] == NULL)
145 			Log(TRACE_MIN, 2, NULL, ptype);
146 		else
147 		{
148 			if ((pack = (*new_packets[ptype])(MQTTVersion, header.byte, data, remaining_length)) == NULL)
149 			{
150 				*error = SOCKET_ERROR; // was BAD_MQTT_PACKET;
151 				Log(LOG_ERROR, -1, "Bad MQTT packet, type %d", ptype);
152 			}
153 #if !defined(NO_PERSISTENCE)
154 			else if (header.bits.type == PUBLISH && header.bits.qos == 2)
155 			{
156 				int buf0len;
157 				char *buf = malloc(10);
158 
159 				if (buf == NULL)
160 				{
161 					*error = SOCKET_ERROR;
162 					goto exit;
163 				}
164 				buf[0] = header.byte;
165 				buf0len = 1 + MQTTPacket_encode(&buf[1], remaining_length);
166 				*error = MQTTPersistence_putPacket(net->socket, buf, buf0len, 1,
167 					&data, &remaining_length, header.bits.type, ((Publish *)pack)->msgId, 1, MQTTVersion);
168 				free(buf);
169 			}
170 #endif
171 		}
172 	}
173 	if (pack)
174 		net->lastReceived = MQTTTime_now();
175 exit:
176 	if (*error == TCPSOCKET_INTERRUPTED)
177 		WebSocket_framePosSeekTo(headerWsFramePos);
178 
179 	FUNC_EXIT_RC(*error);
180 	return pack;
181 }
182 
183 
184 /**
185  * Sends an MQTT packet in one system call write
186  * @param socket the socket to which to write the data
187  * @param header the one-byte MQTT header
188  * @param buffer the rest of the buffer to write (not including remaining length)
189  * @param buflen the length of the data in buffer to be written
190  * @param MQTTVersion the version of MQTT being used
191  * @return the completion code (TCPSOCKET_COMPLETE etc)
192  */
MQTTPacket_send(networkHandles * net,Header header,char * buffer,size_t buflen,int freeData,int MQTTVersion)193 int MQTTPacket_send(networkHandles* net, Header header, char* buffer, size_t buflen, int freeData,
194 		int MQTTVersion)
195 {
196 	int rc = SOCKET_ERROR;
197 	size_t buf0len;
198 	char *buf;
199 	PacketBuffers packetbufs;
200 
201 	FUNC_ENTRY;
202 	buf0len = 1 + MQTTPacket_encode(NULL, buflen);
203 	buf = malloc(buf0len);
204 	if (buf == NULL)
205 	{
206 		rc = SOCKET_ERROR;
207 		goto exit;
208 	}
209 	buf[0] = header.byte;
210 	MQTTPacket_encode(&buf[1], buflen);
211 
212 #if !defined(NO_PERSISTENCE)
213 	if (header.bits.type == PUBREL)
214 	{
215 		char* ptraux = buffer;
216 		int msgId = readInt(&ptraux);
217 
218 		rc = MQTTPersistence_putPacket(net->socket, buf, buf0len, 1, &buffer, &buflen,
219 			header.bits.type, msgId, 0, MQTTVersion);
220 	}
221 #endif
222 	packetbufs.count = 1;
223 	packetbufs.buffers = &buffer;
224 	packetbufs.buflens = &buflen;
225 	packetbufs.frees = &freeData;
226 	memset(packetbufs.mask, '\0', sizeof(packetbufs.mask));
227 	rc = WebSocket_putdatas(net, &buf, &buf0len, &packetbufs);
228 
229 	if (rc == TCPSOCKET_COMPLETE)
230 		net->lastSent = MQTTTime_now();
231 
232 	if (rc != TCPSOCKET_INTERRUPTED)
233 	  free(buf);
234 
235 exit:
236 	FUNC_EXIT_RC(rc);
237 	return rc;
238 }
239 
240 
241 
242 
243 /**
244  * Sends an MQTT packet from multiple buffers in one system call write
245  * @param socket the socket to which to write the data
246  * @param header the one-byte MQTT header
247  * @param count the number of buffers
248  * @param buffers the rest of the buffers to write (not including remaining length)
249  * @param buflens the lengths of the data in the array of buffers to be written
250  * @param the MQTT version being used
251  * @return the completion code (TCPSOCKET_COMPLETE etc)
252  */
MQTTPacket_sends(networkHandles * net,Header header,PacketBuffers * bufs,int MQTTVersion)253 int MQTTPacket_sends(networkHandles* net, Header header, PacketBuffers* bufs, int MQTTVersion)
254 {
255 	int i, rc = SOCKET_ERROR;
256 	size_t buf0len, total = 0;
257 	char *buf;
258 
259 	FUNC_ENTRY;
260 	for (i = 0; i < bufs->count; i++)
261 		total += bufs->buflens[i];
262 	buf0len = 1 + MQTTPacket_encode(NULL, total);
263 	buf = malloc(buf0len);
264 	if (buf == NULL)
265 	{
266 		rc = SOCKET_ERROR;
267 		goto exit;
268 	}
269 	buf[0] = header.byte;
270 	MQTTPacket_encode(&buf[1], total);
271 
272 #if !defined(NO_PERSISTENCE)
273 	if (header.bits.type == PUBLISH && header.bits.qos != 0)
274 	{   /* persist PUBLISH QoS1 and Qo2 */
275 		char *ptraux = bufs->buffers[2];
276 		int msgId = readInt(&ptraux);
277 		rc = MQTTPersistence_putPacket(net->socket, buf, buf0len, bufs->count, bufs->buffers, bufs->buflens,
278 			header.bits.type, msgId, 0, MQTTVersion);
279 	}
280 #endif
281 	rc = WebSocket_putdatas(net, &buf, &buf0len, bufs);
282 
283 	if (rc == TCPSOCKET_COMPLETE)
284 		net->lastSent = MQTTTime_now();
285 
286 	if (rc != TCPSOCKET_INTERRUPTED)
287 	  free(buf);
288 exit:
289 	FUNC_EXIT_RC(rc);
290 	return rc;
291 }
292 
293 
294 /**
295  * Encodes the message length according to the MQTT algorithm
296  * @param buf the buffer into which the encoded data is written
297  * @param length the length to be encoded
298  * @return the number of bytes written to buffer
299  */
MQTTPacket_encode(char * buf,size_t length)300 int MQTTPacket_encode(char* buf, size_t length)
301 {
302 	int rc = 0;
303 
304 	FUNC_ENTRY;
305 	do
306 	{
307 		char d = length % 128;
308 		length /= 128;
309 		/* if there are more digits to encode, set the top bit of this digit */
310 		if (length > 0)
311 			d |= 0x80;
312 		if (buf)
313 			buf[rc++] = d;
314 		else
315 			rc++;
316 	} while (length > 0);
317 	FUNC_EXIT_RC(rc);
318 	return rc;
319 }
320 
321 
322 /**
323  * Decodes the message length according to the MQTT algorithm
324  * @param socket the socket from which to read the bytes
325  * @param value the decoded length returned
326  * @return the number of bytes read from the socket
327  */
MQTTPacket_decode(networkHandles * net,size_t * value)328 int MQTTPacket_decode(networkHandles* net, size_t* value)
329 {
330 	int rc = SOCKET_ERROR;
331 	char c;
332 	int multiplier = 1;
333 	int len = 0;
334 #define MAX_NO_OF_REMAINING_LENGTH_BYTES 4
335 
336 	FUNC_ENTRY;
337 	*value = 0;
338 	do
339 	{
340 		if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
341 		{
342 			rc = SOCKET_ERROR;	/* bad data */
343 			goto exit;
344 		}
345 		rc = WebSocket_getch(net, &c);
346 		if (rc != TCPSOCKET_COMPLETE)
347 				goto exit;
348 		*value += (c & 127) * multiplier;
349 		multiplier *= 128;
350 	} while ((c & 128) != 0);
351 exit:
352 	FUNC_EXIT_RC(rc);
353 	return rc;
354 }
355 
356 
357 /**
358  * Calculates an integer from two bytes read from the input buffer
359  * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
360  * @return the integer value calculated
361  */
readInt(char ** pptr)362 int readInt(char** pptr)
363 {
364 	char* ptr = *pptr;
365 	int len = 256*((unsigned char)(*ptr)) + (unsigned char)(*(ptr+1));
366 	*pptr += 2;
367 	return len;
368 }
369 
370 
371 /**
372  * Reads a "UTF" string from the input buffer.  UTF as in the MQTT v3 spec which really means
373  * a length delimited string.  So it reads the two byte length then the data according to
374  * that length.  The end of the buffer is provided too, so we can prevent buffer overruns caused
375  * by an incorrect length.
376  * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
377  * @param enddata pointer to the end of the buffer not to be read beyond
378  * @param len returns the calculcated value of the length bytes read
379  * @return an allocated C string holding the characters read, or NULL if the length read would
380  * have caused an overrun.
381  *
382  */
readUTFlen(char ** pptr,char * enddata,int * len)383 static char* readUTFlen(char** pptr, char* enddata, int* len)
384 {
385 	char* string = NULL;
386 
387 	FUNC_ENTRY;
388 	if (enddata - (*pptr) > 1) /* enough length to read the integer? */
389 	{
390 		*len = readInt(pptr);
391 		if (&(*pptr)[*len] <= enddata)
392 		{
393 			if ((string = malloc(*len+1)) == NULL)
394 				goto exit;
395 			memcpy(string, *pptr, *len);
396 			string[*len] = '\0';
397 			*pptr += *len;
398 		}
399 	}
400 exit:
401 	FUNC_EXIT;
402 	return string;
403 }
404 
405 
406 /**
407  * Reads a "UTF" string from the input buffer.  UTF as in the MQTT v3 spec which really means
408  * a length delimited string.  So it reads the two byte length then the data according to
409  * that length.  The end of the buffer is provided too, so we can prevent buffer overruns caused
410  * by an incorrect length.
411  * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
412  * @param enddata pointer to the end of the buffer not to be read beyond
413  * @return an allocated C string holding the characters read, or NULL if the length read would
414  * have caused an overrun.
415  */
readUTF(char ** pptr,char * enddata)416 char* readUTF(char** pptr, char* enddata)
417 {
418 	int len;
419 	return readUTFlen(pptr, enddata, &len);
420 }
421 
422 
423 /**
424  * Reads one character from the input buffer.
425  * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
426  * @return the character read
427  */
readChar(char ** pptr)428 unsigned char readChar(char** pptr)
429 {
430 	unsigned char c = **pptr;
431 	(*pptr)++;
432 	return c;
433 }
434 
435 
436 /**
437  * Writes one character to an output buffer.
438  * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
439  * @param c the character to write
440  */
writeChar(char ** pptr,char c)441 void writeChar(char** pptr, char c)
442 {
443 	**pptr = c;
444 	(*pptr)++;
445 }
446 
447 
448 /**
449  * Writes an integer as 2 bytes to an output buffer.
450  * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
451  * @param anInt the integer to write
452  */
writeInt(char ** pptr,int anInt)453 void writeInt(char** pptr, int anInt)
454 {
455 	**pptr = (char)(anInt / 256);
456 	(*pptr)++;
457 	**pptr = (char)(anInt % 256);
458 	(*pptr)++;
459 }
460 
461 
462 /**
463  * Writes a "UTF" string to an output buffer.  Converts C string to length-delimited.
464  * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
465  * @param string the C string to write
466  */
writeUTF(char ** pptr,const char * string)467 void writeUTF(char** pptr, const char* string)
468 {
469 	size_t len = strlen(string);
470 	writeInt(pptr, (int)len);
471 	memcpy(*pptr, string, len);
472 	*pptr += len;
473 }
474 
475 
476 /**
477  * Writes length delimited data to an output buffer
478  * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
479  * @param data the data to write
480  * @param datalen the length of the data to write
481  */
writeData(char ** pptr,const void * data,int datalen)482 void writeData(char** pptr, const void* data, int datalen)
483 {
484 	writeInt(pptr, datalen);
485 	memcpy(*pptr, data, datalen);
486 	*pptr += datalen;
487 }
488 
489 
490 /**
491  * Function used in the new packets table to create packets which have only a header.
492  * @param MQTTVersion the version of MQTT
493  * @param aHeader the MQTT header byte
494  * @param data the rest of the packet
495  * @param datalen the length of the rest of the packet
496  * @return pointer to the packet structure
497  */
MQTTPacket_header_only(int MQTTVersion,unsigned char aHeader,char * data,size_t datalen)498 void* MQTTPacket_header_only(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
499 {
500 	static unsigned char header = 0;
501 	header = aHeader;
502 	return &header;
503 }
504 
505 
506 /**
507  * Send an MQTT disconnect packet down a socket.
508  * @param socket the open socket to send the data to
509  * @return the completion code (e.g. TCPSOCKET_COMPLETE)
510  */
MQTTPacket_send_disconnect(Clients * client,enum MQTTReasonCodes reason,MQTTProperties * props)511 int MQTTPacket_send_disconnect(Clients* client, enum MQTTReasonCodes reason, MQTTProperties* props)
512 {
513 	Header header;
514 	int rc = 0;
515 
516 	FUNC_ENTRY;
517 	header.byte = 0;
518 	header.bits.type = DISCONNECT;
519 
520 	if (client->MQTTVersion >= 5 && (props || reason != MQTTREASONCODE_SUCCESS))
521 	{
522 		size_t buflen = 1 + ((props == NULL) ? 0 : MQTTProperties_len(props));
523 		char *buf = NULL;
524 		char *ptr = NULL;
525 
526 		if ((buf = malloc(buflen)) == NULL)
527 		{
528 			rc = SOCKET_ERROR;
529 			goto exit;
530 		}
531 		ptr = buf;
532 		writeChar(&ptr, reason);
533 		if (props)
534 			MQTTProperties_write(&ptr, props);
535 		if ((rc = MQTTPacket_send(&client->net, header, buf, buflen, 1,
536 				                   client->MQTTVersion)) != TCPSOCKET_INTERRUPTED)
537 			free(buf);
538 	}
539 	else
540 		rc = MQTTPacket_send(&client->net, header, NULL, 0, 0, client->MQTTVersion);
541 exit:
542 	Log(LOG_PROTOCOL, 28, NULL, client->net.socket, client->clientID, rc);
543 	FUNC_EXIT_RC(rc);
544 	return rc;
545 }
546 
547 
548 /**
549  * Function used in the new packets table to create publish packets.
550  * @param MQTTVersion
551  * @param aHeader the MQTT header byte
552  * @param data the rest of the packet
553  * @param datalen the length of the rest of the packet
554  * @return pointer to the packet structure
555  */
MQTTPacket_publish(int MQTTVersion,unsigned char aHeader,char * data,size_t datalen)556 void* MQTTPacket_publish(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
557 {
558 	Publish* pack = NULL;
559 	char* curdata = data;
560 	char* enddata = &data[datalen];
561 
562 	FUNC_ENTRY;
563 	if ((pack = malloc(sizeof(Publish))) == NULL)
564 		goto exit;
565 	memset(pack, '\0', sizeof(Publish));
566 	pack->MQTTVersion = MQTTVersion;
567 	pack->header.byte = aHeader;
568 	if ((pack->topic = readUTFlen(&curdata, enddata, &pack->topiclen)) == NULL) /* Topic name on which to publish */
569 	{
570 		free(pack);
571 		pack = NULL;
572 		goto exit;
573 	}
574 	if (pack->header.bits.qos > 0)  /* Msgid only exists for QoS 1 or 2 */
575 	{
576 		if (enddata - curdata < 2)  /* Is there enough data for the msgid? */
577 		{
578 			free(pack);
579 			pack = NULL;
580 			goto exit;
581 		}
582 		pack->msgId = readInt(&curdata);
583 	}
584 	else
585 		pack->msgId = 0;
586 	if (MQTTVersion >= MQTTVERSION_5)
587 	{
588 		MQTTProperties props = MQTTProperties_initializer;
589 		pack->properties = props;
590 		if (MQTTProperties_read(&pack->properties, &curdata, enddata) != 1)
591 		{
592 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
593 			if (pack->topic != NULL)
594 				free(pack->topic);
595 #endif
596 			if (pack->properties.array)
597 				free(pack->properties.array);
598 			if (pack)
599 				free(pack);
600 			pack = NULL; /* signal protocol error */
601 			goto exit;
602 		}
603 	}
604 	pack->payload = curdata;
605 	pack->payloadlen = (int)(datalen-(curdata-data));
606 exit:
607 	FUNC_EXIT;
608 	return pack;
609 }
610 
611 
612 /**
613  * Free allocated storage for a publish packet.
614  * @param pack pointer to the publish packet structure
615  */
MQTTPacket_freePublish(Publish * pack)616 void MQTTPacket_freePublish(Publish* pack)
617 {
618 	FUNC_ENTRY;
619 	if (pack->topic != NULL)
620 		free(pack->topic);
621 	if (pack->MQTTVersion >= MQTTVERSION_5)
622 		MQTTProperties_free(&pack->properties);
623 	free(pack);
624 	FUNC_EXIT;
625 }
626 
627 
628 /**
629  * Free allocated storage for an ack packet.
630  * @param pack pointer to the publish packet structure
631  */
MQTTPacket_freeAck(Ack * pack)632 void MQTTPacket_freeAck(Ack* pack)
633 {
634 	FUNC_ENTRY;
635 	if (pack->MQTTVersion >= MQTTVERSION_5)
636 		MQTTProperties_free(&pack->properties);
637 	free(pack);
638 	FUNC_EXIT;
639 }
640 
641 
642 /**
643  * Send an MQTT acknowledgement packet down a socket.
644  * @param MQTTVersion the version of MQTT being used
645  * @param type the MQTT packet type e.g. SUBACK
646  * @param msgid the MQTT message id to use
647  * @param dup boolean - whether to set the MQTT DUP flag
648  * @param net the network handle to send the data to
649  * @return the completion code (e.g. TCPSOCKET_COMPLETE)
650  */
MQTTPacket_send_ack(int MQTTVersion,int type,int msgid,int dup,networkHandles * net)651 static int MQTTPacket_send_ack(int MQTTVersion, int type, int msgid, int dup, networkHandles *net)
652 {
653 	Header header;
654 	int rc = SOCKET_ERROR;
655 	char *buf = NULL;
656 	char *ptr = NULL;
657 
658 	FUNC_ENTRY;
659 	if ((ptr = buf = malloc(2)) == NULL)
660 		goto exit;
661 	header.byte = 0;
662 	header.bits.type = type;
663 	header.bits.dup = dup;
664 	if (type == PUBREL)
665 	    header.bits.qos = 1;
666 	writeInt(&ptr, msgid);
667 	if ((rc = MQTTPacket_send(net, header, buf, 2, 1, MQTTVersion)) != TCPSOCKET_INTERRUPTED)
668 		free(buf);
669 exit:
670 	FUNC_EXIT_RC(rc);
671 	return rc;
672 }
673 
674 
675 /**
676  * Send an MQTT PUBACK packet down a socket.
677  * @param MQTTVersion the version of MQTT being used
678  * @param msgid the MQTT message id to use
679  * @param socket the open socket to send the data to
680  * @param clientID the string client identifier, only used for tracing
681  * @return the completion code (e.g. TCPSOCKET_COMPLETE)
682  */
MQTTPacket_send_puback(int MQTTVersion,int msgid,networkHandles * net,const char * clientID)683 int MQTTPacket_send_puback(int MQTTVersion, int msgid, networkHandles* net, const char* clientID)
684 {
685 	int rc = 0;
686 
687 	FUNC_ENTRY;
688 	rc =  MQTTPacket_send_ack(MQTTVersion, PUBACK, msgid, 0, net);
689 	Log(LOG_PROTOCOL, 12, NULL, net->socket, clientID, msgid, rc);
690 	FUNC_EXIT_RC(rc);
691 	return rc;
692 }
693 
694 
695 /**
696  * Free allocated storage for a suback packet.
697  * @param pack pointer to the suback packet structure
698  */
MQTTPacket_freeSuback(Suback * pack)699 void MQTTPacket_freeSuback(Suback* pack)
700 {
701 	FUNC_ENTRY;
702 	if (pack->MQTTVersion >= MQTTVERSION_5)
703 		MQTTProperties_free(&pack->properties);
704 	if (pack->qoss != NULL)
705 		ListFree(pack->qoss);
706 	free(pack);
707 	FUNC_EXIT;
708 }
709 
710 
711 /**
712  * Free allocated storage for a suback packet.
713  * @param pack pointer to the suback packet structure
714  */
MQTTPacket_freeUnsuback(Unsuback * pack)715 void MQTTPacket_freeUnsuback(Unsuback* pack)
716 {
717 	FUNC_ENTRY;
718 	if (pack->MQTTVersion >= MQTTVERSION_5)
719 	{
720 		MQTTProperties_free(&pack->properties);
721 		if (pack->reasonCodes != NULL)
722 			ListFree(pack->reasonCodes);
723 	}
724 	free(pack);
725 	FUNC_EXIT;
726 }
727 
728 
729 /**
730  * Send an MQTT PUBREC packet down a socket.
731  * @param MQTTVersion the version of MQTT being used
732  * @param msgid the MQTT message id to use
733  * @param socket the open socket to send the data to
734  * @param clientID the string client identifier, only used for tracing
735  * @return the completion code (e.g. TCPSOCKET_COMPLETE)
736  */
MQTTPacket_send_pubrec(int MQTTVersion,int msgid,networkHandles * net,const char * clientID)737 int MQTTPacket_send_pubrec(int MQTTVersion, int msgid, networkHandles* net, const char* clientID)
738 {
739 	int rc = 0;
740 
741 	FUNC_ENTRY;
742 	rc =  MQTTPacket_send_ack(MQTTVersion, PUBREC, msgid, 0, net);
743 	Log(LOG_PROTOCOL, 13, NULL, net->socket, clientID, msgid, rc);
744 	FUNC_EXIT_RC(rc);
745 	return rc;
746 }
747 
748 
749 /**
750  * Send an MQTT PUBREL packet down a socket.
751  * @param MQTTVersion the version of MQTT being used
752  * @param msgid the MQTT message id to use
753  * @param dup boolean - whether to set the MQTT DUP flag
754  * @param socket the open socket to send the data to
755  * @param clientID the string client identifier, only used for tracing
756  * @return the completion code (e.g. TCPSOCKET_COMPLETE)
757  */
MQTTPacket_send_pubrel(int MQTTVersion,int msgid,int dup,networkHandles * net,const char * clientID)758 int MQTTPacket_send_pubrel(int MQTTVersion, int msgid, int dup, networkHandles* net, const char* clientID)
759 {
760 	int rc = 0;
761 
762 	FUNC_ENTRY;
763 	rc = MQTTPacket_send_ack(MQTTVersion, PUBREL, msgid, dup, net);
764 	Log(LOG_PROTOCOL, 16, NULL, net->socket, clientID, msgid, rc);
765 	FUNC_EXIT_RC(rc);
766 	return rc;
767 }
768 
769 
770 /**
771  * Send an MQTT PUBCOMP packet down a socket.
772  * @param MQTTVersion the version of MQTT being used
773  * @param msgid the MQTT message id to use
774  * @param socket the open socket to send the data to
775  * @param clientID the string client identifier, only used for tracing
776  * @return the completion code (e.g. TCPSOCKET_COMPLETE)
777  */
MQTTPacket_send_pubcomp(int MQTTVersion,int msgid,networkHandles * net,const char * clientID)778 int MQTTPacket_send_pubcomp(int MQTTVersion, int msgid, networkHandles* net, const char* clientID)
779 {
780 	int rc = 0;
781 
782 	FUNC_ENTRY;
783 	rc = MQTTPacket_send_ack(MQTTVersion, PUBCOMP, msgid, 0, net);
784 	Log(LOG_PROTOCOL, 18, NULL, net->socket, clientID, msgid, rc);
785 	FUNC_EXIT_RC(rc);
786 	return rc;
787 }
788 
789 
790 /**
791  * Function used in the new packets table to create acknowledgement packets.
792  * @param MQTTVersion the version of MQTT being used
793  * @param aHeader the MQTT header byte
794  * @param data the rest of the packet
795  * @param datalen the length of the rest of the packet
796  * @return pointer to the packet structure
797  */
MQTTPacket_ack(int MQTTVersion,unsigned char aHeader,char * data,size_t datalen)798 void* MQTTPacket_ack(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
799 {
800 	Ack* pack = NULL;
801 	char* curdata = data;
802 	char* enddata = &data[datalen];
803 
804 	FUNC_ENTRY;
805 	if ((pack = malloc(sizeof(Ack))) == NULL)
806 		goto exit;
807 	pack->MQTTVersion = MQTTVersion;
808 	pack->header.byte = aHeader;
809 	if (pack->header.bits.type != DISCONNECT)
810 	{
811 		if (enddata - curdata < 2)  /* Is there enough data for the msgid? */
812 		{
813 			free(pack);
814 			pack = NULL;
815 			goto exit;
816 		}
817 		pack->msgId = readInt(&curdata);
818 	}
819 	if (MQTTVersion >= MQTTVERSION_5)
820 	{
821 		MQTTProperties props = MQTTProperties_initializer;
822 
823 		pack->rc = MQTTREASONCODE_SUCCESS;
824 		pack->properties = props;
825 
826 		/* disconnect has no msgid */
827 		if (datalen > 2 || (pack->header.bits.type == DISCONNECT && datalen > 0))
828 			pack->rc = readChar(&curdata); /* reason code */
829 
830 		if (datalen > 3 || (pack->header.bits.type == DISCONNECT && datalen > 1))
831 		{
832 			if (MQTTProperties_read(&pack->properties, &curdata, enddata) != 1)
833 			{
834 				if (pack->properties.array)
835 					free(pack->properties.array);
836 				if (pack)
837 					free(pack);
838 				pack = NULL; /* signal protocol error */
839 				goto exit;
840 			}
841 		}
842 	}
843 exit:
844 	FUNC_EXIT;
845 	return pack;
846 }
847 
848 
849 /**
850  * Send an MQTT PUBLISH packet down a socket.
851  * @param pack a structure from which to get some values to use, e.g topic, payload
852  * @param dup boolean - whether to set the MQTT DUP flag
853  * @param qos the value to use for the MQTT QoS setting
854  * @param retained boolean - whether to set the MQTT retained flag
855  * @param socket the open socket to send the data to
856  * @param clientID the string client identifier, only used for tracing
857  * @return the completion code (e.g. TCPSOCKET_COMPLETE)
858  */
MQTTPacket_send_publish(Publish * pack,int dup,int qos,int retained,networkHandles * net,const char * clientID)859 int MQTTPacket_send_publish(Publish* pack, int dup, int qos, int retained, networkHandles* net, const char* clientID)
860 {
861 	Header header;
862 	char *topiclen;
863 	int rc = SOCKET_ERROR;
864 
865 	FUNC_ENTRY;
866 	topiclen = malloc(2);
867 	if (topiclen == NULL)
868 		goto exit;
869 
870 	header.bits.type = PUBLISH;
871 	header.bits.dup = dup;
872 	header.bits.qos = qos;
873 	header.bits.retain = retained;
874 	if (qos > 0 || pack->MQTTVersion >= 5)
875 	{
876 		int buflen = ((qos > 0) ? 2 : 0) + ((pack->MQTTVersion >= 5) ? MQTTProperties_len(&pack->properties) : 0);
877 		char *ptr = NULL;
878 		char* bufs[4] = {topiclen, pack->topic, NULL, pack->payload};
879 		size_t lens[4] = {2, strlen(pack->topic), buflen, pack->payloadlen};
880 		int frees[4] = {1, 0, 1, 0};
881 		PacketBuffers packetbufs = {4, bufs, lens, frees, {pack->mask[0], pack->mask[1], pack->mask[2], pack->mask[3]}};
882 
883 		bufs[2] = ptr = malloc(buflen);
884 		if (ptr == NULL)
885 			goto exit_free;
886 		if (qos > 0)
887 			writeInt(&ptr, pack->msgId);
888 		if (pack->MQTTVersion >= 5)
889 			MQTTProperties_write(&ptr, &pack->properties);
890 
891 		ptr = topiclen;
892 		writeInt(&ptr, (int)lens[1]);
893 		rc = MQTTPacket_sends(net, header, &packetbufs, pack->MQTTVersion);
894 		if (rc != TCPSOCKET_INTERRUPTED)
895 			free(bufs[2]);
896 		memcpy(pack->mask, packetbufs.mask, sizeof(pack->mask));
897 	}
898 	else
899 	{
900 		char* ptr = topiclen;
901 		char* bufs[3] = {topiclen, pack->topic, pack->payload};
902 		size_t lens[3] = {2, strlen(pack->topic), pack->payloadlen};
903 		int frees[3] = {1, 0, 0};
904 		PacketBuffers packetbufs = {3, bufs, lens, frees, {pack->mask[0], pack->mask[1], pack->mask[2], pack->mask[3]}};
905 
906 		writeInt(&ptr, (int)lens[1]);
907 		rc = MQTTPacket_sends(net, header, &packetbufs, pack->MQTTVersion);
908 		memcpy(pack->mask, packetbufs.mask, sizeof(pack->mask));
909 	}
910 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
911 	if (qos == 0)
912 		Log(LOG_PROTOCOL, 27, NULL, net->socket, clientID, rc);
913 	else
914 		Log(LOG_PROTOCOL, 10, NULL, net->socket, clientID, pack->msgId, rc);
915 #else
916 	if (qos == 0)
917 		Log(LOG_PROTOCOL, 27, NULL, net->socket, clientID, retained, rc, pack->payloadlen,
918 				min(20, pack->payloadlen), pack->payload);
919 	else
920 		Log(LOG_PROTOCOL, 10, NULL, net->socket, clientID, pack->msgId, qos, retained, rc, pack->payloadlen,
921 				min(20, pack->payloadlen), pack->payload);
922 #endif
923 exit_free:
924 	if (rc != TCPSOCKET_INTERRUPTED)
925 		free(topiclen);
926 exit:
927 	FUNC_EXIT_RC(rc);
928 	return rc;
929 }
930 
931 
932 /**
933  * Free allocated storage for a various packet tyoes
934  * @param pack pointer to the suback packet structure
935  */
MQTTPacket_free_packet(MQTTPacket * pack)936 void MQTTPacket_free_packet(MQTTPacket* pack)
937 {
938 	FUNC_ENTRY;
939 	if (pack->header.bits.type == PUBLISH)
940 		MQTTPacket_freePublish((Publish*)pack);
941 	/*else if (pack->header.type == SUBSCRIBE)
942 		MQTTPacket_freeSubscribe((Subscribe*)pack, 1);
943 	else if (pack->header.type == UNSUBSCRIBE)
944 		MQTTPacket_freeUnsubscribe((Unsubscribe*)pack);*/
945 	else
946 		free(pack);
947 	FUNC_EXIT;
948 }
949 
950 
951 /**
952  * Writes an integer as 4 bytes to an output buffer.
953  * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
954  * @param anInt the integer to write
955  */
writeInt4(char ** pptr,int anInt)956 void writeInt4(char** pptr, int anInt)
957 {
958   **pptr = (char)(anInt / 16777216);
959   (*pptr)++;
960   anInt %= 16777216;
961   **pptr = (char)(anInt / 65536);
962   (*pptr)++;
963   anInt %= 65536;
964 	**pptr = (char)(anInt / 256);
965 	(*pptr)++;
966 	**pptr = (char)(anInt % 256);
967 	(*pptr)++;
968 }
969 
970 
971 /**
972  * Calculates an integer from two bytes read from the input buffer
973  * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
974  * @return the integer value calculated
975  */
readInt4(char ** pptr)976 int readInt4(char** pptr)
977 {
978 	unsigned char* ptr = (unsigned char*)*pptr;
979 	int value = 16777216*(*ptr) + 65536*(*(ptr+1)) + 256*(*(ptr+2)) + (*(ptr+3));
980 	*pptr += 4;
981 	return value;
982 }
983 
984 
writeMQTTLenString(char ** pptr,MQTTLenString lenstring)985 void writeMQTTLenString(char** pptr, MQTTLenString lenstring)
986 {
987   writeInt(pptr, lenstring.len);
988   memcpy(*pptr, lenstring.data, lenstring.len);
989   *pptr += lenstring.len;
990 }
991 
992 
MQTTLenStringRead(MQTTLenString * lenstring,char ** pptr,char * enddata)993 int MQTTLenStringRead(MQTTLenString* lenstring, char** pptr, char* enddata)
994 {
995 	int len = -1;
996 
997 	/* the first two bytes are the length of the string */
998 	if (enddata - (*pptr) > 1) /* enough length to read the integer? */
999 	{
1000 		lenstring->len = readInt(pptr); /* increments pptr to point past length */
1001 		if (&(*pptr)[lenstring->len] <= enddata)
1002 		{
1003 			lenstring->data = (char*)*pptr;
1004 			*pptr += lenstring->len;
1005 			len = 2 + lenstring->len;
1006 		}
1007 	}
1008 	return len;
1009 }
1010 
1011 /*
1012 if (prop->value.integer4 >= 0 && prop->value.integer4 <= 127)
1013   len = 1;
1014 else if (prop->value.integer4 >= 128 && prop->value.integer4 <= 16383)
1015   len = 2;
1016 else if (prop->value.integer4 >= 16384 && prop->value.integer4 < 2097151)
1017   len = 3;
1018 else if (prop->value.integer4 >= 2097152 && prop->value.integer4 < 268435455)
1019   len = 4;
1020 */
MQTTPacket_VBIlen(int rem_len)1021 int MQTTPacket_VBIlen(int rem_len)
1022 {
1023 	int rc = 0;
1024 
1025 	if (rem_len < 128)
1026 		rc = 1;
1027 	else if (rem_len < 16384)
1028 		rc = 2;
1029 	else if (rem_len < 2097152)
1030 		rc = 3;
1031 	else
1032 		rc = 4;
1033   return rc;
1034 }
1035 
1036 
1037 /**
1038  * Decodes the message length according to the MQTT algorithm
1039  * @param getcharfn pointer to function to read the next character from the data source
1040  * @param value the decoded length returned
1041  * @return the number of bytes read from the socket
1042  */
MQTTPacket_VBIdecode(int (* getcharfn)(char *,int),unsigned int * value)1043 int MQTTPacket_VBIdecode(int (*getcharfn)(char*, int), unsigned int* value)
1044 {
1045 	char c;
1046 	int multiplier = 1;
1047 	int len = 0;
1048 #define MAX_NO_OF_REMAINING_LENGTH_BYTES 4
1049 
1050 	*value = 0;
1051 	do
1052 	{
1053 		int rc = MQTTPACKET_READ_ERROR;
1054 
1055 		if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
1056 		{
1057 			rc = MQTTPACKET_READ_ERROR;	/* bad data */
1058 			goto exit;
1059 		}
1060 		rc = (*getcharfn)(&c, 1);
1061 		if (rc != 1)
1062 			goto exit;
1063 		*value += (c & 127) * multiplier;
1064 		multiplier *= 128;
1065 	} while ((c & 128) != 0);
1066 exit:
1067 	return len;
1068 }
1069 
1070 
1071 static char* bufptr;
1072 
bufchar(char * c,int count)1073 int bufchar(char* c, int count)
1074 {
1075 	int i;
1076 
1077 	for (i = 0; i < count; ++i)
1078 		*c = *bufptr++;
1079 	return count;
1080 }
1081 
1082 
MQTTPacket_decodeBuf(char * buf,unsigned int * value)1083 int MQTTPacket_decodeBuf(char* buf, unsigned int* value)
1084 {
1085 	bufptr = buf;
1086 	return MQTTPacket_VBIdecode(bufchar, value);
1087 }
1088 
1089