• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*******************************************************************************
2  * Copyright (c) 2009, 2021 IBM Corp. and Ian Craggs
3  *
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v2.0
6  * and Eclipse Distribution License v1.0 which accompany this distribution.
7  *
8  * The Eclipse Public License is available at
9  *    https://www.eclipse.org/legal/epl-2.0/
10  * and the Eclipse Distribution License is available at
11  *   http://www.eclipse.org/org/documents/edl-v10.php.
12  *
13  * Contributors:
14  *    Ian Craggs - initial API and implementation and/or initial documentation
15  *    Ian Craggs, Allan Stockdill-Mander - SSL updates
16  *    Ian Craggs - MQTT 3.1.1 support
17  *    Rong Xiang, Ian Craggs - C++ compatibility
18  *    Ian Craggs - binary password and will payload
19  *    Ian Craggs - MQTT 5.0 support
20  *******************************************************************************/
21 
22 /**
23  * @file
24  * \brief functions to deal with reading and writing of MQTT packets from and to sockets
25  *
26  * Some other related functions are in the MQTTPacket module
27  */
28 
29 
30 #include "MQTTPacketOut.h"
31 #include "Log.h"
32 #include "StackTrace.h"
33 
34 #include <string.h>
35 #include <stdlib.h>
36 
37 #include "Heap.h"
38 
39 
40 /**
41  * Send an MQTT CONNECT packet down a socket for V5 or later
42  * @param client a structure from which to get all the required values
43  * @param MQTTVersion the MQTT version to connect with
44  * @param connectProperties MQTT V5 properties for the connect packet
45  * @param willProperties MQTT V5 properties for the will message, if any
46  * @return the completion code (e.g. TCPSOCKET_COMPLETE)
47  */
MQTTPacket_send_connect(Clients * client,int MQTTVersion,MQTTProperties * connectProperties,MQTTProperties * willProperties)48 int MQTTPacket_send_connect(Clients* client, int MQTTVersion,
49 	               MQTTProperties* connectProperties, MQTTProperties* willProperties)
50 {
51 	char *buf, *ptr;
52 	Connect packet;
53 	int rc = SOCKET_ERROR, len;
54 
55 	FUNC_ENTRY;
56 	packet.header.byte = 0;
57 	packet.header.bits.type = CONNECT;
58 
59 	len = ((MQTTVersion == MQTTVERSION_3_1) ? 12 : 10) + (int)strlen(client->clientID)+2;
60 	if (client->will)
61 		len += (int)strlen(client->will->topic)+2 + client->will->payloadlen+2;
62 	if (client->username)
63 		len += (int)strlen(client->username)+2;
64 	if (client->password)
65 		len += client->passwordlen+2;
66 	if (MQTTVersion >= MQTTVERSION_5)
67 	{
68 		len += MQTTProperties_len(connectProperties);
69 		if (client->will)
70 			len += MQTTProperties_len(willProperties);
71 	}
72 
73 	ptr = buf = malloc(len);
74 	if (ptr == NULL)
75 		goto exit_nofree;
76 	if (MQTTVersion == MQTTVERSION_3_1)
77 	{
78 		writeUTF(&ptr, "MQIsdp");
79 		writeChar(&ptr, (char)MQTTVERSION_3_1);
80 	}
81 	else if (MQTTVersion == MQTTVERSION_3_1_1 || MQTTVersion == MQTTVERSION_5)
82 	{
83 		writeUTF(&ptr, "MQTT");
84 		writeChar(&ptr, (char)MQTTVersion);
85 	}
86 	else
87 		goto exit;
88 
89 	packet.flags.all = 0;
90 	if (MQTTVersion >= MQTTVERSION_5)
91 		packet.flags.bits.cleanstart = client->cleanstart;
92 	else
93 		packet.flags.bits.cleanstart = client->cleansession;
94 	packet.flags.bits.will = (client->will) ? 1 : 0;
95 	if (packet.flags.bits.will)
96 	{
97 		packet.flags.bits.willQoS = client->will->qos;
98 		packet.flags.bits.willRetain = client->will->retained;
99 	}
100 	if (client->username)
101 		packet.flags.bits.username = 1;
102 	if (client->password)
103 		packet.flags.bits.password = 1;
104 
105 	writeChar(&ptr, packet.flags.all);
106 	writeInt(&ptr, client->keepAliveInterval);
107 	if (MQTTVersion >= MQTTVERSION_5)
108 		MQTTProperties_write(&ptr, connectProperties);
109 	writeUTF(&ptr, client->clientID);
110 	if (client->will)
111 	{
112 		if (MQTTVersion >= MQTTVERSION_5)
113 			MQTTProperties_write(&ptr, willProperties);
114 		writeUTF(&ptr, client->will->topic);
115 		writeData(&ptr, client->will->payload, client->will->payloadlen);
116 	}
117 	if (client->username)
118 		writeUTF(&ptr, client->username);
119 	if (client->password)
120 		writeData(&ptr, client->password, client->passwordlen);
121 
122 	rc = MQTTPacket_send(&client->net, packet.header, buf, len, 1, MQTTVersion);
123 	Log(LOG_PROTOCOL, 0, NULL, client->net.socket, client->clientID,
124 			MQTTVersion, client->cleansession, rc);
125 exit:
126 	if (rc != TCPSOCKET_INTERRUPTED)
127 		free(buf);
128 exit_nofree:
129 	FUNC_EXIT_RC(rc);
130 	return rc;
131 }
132 
133 
134 /**
135  * Function used in the new packets table to create connack packets.
136  * @param MQTTVersion MQTT 5 or less?
137  * @param aHeader the MQTT header byte
138  * @param data the rest of the packet
139  * @param datalen the length of the rest of the packet
140  * @return pointer to the packet structure
141  */
MQTTPacket_connack(int MQTTVersion,unsigned char aHeader,char * data,size_t datalen)142 void* MQTTPacket_connack(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
143 {
144 	Connack* pack = NULL;
145 	char* curdata = data;
146 	char* enddata = &data[datalen];
147 
148 	FUNC_ENTRY;
149 	if ((pack = malloc(sizeof(Connack))) == NULL)
150 		goto exit;
151 	pack->MQTTVersion = MQTTVersion;
152 	pack->header.byte = aHeader;
153 	if (datalen < 2) /* enough data for connect flags and reason code? */
154 	{
155 		free(pack);
156 		pack = NULL;
157 		goto exit;
158 	}
159 	pack->flags.all = readChar(&curdata); /* connect flags */
160 	pack->rc = readChar(&curdata); /* reason code */
161 	if (MQTTVersion >= MQTTVERSION_5 && datalen > 2)
162 	{
163 		MQTTProperties props = MQTTProperties_initializer;
164 		pack->properties = props;
165 		if (MQTTProperties_read(&pack->properties, &curdata, enddata) != 1)
166 		{
167 			if (pack->properties.array)
168 				free(pack->properties.array);
169 			if (pack)
170 				free(pack);
171 			pack = NULL; /* signal protocol error */
172 			goto exit;
173 		}
174 	}
175 exit:
176 	FUNC_EXIT;
177 	return pack;
178 }
179 
180 
181 /**
182  * Free allocated storage for a connack packet.
183  * @param pack pointer to the connack packet structure
184  */
MQTTPacket_freeConnack(Connack * pack)185 void MQTTPacket_freeConnack(Connack* pack)
186 {
187 	FUNC_ENTRY;
188 	if (pack->MQTTVersion >= MQTTVERSION_5)
189 		MQTTProperties_free(&pack->properties);
190 	free(pack);
191 	FUNC_EXIT;
192 }
193 
194 
195 /**
196  * Send an MQTT PINGREQ packet down a socket.
197  * @param socket the open socket to send the data to
198  * @param clientID the string client identifier, only used for tracing
199  * @return the completion code (e.g. TCPSOCKET_COMPLETE)
200  */
MQTTPacket_send_pingreq(networkHandles * net,const char * clientID)201 int MQTTPacket_send_pingreq(networkHandles* net, const char* clientID)
202 {
203 	Header header;
204 	int rc = 0;
205 
206 	FUNC_ENTRY;
207 	header.byte = 0;
208 	header.bits.type = PINGREQ;
209 	rc = MQTTPacket_send(net, header, NULL, 0, 0, MQTTVERSION_3_1_1);
210 	Log(LOG_PROTOCOL, 20, NULL, net->socket, clientID, rc);
211 	FUNC_EXIT_RC(rc);
212 	return rc;
213 }
214 
215 
216 /**
217  * Send an MQTT subscribe packet down a socket.
218  * @param topics list of topics
219  * @param qoss list of corresponding QoSs
220  * @param msgid the MQTT message id to use
221  * @param dup boolean - whether to set the MQTT DUP flag
222  * @param socket the open socket to send the data to
223  * @param clientID the string client identifier, only used for tracing
224  * @return the completion code (e.g. TCPSOCKET_COMPLETE)
225  */
MQTTPacket_send_subscribe(List * topics,List * qoss,MQTTSubscribe_options * opts,MQTTProperties * props,int msgid,int dup,Clients * client)226 int MQTTPacket_send_subscribe(List* topics, List* qoss, MQTTSubscribe_options* opts, MQTTProperties* props,
227 		int msgid, int dup, Clients* client)
228 {
229 	Header header;
230 	char *data, *ptr;
231 	int rc = -1;
232 	ListElement *elem = NULL, *qosElem = NULL;
233 	int datalen, i = 0;
234 
235 	FUNC_ENTRY;
236 	header.bits.type = SUBSCRIBE;
237 	header.bits.dup = dup;
238 	header.bits.qos = 1;
239 	header.bits.retain = 0;
240 
241 	datalen = 2 + topics->count * 3; /* utf length + char qos == 3 */
242 	while (ListNextElement(topics, &elem))
243 		datalen += (int)strlen((char*)(elem->content));
244 	if (client->MQTTVersion >= MQTTVERSION_5)
245 		datalen += MQTTProperties_len(props);
246 
247 	ptr = data = malloc(datalen);
248 	if (ptr == NULL)
249 		goto exit;
250 	writeInt(&ptr, msgid);
251 
252 	if (client->MQTTVersion >= MQTTVERSION_5)
253 		MQTTProperties_write(&ptr, props);
254 
255 	elem = NULL;
256 	while (ListNextElement(topics, &elem))
257 	{
258 		char subopts = 0;
259 
260 		ListNextElement(qoss, &qosElem);
261 		writeUTF(&ptr, (char*)(elem->content));
262 		subopts = *(int*)(qosElem->content);
263 		if (client->MQTTVersion >= MQTTVERSION_5 && opts != NULL)
264 		{
265 			subopts |= (opts[i].noLocal << 2); /* 1 bit */
266 			subopts |= (opts[i].retainAsPublished << 3); /* 1 bit */
267 			subopts |= (opts[i].retainHandling << 4); /* 2 bits */
268 		}
269 		writeChar(&ptr, subopts);
270 		++i;
271 	}
272 	rc = MQTTPacket_send(&client->net, header, data, datalen, 1, client->MQTTVersion);
273 	Log(LOG_PROTOCOL, 22, NULL, client->net.socket, client->clientID, msgid, rc);
274 	if (rc != TCPSOCKET_INTERRUPTED)
275 		free(data);
276 exit:
277 	FUNC_EXIT_RC(rc);
278 	return rc;
279 }
280 
281 
282 /**
283  * Function used in the new packets table to create suback packets.
284  * @param MQTTVersion the version of MQTT
285  * @param aHeader the MQTT header byte
286  * @param data the rest of the packet
287  * @param datalen the length of the rest of the packet
288  * @return pointer to the packet structure
289  */
MQTTPacket_suback(int MQTTVersion,unsigned char aHeader,char * data,size_t datalen)290 void* MQTTPacket_suback(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
291 {
292 	Suback* pack = NULL;
293 	char* curdata = data;
294 	char* enddata = &data[datalen];
295 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
296 	int ret = -1;
297 #endif
298 
299 	FUNC_ENTRY;
300 	if ((pack = malloc(sizeof(Suback))) == NULL)
301 		goto exit;
302 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
303 	ret = memset_s(pack, sizeof(Suback), 0, sizeof(Suback));
304 	if (ret != 0)
305 	{
306 		if (pack != NULL)
307 			free(pack);
308 		pack = NULL;
309 		goto exit;
310 	}
311 #endif
312 	pack->MQTTVersion = MQTTVersion;
313 	pack->header.byte = aHeader;
314 	if (enddata - curdata < 2)  /* Is there enough data to read the msgid? */
315 	{
316 		free(pack);
317 		pack = NULL;
318 		goto exit;
319 	}
320 
321 	pack->msgId = readInt(&curdata);
322 	if (MQTTVersion >= MQTTVERSION_5)
323 	{
324 		MQTTProperties props = MQTTProperties_initializer;
325 		pack->properties = props;
326 		if (MQTTProperties_read(&pack->properties, &curdata, enddata) != 1)
327 		{
328 			if (pack->properties.array)
329 				free(pack->properties.array);
330 			if (pack)
331 				free(pack);
332 			pack = NULL; /* signal protocol error */
333 			goto exit;
334 		}
335 	}
336 	pack->qoss = ListInitialize();
337 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
338 	if (pack->qoss == NULL)
339 	{
340 		if (pack->properties.array != NULL)
341 			free(pack->properties.array);
342 		if (pack != NULL)
343 			free(pack);
344 		pack = NULL; /* signal protocol error */
345 		goto exit;
346 
347 	}
348 #endif
349 	while ((size_t)(curdata - data) < datalen)
350 	{
351 		unsigned int* newint;
352 		newint = malloc(sizeof(unsigned int));
353 		if (newint == NULL)
354 		{
355 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
356 			if (pack->qoss != NULL)
357 				ListFree(pack->qoss);
358 #endif
359 			if (pack->properties.array)
360 				free(pack->properties.array);
361 			if (pack)
362 				free(pack);
363 			pack = NULL; /* signal protocol error */
364 			goto exit;
365 		}
366 		*newint = (unsigned int)readChar(&curdata);
367 		ListAppend(pack->qoss, newint, sizeof(unsigned int));
368 	}
369 	if (pack->qoss->count == 0)
370 	{
371 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
372 		ListFree(pack->qoss);
373 		if (pack->properties.array)
374 			free(pack->properties.array);
375 		if (pack)
376 			free(pack);
377 #else
378 		if (pack->properties.array)
379 			free(pack->properties.array);
380 		ListFree(pack->qoss);
381 		free(pack);
382 #endif
383 		pack = NULL;
384 	}
385 exit:
386 	FUNC_EXIT;
387 	return pack;
388 }
389 
390 
391 /**
392  * Send an MQTT unsubscribe packet down a socket.
393  * @param topics list of topics
394  * @param msgid the MQTT message id to use
395  * @param dup boolean - whether to set the MQTT DUP flag
396  * @param socket the open socket to send the data to
397  * @param clientID the string client identifier, only used for tracing
398  * @return the completion code (e.g. TCPSOCKET_COMPLETE)
399  */
MQTTPacket_send_unsubscribe(List * topics,MQTTProperties * props,int msgid,int dup,Clients * client)400 int MQTTPacket_send_unsubscribe(List* topics, MQTTProperties* props, int msgid, int dup, Clients* client)
401 {
402 	Header header;
403 	char *data, *ptr;
404 	int rc = SOCKET_ERROR;
405 	ListElement *elem = NULL;
406 	int datalen;
407 
408 	FUNC_ENTRY;
409 	header.bits.type = UNSUBSCRIBE;
410 	header.bits.dup = dup;
411 	header.bits.qos = 1;
412 	header.bits.retain = 0;
413 
414 	datalen = 2 + topics->count * 2; /* utf length == 2 */
415 	while (ListNextElement(topics, &elem))
416 		datalen += (int)strlen((char*)(elem->content));
417 	if (client->MQTTVersion >= MQTTVERSION_5)
418 		datalen += MQTTProperties_len(props);
419 	ptr = data = malloc(datalen);
420 	if (ptr == NULL)
421 		goto exit;
422 
423 	writeInt(&ptr, msgid);
424 
425 	if (client->MQTTVersion >= MQTTVERSION_5)
426 		MQTTProperties_write(&ptr, props);
427 
428 	elem = NULL;
429 	while (ListNextElement(topics, &elem))
430 		writeUTF(&ptr, (char*)(elem->content));
431 	rc = MQTTPacket_send(&client->net, header, data, datalen, 1, client->MQTTVersion);
432 	Log(LOG_PROTOCOL, 25, NULL, client->net.socket, client->clientID, msgid, rc);
433 	if (rc != TCPSOCKET_INTERRUPTED)
434 		free(data);
435 exit:
436 	FUNC_EXIT_RC(rc);
437 	return rc;
438 }
439 
440 
441 /**
442  * Function used in the new packets table to create unsuback packets.
443  * @param MQTTVersion the version of MQTT
444  * @param aHeader the MQTT header byte
445  * @param data the rest of the packet
446  * @param datalen the length of the rest of the packet
447  * @return pointer to the packet structure
448  */
MQTTPacket_unsuback(int MQTTVersion,unsigned char aHeader,char * data,size_t datalen)449 void* MQTTPacket_unsuback(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
450 {
451 	Unsuback* pack = NULL;
452 	char* curdata = data;
453 	char* enddata = &data[datalen];
454 
455 	FUNC_ENTRY;
456 	if ((pack = malloc(sizeof(Unsuback))) == NULL)
457 		goto exit;
458 	pack->MQTTVersion = MQTTVersion;
459 	pack->header.byte = aHeader;
460 	if (enddata - curdata < 2)  /* Is there enough data? */
461 	{
462 		free(pack);
463 		pack = NULL;
464 		goto exit;
465 	}
466 	pack->msgId = readInt(&curdata);
467 	pack->reasonCodes = NULL;
468 	if (MQTTVersion >= MQTTVERSION_5)
469 	{
470 		MQTTProperties props = MQTTProperties_initializer;
471 		pack->properties = props;
472 		if (MQTTProperties_read(&pack->properties, &curdata, enddata) != 1)
473 		{
474 			if (pack->properties.array)
475 				free(pack->properties.array);
476 			if (pack)
477 				free(pack);
478 			pack = NULL; /* signal protocol error */
479 			goto exit;
480 		}
481 		pack->reasonCodes = ListInitialize();
482 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
483 		if (pack->reasonCodes == NULL)
484 		{
485 			if (pack->properties.array != NULL)
486 				free(pack->properties.array);
487 			if (pack != NULL)
488 				free(pack);
489 			pack = NULL; /* signal protocol error */
490 			goto exit;
491 		}
492 #endif
493 		while ((size_t)(curdata - data) < datalen)
494 		{
495 			enum MQTTReasonCodes* newrc;
496 			newrc = malloc(sizeof(enum MQTTReasonCodes));
497 			if (newrc == NULL)
498 			{
499 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
500 				if (pack->reasonCodes != NULL)
501 					ListFree(pack->reasonCodes);
502 #endif
503 				if (pack->properties.array)
504 					free(pack->properties.array);
505 				if (pack)
506 					free(pack);
507 				pack = NULL; /* signal protocol error */
508 				goto exit;
509 			}
510 			*newrc = (enum MQTTReasonCodes)readChar(&curdata);
511 			ListAppend(pack->reasonCodes, newrc, sizeof(enum MQTTReasonCodes));
512 		}
513 		if (pack->reasonCodes->count == 0)
514 		{
515 			ListFree(pack->reasonCodes);
516 			if (pack->properties.array)
517 				free(pack->properties.array);
518 			if (pack)
519 				free(pack);
520 			pack = NULL;
521 		}
522 	}
523 exit:
524 	FUNC_EXIT;
525 	return pack;
526 }
527