1 /*******************************************************************************
2 * Copyright (c) 2009, 2022 IBM Corp.
3 *
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v2.0
6 * and Eclipse Distribution License v1.0 which accompany this distribution.
7 *
8 * The Eclipse Public License is available at
9 * https://www.eclipse.org/legal/epl-2.0/
10 * and the Eclipse Distribution License is available at
11 * http://www.eclipse.org/org/documents/edl-v10.php.
12 *
13 * Contributors:
14 * Ian Craggs - initial API and implementation and/or initial documentation
15 * Ian Craggs - async client updates
16 * Ian Craggs - fix for bug 432903 - queue persistence
17 * Ian Craggs - MQTT V5 updates
18 *******************************************************************************/
19
20 /**
21 * @file
22 * \brief Functions that apply to persistence operations.
23 *
24 */
25
26 #include <stdio.h>
27 #include <string.h>
28
29 #include "MQTTPersistence.h"
30 #include "MQTTPersistenceDefault.h"
31 #include "MQTTProtocolClient.h"
32 #include "Heap.h"
33
34 #if defined(_WIN32) || defined(_WIN64)
35 #define snprintf _snprintf
36 #endif
37
38
39 static MQTTPersistence_qEntry* MQTTPersistence_restoreQueueEntry(char* buffer, size_t buflen, int MQTTVersion);
40 static void MQTTPersistence_insertInSeqOrder(List* list, MQTTPersistence_qEntry* qEntry, size_t size);
41
42 /**
43 * Creates a ::MQTTClient_persistence structure representing a persistence implementation.
44 * @param persistence the ::MQTTClient_persistence structure.
45 * @param type the type of the persistence implementation. See ::MQTTClient_create.
46 * @param pcontext the context for this persistence implementation. See ::MQTTClient_create.
47 * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
48 */
49 #include "StackTrace.h"
50
MQTTPersistence_create(MQTTClient_persistence ** persistence,int type,void * pcontext)51 int MQTTPersistence_create(MQTTClient_persistence** persistence, int type, void* pcontext)
52 {
53 int rc = 0;
54 MQTTClient_persistence* per = NULL;
55
56 FUNC_ENTRY;
57 #if !defined(NO_PERSISTENCE)
58 switch (type)
59 {
60 case MQTTCLIENT_PERSISTENCE_NONE :
61 per = NULL;
62 break;
63 case MQTTCLIENT_PERSISTENCE_DEFAULT :
64 per = malloc(sizeof(MQTTClient_persistence));
65 if ( per != NULL )
66 {
67 if ( pcontext == NULL )
68 pcontext = "."; /* working directory */
69 if ((per->context = malloc(strlen(pcontext) + 1)) == NULL)
70 {
71 free(per);
72 rc = PAHO_MEMORY_ERROR;
73 goto exit;
74 }
75 strcpy(per->context, pcontext);
76 /* file system functions */
77 per->popen = pstopen;
78 per->pclose = pstclose;
79 per->pput = pstput;
80 per->pget = pstget;
81 per->premove = pstremove;
82 per->pkeys = pstkeys;
83 per->pclear = pstclear;
84 per->pcontainskey = pstcontainskey;
85 }
86 else
87 rc = PAHO_MEMORY_ERROR;
88 break;
89 case MQTTCLIENT_PERSISTENCE_USER :
90 per = (MQTTClient_persistence *)pcontext;
91 if ( per == NULL || (per != NULL && (per->context == NULL || per->pclear == NULL ||
92 per->pclose == NULL || per->pcontainskey == NULL || per->pget == NULL || per->pkeys == NULL ||
93 per->popen == NULL || per->pput == NULL || per->premove == NULL)) )
94 rc = MQTTCLIENT_PERSISTENCE_ERROR;
95 break;
96 default:
97 rc = MQTTCLIENT_PERSISTENCE_ERROR;
98 break;
99 }
100 #endif
101
102 *persistence = per;
103 exit:
104 FUNC_EXIT_RC(rc);
105 return rc;
106 }
107
108
109 /**
110 * Open persistent store and restore any persisted messages.
111 * @param client the client as ::Clients.
112 * @param serverURI the URI of the remote end.
113 * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
114 */
MQTTPersistence_initialize(Clients * c,const char * serverURI)115 int MQTTPersistence_initialize(Clients *c, const char *serverURI)
116 {
117 int rc = 0;
118
119 FUNC_ENTRY;
120 if ( c->persistence != NULL )
121 {
122 rc = c->persistence->popen(&(c->phandle), c->clientID, serverURI, c->persistence->context);
123 if ( rc == 0 )
124 rc = MQTTPersistence_restorePackets(c);
125 }
126
127 FUNC_EXIT_RC(rc);
128 return rc;
129 }
130
131
132 /**
133 * Close persistent store.
134 * @param client the client as ::Clients.
135 * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
136 */
MQTTPersistence_close(Clients * c)137 int MQTTPersistence_close(Clients *c)
138 {
139 int rc = 0;
140
141 FUNC_ENTRY;
142 #if !defined(NO_PERSISTENCE)
143 if (c->persistence != NULL)
144 {
145 rc = c->persistence->pclose(c->phandle);
146
147 if (c->persistence->popen == pstopen) {
148 if (c->persistence->context)
149 free(c->persistence->context);
150 free(c->persistence);
151 }
152
153 c->phandle = NULL;
154 c->persistence = NULL;
155 }
156 #endif
157 FUNC_EXIT_RC(rc);
158 return rc;
159 }
160
161 /**
162 * Clears the persistent store.
163 * @param client the client as ::Clients.
164 * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
165 */
MQTTPersistence_clear(Clients * c)166 int MQTTPersistence_clear(Clients *c)
167 {
168 int rc = 0;
169
170 FUNC_ENTRY;
171 if (c->persistence != NULL)
172 rc = c->persistence->pclear(c->phandle);
173
174 FUNC_EXIT_RC(rc);
175 return rc;
176 }
177
178
179 /**
180 * Restores the persisted records to the outbound and inbound message queues of the
181 * client.
182 * @param client the client as ::Clients.
183 * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
184 */
MQTTPersistence_restorePackets(Clients * c)185 int MQTTPersistence_restorePackets(Clients *c)
186 {
187 int rc = 0;
188 char **msgkeys = NULL,
189 *buffer = NULL;
190 int nkeys, buflen;
191 int i = 0;
192 int msgs_sent = 0;
193 int msgs_rcvd = 0;
194
195 FUNC_ENTRY;
196 if (c->persistence && (rc = c->persistence->pkeys(c->phandle, &msgkeys, &nkeys)) == 0)
197 {
198 while (rc == 0 && i < nkeys)
199 {
200 if (strncmp(msgkeys[i], PERSISTENCE_COMMAND_KEY, strlen(PERSISTENCE_COMMAND_KEY)) == 0 ||
201 strncmp(msgkeys[i], PERSISTENCE_V5_COMMAND_KEY, strlen(PERSISTENCE_V5_COMMAND_KEY)) == 0)
202 {
203 ;
204 }
205 else if (strncmp(msgkeys[i], PERSISTENCE_QUEUE_KEY, strlen(PERSISTENCE_QUEUE_KEY)) == 0 ||
206 strncmp(msgkeys[i], PERSISTENCE_V5_QUEUE_KEY, strlen(PERSISTENCE_V5_QUEUE_KEY)) == 0)
207 {
208 ;
209 }
210 else if ((rc = c->persistence->pget(c->phandle, msgkeys[i], &buffer, &buflen)) == 0 &&
211 (c->afterRead == NULL || (rc = c->afterRead(c->afterRead_context, &buffer, &buflen)) == 0))
212 {
213 int data_MQTTVersion = MQTTVERSION_3_1_1;
214 char* cur_key = msgkeys[i];
215 MQTTPacket* pack = NULL;
216
217 if (strncmp(cur_key, PERSISTENCE_V5_PUBLISH_RECEIVED,
218 strlen(PERSISTENCE_V5_PUBLISH_RECEIVED)) == 0)
219 {
220 data_MQTTVersion = MQTTVERSION_5;
221 cur_key = PERSISTENCE_PUBLISH_RECEIVED;
222 }
223 else if (strncmp(cur_key, PERSISTENCE_V5_PUBLISH_SENT,
224 strlen(PERSISTENCE_V5_PUBLISH_SENT)) == 0)
225 {
226 data_MQTTVersion = MQTTVERSION_5;
227 cur_key = PERSISTENCE_PUBLISH_SENT;
228 }
229 else if (strncmp(cur_key, PERSISTENCE_V5_PUBREL,
230 strlen(PERSISTENCE_V5_PUBREL)) == 0)
231 {
232 data_MQTTVersion = MQTTVERSION_5;
233 cur_key = PERSISTENCE_PUBREL;
234 }
235
236 if (data_MQTTVersion == MQTTVERSION_5 && c->MQTTVersion < MQTTVERSION_5)
237 {
238 rc = MQTTCLIENT_PERSISTENCE_ERROR; /* can't restore version 5 data with a version 3 client */
239 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
240 if (buffer != NULL)
241 {
242 free(buffer);
243 buffer = NULL;
244 }
245 if (msgkeys[i] != NULL)
246 free(msgkeys[i]);
247 #endif
248 goto exit;
249 }
250
251 pack = MQTTPersistence_restorePacket(data_MQTTVersion, buffer, buflen);
252 if ( pack != NULL )
253 {
254 if (strncmp(cur_key, PERSISTENCE_PUBLISH_RECEIVED,
255 strlen(PERSISTENCE_PUBLISH_RECEIVED)) == 0)
256 {
257 Publish* publish = (Publish*)pack;
258 Messages* msg = NULL;
259 publish->MQTTVersion = c->MQTTVersion;
260 msg = MQTTProtocol_createMessage(publish, &msg, publish->header.bits.qos, publish->header.bits.retain, 1);
261 msg->nextMessageType = PUBREL;
262 /* order does not matter for persisted received messages */
263 ListAppend(c->inboundMsgs, msg, msg->len);
264 if (c->MQTTVersion >= MQTTVERSION_5)
265 {
266 free(msg->publish->payload);
267 free(msg->publish->topic);
268 msg->publish->payload = msg->publish->topic = NULL;
269 }
270 publish->topic = NULL;
271 MQTTPacket_freePublish(publish);
272 msgs_rcvd++;
273 }
274 else if (strncmp(cur_key, PERSISTENCE_PUBLISH_SENT,
275 strlen(PERSISTENCE_PUBLISH_SENT)) == 0)
276 {
277 Publish* publish = (Publish*)pack;
278 Messages* msg = NULL;
279 const size_t keysize = PERSISTENCE_MAX_KEY_LENGTH + 1;
280 char *key = malloc(keysize);
281 int chars = 0;
282
283 if (!key)
284 {
285 rc = PAHO_MEMORY_ERROR;
286 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
287 publish->topic = NULL;
288 MQTTPacket_freePublish(publish);
289 if (buffer != NULL)
290 {
291 free(buffer);
292 buffer = NULL;
293 }
294 if (msgkeys[i] != NULL)
295 free(msgkeys[i]);
296 #endif
297 goto exit;
298 }
299 publish->MQTTVersion = c->MQTTVersion;
300 if (publish->MQTTVersion >= MQTTVERSION_5)
301 chars = snprintf(key, keysize, "%s%d", PERSISTENCE_V5_PUBREL, publish->msgId);
302 else
303 chars = snprintf(key, keysize, "%s%d", PERSISTENCE_PUBREL, publish->msgId);
304 if (chars >= keysize)
305 {
306 rc = MQTTCLIENT_PERSISTENCE_ERROR;
307 Log(LOG_ERROR, 0, "Error writing %d chars with snprintf", chars);
308 }
309 else
310 {
311 msg = MQTTProtocol_createMessage(publish, &msg, publish->header.bits.qos, publish->header.bits.retain, 1);
312 if (c->persistence->pcontainskey(c->phandle, key) == 0)
313 /* PUBLISH Qo2 and PUBREL sent */
314 msg->nextMessageType = PUBCOMP;
315 /* else: PUBLISH QoS1, or PUBLISH QoS2 and PUBREL not sent */
316 /* retry at the first opportunity */
317 memset(&msg->lastTouch, '\0', sizeof(msg->lastTouch));
318 MQTTPersistence_insertInOrder(c->outboundMsgs, msg, msg->len);
319 publish->topic = NULL;
320 MQTTPacket_freePublish(publish);
321 msgs_sent++;
322 }
323 free(key);
324 }
325 else if (strncmp(cur_key, PERSISTENCE_PUBREL, strlen(PERSISTENCE_PUBREL)) == 0)
326 {
327 /* orphaned PUBRELs ? */
328 Pubrel* pubrel = (Pubrel*)pack;
329 const size_t keysize = PERSISTENCE_MAX_KEY_LENGTH + 1;
330 char *key = malloc(keysize);
331 int chars = 0;
332
333 if (!key)
334 {
335 rc = PAHO_MEMORY_ERROR;
336 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
337 free(pubrel);
338 if (buffer != NULL)
339 {
340 free(buffer);
341 buffer = NULL;
342 }
343 if (msgkeys[i] != NULL)
344 free(msgkeys[i]);
345 #endif
346 goto exit;
347 }
348 pubrel->MQTTVersion = c->MQTTVersion;
349 if (pubrel->MQTTVersion >= MQTTVERSION_5)
350 chars = snprintf(key, keysize, "%s%d", PERSISTENCE_V5_PUBLISH_SENT, pubrel->msgId);
351 else
352 chars = snprintf(key, keysize, "%s%d", PERSISTENCE_PUBLISH_SENT, pubrel->msgId);
353 if (chars >= keysize)
354 {
355 rc = MQTTCLIENT_PERSISTENCE_ERROR;
356 Log(LOG_ERROR, 0, "Error writing %d chars with snprintf", chars);
357 }
358 else if (c->persistence->pcontainskey(c->phandle, key) != 0)
359 rc = c->persistence->premove(c->phandle, msgkeys[i]);
360 free(pubrel);
361 free(key);
362 }
363 }
364 else /* pack == NULL -> bad persisted record */
365 rc = c->persistence->premove(c->phandle, msgkeys[i]);
366 }
367 if (buffer)
368 {
369 free(buffer);
370 buffer = NULL;
371 }
372 if (msgkeys[i])
373 free(msgkeys[i]);
374 i++;
375 }
376 #if !defined(IOT_CONNECT) && !defined(IOT_LITEOS_ADAPT)
377 if (msgkeys)
378 free(msgkeys);
379 #endif
380 }
381 Log(TRACE_MINIMUM, -1, "%d sent messages and %d received messages restored for client %s\n",
382 msgs_sent, msgs_rcvd, c->clientID);
383 MQTTPersistence_wrapMsgID(c);
384 exit:
385 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
386 if (msgkeys != NULL)
387 free(msgkeys);
388 #endif
389 FUNC_EXIT_RC(rc);
390 return rc;
391 }
392
393
394 /**
395 * Returns a MQTT packet restored from persisted data.
396 * @param buffer the persisted data.
397 * @param buflen the number of bytes of the data buffer.
398 */
MQTTPersistence_restorePacket(int MQTTVersion,char * buffer,size_t buflen)399 void* MQTTPersistence_restorePacket(int MQTTVersion, char* buffer, size_t buflen)
400 {
401 void* pack = NULL;
402 Header header;
403 int fixed_header_length = 1, ptype, remaining_length = 0;
404 char c;
405 int multiplier = 1;
406 extern pf new_packets[];
407
408 FUNC_ENTRY;
409 header.byte = buffer[0];
410 /* decode the message length according to the MQTT algorithm */
411 do
412 {
413 c = *(++buffer);
414 remaining_length += (c & 127) * multiplier;
415 multiplier *= 128;
416 fixed_header_length++;
417 } while ((c & 128) != 0);
418
419 if ( (fixed_header_length + remaining_length) == buflen )
420 {
421 ptype = header.bits.type;
422 if (ptype >= CONNECT && ptype <= DISCONNECT && new_packets[ptype] != NULL)
423 pack = (*new_packets[ptype])(MQTTVersion, header.byte, ++buffer, remaining_length);
424 }
425
426 FUNC_EXIT;
427 return pack;
428 }
429
430
431 /**
432 * Inserts the specified message into the list, maintaining message ID order.
433 * @param list the list to insert the message into.
434 * @param content the message to add.
435 * @param size size of the message.
436 */
MQTTPersistence_insertInOrder(List * list,void * content,size_t size)437 void MQTTPersistence_insertInOrder(List* list, void* content, size_t size)
438 {
439 ListElement* index = NULL;
440 ListElement* current = NULL;
441
442 FUNC_ENTRY;
443 while(ListNextElement(list, ¤t) != NULL && index == NULL)
444 {
445 if ( ((Messages*)content)->msgid < ((Messages*)current->content)->msgid )
446 index = current;
447 }
448
449 ListInsert(list, content, size, index);
450 FUNC_EXIT;
451 }
452
453
454 /**
455 * Adds a record to the persistent store. This function must not be called for QoS0
456 * messages.
457 * @param socket the socket of the client.
458 * @param buf0 fixed header.
459 * @param buf0len length of the fixed header.
460 * @param count number of buffers representing the variable header and/or the payload.
461 * @param buffers the buffers representing the variable header and/or the payload.
462 * @param buflens length of the buffers representing the variable header and/or the payload.
463 * @param htype MQTT packet type - PUBLISH or PUBREL
464 * @param msgId the message ID.
465 * @param scr 0 indicates message in the sending direction; 1 indicates message in the
466 * receiving direction.
467 * @param the MQTT version being used (>= MQTTVERSION_5 means properties included)
468 * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
469 */
MQTTPersistence_putPacket(SOCKET socket,char * buf0,size_t buf0len,int count,char ** buffers,size_t * buflens,int htype,int msgId,int scr,int MQTTVersion)470 int MQTTPersistence_putPacket(SOCKET socket, char* buf0, size_t buf0len, int count,
471 char** buffers, size_t* buflens, int htype, int msgId, int scr, int MQTTVersion)
472 {
473 int rc = 0;
474 extern ClientStates* bstate;
475 int nbufs, i;
476 int* lens = NULL;
477 char** bufs = NULL;
478 char *key;
479 Clients* client = NULL;
480
481 FUNC_ENTRY;
482 client = (Clients*)(ListFindItem(bstate->clients, &socket, clientSocketCompare)->content);
483 if (client->persistence != NULL)
484 {
485 const size_t keysize = PERSISTENCE_MAX_KEY_LENGTH + 1;
486 if ((key = malloc(keysize)) == NULL)
487 {
488 rc = PAHO_MEMORY_ERROR;
489 goto exit;
490 }
491 nbufs = 1 + count;
492 if ((lens = (int *)malloc(nbufs * sizeof(int))) == NULL)
493 {
494 free(key);
495 rc = PAHO_MEMORY_ERROR;
496 goto exit;
497 }
498 if ((bufs = (char **)malloc(nbufs * sizeof(char *))) == NULL)
499 {
500 free(key);
501 free(lens);
502 rc = PAHO_MEMORY_ERROR;
503 goto exit;
504 }
505 lens[0] = (int)buf0len;
506 bufs[0] = buf0;
507 for (i = 0; i < count; i++)
508 {
509 lens[i+1] = (int)buflens[i];
510 bufs[i+1] = buffers[i];
511 }
512
513 /* key */
514 if (scr == 0)
515 { /* sending */
516 char* key_id = PERSISTENCE_PUBLISH_SENT;
517
518 if (htype == PUBLISH) /* PUBLISH QoS1 and QoS2*/
519 {
520 if (MQTTVersion >= MQTTVERSION_5)
521 key_id = PERSISTENCE_V5_PUBLISH_SENT;
522 }
523 else if (htype == PUBREL) /* PUBREL */
524 {
525 if (MQTTVersion >= MQTTVERSION_5)
526 key_id = PERSISTENCE_V5_PUBREL;
527 else
528 key_id = PERSISTENCE_PUBREL;
529 }
530 if (snprintf(key, keysize, "%s%d", key_id, msgId) >= keysize)
531 rc = MQTTCLIENT_PERSISTENCE_ERROR;
532 }
533 else if (scr == 1) /* receiving PUBLISH QoS2 */
534 {
535 char* key_id = PERSISTENCE_PUBLISH_RECEIVED;
536
537 if (MQTTVersion >= MQTTVERSION_5)
538 key_id = PERSISTENCE_V5_PUBLISH_RECEIVED;
539 if (snprintf(key, keysize, "%s%d", key_id, msgId) >= keysize)
540 rc = MQTTCLIENT_PERSISTENCE_ERROR;
541 }
542
543 if (rc == 0 && client->beforeWrite)
544 rc = client->beforeWrite(client->beforeWrite_context, nbufs, bufs, lens);
545
546 if (rc == 0)
547 rc = client->persistence->pput(client->phandle, key, nbufs, bufs, lens);
548
549 free(key);
550 free(lens);
551 free(bufs);
552 }
553
554 exit:
555 FUNC_EXIT_RC(rc);
556 return rc;
557 }
558
559
560 /**
561 * Deletes a record from the persistent store.
562 * @param client the client as ::Clients.
563 * @param type the type of the persisted record: #PERSISTENCE_PUBLISH_SENT, #PERSISTENCE_PUBREL
564 * or #PERSISTENCE_PUBLISH_RECEIVED.
565 * @param qos the qos field of the message.
566 * @param msgId the message ID.
567 * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
568 */
MQTTPersistence_remove(Clients * c,char * type,int qos,int msgId)569 int MQTTPersistence_remove(Clients* c, char *type, int qos, int msgId)
570 {
571 int rc = 0;
572
573 FUNC_ENTRY;
574 if (c->persistence != NULL)
575 {
576 const size_t keysize = PERSISTENCE_MAX_KEY_LENGTH + 1;
577 char *key = malloc(keysize);
578 int chars = 0;
579
580 if (!key)
581 {
582 rc = PAHO_MEMORY_ERROR;
583 goto exit;
584 }
585 if (strcmp(type, PERSISTENCE_PUBLISH_SENT) == 0 ||
586 strcmp(type, PERSISTENCE_V5_PUBLISH_SENT) == 0)
587 {
588 if ((chars = snprintf(key, keysize, "%s%d", PERSISTENCE_V5_PUBLISH_SENT, msgId)) >= keysize)
589 rc = MQTTCLIENT_PERSISTENCE_ERROR;
590 else
591 {
592 rc = c->persistence->premove(c->phandle, key);
593 if ((chars = snprintf(key, keysize, "%s%d", PERSISTENCE_V5_PUBREL, msgId)) >= keysize)
594 rc = MQTTCLIENT_PERSISTENCE_ERROR;
595 else
596 {
597 rc += c->persistence->premove(c->phandle, key);
598 if ((chars = snprintf(key, keysize, "%s%d", PERSISTENCE_PUBLISH_SENT, msgId)) >= keysize)
599 rc = MQTTCLIENT_PERSISTENCE_ERROR;
600 else
601 {
602 rc += c->persistence->premove(c->phandle, key);
603 if ((chars = snprintf(key, keysize, "%s%d", PERSISTENCE_PUBREL, msgId)) >= keysize)
604 rc = MQTTCLIENT_PERSISTENCE_ERROR;
605 else
606 rc += c->persistence->premove(c->phandle, key);
607 }
608 }
609 }
610 }
611 else /* PERSISTENCE_PUBLISH_SENT && qos == 1 */
612 { /* or PERSISTENCE_PUBLISH_RECEIVED */
613
614 if ((chars = snprintf(key, keysize, "%s%d", PERSISTENCE_V5_PUBLISH_RECEIVED, msgId)) >= keysize)
615 rc = MQTTCLIENT_PERSISTENCE_ERROR;
616 else
617 {
618 rc = c->persistence->premove(c->phandle, key);
619 if ((chars = snprintf(key, keysize, "%s%d", PERSISTENCE_PUBLISH_RECEIVED, msgId)) >= keysize)
620 rc = MQTTCLIENT_PERSISTENCE_ERROR;
621 else
622 rc += c->persistence->premove(c->phandle, key);
623 }
624 }
625 if (rc == MQTTCLIENT_PERSISTENCE_ERROR)
626 Log(LOG_ERROR, 0, "Error writing %d chars with snprintf", chars);
627 free(key);
628 }
629
630 exit:
631 FUNC_EXIT_RC(rc);
632 return rc;
633 }
634
635
636 /**
637 * Checks whether the message IDs wrapped by looking for the largest gap between two consecutive
638 * message IDs in the outboundMsgs queue.
639 * @param client the client as ::Clients.
640 */
MQTTPersistence_wrapMsgID(Clients * client)641 void MQTTPersistence_wrapMsgID(Clients *client)
642 {
643 ListElement* wrapel = NULL;
644 ListElement* current = NULL;
645
646 FUNC_ENTRY;
647 if ( client->outboundMsgs->count > 0 )
648 {
649 int firstMsgID = ((Messages*)client->outboundMsgs->first->content)->msgid;
650 int lastMsgID = ((Messages*)client->outboundMsgs->last->content)->msgid;
651 int gap = MAX_MSG_ID - lastMsgID + firstMsgID;
652 current = ListNextElement(client->outboundMsgs, ¤t);
653
654 while(ListNextElement(client->outboundMsgs, ¤t) != NULL)
655 {
656 int curMsgID = ((Messages*)current->content)->msgid;
657 int curPrevMsgID = ((Messages*)current->prev->content)->msgid;
658 int curgap = curMsgID - curPrevMsgID;
659 if ( curgap > gap )
660 {
661 gap = curgap;
662 wrapel = current;
663 }
664 }
665 }
666
667 if ( wrapel != NULL )
668 {
669 /* put wrapel at the beginning of the queue */
670 client->outboundMsgs->first->prev = client->outboundMsgs->last;
671 client->outboundMsgs->last->next = client->outboundMsgs->first;
672 client->outboundMsgs->first = wrapel;
673 client->outboundMsgs->last = wrapel->prev;
674 client->outboundMsgs->first->prev = NULL;
675 client->outboundMsgs->last->next = NULL;
676 }
677 FUNC_EXIT;
678 }
679
680
681 #if !defined(NO_PERSISTENCE)
MQTTPersistence_unpersistQueueEntry(Clients * client,MQTTPersistence_qEntry * qe)682 int MQTTPersistence_unpersistQueueEntry(Clients* client, MQTTPersistence_qEntry* qe)
683 {
684 int rc = 0;
685 #if defined(_WIN32) || defined(_WIN64)
686 #define KEYSIZE PERSISTENCE_MAX_KEY_LENGTH + 1
687 #else
688 const size_t KEYSIZE = PERSISTENCE_MAX_KEY_LENGTH + 1;
689 #endif
690 char key[KEYSIZE];
691 int chars = 0;
692
693 FUNC_ENTRY;
694 if (client->MQTTVersion >= MQTTVERSION_5)
695 chars = snprintf(key, KEYSIZE, "%s%u", PERSISTENCE_V5_QUEUE_KEY, qe->seqno);
696 else
697 chars = snprintf(key, KEYSIZE, "%s%u", PERSISTENCE_QUEUE_KEY, qe->seqno);
698 if (chars >= KEYSIZE)
699 {
700 Log(LOG_ERROR, 0, "Error writing %d chars with snprintf", chars);
701 rc = MQTTCLIENT_PERSISTENCE_ERROR;
702 }
703 else if ((rc = client->persistence->premove(client->phandle, key)) != 0)
704 Log(LOG_ERROR, 0, "Error %d removing qEntry from persistence", rc);
705 FUNC_EXIT_RC(rc);
706 return rc;
707 }
708
709
710 #define MAX_NO_OF_BUFFERS 9
MQTTPersistence_persistQueueEntry(Clients * aclient,MQTTPersistence_qEntry * qe)711 int MQTTPersistence_persistQueueEntry(Clients* aclient, MQTTPersistence_qEntry* qe)
712 {
713 int rc = 0;
714 int bufindex = 0;
715 #if !defined(_WIN32) && !defined(_WIN64)
716 const size_t KEYSIZE = PERSISTENCE_MAX_KEY_LENGTH + 1;
717 #endif
718 char key[KEYSIZE];
719 int chars = 0;
720 int lens[MAX_NO_OF_BUFFERS];
721 void* bufs[MAX_NO_OF_BUFFERS];
722 int props_allocated = 0;
723
724 FUNC_ENTRY;
725 bufs[bufindex] = &qe->msg->payloadlen;
726 lens[bufindex++] = sizeof(qe->msg->payloadlen);
727
728 bufs[bufindex] = qe->msg->payload;
729 lens[bufindex++] = qe->msg->payloadlen;
730
731 bufs[bufindex] = &qe->msg->qos;
732 lens[bufindex++] = sizeof(qe->msg->qos);
733
734 bufs[bufindex] = &qe->msg->retained;
735 lens[bufindex++] = sizeof(qe->msg->retained);
736
737 bufs[bufindex] = &qe->msg->dup;
738 lens[bufindex++] = sizeof(qe->msg->dup);
739
740 bufs[bufindex] = &qe->msg->msgid;
741 lens[bufindex++] = sizeof(qe->msg->msgid);
742
743 bufs[bufindex] = qe->topicName;
744 lens[bufindex++] = (int)strlen(qe->topicName) + 1;
745
746 bufs[bufindex] = &qe->topicLen;
747 lens[bufindex++] = sizeof(qe->topicLen);
748
749 if (++aclient->qentry_seqno == PERSISTENCE_SEQNO_LIMIT)
750 aclient->qentry_seqno = 0;
751
752 if (aclient->MQTTVersion >= MQTTVERSION_5) /* persist properties */
753 {
754 MQTTProperties no_props = MQTTProperties_initializer;
755 MQTTProperties* props = &no_props;
756 int temp_len = 0;
757 char* ptr = NULL;
758
759 if (qe->msg->struct_version >= 1)
760 props = &qe->msg->properties;
761
762 temp_len = MQTTProperties_len(props);
763 ptr = bufs[bufindex] = malloc(temp_len);
764 if (!ptr)
765 {
766 rc = PAHO_MEMORY_ERROR;
767 goto exit;
768 }
769 props_allocated = bufindex;
770 rc = MQTTProperties_write(&ptr, props);
771 lens[bufindex++] = temp_len;
772
773 chars = snprintf(key, KEYSIZE, "%s%u", PERSISTENCE_V5_QUEUE_KEY, aclient->qentry_seqno);
774 }
775 else
776 chars = snprintf(key, KEYSIZE, "%s%u", PERSISTENCE_QUEUE_KEY, aclient->qentry_seqno);
777
778 if (chars >= KEYSIZE)
779 rc = MQTTCLIENT_PERSISTENCE_ERROR;
780 else
781 {
782 qe->seqno = aclient->qentry_seqno;
783
784 if (aclient->beforeWrite)
785 rc = aclient->beforeWrite(aclient->beforeWrite_context, bufindex, (char**)bufs, lens);
786
787 if (rc == 0 && (rc = aclient->persistence->pput(aclient->phandle, key, bufindex, (char**)bufs, lens)) != 0)
788 Log(LOG_ERROR, 0, "Error persisting queue entry, rc %d", rc);
789 }
790 if (props_allocated != 0)
791 free(bufs[props_allocated]);
792
793 exit:
794 FUNC_EXIT_RC(rc);
795 return rc;
796 }
797
798
MQTTPersistence_restoreQueueEntry(char * buffer,size_t buflen,int MQTTVersion)799 static MQTTPersistence_qEntry* MQTTPersistence_restoreQueueEntry(char* buffer, size_t buflen, int MQTTVersion)
800 {
801 MQTTPersistence_qEntry* qe = NULL;
802 char* ptr = buffer;
803 int data_size;
804
805 FUNC_ENTRY;
806 if ((qe = malloc(sizeof(MQTTPersistence_qEntry))) == NULL)
807 goto exit;
808 memset(qe, '\0', sizeof(MQTTPersistence_qEntry));
809
810 if ((qe->msg = malloc(sizeof(MQTTPersistence_message))) == NULL)
811 {
812 free(qe);
813 qe = NULL;
814 goto exit;
815 }
816 memset(qe->msg, '\0', sizeof(MQTTPersistence_message));
817
818 qe->msg->struct_version = 1;
819
820 qe->msg->payloadlen = *(int*)ptr;
821 ptr += sizeof(int);
822
823 data_size = qe->msg->payloadlen;
824 if ((qe->msg->payload = malloc(data_size)) == NULL)
825 {
826 free(qe->msg);
827 free(qe);
828 qe = NULL;
829 goto exit;
830 }
831 memcpy(qe->msg->payload, ptr, data_size);
832 ptr += data_size;
833
834 qe->msg->qos = *(int*)ptr;
835 ptr += sizeof(int);
836
837 qe->msg->retained = *(int*)ptr;
838 ptr += sizeof(int);
839
840 qe->msg->dup = *(int*)ptr;
841 ptr += sizeof(int);
842
843 qe->msg->msgid = *(int*)ptr;
844 ptr += sizeof(int);
845
846 data_size = (int)strlen(ptr) + 1;
847 if ((qe->topicName = malloc(data_size)) == NULL)
848 {
849 free(qe->msg->payload);
850 free(qe->msg);
851 free(qe);
852 qe = NULL;
853 goto exit;
854 }
855 strcpy(qe->topicName, ptr);
856 ptr += data_size;
857
858 qe->topicLen = *(int*)ptr;
859 ptr += sizeof(int);
860
861 if (MQTTVersion >= MQTTVERSION_5 &&
862 MQTTProperties_read(&qe->msg->properties, &ptr, buffer + buflen) != 1)
863 Log(LOG_ERROR, -1, "Error restoring properties from persistence");
864
865 exit:
866 FUNC_EXIT;
867 return qe;
868 }
869
870
MQTTPersistence_insertInSeqOrder(List * list,MQTTPersistence_qEntry * qEntry,size_t size)871 static void MQTTPersistence_insertInSeqOrder(List* list, MQTTPersistence_qEntry* qEntry, size_t size)
872 {
873 ListElement* index = NULL;
874 ListElement* current = NULL;
875
876 FUNC_ENTRY;
877 while (ListNextElement(list, ¤t) != NULL && index == NULL)
878 {
879 if (qEntry->seqno < ((MQTTPersistence_qEntry*)current->content)->seqno)
880 index = current;
881 }
882 ListInsert(list, qEntry, size, index);
883 FUNC_EXIT;
884 }
885
886
887 /**
888 * Restores a queue of messages from persistence to memory
889 * @param c the client as ::Clients - the client object to restore the messages to
890 * @return return code, 0 if successful
891 */
MQTTPersistence_restoreMessageQueue(Clients * c)892 int MQTTPersistence_restoreMessageQueue(Clients* c)
893 {
894 int rc = 0;
895 char **msgkeys;
896 int nkeys;
897 int i = 0;
898 int entries_restored = 0;
899
900 FUNC_ENTRY;
901 if (c->persistence && (rc = c->persistence->pkeys(c->phandle, &msgkeys, &nkeys)) == 0)
902 {
903 while (rc == 0 && i < nkeys)
904 {
905 char *buffer = NULL;
906 int buflen;
907
908 if (strncmp(msgkeys[i], PERSISTENCE_QUEUE_KEY, strlen(PERSISTENCE_QUEUE_KEY)) != 0 &&
909 strncmp(msgkeys[i], PERSISTENCE_V5_QUEUE_KEY, strlen(PERSISTENCE_V5_QUEUE_KEY)) != 0)
910 {
911 ; /* ignore if not a queue entry key */
912 }
913 else if ((rc = c->persistence->pget(c->phandle, msgkeys[i], &buffer, &buflen)) == 0 &&
914 (c->afterRead == NULL || (rc = c->afterRead(c->afterRead_context, &buffer, &buflen)) == 0))
915 {
916 int MQTTVersion =
917 (strncmp(msgkeys[i], PERSISTENCE_V5_QUEUE_KEY, strlen(PERSISTENCE_V5_QUEUE_KEY)) == 0)
918 ? MQTTVERSION_5 : MQTTVERSION_3_1_1;
919 MQTTPersistence_qEntry* qe = MQTTPersistence_restoreQueueEntry(buffer, buflen, MQTTVersion);
920
921 if (qe)
922 {
923 qe->seqno = atoi(strchr(msgkeys[i], '-')+1); /* key format is tag'-'seqno */
924 MQTTPersistence_insertInSeqOrder(c->messageQueue, qe, sizeof(MQTTPersistence_qEntry));
925 c->qentry_seqno = max(c->qentry_seqno, qe->seqno);
926 entries_restored++;
927 }
928 if (buffer)
929 free(buffer);
930 }
931 if (msgkeys[i])
932 {
933 free(msgkeys[i]);
934 }
935 i++;
936 }
937 if (msgkeys != NULL)
938 free(msgkeys);
939 }
940 Log(TRACE_MINIMUM, -1, "%d queued messages restored for client %s", entries_restored, c->clientID);
941 FUNC_EXIT_RC(rc);
942 return rc;
943 }
944 #endif
945