• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*******************************************************************************
2  * Copyright (c) 2009, 2022 IBM Corp. and Ian Craggs
3  *
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v2.0
6  * and Eclipse Distribution License v1.0 which accompany this distribution.
7  *
8  * The Eclipse Public License is available at
9  *    https://www.eclipse.org/legal/epl-2.0/
10  * and the Eclipse Distribution License is available at
11  *   http://www.eclipse.org/org/documents/edl-v10.php.
12  *
13  * Contributors:
14  *    Ian Craggs - initial API and implementation and/or initial documentation
15  *    Ian Craggs - fix for bug 413429 - connectionLost not called
16  *    Ian Craggs - fix for bug 421103 - trying to write to same socket, in retry
17  *    Rong Xiang, Ian Craggs - C++ compatibility
18  *    Ian Craggs - turn off DUP flag for PUBREL - MQTT 3.1.1
19  *    Ian Craggs - ensure that acks are not sent if write is outstanding on socket
20  *    Ian Craggs - MQTT 5.0 support
21  *******************************************************************************/
22 
23 /**
24  * @file
25  * \brief Functions dealing with the MQTT protocol exchanges
26  *
27  * Some other related functions are in the MQTTProtocolOut module
28  * */
29 
30 
31 #include <stdlib.h>
32 #include <string.h>
33 #include <stdint.h>
34 
35 #include "MQTTProtocolClient.h"
36 #if !defined(NO_PERSISTENCE)
37 #include "MQTTPersistence.h"
38 #endif
39 #include "Socket.h"
40 #include "SocketBuffer.h"
41 #include "StackTrace.h"
42 #include "Heap.h"
43 
44 #if !defined(min)
45 #define min(A,B) ( (A) < (B) ? (A):(B))
46 #endif
47 
48 extern MQTTProtocol state;
49 extern ClientStates* bstate;
50 
51 static void MQTTProtocol_storeQoS0(Clients* pubclient, Publish* publish);
52 static int MQTTProtocol_startPublishCommon(
53 		Clients* pubclient,
54 		Publish* publish,
55 		int qos,
56 		int retained);
57 static void MQTTProtocol_retries(START_TIME_TYPE now, Clients* client, int regardless);
58 
59 static int MQTTProtocol_queueAck(Clients* client, int ackType, int msgId);
60 
61 typedef struct {
62 	int messageId;
63 	int ackType;
64 } AckRequest;
65 
66 
67 /**
68  * List callback function for comparing Message structures by message id
69  * @param a first integer value
70  * @param b second integer value
71  * @return boolean indicating whether a and b are equal
72  */
messageIDCompare(void * a,void * b)73 int messageIDCompare(void* a, void* b)
74 {
75 	Messages* msg = (Messages*)a;
76 	return msg->msgid == *(int*)b;
77 }
78 
79 
80 /**
81  * Assign a new message id for a client.  Make sure it isn't already being used and does
82  * not exceed the maximum.
83  * @param client a client structure
84  * @return the next message id to use, or 0 if none available
85  */
MQTTProtocol_assignMsgId(Clients * client)86 int MQTTProtocol_assignMsgId(Clients* client)
87 {
88 	int start_msgid = client->msgID;
89 	int msgid = start_msgid;
90 
91 	FUNC_ENTRY;
92 	msgid = (msgid == MAX_MSG_ID) ? 1 : msgid + 1;
93 	while (ListFindItem(client->outboundMsgs, &msgid, messageIDCompare) != NULL)
94 	{
95 		msgid = (msgid == MAX_MSG_ID) ? 1 : msgid + 1;
96 		if (msgid == start_msgid)
97 		{ /* we've tried them all - none free */
98 			msgid = 0;
99 			break;
100 		}
101 	}
102 	if (msgid != 0)
103 		client->msgID = msgid;
104 	FUNC_EXIT_RC(msgid);
105 	return msgid;
106 }
107 
108 
MQTTProtocol_storeQoS0(Clients * pubclient,Publish * publish)109 static void MQTTProtocol_storeQoS0(Clients* pubclient, Publish* publish)
110 {
111 	int len;
112 	pending_write* pw = NULL;
113 
114 	FUNC_ENTRY;
115 	/* store the publication until the write is finished */
116 	if ((pw = malloc(sizeof(pending_write))) == NULL)
117 		goto exit;
118 	Log(TRACE_MIN, 12, NULL);
119 	if ((pw->p = MQTTProtocol_storePublication(publish, &len)) == NULL)
120 	{
121 		free(pw);
122 		goto exit;
123 	}
124 	pw->socket = pubclient->net.socket;
125 	if (!ListAppend(&(state.pending_writes), pw, sizeof(pending_write)+len))
126 	{
127 		free(pw->p);
128 		free(pw);
129 		goto exit;
130 	}
131 	/* we don't copy QoS 0 messages unless we have to, so now we have to tell the socket buffer where
132 	the saved copy is */
133 	if (SocketBuffer_updateWrite(pw->socket, pw->p->topic, pw->p->payload) == NULL)
134 		Log(LOG_SEVERE, 0, "Error updating write");
135 	publish->payload = publish->topic = NULL;
136 exit:
137 	FUNC_EXIT;
138 }
139 
140 
141 /**
142  * Utility function to start a new publish exchange.
143  * @param pubclient the client to send the publication to
144  * @param publish the publication data
145  * @param qos the MQTT QoS to use
146  * @param retained boolean - whether to set the MQTT retained flag
147  * @return the completion code
148  */
MQTTProtocol_startPublishCommon(Clients * pubclient,Publish * publish,int qos,int retained)149 static int MQTTProtocol_startPublishCommon(Clients* pubclient, Publish* publish, int qos, int retained)
150 {
151 	int rc = TCPSOCKET_COMPLETE;
152 
153 	FUNC_ENTRY;
154 	rc = MQTTPacket_send_publish(publish, 0, qos, retained, &pubclient->net, pubclient->clientID);
155 	if (qos == 0 && rc == TCPSOCKET_INTERRUPTED)
156 		MQTTProtocol_storeQoS0(pubclient, publish);
157 	FUNC_EXIT_RC(rc);
158 	return rc;
159 }
160 
161 
162 /**
163  * Start a new publish exchange.  Store any state necessary and try to send the packet
164  * @param pubclient the client to send the publication to
165  * @param publish the publication data
166  * @param qos the MQTT QoS to use
167  * @param retained boolean - whether to set the MQTT retained flag
168  * @param mm - pointer to the message to send
169  * @return the completion code
170  */
MQTTProtocol_startPublish(Clients * pubclient,Publish * publish,int qos,int retained,Messages ** mm)171 int MQTTProtocol_startPublish(Clients* pubclient, Publish* publish, int qos, int retained, Messages** mm)
172 {
173 	Publish qos12pub = *publish;
174 	int rc = 0;
175 
176 	FUNC_ENTRY;
177 	if (qos > 0)
178 	{
179 		*mm = MQTTProtocol_createMessage(publish, mm, qos, retained, 0);
180 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
181 		if (*mm != NULL)
182 		{
183 #endif
184 			ListAppend(pubclient->outboundMsgs, *mm, (*mm)->len);
185 			/* we change these pointers to the saved message location just in case the packet could not be written
186 			entirely; the socket buffer will use these locations to finish writing the packet */
187 			qos12pub.payload = (*mm)->publish->payload;
188 			qos12pub.topic = (*mm)->publish->topic;
189 			qos12pub.properties = (*mm)->properties;
190 			qos12pub.MQTTVersion = (*mm)->MQTTVersion;
191 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
192 		}
193 		else
194 		{
195 			rc = PAHO_MEMORY_ERROR;
196 			goto exit;
197 		}
198 #endif
199 		publish = &qos12pub;
200 	}
201 	rc = MQTTProtocol_startPublishCommon(pubclient, publish, qos, retained);
202 	if (qos > 0)
203 		memcpy((*mm)->publish->mask, publish->mask, sizeof((*mm)->publish->mask));
204 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
205 exit:
206 #endif
207 	FUNC_EXIT_RC(rc);
208 	return rc;
209 }
210 
211 
212 /**
213  * Copy and store message data for retries
214  * @param publish the publication data
215  * @param mm - pointer to the message data to store
216  * @param qos the MQTT QoS to use
217  * @param retained boolean - whether to set the MQTT retained flag
218  * @param allocatePayload boolean - whether or not to malloc payload
219  * @return pointer to the message data stored
220  */
MQTTProtocol_createMessage(Publish * publish,Messages ** mm,int qos,int retained,int allocatePayload)221 Messages* MQTTProtocol_createMessage(Publish* publish, Messages **mm, int qos, int retained, int allocatePayload)
222 {
223 	Messages* m = malloc(sizeof(Messages));
224 
225 	FUNC_ENTRY;
226 	if (!m)
227 		goto exit;
228 	m->len = sizeof(Messages);
229 	if (*mm == NULL || (*mm)->publish == NULL)
230 	{
231 		int len1;
232 		*mm = m;
233 		if ((m->publish = MQTTProtocol_storePublication(publish, &len1)) == NULL)
234 		{
235 			free(m);
236 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
237 			m = NULL;
238 #endif
239 			goto exit;
240 		}
241 		m->len += len1;
242 		if (allocatePayload)
243 		{
244 			char *temp = m->publish->payload;
245 
246 			if ((m->publish->payload = malloc(m->publish->payloadlen)) == NULL)
247 			{
248 				free(m);
249 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
250 				m = NULL;
251 #endif
252 				goto exit;
253 			}
254 			memcpy(m->publish->payload, temp, m->publish->payloadlen);
255 		}
256 	}
257 	else /* this is now never used, I think */
258 	{
259 		++(((*mm)->publish)->refcount);
260 		m->publish = (*mm)->publish;
261 	}
262 	m->msgid = publish->msgId;
263 	m->qos = qos;
264 	m->retain = retained;
265 	m->MQTTVersion = publish->MQTTVersion;
266 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
267 	m->retryTime = 0;
268 #endif
269 	if (m->MQTTVersion >= 5)
270 		m->properties = MQTTProperties_copy(&publish->properties);
271 	m->lastTouch = MQTTTime_now();
272 	if (qos == 2)
273 		m->nextMessageType = PUBREC;
274 exit:
275 	FUNC_EXIT;
276 	return m;
277 }
278 
279 
280 /**
281  * Store message data for possible retry
282  * @param publish the publication data
283  * @param len returned length of the data stored
284  * @return the publication stored
285  */
MQTTProtocol_storePublication(Publish * publish,int * len)286 Publications* MQTTProtocol_storePublication(Publish* publish, int* len)
287 {
288 	Publications* p = malloc(sizeof(Publications));
289 
290 	FUNC_ENTRY;
291 	if (!p)
292 		goto exit;
293 	p->refcount = 1;
294 	*len = (int)strlen(publish->topic)+1;
295 	p->topic = publish->topic;
296 	publish->topic = NULL;
297 	*len += sizeof(Publications);
298 	p->topiclen = publish->topiclen;
299 	p->payloadlen = publish->payloadlen;
300 	p->payload = publish->payload;
301 	publish->payload = NULL;
302 	*len += publish->payloadlen;
303 	memcpy(p->mask, publish->mask, sizeof(p->mask));
304 
305 	if ((ListAppend(&(state.publications), p, *len)) == NULL)
306 	{
307 		free(p);
308 		p = NULL;
309 	}
310 exit:
311 	FUNC_EXIT;
312 	return p;
313 }
314 
315 /**
316  * Remove stored message data.  Opposite of storePublication
317  * @param p stored publication to remove
318  */
MQTTProtocol_removePublication(Publications * p)319 void MQTTProtocol_removePublication(Publications* p)
320 {
321 	FUNC_ENTRY;
322 	if (p && --(p->refcount) == 0)
323 	{
324 		if (p->payload)
325 		{
326 			free(p->payload);
327 			p->payload = NULL;
328 		}
329 		if (p->topic)
330 		{
331 			free(p->topic);
332 			p->topic = NULL;
333 		}
334 		ListRemove(&(state.publications), p);
335 	}
336 	FUNC_EXIT;
337 }
338 
339 /**
340  * Process an incoming publish packet for a socket
341  * The payload field of the packet has not been transferred to another buffer at this point.
342  * If it's needed beyond the scope of this function, it has to be copied.
343  * @param pack pointer to the publish packet
344  * @param sock the socket on which the packet was received
345  * @return completion code
346  */
MQTTProtocol_handlePublishes(void * pack,SOCKET sock)347 int MQTTProtocol_handlePublishes(void* pack, SOCKET sock)
348 {
349 	Publish* publish = (Publish*)pack;
350 	Clients* client = NULL;
351 	char* clientid = NULL;
352 	int rc = TCPSOCKET_COMPLETE;
353 	int socketHasPendingWrites = 0;
354 
355 	FUNC_ENTRY;
356 	client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
357 	clientid = client->clientID;
358 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
359 	Log(LOG_PROTOCOL, 11, NULL, sock, clientid, publish->msgId);
360 #else
361 	Log(LOG_PROTOCOL, 11, NULL, sock, clientid, publish->msgId, publish->header.bits.qos,
362 					publish->header.bits.retain, publish->payloadlen, min(20, publish->payloadlen), publish->payload);
363 #endif
364 
365 	if (publish->header.bits.qos == 0)
366 	{
367 		Protocol_processPublication(publish, client, 1);
368 		goto exit;
369 	}
370 
371 	socketHasPendingWrites = !Socket_noPendingWrites(sock);
372 
373 	if (publish->header.bits.qos == 1)
374 	{
375 		Protocol_processPublication(publish, client, 1);
376 
377 		if (socketHasPendingWrites)
378 			rc = MQTTProtocol_queueAck(client, PUBACK, publish->msgId);
379 		else
380 			rc = MQTTPacket_send_puback(publish->MQTTVersion, publish->msgId, &client->net, client->clientID);
381 	}
382 	else if (publish->header.bits.qos == 2)
383 	{
384 		/* store publication in inbound list */
385 		int len;
386 		int already_received = 0;
387 		ListElement* listElem = NULL;
388 		Messages* m = malloc(sizeof(Messages));
389 		Publications* p = NULL;
390 		if (!m)
391 		{
392 			rc = PAHO_MEMORY_ERROR;
393 			goto exit;
394 		}
395 		p = MQTTProtocol_storePublication(publish, &len);
396 
397 		m->publish = p;
398 		m->msgid = publish->msgId;
399 		m->qos = publish->header.bits.qos;
400 		m->retain = publish->header.bits.retain;
401 		m->MQTTVersion = publish->MQTTVersion;
402 		if (m->MQTTVersion >= MQTTVERSION_5)
403 			m->properties = MQTTProperties_copy(&publish->properties);
404 		m->nextMessageType = PUBREL;
405 		if ((listElem = ListFindItem(client->inboundMsgs, &(m->msgid), messageIDCompare)) != NULL)
406 		{   /* discard queued publication with same msgID that the current incoming message */
407 			Messages* msg = (Messages*)(listElem->content);
408 			MQTTProtocol_removePublication(msg->publish);
409 			if (msg->MQTTVersion >= MQTTVERSION_5)
410 				MQTTProperties_free(&msg->properties);
411 			ListInsert(client->inboundMsgs, m, sizeof(Messages) + len, listElem);
412 			ListRemove(client->inboundMsgs, msg);
413 			already_received = 1;
414 		} else
415 			ListAppend(client->inboundMsgs, m, sizeof(Messages) + len);
416 
417 		if (m->MQTTVersion >= MQTTVERSION_5 && already_received == 0)
418 		{
419 			Publish publish1;
420 
421 			publish1.header.bits.qos = m->qos;
422 			publish1.header.bits.retain = m->retain;
423 			publish1.msgId = m->msgid;
424 			publish1.topic = m->publish->topic;
425 			publish1.topiclen = m->publish->topiclen;
426 			publish1.payload = m->publish->payload;
427 			publish1.payloadlen = m->publish->payloadlen;
428 			publish1.MQTTVersion = m->MQTTVersion;
429 			publish1.properties = m->properties;
430 
431 			Protocol_processPublication(&publish1, client, 1);
432 			ListRemove(&(state.publications), m->publish);
433 			m->publish = NULL;
434 		} else
435 		{	/* allocate and copy payload data as it's needed for pubrel.
436 		       For other cases, it's done in Protocol_processPublication */
437 			char *temp = m->publish->payload;
438 
439 			if ((m->publish->payload = malloc(m->publish->payloadlen)) == NULL)
440 			{
441 				rc = PAHO_MEMORY_ERROR;
442 				goto exit;
443 			}
444 			memcpy(m->publish->payload, temp, m->publish->payloadlen);
445 		}
446 		if (socketHasPendingWrites)
447 			rc = MQTTProtocol_queueAck(client, PUBREC, publish->msgId);
448 		else
449 			rc = MQTTPacket_send_pubrec(publish->MQTTVersion, publish->msgId, &client->net, client->clientID);
450 		publish->topic = NULL;
451 	}
452 exit:
453 	MQTTPacket_freePublish(publish);
454 	FUNC_EXIT_RC(rc);
455 	return rc;
456 }
457 
458 /**
459  * Process an incoming puback packet for a socket
460  * @param pack pointer to the publish packet
461  * @param sock the socket on which the packet was received
462  * @return completion code
463  */
MQTTProtocol_handlePubacks(void * pack,SOCKET sock,Publications ** pubToRemove)464 int MQTTProtocol_handlePubacks(void* pack, SOCKET sock, Publications** pubToRemove)
465 {
466 	Puback* puback = (Puback*)pack;
467 	Clients* client = NULL;
468 	int rc = TCPSOCKET_COMPLETE;
469 
470 	FUNC_ENTRY;
471 	client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
472 	Log(LOG_PROTOCOL, 14, NULL, sock, client->clientID, puback->msgId);
473 
474 	/* look for the message by message id in the records of outbound messages for this client */
475 	if (ListFindItem(client->outboundMsgs, &(puback->msgId), messageIDCompare) == NULL)
476 		Log(TRACE_MIN, 3, NULL, "PUBACK", client->clientID, puback->msgId);
477 	else
478 	{
479 		Messages* m = (Messages*)(client->outboundMsgs->current->content);
480 		if (m->qos != 1)
481 			Log(TRACE_MIN, 4, NULL, "PUBACK", client->clientID, puback->msgId, m->qos);
482 		else
483 		{
484 			Log(TRACE_MIN, 6, NULL, "PUBACK", client->clientID, puback->msgId);
485 			#if !defined(NO_PERSISTENCE)
486 				rc = MQTTPersistence_remove(client,
487 						(m->MQTTVersion >= MQTTVERSION_5) ? PERSISTENCE_V5_PUBLISH_SENT : PERSISTENCE_PUBLISH_SENT,
488 								m->qos, puback->msgId);
489 			#endif
490 			if (pubToRemove != NULL)
491 				*pubToRemove = m->publish;
492 			else
493 				MQTTProtocol_removePublication(m->publish);
494 			if (m->MQTTVersion >= MQTTVERSION_5)
495 				MQTTProperties_free(&m->properties);
496 			ListRemove(client->outboundMsgs, m);
497 		}
498 	}
499 	if (puback->MQTTVersion >= MQTTVERSION_5)
500 		MQTTProperties_free(&puback->properties);
501 	free(pack);
502 	FUNC_EXIT_RC(rc);
503 	return rc;
504 }
505 
506 
507 /**
508  * Process an incoming pubrec packet for a socket
509  * @param pack pointer to the publish packet
510  * @param sock the socket on which the packet was received
511  * @return completion code
512  */
MQTTProtocol_handlePubrecs(void * pack,SOCKET sock,Publications ** pubToRemove)513 int MQTTProtocol_handlePubrecs(void* pack, SOCKET sock, Publications** pubToRemove)
514 {
515 	Pubrec* pubrec = (Pubrec*)pack;
516 	Clients* client = NULL;
517 	int rc = TCPSOCKET_COMPLETE;
518 	int send_pubrel = 1; /* boolean to send PUBREL or not */
519 
520 	FUNC_ENTRY;
521 	client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
522 	Log(LOG_PROTOCOL, 15, NULL, sock, client->clientID, pubrec->msgId);
523 
524 	/* look for the message by message id in the records of outbound messages for this client */
525 	client->outboundMsgs->current = NULL;
526 	if (ListFindItem(client->outboundMsgs, &(pubrec->msgId), messageIDCompare) == NULL)
527 	{
528 		if (pubrec->header.bits.dup == 0)
529 			Log(TRACE_MIN, 3, NULL, "PUBREC", client->clientID, pubrec->msgId);
530 	}
531 	else
532 	{
533 		Messages* m = (Messages*)(client->outboundMsgs->current->content);
534 		if (m->qos != 2)
535 		{
536 			if (pubrec->header.bits.dup == 0)
537 				Log(TRACE_MIN, 4, NULL, "PUBREC", client->clientID, pubrec->msgId, m->qos);
538 		}
539 		else if (m->nextMessageType != PUBREC)
540 		{
541 			if (pubrec->header.bits.dup == 0)
542 				Log(TRACE_MIN, 5, NULL, "PUBREC", client->clientID, pubrec->msgId);
543 		}
544 		else
545 		{
546 			if (pubrec->MQTTVersion >= MQTTVERSION_5 && pubrec->rc >= MQTTREASONCODE_UNSPECIFIED_ERROR)
547 			{
548 				Log(TRACE_MIN, -1, "Pubrec error %d received for client %s msgid %d, not sending PUBREL",
549 						pubrec->rc, client->clientID, pubrec->msgId);
550 				#if !defined(NO_PERSISTENCE)
551 					rc = MQTTPersistence_remove(client,
552 							(pubrec->MQTTVersion >= MQTTVERSION_5) ? PERSISTENCE_V5_PUBLISH_SENT : PERSISTENCE_PUBLISH_SENT,
553 							m->qos, pubrec->msgId);
554 				#endif
555 				if (pubToRemove != NULL)
556 					*pubToRemove = m->publish;
557 				else
558 					MQTTProtocol_removePublication(m->publish);
559 				if (m->MQTTVersion >= MQTTVERSION_5)
560 					MQTTProperties_free(&m->properties);
561 				ListRemove(client->outboundMsgs, m);
562 				(++state.msgs_sent);
563 				send_pubrel = 0; /* in MQTT v5, stop the exchange if there is an error reported */
564 			}
565 			else
566 			{
567 				m->nextMessageType = PUBCOMP;
568 				m->lastTouch = MQTTTime_now();
569 			}
570 		}
571 	}
572 	if (!send_pubrel)
573 		; /* only don't send ack on MQTT v5 PUBREC error, otherwise send ack under all circumstances because MQTT state can get out of step */
574 	else if (!Socket_noPendingWrites(sock))
575 		rc = MQTTProtocol_queueAck(client, PUBREL, pubrec->msgId);
576 	else
577 		rc = MQTTPacket_send_pubrel(pubrec->MQTTVersion, pubrec->msgId, 0, &client->net, client->clientID);
578 
579 	if (pubrec->MQTTVersion >= MQTTVERSION_5)
580 		MQTTProperties_free(&pubrec->properties);
581 	free(pack);
582 	FUNC_EXIT_RC(rc);
583 	return rc;
584 }
585 
586 
587 /**
588  * Process an incoming pubrel packet for a socket
589  * @param pack pointer to the publish packet
590  * @param sock the socket on which the packet was received
591  * @return completion code
592  */
MQTTProtocol_handlePubrels(void * pack,SOCKET sock)593 int MQTTProtocol_handlePubrels(void* pack, SOCKET sock)
594 {
595 	Pubrel* pubrel = (Pubrel*)pack;
596 	Clients* client = NULL;
597 	int rc = TCPSOCKET_COMPLETE;
598 
599 	FUNC_ENTRY;
600 	client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
601 	Log(LOG_PROTOCOL, 17, NULL, sock, client->clientID, pubrel->msgId);
602 
603 	/* look for the message by message id in the records of inbound messages for this client */
604 	if (ListFindItem(client->inboundMsgs, &(pubrel->msgId), messageIDCompare) == NULL)
605 	{
606 		if (pubrel->header.bits.dup == 0)
607 			Log(TRACE_MIN, 3, NULL, "PUBREL", client->clientID, pubrel->msgId);
608 	}
609 	else
610 	{
611 		Messages* m = (Messages*)(client->inboundMsgs->current->content);
612 		if (m->qos != 2)
613 			Log(TRACE_MIN, 4, NULL, "PUBREL", client->clientID, pubrel->msgId, m->qos);
614 		else if (m->nextMessageType != PUBREL)
615 			Log(TRACE_MIN, 5, NULL, "PUBREL", client->clientID, pubrel->msgId);
616 		else
617 		{
618 			Publish publish;
619 
620 			memset(&publish, '\0', sizeof(publish));
621 
622 			publish.header.bits.qos = m->qos;
623 			publish.header.bits.retain = m->retain;
624 			publish.msgId = m->msgid;
625 			if (m->publish)
626 			{
627 				publish.topic = m->publish->topic;
628 				publish.topiclen = m->publish->topiclen;
629 				publish.payload = m->publish->payload;
630 				publish.payloadlen = m->publish->payloadlen;
631 			}
632 			publish.MQTTVersion = m->MQTTVersion;
633 			if (publish.MQTTVersion >= MQTTVERSION_5)
634 				publish.properties = m->properties;
635 			else
636 				Protocol_processPublication(&publish, client, 0); /* only for 3.1.1 and lower */
637 			#if !defined(NO_PERSISTENCE)
638 			rc += MQTTPersistence_remove(client,
639 					(m->MQTTVersion >= MQTTVERSION_5) ? PERSISTENCE_V5_PUBLISH_RECEIVED : PERSISTENCE_PUBLISH_RECEIVED,
640 					m->qos, pubrel->msgId);
641 			#endif
642 			if (m->MQTTVersion >= MQTTVERSION_5)
643 				MQTTProperties_free(&m->properties);
644 			if (m->publish)
645 				ListRemove(&(state.publications), m->publish);
646 			ListRemove(client->inboundMsgs, m);
647 			++(state.msgs_received);
648 		}
649 	}
650 	/* Send ack under all circumstances because MQTT state can get out of step - this standard also says to do this */
651 	if (!Socket_noPendingWrites(sock))
652 		rc = MQTTProtocol_queueAck(client, PUBCOMP, pubrel->msgId);
653 	else
654 		rc = MQTTPacket_send_pubcomp(pubrel->MQTTVersion, pubrel->msgId, &client->net, client->clientID);
655 
656 	if (pubrel->MQTTVersion >= MQTTVERSION_5)
657 		MQTTProperties_free(&pubrel->properties);
658 	free(pack);
659 	FUNC_EXIT_RC(rc);
660 	return rc;
661 }
662 
663 
664 /**
665  * Process an incoming pubcomp packet for a socket
666  * @param pack pointer to the publish packet
667  * @param sock the socket on which the packet was received
668  * @return completion code
669  */
MQTTProtocol_handlePubcomps(void * pack,SOCKET sock,Publications ** pubToRemove)670 int MQTTProtocol_handlePubcomps(void* pack, SOCKET sock, Publications** pubToRemove)
671 {
672 	Pubcomp* pubcomp = (Pubcomp*)pack;
673 	Clients* client = NULL;
674 	int rc = TCPSOCKET_COMPLETE;
675 
676 	FUNC_ENTRY;
677 	client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
678 	Log(LOG_PROTOCOL, 19, NULL, sock, client->clientID, pubcomp->msgId);
679 
680 	/* look for the message by message id in the records of outbound messages for this client */
681 	if (ListFindItem(client->outboundMsgs, &(pubcomp->msgId), messageIDCompare) == NULL)
682 	{
683 		if (pubcomp->header.bits.dup == 0)
684 			Log(TRACE_MIN, 3, NULL, "PUBCOMP", client->clientID, pubcomp->msgId);
685 	}
686 	else
687 	{
688 		Messages* m = (Messages*)(client->outboundMsgs->current->content);
689 		if (m->qos != 2)
690 			Log(TRACE_MIN, 4, NULL, "PUBCOMP", client->clientID, pubcomp->msgId, m->qos);
691 		else
692 		{
693 			if (m->nextMessageType != PUBCOMP)
694 				Log(TRACE_MIN, 5, NULL, "PUBCOMP", client->clientID, pubcomp->msgId);
695 			else
696 			{
697 				Log(TRACE_MIN, 6, NULL, "PUBCOMP", client->clientID, pubcomp->msgId);
698 				#if !defined(NO_PERSISTENCE)
699 					rc = MQTTPersistence_remove(client,
700 							(m->MQTTVersion >= MQTTVERSION_5) ? PERSISTENCE_V5_PUBLISH_SENT : PERSISTENCE_PUBLISH_SENT,
701 							m->qos, pubcomp->msgId);
702 					if (rc != 0)
703 						Log(LOG_ERROR, -1, "Error removing PUBCOMP for client id %s msgid %d from persistence", client->clientID, pubcomp->msgId);
704 				#endif
705 				if (pubToRemove != NULL)
706 					*pubToRemove = m->publish;
707 				else
708 					MQTTProtocol_removePublication(m->publish);
709 				if (m->MQTTVersion >= MQTTVERSION_5)
710 					MQTTProperties_free(&m->properties);
711 				ListRemove(client->outboundMsgs, m);
712 				(++state.msgs_sent);
713 			}
714 		}
715 	}
716 	if (pubcomp->MQTTVersion >= MQTTVERSION_5)
717 		MQTTProperties_free(&pubcomp->properties);
718 	free(pack);
719 	FUNC_EXIT_RC(rc);
720 	return rc;
721 }
722 
723 
724 /**
725  * MQTT protocol keepAlive processing.  Sends PINGREQ packets as required.
726  * @param now current time
727  */
MQTTProtocol_keepalive(START_TIME_TYPE now)728 void MQTTProtocol_keepalive(START_TIME_TYPE now)
729 {
730 	ListElement* current = NULL;
731 
732 	FUNC_ENTRY;
733 	ListNextElement(bstate->clients, &current);
734 	while (current)
735 	{
736 		Clients* client =	(Clients*)(current->content);
737 		ListNextElement(bstate->clients, &current);
738 
739 		if (client->connected == 0 || client->keepAliveInterval == 0)
740 			continue;
741 
742 		if (client->ping_outstanding == 1)
743 		{
744 			if (MQTTTime_difftime(now, client->net.lastPing) >= (DIFF_TIME_TYPE)(client->keepAliveInterval * 1500) &&
745 				/* if last received is more recent, we could be receiving a large packet */
746 				MQTTTime_difftime(now, client->net.lastReceived) >= (DIFF_TIME_TYPE)(client->keepAliveInterval * 1500))
747 			{
748 				Log(TRACE_PROTOCOL, -1, "PINGRESP not received in keepalive interval for client %s on socket %d, disconnecting", client->clientID, client->net.socket);
749 				MQTTProtocol_closeSession(client, 1);
750 			}
751 		}
752 		else if (client->ping_due == 1 &&
753 			(MQTTTime_difftime(now, client->ping_due_time) >= (DIFF_TIME_TYPE)(client->keepAliveInterval * 1500)))
754 		{
755 			/* if the last received time is more recent than the ping due time, we could be receiving a large packet,
756 			 * preventing the PINGRESP being received */
757 			if (MQTTTime_difftime(now, client->ping_due_time) <= MQTTTime_difftime(now, client->net.lastReceived))
758 			{
759 				/* ping still outstanding after keep alive interval, so close session */
760 				Log(TRACE_PROTOCOL, -1, "PINGREQ still outstanding for client %s on socket %d, disconnecting", client->clientID, client->net.socket);
761 				MQTTProtocol_closeSession(client, 1);
762 			}
763 		}
764 		else if (MQTTTime_difftime(now, client->net.lastSent) >= (DIFF_TIME_TYPE)(client->keepAliveInterval * 1000))
765 		/* the time since we last sent a packet, or part of a packet has exceeded the keep alive, so we need to send a ping */
766 		{
767 			if (Socket_noPendingWrites(client->net.socket))
768 			{
769 				if (MQTTPacket_send_pingreq(&client->net, client->clientID) != TCPSOCKET_COMPLETE)
770 				{
771 					Log(TRACE_PROTOCOL, -1, "Error sending PINGREQ for client %s on socket %d, disconnecting", client->clientID, client->net.socket);
772 					MQTTProtocol_closeSession(client, 1);
773 				}
774 				else
775 				{
776 					client->ping_due = 0;
777 					client->net.lastPing = now;
778 					client->ping_outstanding = 1;
779 				}
780 			}
781 			else if (client->ping_due == 0)
782 			{
783 				Log(TRACE_PROTOCOL, -1, "Couldn't send PINGREQ for client %s on socket %d, noting",
784 						client->clientID, client->net.socket);
785 				client->ping_due = 1;
786 				client->ping_due_time = now;
787 			}
788 		}
789 		else if (MQTTTime_difftime(now, client->net.lastReceived) >= (DIFF_TIME_TYPE)(client->keepAliveInterval * 1000))
790 		/* the time since we last received any data has exceeded the keep alive, so we can send a ping to see if the server is alive */
791 		{
792 			/* Check that no writes are pending for the socket. If there are, forget about it, as this PING use is optional */
793 			if (Socket_noPendingWrites(client->net.socket))
794 			{
795 				if (MQTTPacket_send_pingreq(&client->net, client->clientID) != TCPSOCKET_COMPLETE)
796 				{
797 					Log(TRACE_PROTOCOL, -1, "Error sending PINGREQ for client %s on socket %d, disconnecting", client->clientID, client->net.socket);
798 					MQTTProtocol_closeSession(client, 1);
799 				}
800 				else
801 				{
802 					client->ping_due = 0;
803 					client->net.lastPing = now;
804 					client->ping_outstanding = 1;
805 				}
806 			}
807 		}
808 	}
809 	FUNC_EXIT;
810 }
811 
812 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
813 #define RETRYTIME 5
MQTTProtocol_release(Clients * client,Messages * m)814 static int MQTTProtocol_release(Clients* client, Messages* m)
815 {
816 	int ret = -1;
817 	if (m->retryTime >= RETRYTIME)
818 	{
819 		if (ListFindItem(client->outboundMsgs, &(m->msgid), messageIDCompare) != NULL)
820 		{
821 			#if !defined(NO_PERSISTENCE)
822 				rc = MQTTPersistence_remove(client,
823 					(m->MQTTVersion >= MQTTVERSION_5) ? PERSISTENCE_V5_PUBLISH_SENT : PERSISTENCE_PUBLISH_SENT,
824 					m->qos, m->msgid);
825 			#endif
826 			MQTTProtocol_removePublication(m->publish);
827 			if (m->MQTTVersion >= MQTTVERSION_5)
828 				MQTTProperties_free(&m->properties);
829 			ListRemove(client->outboundMsgs, m);
830 		}
831 		client->retryMsgs = m->msgid;
832 		ret = 0;
833 	}
834 	else
835 	{
836 		m->retryTime++;
837 	}
838 exit:
839 	return ret;
840 }
841 #endif
842 
843 /**
844  * MQTT retry processing per client
845  * @param now current time
846  * @param client - the client to which to apply the retry processing
847  * @param regardless boolean - retry packets regardless of retry interval (used on reconnect)
848  */
MQTTProtocol_retries(START_TIME_TYPE now,Clients * client,int regardless)849 static void MQTTProtocol_retries(START_TIME_TYPE now, Clients* client, int regardless)
850 {
851 	ListElement* outcurrent = NULL;
852 
853 	FUNC_ENTRY;
854 #if !defined(IOT_CONNECT) && !defined(IOT_LITEOS_ADAPT)
855 	if (!regardless && client->retryInterval <= 0 && /* 0 or -ive retryInterval turns off retry except on reconnect */
856 			client->connect_sent == client->connect_count)
857 		goto exit;
858 #endif
859 	if (regardless)
860 		client->connect_count = client->outboundMsgs->count; /* remember the number of messages to retry on connect */
861 	else if (client->connect_sent < client->connect_count) /* continue a connect retry which didn't complete first time around */
862 		regardless = 1;
863 
864 	while (client && ListNextElement(client->outboundMsgs, &outcurrent) &&
865 		   client->connected && client->good &&        /* client is connected and has no errors */
866 		   Socket_noPendingWrites(client->net.socket)) /* there aren't any previous packets still stacked up on the socket */
867 	{
868 		Messages* m = (Messages*)(outcurrent->content);
869 		if (regardless || MQTTTime_difftime(now, m->lastTouch) > (DIFF_TIME_TYPE)(max(client->retryInterval, 10) * 1000))
870 		{
871 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
872 			if (MQTTProtocol_release(client, m) == 0)
873 				goto exit;
874 			Log(TRACE_MIN, -1, "%s, %d, msgid[%d]: retryTime = %d", __func__, __LINE__, m->msgid, m->retryTime);
875 #endif
876 			if (regardless)
877 				++client->connect_sent;
878 			if (m->qos == 1 || (m->qos == 2 && m->nextMessageType == PUBREC))
879 			{
880 				Publish publish;
881 				int rc;
882 
883 				Log(TRACE_MIN, 7, NULL, "PUBLISH", client->clientID, client->net.socket, m->msgid);
884 				publish.msgId = m->msgid;
885 				publish.topic = m->publish->topic;
886 				publish.payload = m->publish->payload;
887 				publish.payloadlen = m->publish->payloadlen;
888 				publish.properties = m->properties;
889 				publish.MQTTVersion = m->MQTTVersion;
890 				memcpy(publish.mask, m->publish->mask, sizeof(publish.mask));
891 				rc = MQTTPacket_send_publish(&publish, 1, m->qos, m->retain, &client->net, client->clientID);
892 				memcpy(m->publish->mask, publish.mask, sizeof(m->publish->mask)); /* store websocket mask used in send */
893 				if (rc == SOCKET_ERROR)
894 				{
895 					client->good = 0;
896 					Log(TRACE_PROTOCOL, 29, NULL, client->clientID, client->net.socket,
897 												Socket_getpeer(client->net.socket));
898 					MQTTProtocol_closeSession(client, 1);
899 					client = NULL;
900 				}
901 				else
902 				{
903 					if (m->qos == 0 && rc == TCPSOCKET_INTERRUPTED)
904 						MQTTProtocol_storeQoS0(client, &publish);
905 					m->lastTouch = MQTTTime_now();
906 				}
907 			}
908 			else if (m->qos && m->nextMessageType == PUBCOMP)
909 			{
910 				Log(TRACE_MIN, 7, NULL, "PUBREL", client->clientID, client->net.socket, m->msgid);
911 				if (MQTTPacket_send_pubrel(m->MQTTVersion, m->msgid, 0, &client->net, client->clientID) != TCPSOCKET_COMPLETE)
912 				{
913 					client->good = 0;
914 					Log(TRACE_PROTOCOL, 29, NULL, client->clientID, client->net.socket,
915 							Socket_getpeer(client->net.socket));
916 					MQTTProtocol_closeSession(client, 1);
917 					client = NULL;
918 				}
919 				else
920 					m->lastTouch = MQTTTime_now();
921 			}
922 		}
923 	}
924 exit:
925 	FUNC_EXIT;
926 }
927 
928 
929 /**
930  * Queue an ack message. This is used when the socket is full (e.g. SSL_ERROR_WANT_WRITE).
931  * To be completed/cleared when the socket is no longer full
932  * @param client the client that received the published message
933  * @param ackType the type of ack to send
934  * @param msgId the msg id of the message we are acknowledging
935  * @return the completion code
936  */
MQTTProtocol_queueAck(Clients * client,int ackType,int msgId)937 int MQTTProtocol_queueAck(Clients* client, int ackType, int msgId)
938 {
939 	int rc = 0;
940 	AckRequest* ackReq = NULL;
941 
942 	FUNC_ENTRY;
943 	ackReq = malloc(sizeof(AckRequest));
944 	if (!ackReq)
945 		rc = PAHO_MEMORY_ERROR;
946 	else
947 	{
948 		ackReq->messageId = msgId;
949 		ackReq->ackType = ackType;
950 		ListAppend(client->outboundQueue, ackReq, sizeof(AckRequest));
951 	}
952 
953 	FUNC_EXIT_RC(rc);
954 	return rc;
955 }
956 
957 
958 /**
959  * MQTT retry protocol and socket pending writes processing.
960  * @param now current time
961  * @param doRetry boolean - retries as well as pending writes?
962  * @param regardless boolean - retry packets regardless of retry interval (used on reconnect)
963  */
MQTTProtocol_retry(START_TIME_TYPE now,int doRetry,int regardless)964 void MQTTProtocol_retry(START_TIME_TYPE now, int doRetry, int regardless)
965 {
966 	ListElement* current = NULL;
967 
968 	FUNC_ENTRY;
969 	ListNextElement(bstate->clients, &current);
970 	/* look through the outbound message list of each client, checking to see if a retry is necessary */
971 	while (current)
972 	{
973 		Clients* client = (Clients*)(current->content);
974 		ListNextElement(bstate->clients, &current);
975 		if (client->connected == 0)
976 			continue;
977 		if (client->good == 0)
978 		{
979 			MQTTProtocol_closeSession(client, 1);
980 			continue;
981 		}
982 		if (Socket_noPendingWrites(client->net.socket) == 0)
983 			continue;
984 		if (doRetry)
985 			MQTTProtocol_retries(now, client, regardless);
986 	}
987 	FUNC_EXIT;
988 }
989 
990 
991 /**
992  * Free a client structure
993  * @param client the client data to free
994  */
MQTTProtocol_freeClient(Clients * client)995 void MQTTProtocol_freeClient(Clients* client)
996 {
997 	FUNC_ENTRY;
998 	/* free up pending message lists here, and any other allocated data */
999 	MQTTProtocol_freeMessageList(client->outboundMsgs);
1000 	MQTTProtocol_freeMessageList(client->inboundMsgs);
1001 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
1002 	if (client->messageQueue != NULL)
1003 		ListFree(client->messageQueue);
1004 	if (client->outboundQueue != NULL)
1005 		ListFree(client->outboundQueue);
1006 #else
1007 	ListFree(client->messageQueue);
1008 	ListFree(client->outboundQueue);
1009 #endif
1010 
1011 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
1012 	if (client->clientID != NULL)
1013 	{
1014 #endif
1015 		free(client->clientID);
1016 		client->clientID = NULL;
1017 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
1018 	}
1019 #endif
1020 	if (client->will)
1021 	{
1022 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
1023 		if (client->will->payload != NULL)
1024 			free(client->will->payload);
1025 		if (client->will->topic != NULL)
1026 			free(client->will->topic);
1027 #else
1028 		free(client->will->payload);
1029 		free(client->will->topic);
1030 #endif
1031 		free(client->will);
1032     	client->will = NULL;
1033 	}
1034 	if (client->username)
1035 		free((void*)client->username);
1036 	if (client->password)
1037 		free((void*)client->password);
1038 	if (client->httpProxy)
1039 		free(client->httpProxy);
1040 	if (client->httpsProxy)
1041 		free(client->httpsProxy);
1042 	if (client->net.http_proxy_auth)
1043 		free(client->net.http_proxy_auth);
1044 #if defined(OPENSSL) || defined(MBEDTLS)
1045 	if (client->net.https_proxy_auth)
1046 		free(client->net.https_proxy_auth);
1047 	if (client->sslopts)
1048 	{
1049 		if (client->sslopts->trustStore)
1050 			free((void*)client->sslopts->trustStore);
1051 		if (client->sslopts->keyStore)
1052 			free((void*)client->sslopts->keyStore);
1053 		if (client->sslopts->privateKey)
1054 			free((void*)client->sslopts->privateKey);
1055 		if (client->sslopts->privateKeyPassword)
1056 			free((void*)client->sslopts->privateKeyPassword);
1057 		if (client->sslopts->enabledCipherSuites)
1058 			free((void*)client->sslopts->enabledCipherSuites);
1059 		if (client->sslopts->struct_version >= 2)
1060 		{
1061 			if (client->sslopts->CApath)
1062 				free((void*)client->sslopts->CApath);
1063 		}
1064 		if (client->sslopts->struct_version >= 5)
1065 		{
1066 			if (client->sslopts->protos)
1067 				free((void*)client->sslopts->protos);
1068 		}
1069 		free(client->sslopts);
1070 		client->sslopts = NULL;
1071 	}
1072 #endif
1073 	/* don't free the client structure itself... this is done elsewhere */
1074 	FUNC_EXIT;
1075 }
1076 
1077 
1078 /**
1079  * Empty a message list, leaving it able to accept new messages
1080  * @param msgList the message list to empty
1081  */
MQTTProtocol_emptyMessageList(List * msgList)1082 void MQTTProtocol_emptyMessageList(List* msgList)
1083 {
1084 	ListElement* current = NULL;
1085 
1086 	FUNC_ENTRY;
1087 	while (ListNextElement(msgList, &current))
1088 	{
1089 		Messages* m = (Messages*)(current->content);
1090 		MQTTProtocol_removePublication(m->publish);
1091 		if (m->MQTTVersion >= MQTTVERSION_5)
1092 			MQTTProperties_free(&m->properties);
1093 	}
1094 	ListEmpty(msgList);
1095 	FUNC_EXIT;
1096 }
1097 
1098 
1099 /**
1100  * Empty and free up all storage used by a message list
1101  * @param msgList the message list to empty and free
1102  */
MQTTProtocol_freeMessageList(List * msgList)1103 void MQTTProtocol_freeMessageList(List* msgList)
1104 {
1105 	FUNC_ENTRY;
1106 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
1107 	if (msgList != NULL)
1108 	{
1109 		MQTTProtocol_emptyMessageList(msgList);
1110 		ListFree(msgList);
1111 	}
1112 #endif
1113 	FUNC_EXIT;
1114 }
1115 
1116 
1117 /**
1118  * Callback that is invoked when the socket is available for writing.
1119  * This is the last attempt made to acknowledge a message. Failures that
1120  * occur here are ignored.
1121  * @param socket the socket that is available for writing
1122  */
MQTTProtocol_writeAvailable(SOCKET socket)1123 void MQTTProtocol_writeAvailable(SOCKET socket)
1124 {
1125 	Clients* client = NULL;
1126 	ListElement* current = NULL;
1127 	int rc = 0;
1128 
1129 	FUNC_ENTRY;
1130 
1131 	client = (Clients*)(ListFindItem(bstate->clients, &socket, clientSocketCompare)->content);
1132 
1133 	current = NULL;
1134 	while (ListNextElement(client->outboundQueue, &current) && rc == 0)
1135 	{
1136 		AckRequest* ackReq = (AckRequest*)(current->content);
1137 
1138 		switch (ackReq->ackType)
1139 		{
1140 			case PUBACK:
1141 				rc = MQTTPacket_send_puback(client->MQTTVersion, ackReq->messageId, &client->net, client->clientID);
1142 				break;
1143 			case PUBREC:
1144 				rc = MQTTPacket_send_pubrec(client->MQTTVersion, ackReq->messageId, &client->net, client->clientID);
1145 				break;
1146 			case PUBREL:
1147 				rc = MQTTPacket_send_pubrel(client->MQTTVersion, ackReq->messageId, 0, &client->net, client->clientID);
1148 				break;
1149 			case PUBCOMP:
1150 				rc = MQTTPacket_send_pubcomp(client->MQTTVersion, ackReq->messageId, &client->net, client->clientID);
1151 				break;
1152 			default:
1153 				Log(LOG_ERROR, -1, "unknown ACK type %d, dropping msg", ackReq->ackType);
1154 		break;
1155 		}
1156 	}
1157 
1158 	ListEmpty(client->outboundQueue);
1159 	FUNC_EXIT_RC(rc);
1160 }
1161 
1162 /**
1163 * Copy no more than dest_size -1 characters from the string pointed to by src to the array pointed to by dest.
1164 * The destination string will always be null-terminated.
1165 * @param dest the array which characters copy to
1166 * @param src the source string which characters copy from
1167 * @param dest_size the size of the memory pointed to by dest: copy no more than this -1 (allow for null).  Must be >= 1
1168 * @return the destination string pointer
1169 */
MQTTStrncpy(char * dest,const char * src,size_t dest_size)1170 char* MQTTStrncpy(char *dest, const char *src, size_t dest_size)
1171 {
1172   size_t count = dest_size;
1173   char *temp = dest;
1174 
1175   FUNC_ENTRY;
1176   if (dest_size < strlen(src))
1177     Log(TRACE_MIN, -1, "the src string is truncated");
1178 
1179   /* We must copy only the first (dest_size - 1) bytes */
1180   while (count > 1 && (*temp++ = *src++))
1181     count--;
1182 
1183   *temp = '\0';
1184 
1185   FUNC_EXIT;
1186   return dest;
1187 }
1188 
1189 
1190 /**
1191 * Duplicate a string, safely, allocating space on the heap
1192 * @param src the source string which characters copy from
1193 * @return the duplicated, allocated string
1194 */
MQTTStrdup(const char * src)1195 char* MQTTStrdup(const char* src)
1196 {
1197 	size_t mlen = strlen(src) + 1;
1198 	char* temp = malloc(mlen);
1199 	if (temp)
1200 		MQTTStrncpy(temp, src, mlen);
1201 	else
1202 		Log(LOG_ERROR, -1, "memory allocation error in MQTTStrdup");
1203 	return temp;
1204 }
1205