• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*******************************************************************************
2  * Copyright (c) 2009, 2022 IBM Corp.
3  *
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v2.0
6  * and Eclipse Distribution License v1.0 which accompany this distribution.
7  *
8  * The Eclipse Public License is available at
9  *    https://www.eclipse.org/legal/epl-2.0/
10  * and the Eclipse Distribution License is available at
11  *   http://www.eclipse.org/org/documents/edl-v10.php.
12  *
13  * Contributors:
14  *    Ian Craggs - initial API and implementation and/or initial documentation
15  *    Ian Craggs - async client updates
16  *    Ian Craggs - fix for bug 432903 - queue persistence
17  *    Ian Craggs - MQTT V5 updates
18  *******************************************************************************/
19 
20 /**
21  * @file
22  * \brief Functions that apply to persistence operations.
23  *
24  */
25 
26 #include <stdio.h>
27 #include <string.h>
28 
29 #include "MQTTPersistence.h"
30 #include "MQTTPersistenceDefault.h"
31 #include "MQTTProtocolClient.h"
32 #include "Heap.h"
33 
34 #if defined(_WIN32) || defined(_WIN64)
35 	#define snprintf _snprintf
36 #endif
37 
38 
39 static MQTTPersistence_qEntry* MQTTPersistence_restoreQueueEntry(char* buffer, size_t buflen, int MQTTVersion);
40 static void MQTTPersistence_insertInSeqOrder(List* list, MQTTPersistence_qEntry* qEntry, size_t size);
41 
42 /**
43  * Creates a ::MQTTClient_persistence structure representing a persistence implementation.
44  * @param persistence the ::MQTTClient_persistence structure.
45  * @param type the type of the persistence implementation. See ::MQTTClient_create.
46  * @param pcontext the context for this persistence implementation. See ::MQTTClient_create.
47  * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
48  */
49 #include "StackTrace.h"
50 
MQTTPersistence_create(MQTTClient_persistence ** persistence,int type,void * pcontext)51 int MQTTPersistence_create(MQTTClient_persistence** persistence, int type, void* pcontext)
52 {
53 	int rc = 0;
54 	MQTTClient_persistence* per = NULL;
55 
56 	FUNC_ENTRY;
57 #if !defined(NO_PERSISTENCE)
58 	switch (type)
59 	{
60 		case MQTTCLIENT_PERSISTENCE_NONE :
61 			per = NULL;
62 			break;
63 		case MQTTCLIENT_PERSISTENCE_DEFAULT :
64 			per = malloc(sizeof(MQTTClient_persistence));
65 			if ( per != NULL )
66 			{
67 				if ( pcontext == NULL )
68 					pcontext = "."; /* working directory */
69 				if ((per->context = malloc(strlen(pcontext) + 1)) == NULL)
70 				{
71 					free(per);
72 					rc = PAHO_MEMORY_ERROR;
73 					goto exit;
74 				}
75 				strcpy(per->context, pcontext);
76 				/* file system functions */
77 				per->popen        = pstopen;
78 				per->pclose       = pstclose;
79 				per->pput         = pstput;
80 				per->pget         = pstget;
81 				per->premove      = pstremove;
82 				per->pkeys        = pstkeys;
83 				per->pclear       = pstclear;
84 				per->pcontainskey = pstcontainskey;
85 			}
86 			else
87 				rc = PAHO_MEMORY_ERROR;
88 			break;
89 		case MQTTCLIENT_PERSISTENCE_USER :
90 			per = (MQTTClient_persistence *)pcontext;
91 			if ( per == NULL || (per != NULL && (per->context == NULL || per->pclear == NULL ||
92 				per->pclose == NULL || per->pcontainskey == NULL || per->pget == NULL || per->pkeys == NULL ||
93 				per->popen == NULL || per->pput == NULL || per->premove == NULL)) )
94 				rc = MQTTCLIENT_PERSISTENCE_ERROR;
95 			break;
96 		default:
97 			rc = MQTTCLIENT_PERSISTENCE_ERROR;
98 			break;
99 	}
100 #endif
101 
102 	*persistence = per;
103 exit:
104 	FUNC_EXIT_RC(rc);
105 	return rc;
106 }
107 
108 
109 /**
110  * Open persistent store and restore any persisted messages.
111  * @param client the client as ::Clients.
112  * @param serverURI the URI of the remote end.
113  * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
114  */
MQTTPersistence_initialize(Clients * c,const char * serverURI)115 int MQTTPersistence_initialize(Clients *c, const char *serverURI)
116 {
117 	int rc = 0;
118 
119 	FUNC_ENTRY;
120 	if ( c->persistence != NULL )
121 	{
122 		rc = c->persistence->popen(&(c->phandle), c->clientID, serverURI, c->persistence->context);
123 		if ( rc == 0 )
124 			rc = MQTTPersistence_restorePackets(c);
125 	}
126 
127 	FUNC_EXIT_RC(rc);
128 	return rc;
129 }
130 
131 
132 /**
133  * Close persistent store.
134  * @param client the client as ::Clients.
135  * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
136  */
MQTTPersistence_close(Clients * c)137 int MQTTPersistence_close(Clients *c)
138 {
139 	int rc = 0;
140 
141 	FUNC_ENTRY;
142 #if !defined(NO_PERSISTENCE)
143 	if (c->persistence != NULL)
144 	{
145 		rc = c->persistence->pclose(c->phandle);
146 
147 		if (c->persistence->popen == pstopen) {
148 			if (c->persistence->context)
149 				free(c->persistence->context);
150 			free(c->persistence);
151 		}
152 
153 		c->phandle = NULL;
154 		c->persistence = NULL;
155 	}
156 #endif
157 	FUNC_EXIT_RC(rc);
158 	return rc;
159 }
160 
161 /**
162  * Clears the persistent store.
163  * @param client the client as ::Clients.
164  * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
165  */
MQTTPersistence_clear(Clients * c)166 int MQTTPersistence_clear(Clients *c)
167 {
168 	int rc = 0;
169 
170 	FUNC_ENTRY;
171 	if (c->persistence != NULL)
172 		rc = c->persistence->pclear(c->phandle);
173 
174 	FUNC_EXIT_RC(rc);
175 	return rc;
176 }
177 
178 
179 /**
180  * Restores the persisted records to the outbound and inbound message queues of the
181  * client.
182  * @param client the client as ::Clients.
183  * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
184  */
MQTTPersistence_restorePackets(Clients * c)185 int MQTTPersistence_restorePackets(Clients *c)
186 {
187 	int rc = 0;
188 	char **msgkeys = NULL,
189 		 *buffer = NULL;
190 	int nkeys, buflen;
191 	int i = 0;
192 	int msgs_sent = 0;
193 	int msgs_rcvd = 0;
194 
195 	FUNC_ENTRY;
196 	if (c->persistence && (rc = c->persistence->pkeys(c->phandle, &msgkeys, &nkeys)) == 0)
197 	{
198 		while (rc == 0 && i < nkeys)
199 		{
200 			if (strncmp(msgkeys[i], PERSISTENCE_COMMAND_KEY, strlen(PERSISTENCE_COMMAND_KEY)) == 0 ||
201 				strncmp(msgkeys[i], PERSISTENCE_V5_COMMAND_KEY, strlen(PERSISTENCE_V5_COMMAND_KEY)) == 0)
202 			{
203 				;
204 			}
205 			else if (strncmp(msgkeys[i], PERSISTENCE_QUEUE_KEY, strlen(PERSISTENCE_QUEUE_KEY)) == 0 ||
206 					 strncmp(msgkeys[i], PERSISTENCE_V5_QUEUE_KEY, strlen(PERSISTENCE_V5_QUEUE_KEY)) == 0)
207 			{
208 				;
209 			}
210 			else if ((rc = c->persistence->pget(c->phandle, msgkeys[i], &buffer, &buflen)) == 0 &&
211 					(c->afterRead == NULL || (rc = c->afterRead(c->afterRead_context, &buffer, &buflen)) == 0))
212 			{
213 				int data_MQTTVersion = MQTTVERSION_3_1_1;
214 				char* cur_key = msgkeys[i];
215 				MQTTPacket* pack = NULL;
216 
217 				if (strncmp(cur_key, PERSISTENCE_V5_PUBLISH_RECEIVED,
218 							strlen(PERSISTENCE_V5_PUBLISH_RECEIVED)) == 0)
219 				{
220 					data_MQTTVersion = MQTTVERSION_5;
221 					cur_key = PERSISTENCE_PUBLISH_RECEIVED;
222 				}
223 				else if (strncmp(cur_key, PERSISTENCE_V5_PUBLISH_SENT,
224 							strlen(PERSISTENCE_V5_PUBLISH_SENT)) == 0)
225 				{
226 					data_MQTTVersion = MQTTVERSION_5;
227 					cur_key = PERSISTENCE_PUBLISH_SENT;
228 				}
229 				else if (strncmp(cur_key, PERSISTENCE_V5_PUBREL,
230 							strlen(PERSISTENCE_V5_PUBREL)) == 0)
231 				{
232 					data_MQTTVersion = MQTTVERSION_5;
233 					cur_key = PERSISTENCE_PUBREL;
234 				}
235 
236 				if (data_MQTTVersion == MQTTVERSION_5 && c->MQTTVersion < MQTTVERSION_5)
237 				{
238 					rc = MQTTCLIENT_PERSISTENCE_ERROR; /* can't restore version 5 data with a version 3 client */
239 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
240 					if (buffer != NULL)
241 					{
242 						free(buffer);
243 						buffer = NULL;
244 					}
245 					if (msgkeys[i] != NULL)
246 						free(msgkeys[i]);
247 #endif
248 					goto exit;
249 				}
250 
251 				pack = MQTTPersistence_restorePacket(data_MQTTVersion, buffer, buflen);
252 				if ( pack != NULL )
253 				{
254 					if (strncmp(cur_key, PERSISTENCE_PUBLISH_RECEIVED,
255 							      strlen(PERSISTENCE_PUBLISH_RECEIVED)) == 0)
256 					{
257 						Publish* publish = (Publish*)pack;
258 						Messages* msg = NULL;
259 						publish->MQTTVersion = c->MQTTVersion;
260 						msg = MQTTProtocol_createMessage(publish, &msg, publish->header.bits.qos, publish->header.bits.retain, 1);
261 						msg->nextMessageType = PUBREL;
262 						/* order does not matter for persisted received messages */
263 						ListAppend(c->inboundMsgs, msg, msg->len);
264 						if (c->MQTTVersion >= MQTTVERSION_5)
265 						{
266 							free(msg->publish->payload);
267 							free(msg->publish->topic);
268 							msg->publish->payload = msg->publish->topic = NULL;
269 						}
270 						publish->topic = NULL;
271 						MQTTPacket_freePublish(publish);
272 						msgs_rcvd++;
273 					}
274 					else if (strncmp(cur_key, PERSISTENCE_PUBLISH_SENT,
275 						               strlen(PERSISTENCE_PUBLISH_SENT)) == 0)
276 					{
277 						Publish* publish = (Publish*)pack;
278 						Messages* msg = NULL;
279 						const size_t keysize = PERSISTENCE_MAX_KEY_LENGTH + 1;
280 						char *key = malloc(keysize);
281 						int chars = 0;
282 
283 						if (!key)
284 						{
285 							rc = PAHO_MEMORY_ERROR;
286 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
287 							publish->topic = NULL;
288 							MQTTPacket_freePublish(publish);
289 							if (buffer != NULL)
290 							{
291 								free(buffer);
292 								buffer = NULL;
293 							}
294 							if (msgkeys[i] != NULL)
295 								free(msgkeys[i]);
296 #endif
297 							goto exit;
298 						}
299 						publish->MQTTVersion = c->MQTTVersion;
300 						if (publish->MQTTVersion >= MQTTVERSION_5)
301 							chars = snprintf(key, keysize, "%s%d", PERSISTENCE_V5_PUBREL, publish->msgId);
302 						else
303 							chars = snprintf(key, keysize, "%s%d", PERSISTENCE_PUBREL, publish->msgId);
304 						if (chars >= keysize)
305 						{
306 						    rc = MQTTCLIENT_PERSISTENCE_ERROR;
307 						    Log(LOG_ERROR, 0, "Error writing %d chars with snprintf", chars);
308 						}
309 						else
310 						{
311 							msg = MQTTProtocol_createMessage(publish, &msg, publish->header.bits.qos, publish->header.bits.retain, 1);
312 							if (c->persistence->pcontainskey(c->phandle, key) == 0)
313 								/* PUBLISH Qo2 and PUBREL sent */
314 								msg->nextMessageType = PUBCOMP;
315 							/* else: PUBLISH QoS1, or PUBLISH QoS2 and PUBREL not sent */
316 							/* retry at the first opportunity */
317 							memset(&msg->lastTouch, '\0', sizeof(msg->lastTouch));
318 							MQTTPersistence_insertInOrder(c->outboundMsgs, msg, msg->len);
319 							publish->topic = NULL;
320 							MQTTPacket_freePublish(publish);
321 							msgs_sent++;
322 						}
323 						free(key);
324 					}
325 					else if (strncmp(cur_key, PERSISTENCE_PUBREL, strlen(PERSISTENCE_PUBREL)) == 0)
326 					{
327 						/* orphaned PUBRELs ? */
328 						Pubrel* pubrel = (Pubrel*)pack;
329 						const size_t keysize = PERSISTENCE_MAX_KEY_LENGTH + 1;
330 						char *key = malloc(keysize);
331 						int chars = 0;
332 
333 						if (!key)
334 						{
335 							rc = PAHO_MEMORY_ERROR;
336 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
337 							free(pubrel);
338 							if (buffer != NULL)
339 							{
340 								free(buffer);
341 								buffer = NULL;
342 							}
343 							if (msgkeys[i] != NULL)
344 								free(msgkeys[i]);
345 #endif
346 							goto exit;
347 						}
348 						pubrel->MQTTVersion = c->MQTTVersion;
349 						if (pubrel->MQTTVersion >= MQTTVERSION_5)
350 							chars = snprintf(key, keysize, "%s%d", PERSISTENCE_V5_PUBLISH_SENT, pubrel->msgId);
351 						else
352 							chars = snprintf(key, keysize, "%s%d", PERSISTENCE_PUBLISH_SENT, pubrel->msgId);
353 						if (chars >= keysize)
354 						{
355 						    rc = MQTTCLIENT_PERSISTENCE_ERROR;
356 						    Log(LOG_ERROR, 0, "Error writing %d chars with snprintf", chars);
357 						}
358 						else if (c->persistence->pcontainskey(c->phandle, key) != 0)
359 							rc = c->persistence->premove(c->phandle, msgkeys[i]);
360 						free(pubrel);
361 						free(key);
362 					}
363 				}
364 				else  /* pack == NULL -> bad persisted record */
365 					rc = c->persistence->premove(c->phandle, msgkeys[i]);
366 			}
367 			if (buffer)
368 			{
369 				free(buffer);
370 				buffer = NULL;
371 			}
372 			if (msgkeys[i])
373 				free(msgkeys[i]);
374 			i++;
375 		}
376 #if !defined(IOT_CONNECT) && !defined(IOT_LITEOS_ADAPT)
377 		if (msgkeys)
378 			free(msgkeys);
379 #endif
380 	}
381 	Log(TRACE_MINIMUM, -1, "%d sent messages and %d received messages restored for client %s\n",
382 		msgs_sent, msgs_rcvd, c->clientID);
383 	MQTTPersistence_wrapMsgID(c);
384 exit:
385 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
386 	if (msgkeys != NULL)
387 		free(msgkeys);
388 #endif
389 	FUNC_EXIT_RC(rc);
390 	return rc;
391 }
392 
393 
394 /**
395  * Returns a MQTT packet restored from persisted data.
396  * @param buffer the persisted data.
397  * @param buflen the number of bytes of the data buffer.
398  */
MQTTPersistence_restorePacket(int MQTTVersion,char * buffer,size_t buflen)399 void* MQTTPersistence_restorePacket(int MQTTVersion, char* buffer, size_t buflen)
400 {
401 	void* pack = NULL;
402 	Header header;
403 	int fixed_header_length = 1, ptype, remaining_length = 0;
404 	char c;
405 	int multiplier = 1;
406 	extern pf new_packets[];
407 
408 	FUNC_ENTRY;
409 	header.byte = buffer[0];
410 	/* decode the message length according to the MQTT algorithm */
411 	do
412 	{
413 		c = *(++buffer);
414 		remaining_length += (c & 127) * multiplier;
415 		multiplier *= 128;
416 		fixed_header_length++;
417 	} while ((c & 128) != 0);
418 
419 	if ( (fixed_header_length + remaining_length) == buflen )
420 	{
421 		ptype = header.bits.type;
422 		if (ptype >= CONNECT && ptype <= DISCONNECT && new_packets[ptype] != NULL)
423 			pack = (*new_packets[ptype])(MQTTVersion, header.byte, ++buffer, remaining_length);
424 	}
425 
426 	FUNC_EXIT;
427 	return pack;
428 }
429 
430 
431 /**
432  * Inserts the specified message into the list, maintaining message ID order.
433  * @param list the list to insert the message into.
434  * @param content the message to add.
435  * @param size size of the message.
436  */
MQTTPersistence_insertInOrder(List * list,void * content,size_t size)437 void MQTTPersistence_insertInOrder(List* list, void* content, size_t size)
438 {
439 	ListElement* index = NULL;
440 	ListElement* current = NULL;
441 
442 	FUNC_ENTRY;
443 	while(ListNextElement(list, &current) != NULL && index == NULL)
444 	{
445 		if ( ((Messages*)content)->msgid < ((Messages*)current->content)->msgid )
446 			index = current;
447 	}
448 
449 	ListInsert(list, content, size, index);
450 	FUNC_EXIT;
451 }
452 
453 
454 /**
455  * Adds a record to the persistent store. This function must not be called for QoS0
456  * messages.
457  * @param socket the socket of the client.
458  * @param buf0 fixed header.
459  * @param buf0len length of the fixed header.
460  * @param count number of buffers representing the variable header and/or the payload.
461  * @param buffers the buffers representing the variable header and/or the payload.
462  * @param buflens length of the buffers representing the variable header and/or the payload.
463  * @param htype MQTT packet type - PUBLISH or PUBREL
464  * @param msgId the message ID.
465  * @param scr 0 indicates message in the sending direction; 1 indicates message in the
466  * receiving direction.
467  * @param the MQTT version being used (>= MQTTVERSION_5 means properties included)
468  * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
469  */
MQTTPersistence_putPacket(SOCKET socket,char * buf0,size_t buf0len,int count,char ** buffers,size_t * buflens,int htype,int msgId,int scr,int MQTTVersion)470 int MQTTPersistence_putPacket(SOCKET socket, char* buf0, size_t buf0len, int count,
471 						char** buffers, size_t* buflens, int htype, int msgId, int scr, int MQTTVersion)
472 {
473 	int rc = 0;
474 	extern ClientStates* bstate;
475 	int nbufs, i;
476 	int* lens = NULL;
477 	char** bufs = NULL;
478 	char *key;
479 	Clients* client = NULL;
480 
481 	FUNC_ENTRY;
482 	client = (Clients*)(ListFindItem(bstate->clients, &socket, clientSocketCompare)->content);
483 	if (client->persistence != NULL)
484 	{
485 		const size_t keysize = PERSISTENCE_MAX_KEY_LENGTH + 1;
486 		if ((key = malloc(keysize)) == NULL)
487 		{
488 			rc = PAHO_MEMORY_ERROR;
489 			goto exit;
490 		}
491 		nbufs = 1 + count;
492 		if ((lens = (int *)malloc(nbufs * sizeof(int))) == NULL)
493 		{
494 			free(key);
495 			rc = PAHO_MEMORY_ERROR;
496 			goto exit;
497 		}
498 		if ((bufs = (char **)malloc(nbufs * sizeof(char *))) == NULL)
499 		{
500 			free(key);
501 			free(lens);
502 			rc = PAHO_MEMORY_ERROR;
503 			goto exit;
504 		}
505 		lens[0] = (int)buf0len;
506 		bufs[0] = buf0;
507 		for (i = 0; i < count; i++)
508 		{
509 			lens[i+1] = (int)buflens[i];
510 			bufs[i+1] = buffers[i];
511 		}
512 
513 		/* key */
514 		if (scr == 0)
515 		{  /* sending */
516 			char* key_id = PERSISTENCE_PUBLISH_SENT;
517 
518 			if (htype == PUBLISH) /* PUBLISH QoS1 and QoS2*/
519 			{
520 				if (MQTTVersion >= MQTTVERSION_5)
521 					key_id = PERSISTENCE_V5_PUBLISH_SENT;
522 			}
523 			else if (htype == PUBREL) /* PUBREL */
524 			{
525 				if (MQTTVersion >= MQTTVERSION_5)
526 					key_id = PERSISTENCE_V5_PUBREL;
527 				else
528 					key_id = PERSISTENCE_PUBREL;
529 			}
530 			if (snprintf(key, keysize, "%s%d", key_id, msgId) >= keysize)
531 				rc = MQTTCLIENT_PERSISTENCE_ERROR;
532 		}
533 		else if (scr == 1)  /* receiving PUBLISH QoS2 */
534 		{
535 			char* key_id = PERSISTENCE_PUBLISH_RECEIVED;
536 
537 			if (MQTTVersion >= MQTTVERSION_5)
538 				key_id = PERSISTENCE_V5_PUBLISH_RECEIVED;
539 			if (snprintf(key, keysize, "%s%d", key_id, msgId) >= keysize)
540 				rc = MQTTCLIENT_PERSISTENCE_ERROR;
541 		}
542 
543 		if (rc == 0 && client->beforeWrite)
544 			rc = client->beforeWrite(client->beforeWrite_context, nbufs, bufs, lens);
545 
546 		if (rc == 0)
547 			rc = client->persistence->pput(client->phandle, key, nbufs, bufs, lens);
548 
549 		free(key);
550 		free(lens);
551 		free(bufs);
552 	}
553 
554 exit:
555 	FUNC_EXIT_RC(rc);
556 	return rc;
557 }
558 
559 
560 /**
561  * Deletes a record from the persistent store.
562  * @param client the client as ::Clients.
563  * @param type the type of the persisted record: #PERSISTENCE_PUBLISH_SENT, #PERSISTENCE_PUBREL
564  * or #PERSISTENCE_PUBLISH_RECEIVED.
565  * @param qos the qos field of the message.
566  * @param msgId the message ID.
567  * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
568  */
MQTTPersistence_remove(Clients * c,char * type,int qos,int msgId)569 int MQTTPersistence_remove(Clients* c, char *type, int qos, int msgId)
570 {
571 	int rc = 0;
572 
573 	FUNC_ENTRY;
574 	if (c->persistence != NULL)
575 	{
576 		const size_t keysize = PERSISTENCE_MAX_KEY_LENGTH + 1;
577 		char *key = malloc(keysize);
578 		int chars = 0;
579 
580 		if (!key)
581 		{
582 			rc = PAHO_MEMORY_ERROR;
583 			goto exit;
584 		}
585 		if (strcmp(type, PERSISTENCE_PUBLISH_SENT) == 0 ||
586 				strcmp(type, PERSISTENCE_V5_PUBLISH_SENT) == 0)
587 		{
588 			if ((chars = snprintf(key, keysize, "%s%d", PERSISTENCE_V5_PUBLISH_SENT, msgId)) >= keysize)
589 				rc = MQTTCLIENT_PERSISTENCE_ERROR;
590 			else
591 			{
592 				rc = c->persistence->premove(c->phandle, key);
593 				if ((chars = snprintf(key, keysize, "%s%d", PERSISTENCE_V5_PUBREL, msgId)) >= keysize)
594 					rc = MQTTCLIENT_PERSISTENCE_ERROR;
595 				else
596 				{
597 					rc += c->persistence->premove(c->phandle, key);
598 					if ((chars = snprintf(key, keysize, "%s%d", PERSISTENCE_PUBLISH_SENT, msgId)) >= keysize)
599 						rc = MQTTCLIENT_PERSISTENCE_ERROR;
600 					else
601 					{
602 						rc += c->persistence->premove(c->phandle, key);
603 						if ((chars = snprintf(key, keysize, "%s%d", PERSISTENCE_PUBREL, msgId)) >= keysize)
604 							rc = MQTTCLIENT_PERSISTENCE_ERROR;
605 						else
606 							rc += c->persistence->premove(c->phandle, key);
607 					}
608 				}
609 			}
610 		}
611 		else /* PERSISTENCE_PUBLISH_SENT && qos == 1 */
612 		{    /* or PERSISTENCE_PUBLISH_RECEIVED */
613 
614 			if ((chars = snprintf(key, keysize, "%s%d", PERSISTENCE_V5_PUBLISH_RECEIVED, msgId)) >= keysize)
615 				rc = MQTTCLIENT_PERSISTENCE_ERROR;
616 			else
617 			{
618 				rc = c->persistence->premove(c->phandle, key);
619 				if ((chars = snprintf(key, keysize, "%s%d", PERSISTENCE_PUBLISH_RECEIVED, msgId)) >= keysize)
620 					rc = MQTTCLIENT_PERSISTENCE_ERROR;
621 				else
622 					rc += c->persistence->premove(c->phandle, key);
623 			}
624 		}
625 		if (rc == MQTTCLIENT_PERSISTENCE_ERROR)
626 			Log(LOG_ERROR, 0, "Error writing %d chars with snprintf", chars);
627 		free(key);
628 	}
629 
630 exit:
631 	FUNC_EXIT_RC(rc);
632 	return rc;
633 }
634 
635 
636 /**
637  * Checks whether the message IDs wrapped by looking for the largest gap between two consecutive
638  * message IDs in the outboundMsgs queue.
639  * @param client the client as ::Clients.
640  */
MQTTPersistence_wrapMsgID(Clients * client)641 void MQTTPersistence_wrapMsgID(Clients *client)
642 {
643 	ListElement* wrapel = NULL;
644 	ListElement* current = NULL;
645 
646 	FUNC_ENTRY;
647 	if ( client->outboundMsgs->count > 0 )
648 	{
649 		int firstMsgID = ((Messages*)client->outboundMsgs->first->content)->msgid;
650 		int lastMsgID = ((Messages*)client->outboundMsgs->last->content)->msgid;
651 		int gap = MAX_MSG_ID - lastMsgID + firstMsgID;
652 		current = ListNextElement(client->outboundMsgs, &current);
653 
654 		while(ListNextElement(client->outboundMsgs, &current) != NULL)
655 		{
656 			int curMsgID = ((Messages*)current->content)->msgid;
657 			int curPrevMsgID = ((Messages*)current->prev->content)->msgid;
658 			int curgap = curMsgID - curPrevMsgID;
659 			if ( curgap > gap )
660 			{
661 				gap = curgap;
662 				wrapel = current;
663 			}
664 		}
665 	}
666 
667 	if ( wrapel != NULL )
668 	{
669 		/* put wrapel at the beginning of the queue */
670 		client->outboundMsgs->first->prev = client->outboundMsgs->last;
671 		client->outboundMsgs->last->next = client->outboundMsgs->first;
672 		client->outboundMsgs->first = wrapel;
673 		client->outboundMsgs->last = wrapel->prev;
674 		client->outboundMsgs->first->prev = NULL;
675 		client->outboundMsgs->last->next = NULL;
676 	}
677 	FUNC_EXIT;
678 }
679 
680 
681 #if !defined(NO_PERSISTENCE)
MQTTPersistence_unpersistQueueEntry(Clients * client,MQTTPersistence_qEntry * qe)682 int MQTTPersistence_unpersistQueueEntry(Clients* client, MQTTPersistence_qEntry* qe)
683 {
684 	int rc = 0;
685 #if defined(_WIN32) || defined(_WIN64)
686 #define KEYSIZE PERSISTENCE_MAX_KEY_LENGTH + 1
687 #else
688 	const size_t KEYSIZE = PERSISTENCE_MAX_KEY_LENGTH + 1;
689 #endif
690 	char key[KEYSIZE];
691 	int chars = 0;
692 
693 	FUNC_ENTRY;
694 	if (client->MQTTVersion >= MQTTVERSION_5)
695 		chars = snprintf(key, KEYSIZE, "%s%u", PERSISTENCE_V5_QUEUE_KEY, qe->seqno);
696 	else
697 		chars = snprintf(key, KEYSIZE, "%s%u", PERSISTENCE_QUEUE_KEY, qe->seqno);
698 	if (chars >= KEYSIZE)
699 	{
700 		Log(LOG_ERROR, 0, "Error writing %d chars with snprintf", chars);
701 		rc = MQTTCLIENT_PERSISTENCE_ERROR;
702 	}
703 	else if ((rc = client->persistence->premove(client->phandle, key)) != 0)
704 		Log(LOG_ERROR, 0, "Error %d removing qEntry from persistence", rc);
705 	FUNC_EXIT_RC(rc);
706 	return rc;
707 }
708 
709 
710 #define MAX_NO_OF_BUFFERS 9
MQTTPersistence_persistQueueEntry(Clients * aclient,MQTTPersistence_qEntry * qe)711 int MQTTPersistence_persistQueueEntry(Clients* aclient, MQTTPersistence_qEntry* qe)
712 {
713 	int rc = 0;
714 	int bufindex = 0;
715 #if !defined(_WIN32) && !defined(_WIN64)
716 	const size_t KEYSIZE = PERSISTENCE_MAX_KEY_LENGTH + 1;
717 #endif
718 	char key[KEYSIZE];
719 	int chars = 0;
720 	int lens[MAX_NO_OF_BUFFERS];
721 	void* bufs[MAX_NO_OF_BUFFERS];
722 	int props_allocated = 0;
723 
724 	FUNC_ENTRY;
725 	bufs[bufindex] = &qe->msg->payloadlen;
726 	lens[bufindex++] = sizeof(qe->msg->payloadlen);
727 
728 	bufs[bufindex] = qe->msg->payload;
729 	lens[bufindex++] = qe->msg->payloadlen;
730 
731 	bufs[bufindex] = &qe->msg->qos;
732 	lens[bufindex++] = sizeof(qe->msg->qos);
733 
734 	bufs[bufindex] = &qe->msg->retained;
735 	lens[bufindex++] = sizeof(qe->msg->retained);
736 
737 	bufs[bufindex] = &qe->msg->dup;
738 	lens[bufindex++] = sizeof(qe->msg->dup);
739 
740 	bufs[bufindex] = &qe->msg->msgid;
741 	lens[bufindex++] = sizeof(qe->msg->msgid);
742 
743 	bufs[bufindex] = qe->topicName;
744 	lens[bufindex++] = (int)strlen(qe->topicName) + 1;
745 
746 	bufs[bufindex] = &qe->topicLen;
747 	lens[bufindex++] = sizeof(qe->topicLen);
748 
749 	if (++aclient->qentry_seqno == PERSISTENCE_SEQNO_LIMIT)
750 		aclient->qentry_seqno = 0;
751 
752 	if (aclient->MQTTVersion >= MQTTVERSION_5)  		/* persist properties */
753 	{
754 		MQTTProperties no_props = MQTTProperties_initializer;
755 		MQTTProperties* props = &no_props;
756 		int temp_len = 0;
757 		char* ptr = NULL;
758 
759 		if (qe->msg->struct_version >= 1)
760 			props = &qe->msg->properties;
761 
762 		temp_len = MQTTProperties_len(props);
763 		ptr = bufs[bufindex] = malloc(temp_len);
764 		if (!ptr)
765 		{
766 			rc = PAHO_MEMORY_ERROR;
767 			goto exit;
768 		}
769 		props_allocated = bufindex;
770 		rc = MQTTProperties_write(&ptr, props);
771 		lens[bufindex++] = temp_len;
772 
773 		chars = snprintf(key, KEYSIZE, "%s%u", PERSISTENCE_V5_QUEUE_KEY, aclient->qentry_seqno);
774 	}
775 	else
776 		chars = snprintf(key, KEYSIZE, "%s%u", PERSISTENCE_QUEUE_KEY, aclient->qentry_seqno);
777 
778 	if (chars >= KEYSIZE)
779 		rc = MQTTCLIENT_PERSISTENCE_ERROR;
780 	else
781 	{
782 		qe->seqno = aclient->qentry_seqno;
783 
784 		if (aclient->beforeWrite)
785 			rc = aclient->beforeWrite(aclient->beforeWrite_context, bufindex, (char**)bufs, lens);
786 
787 		if (rc == 0 && (rc = aclient->persistence->pput(aclient->phandle, key, bufindex, (char**)bufs, lens)) != 0)
788 			Log(LOG_ERROR, 0, "Error persisting queue entry, rc %d", rc);
789 	}
790 	if (props_allocated != 0)
791 		free(bufs[props_allocated]);
792 
793 exit:
794 	FUNC_EXIT_RC(rc);
795 	return rc;
796 }
797 
798 
MQTTPersistence_restoreQueueEntry(char * buffer,size_t buflen,int MQTTVersion)799 static MQTTPersistence_qEntry* MQTTPersistence_restoreQueueEntry(char* buffer, size_t buflen, int MQTTVersion)
800 {
801 	MQTTPersistence_qEntry* qe = NULL;
802 	char* ptr = buffer;
803 	int data_size;
804 
805 	FUNC_ENTRY;
806 	if ((qe = malloc(sizeof(MQTTPersistence_qEntry))) == NULL)
807 		goto exit;
808 	memset(qe, '\0', sizeof(MQTTPersistence_qEntry));
809 
810 	if ((qe->msg = malloc(sizeof(MQTTPersistence_message))) == NULL)
811 	{
812 		free(qe);
813 		qe = NULL;
814 		goto exit;
815 	}
816 	memset(qe->msg, '\0', sizeof(MQTTPersistence_message));
817 
818 	qe->msg->struct_version = 1;
819 
820 	qe->msg->payloadlen = *(int*)ptr;
821 	ptr += sizeof(int);
822 
823 	data_size = qe->msg->payloadlen;
824 	if ((qe->msg->payload = malloc(data_size)) == NULL)
825 	{
826 		free(qe->msg);
827 		free(qe);
828 		qe = NULL;
829 		goto exit;
830 	}
831 	memcpy(qe->msg->payload, ptr, data_size);
832 	ptr += data_size;
833 
834 	qe->msg->qos = *(int*)ptr;
835 	ptr += sizeof(int);
836 
837 	qe->msg->retained = *(int*)ptr;
838 	ptr += sizeof(int);
839 
840 	qe->msg->dup = *(int*)ptr;
841 	ptr += sizeof(int);
842 
843 	qe->msg->msgid = *(int*)ptr;
844 	ptr += sizeof(int);
845 
846 	data_size = (int)strlen(ptr) + 1;
847 	if ((qe->topicName = malloc(data_size)) == NULL)
848 	{
849 		free(qe->msg->payload);
850 		free(qe->msg);
851 		free(qe);
852 		qe = NULL;
853 		goto exit;
854 	}
855 	strcpy(qe->topicName, ptr);
856 	ptr += data_size;
857 
858 	qe->topicLen = *(int*)ptr;
859 	ptr += sizeof(int);
860 
861 	if (MQTTVersion >= MQTTVERSION_5 &&
862 		MQTTProperties_read(&qe->msg->properties, &ptr, buffer + buflen) != 1)
863 			Log(LOG_ERROR, -1, "Error restoring properties from persistence");
864 
865 exit:
866 	FUNC_EXIT;
867 	return qe;
868 }
869 
870 
MQTTPersistence_insertInSeqOrder(List * list,MQTTPersistence_qEntry * qEntry,size_t size)871 static void MQTTPersistence_insertInSeqOrder(List* list, MQTTPersistence_qEntry* qEntry, size_t size)
872 {
873 	ListElement* index = NULL;
874 	ListElement* current = NULL;
875 
876 	FUNC_ENTRY;
877 	while (ListNextElement(list, &current) != NULL && index == NULL)
878 	{
879 		if (qEntry->seqno < ((MQTTPersistence_qEntry*)current->content)->seqno)
880 			index = current;
881 	}
882 	ListInsert(list, qEntry, size, index);
883 	FUNC_EXIT;
884 }
885 
886 
887 /**
888  * Restores a queue of messages from persistence to memory
889  * @param c the client as ::Clients - the client object to restore the messages to
890  * @return return code, 0 if successful
891  */
MQTTPersistence_restoreMessageQueue(Clients * c)892 int MQTTPersistence_restoreMessageQueue(Clients* c)
893 {
894 	int rc = 0;
895 	char **msgkeys;
896 	int nkeys;
897 	int i = 0;
898 	int entries_restored = 0;
899 
900 	FUNC_ENTRY;
901 	if (c->persistence && (rc = c->persistence->pkeys(c->phandle, &msgkeys, &nkeys)) == 0)
902 	{
903 		while (rc == 0 && i < nkeys)
904 		{
905 			char *buffer = NULL;
906 			int buflen;
907 
908 			if (strncmp(msgkeys[i], PERSISTENCE_QUEUE_KEY, strlen(PERSISTENCE_QUEUE_KEY)) != 0 &&
909 				strncmp(msgkeys[i], PERSISTENCE_V5_QUEUE_KEY, strlen(PERSISTENCE_V5_QUEUE_KEY)) != 0)
910 			{
911 				; /* ignore if not a queue entry key */
912 			}
913 			else if ((rc = c->persistence->pget(c->phandle, msgkeys[i], &buffer, &buflen)) == 0 &&
914 				(c->afterRead == NULL || (rc = c->afterRead(c->afterRead_context, &buffer, &buflen)) == 0))
915 			{
916 				int MQTTVersion =
917 					(strncmp(msgkeys[i], PERSISTENCE_V5_QUEUE_KEY, strlen(PERSISTENCE_V5_QUEUE_KEY)) == 0)
918 					? MQTTVERSION_5 : MQTTVERSION_3_1_1;
919 				MQTTPersistence_qEntry* qe = MQTTPersistence_restoreQueueEntry(buffer, buflen, MQTTVersion);
920 
921 				if (qe)
922 				{
923 					qe->seqno = atoi(strchr(msgkeys[i], '-')+1); /* key format is tag'-'seqno */
924 					MQTTPersistence_insertInSeqOrder(c->messageQueue, qe, sizeof(MQTTPersistence_qEntry));
925 					c->qentry_seqno = max(c->qentry_seqno, qe->seqno);
926 					entries_restored++;
927 				}
928 				if (buffer)
929 					free(buffer);
930 			}
931 			if (msgkeys[i])
932 			{
933 				free(msgkeys[i]);
934 			}
935 			i++;
936 		}
937 		if (msgkeys != NULL)
938 			free(msgkeys);
939 	}
940 	Log(TRACE_MINIMUM, -1, "%d queued messages restored for client %s", entries_restored, c->clientID);
941 	FUNC_EXIT_RC(rc);
942 	return rc;
943 }
944 #endif
945