1 /*******************************************************************************
2 * Copyright (c) 2009, 2022 IBM Corp., Ian Craggs and others
3 *
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v2.0
6 * and Eclipse Distribution License v1.0 which accompany this distribution.
7 *
8 * The Eclipse Public License is available at
9 * https://www.eclipse.org/legal/epl-2.0/
10 * and the Eclipse Distribution License is available at
11 * http://www.eclipse.org/org/documents/edl-v10.php.
12 *
13 * Contributors:
14 * Ian Craggs - initial API and implementation and/or initial documentation
15 * Ian Craggs - bug 384016 - segv setting will message
16 * Ian Craggs - bug 384053 - v1.0.0.7 - stop MQTTClient_receive on socket error
17 * Ian Craggs, Allan Stockdill-Mander - add ability to connect with SSL
18 * Ian Craggs - multiple server connection support
19 * Ian Craggs - fix for bug 413429 - connectionLost not called
20 * Ian Craggs - fix for bug 421103 - trying to write to same socket, in publish/retries
21 * Ian Craggs - fix for bug 419233 - mutexes not reporting errors
22 * Ian Craggs - fix for bug 420851
23 * Ian Craggs - fix for bug 432903 - queue persistence
24 * Ian Craggs - MQTT 3.1.1 support
25 * Ian Craggs - fix for bug 438176 - MQTT version selection
26 * Rong Xiang, Ian Craggs - C++ compatibility
27 * Ian Craggs - fix for bug 443724 - stack corruption
28 * Ian Craggs - fix for bug 447672 - simultaneous access to socket structure
29 * Ian Craggs - fix for bug 459791 - deadlock in WaitForCompletion for bad client
30 * Ian Craggs - fix for bug 474905 - insufficient synchronization for subscribe, unsubscribe, connect
31 * Ian Craggs - make it clear that yield and receive are not intended for multi-threaded mode (bug 474748)
32 * Ian Craggs - SNI support, message queue unpersist bug
33 * Ian Craggs - binary will message support
34 * Ian Craggs - waitforCompletion fix #240
35 * Ian Craggs - check for NULL SSL options #334
36 * Ian Craggs - allocate username/password buffers #431
37 * Ian Craggs - MQTT 5.0 support
38 * Sven Gambel - add generic proxy support
39 *******************************************************************************/
40
41 /**
42 * @file
43 * \brief Synchronous API implementation
44 *
45 */
46
47 #include <stdlib.h>
48 #include <string.h>
49 #if !defined(_WIN32) && !defined(_WIN64)
50 #include <sys/time.h>
51 #else
52 #if defined(_MSC_VER) && _MSC_VER < 1900
53 #define snprintf _snprintf
54 #endif
55 #endif
56
57 #include "MQTTClient.h"
58 #if !defined(NO_PERSISTENCE)
59 #include "MQTTPersistence.h"
60 #endif
61
62 #include "utf-8.h"
63 #include "MQTTProtocol.h"
64 #include "MQTTProtocolOut.h"
65 #include "Thread.h"
66 #include "SocketBuffer.h"
67 #include "StackTrace.h"
68 #include "Heap.h"
69
70 #if defined(IOT_CONNECT)
71 #include "securec.h"
72 #include "soc_socket_types.h"
73 #include "atiny_mqtt_commu.h"
74 #endif
75
76 #if defined(OPENSSL)
77 #include <openssl/ssl.h>
78 #elif defined(MBEDTLS)
79 #include <mbedtls/version.h>
80 #include "Clients.h"
81 #else
82 #define URI_SSL "ssl://"
83 #define URI_MQTTS "mqtts://"
84 #endif
85
86 #include "OsWrapper.h"
87
88 #define URI_TCP "tcp://"
89 #define URI_MQTT "mqtt://"
90 #define URI_WS "ws://"
91 #define URI_WSS "wss://"
92
93 #include "VersionInfo.h"
94 #include "WebSocket.h"
95 #include "Proxy.h"
96
97 const char *client_timestamp_eye = "MQTTClientV3_Timestamp " BUILD_TIMESTAMP;
98 const char *client_version_eye = "MQTTClientV3_Version " CLIENT_VERSION;
99
100 struct conlost_sync_data {
101 sem_type sem;
102 void *m;
103 };
104
105 int MQTTClient_init(void);
106
MQTTClient_global_init(MQTTClient_init_options * inits)107 void MQTTClient_global_init(MQTTClient_init_options* inits)
108 {
109 MQTTClient_init();
110 #if defined(OPENSSL) || defined(MBEDTLS)
111 SSLSocket_handleOpensslInit(inits->do_openssl_init);
112 #endif
113 }
114
115 static ClientStates ClientState =
116 {
117 CLIENT_VERSION, /* version */
118 NULL /* client list */
119 };
120
121 ClientStates* bstate = &ClientState;
122
123 MQTTProtocol state;
124
125 #if defined(_WIN32) || defined(_WIN64)
126 static mutex_type mqttclient_mutex = NULL;
127 mutex_type socket_mutex = NULL;
128 static mutex_type subscribe_mutex = NULL;
129 static mutex_type unsubscribe_mutex = NULL;
130 static mutex_type connect_mutex = NULL;
131 #if !defined(NO_HEAP_TRACKING)
132 extern mutex_type stack_mutex;
133 extern mutex_type heap_mutex;
134 #endif
135 extern mutex_type log_mutex;
136
MQTTClient_init(void)137 int MQTTClient_init(void)
138 {
139 DWORD rc = 0;
140
141 if (mqttclient_mutex == NULL)
142 {
143 if ((mqttclient_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
144 {
145 rc = GetLastError();
146 printf("mqttclient_mutex error %d\n", rc);
147 goto exit;
148 }
149 if ((subscribe_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
150 {
151 rc = GetLastError();
152 printf("subscribe_mutex error %d\n", rc);
153 goto exit;
154 }
155 if ((unsubscribe_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
156 {
157 rc = GetLastError();
158 printf("unsubscribe_mutex error %d\n", rc);
159 goto exit;
160 }
161 if ((connect_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
162 {
163 rc = GetLastError();
164 printf("connect_mutex error %d\n", rc);
165 goto exit;
166 }
167 #if !defined(NO_HEAP_TRACKING)
168 if ((stack_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
169 {
170 rc = GetLastError();
171 printf("stack_mutex error %d\n", rc);
172 goto exit;
173 }
174 if ((heap_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
175 {
176 rc = GetLastError();
177 printf("heap_mutex error %d\n", rc);
178 goto exit;
179 }
180 #endif
181 if ((log_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
182 {
183 rc = GetLastError();
184 printf("log_mutex error %d\n", rc);
185 goto exit;
186 }
187 if ((socket_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
188 {
189 rc = GetLastError();
190 printf("socket_mutex error %d\n", rc);
191 goto exit;
192 }
193 }
194 exit:
195 return rc;
196 }
197
MQTTClient_cleanup(void)198 void MQTTClient_cleanup(void)
199 {
200 if (connect_mutex)
201 CloseHandle(connect_mutex);
202 if (subscribe_mutex)
203 CloseHandle(subscribe_mutex);
204 if (unsubscribe_mutex)
205 CloseHandle(unsubscribe_mutex);
206 #if !defined(NO_HEAP_TRACKING)
207 if (stack_mutex)
208 CloseHandle(stack_mutex);
209 if (heap_mutex)
210 CloseHandle(heap_mutex);
211 #endif
212 if (log_mutex)
213 CloseHandle(log_mutex);
214 if (socket_mutex)
215 CloseHandle(socket_mutex);
216 if (mqttclient_mutex)
217 CloseHandle(mqttclient_mutex);
218 }
219
220 #if defined(PAHO_MQTT_STATIC)
221 /* Global variable for one-time initialization structure */
222 static INIT_ONCE g_InitOnce = INIT_ONCE_STATIC_INIT; /* Static initialization */
223
224 /* One time initialization function */
InitOnceFunction(PINIT_ONCE InitOnce,PVOID Parameter,PVOID * lpContext)225 BOOL CALLBACK InitOnceFunction (
226 PINIT_ONCE InitOnce, /* Pointer to one-time initialization structure */
227 PVOID Parameter, /* Optional parameter passed by InitOnceExecuteOnce */
228 PVOID *lpContext) /* Receives pointer to event object */
229 {
230 int rc = MQTTClient_init();
231 return rc == 0;
232 }
233
234 #else
DllMain(HANDLE hModule,DWORD ul_reason_for_call,LPVOID lpReserved)235 BOOL APIENTRY DllMain(HANDLE hModule,
236 DWORD ul_reason_for_call,
237 LPVOID lpReserved)
238 {
239 switch (ul_reason_for_call)
240 {
241 case DLL_PROCESS_ATTACH:
242 MQTTClient_init();
243 break;
244 case DLL_THREAD_ATTACH:
245 break;
246 case DLL_THREAD_DETACH:
247 break;
248 case DLL_PROCESS_DETACH:
249 if (lpReserved)
250 MQTTClient_cleanup();
251 break;
252 }
253 return TRUE;
254 }
255 #endif
256 #elif defined (COMPAT_CMSIS)
257
258 static mutex_type mqttclient_mutex = NULL;
259 mutex_type socket_mutex = NULL;
260 static mutex_type subscribe_mutex = NULL;
261 static mutex_type unsubscribe_mutex = NULL;
262 static mutex_type connect_mutex = NULL;
263 #if !defined(NO_HEAP_TRACKING)
264 extern mutex_type stack_mutex;
265 extern mutex_type heap_mutex;
266 #endif
267 extern mutex_type log_mutex;
268
MQTTClient_init(void)269 int MQTTClient_init(void)
270 {
271 int rc = MQTTCLIENT_SUCCESS;
272 if ((mqttclient_mutex = Thread_create_mutex(&rc)) == NULL)
273 {
274 Log(TRACE_MIN, -1,"MQTTClient: error initializing client_mutex\n");
275 return rc;
276 }
277 if ((subscribe_mutex = Thread_create_mutex(&rc)) == NULL)
278 {
279 Log(TRACE_MIN, -1,"MQTTClient: error initializing subscribe_mutex\n");
280 return rc;
281 }
282 if ((unsubscribe_mutex = Thread_create_mutex(&rc)) == NULL)
283 {
284 Log(TRACE_MIN, -1,"MQTTClient: error initializing unsubscribe_mutex\n");
285 return rc;
286 }
287 if ((connect_mutex = Thread_create_mutex(&rc)) == NULL)
288 {
289 Log(TRACE_MIN, -1,"MQTTClient: error initializing connect_mutex\n");
290 return rc;
291 }
292 #if !defined(NO_HEAP_TRACKING)
293 if ((stack_mutex = Thread_create_mutex(&rc)) == NULL)
294 {
295 Log(TRACE_MIN, -1,"MQTTClient: error initializing stack_mutex\n");
296 return rc;
297 }
298 if ((heap_mutex = Thread_create_mutex(&rc)) == NULL)
299 {
300 Log(TRACE_MIN, -1,"MQTTClient: error initializing heap_mutex\n");
301 return rc;
302 }
303 #endif
304 if ((log_mutex = Thread_create_mutex(&rc)) == NULL)
305 {
306 Log(TRACE_MIN, -1,"MQTTClient: error initializing log_mutex\n");
307 return rc;
308 }
309 if ((socket_mutex = Thread_create_mutex(&rc)) == NULL)
310 {
311 Log(TRACE_MIN, -1,"MQTTClient: error initializing socket_mutex\n");
312 return rc;
313 }
314 return rc;
315 }
316
MQTTClient_cleanup(void)317 void MQTTClient_cleanup(void)
318 {
319 Thread_destroy_mutex(connect_mutex);
320 connect_mutex = NULL;
321 Thread_destroy_mutex(subscribe_mutex);
322 subscribe_mutex = NULL;
323 Thread_destroy_mutex(unsubscribe_mutex);
324 unsubscribe_mutex = NULL;
325 #if !defined(NO_HEAP_TRACKING)
326 Thread_destroy_mutex(stack_mutex);
327 stack_mutex = NULL;
328 Thread_destroy_mutex(heap_mutex);
329 heap_mutex = NULL;
330 #endif
331 Thread_destroy_mutex(log_mutex);
332 log_mutex = NULL;
333 Thread_destroy_mutex(socket_mutex);
334 socket_mutex = NULL;
335 Thread_destroy_mutex(mqttclient_mutex);
336 mqttclient_mutex = NULL;
337 }
338
339 #define WINAPI
340 #else
341 static pthread_mutex_t mqttclient_mutex_store = PTHREAD_MUTEX_INITIALIZER;
342 static mutex_type mqttclient_mutex = &mqttclient_mutex_store;
343
344 static pthread_mutex_t socket_mutex_store = PTHREAD_MUTEX_INITIALIZER;
345 mutex_type socket_mutex = &socket_mutex_store;
346
347 static pthread_mutex_t subscribe_mutex_store = PTHREAD_MUTEX_INITIALIZER;
348 static mutex_type subscribe_mutex = &subscribe_mutex_store;
349
350 static pthread_mutex_t unsubscribe_mutex_store = PTHREAD_MUTEX_INITIALIZER;
351 static mutex_type unsubscribe_mutex = &unsubscribe_mutex_store;
352
353 static pthread_mutex_t connect_mutex_store = PTHREAD_MUTEX_INITIALIZER;
354 static mutex_type connect_mutex = &connect_mutex_store;
355
MQTTClient_init(void)356 int MQTTClient_init(void)
357 {
358 pthread_mutexattr_t attr;
359 int rc;
360
361 pthread_mutexattr_init(&attr);
362 #if !defined(_WRS_KERNEL)
363 pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
364 #else
365 /* #warning "no pthread_mutexattr_settype" */
366 #endif /* !defined(_WRS_KERNEL) */
367 if ((rc = pthread_mutex_init(mqttclient_mutex, &attr)) != 0)
368 printf("MQTTClient: error %d initializing client_mutex\n", rc);
369 else if ((rc = pthread_mutex_init(socket_mutex, &attr)) != 0)
370 printf("MQTTClient: error %d initializing socket_mutex\n", rc);
371 else if ((rc = pthread_mutex_init(subscribe_mutex, &attr)) != 0)
372 printf("MQTTClient: error %d initializing subscribe_mutex\n", rc);
373 else if ((rc = pthread_mutex_init(unsubscribe_mutex, &attr)) != 0)
374 printf("MQTTClient: error %d initializing unsubscribe_mutex\n", rc);
375 else if ((rc = pthread_mutex_init(connect_mutex, &attr)) != 0)
376 printf("MQTTClient: error %d initializing connect_mutex\n", rc);
377
378 return rc;
379 }
380
381 #define WINAPI
382 #endif
383
384 static volatile int library_initialized = 0;
385 static List* handles = NULL;
386 static int running = 0;
387 static int tostop = 0;
388 static thread_id_type run_id = 0;
389
390 typedef struct
391 {
392 MQTTClient_message* msg;
393 char* topicName;
394 int topicLen;
395 unsigned int seqno; /* only used on restore */
396 } qEntry;
397
398
399 typedef struct
400 {
401 char* serverURI;
402 const char* currentServerURI; /* when using HA options, set the currently used serverURI */
403 #if defined(OPENSSL) || defined(MBEDTLS)
404 int ssl;
405 #endif
406 int websocket;
407 Clients* c;
408 MQTTClient_connectionLost* cl;
409 MQTTClient_messageArrived* ma;
410 MQTTClient_deliveryComplete* dc;
411 void* context;
412
413 MQTTClient_disconnected* disconnected;
414 void* disconnected_context; /* the context to be associated with the disconnected callback*/
415
416 MQTTClient_published* published;
417 void* published_context; /* the context to be associated with the disconnected callback*/
418
419 #if 0
420 MQTTClient_authHandle* auth_handle;
421 void* auth_handle_context; /* the context to be associated with the authHandle callback*/
422 #endif
423
424 sem_type connect_sem;
425 int rc; /* getsockopt return code in connect */
426 sem_type connack_sem;
427 sem_type suback_sem;
428 sem_type unsuback_sem;
429 MQTTPacket* pack;
430
431 unsigned long commandTimeout;
432 } MQTTClients;
433
434 struct props_rc_parms
435 {
436 MQTTClients* m;
437 MQTTProperties* properties;
438 enum MQTTReasonCodes reasonCode;
439 };
440
441 static void MQTTClient_terminate(void);
442 static void MQTTClient_emptyMessageQueue(Clients* client);
443 static int MQTTClient_deliverMessage(
444 int rc, MQTTClients* m,
445 char** topicName, int* topicLen,
446 MQTTClient_message** message);
447 static int clientSockCompare(void* a, void* b);
448 static thread_return_type WINAPI connectionLost_call(void* context);
449 static thread_return_type WINAPI MQTTClient_run(void* n);
450 static int MQTTClient_stop(void);
451 static void MQTTClient_closeSession(Clients* client, enum MQTTReasonCodes reason, MQTTProperties* props);
452 static int MQTTClient_cleanSession(Clients* client);
453 static MQTTResponse MQTTClient_connectURIVersion(
454 MQTTClient handle, MQTTClient_connectOptions* options,
455 const char* serverURI, int MQTTVersion,
456 START_TIME_TYPE start, ELAPSED_TIME_TYPE millisecsTimeout,
457 MQTTProperties* connectProperties, MQTTProperties* willProperties);
458 static MQTTResponse MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectOptions* options, const char* serverURI,
459 MQTTProperties* connectProperties, MQTTProperties* willProperties);
460 static int MQTTClient_disconnect1(MQTTClient handle, int timeout, int internal, int stop, enum MQTTReasonCodes, MQTTProperties*);
461 static int MQTTClient_disconnect_internal(MQTTClient handle, int timeout);
462 static void MQTTClient_retry(void);
463 static MQTTPacket* MQTTClient_cycle(SOCKET* sock, ELAPSED_TIME_TYPE timeout, int* rc);
464 static MQTTPacket* MQTTClient_waitfor(MQTTClient handle, int packet_type, int* rc, int64_t timeout);
465 /*static int pubCompare(void* a, void* b); */
466 static void MQTTProtocol_checkPendingWrites(void);
467 static void MQTTClient_writeComplete(SOCKET socket, int rc);
468 static void MQTTClient_writeContinue(SOCKET socket);
469
470
MQTTClient_createWithOptions(MQTTClient * handle,const char * serverURI,const char * clientId,int persistence_type,void * persistence_context,MQTTClient_createOptions * options)471 int MQTTClient_createWithOptions(MQTTClient* handle, const char* serverURI, const char* clientId,
472 int persistence_type, void* persistence_context, MQTTClient_createOptions* options)
473 {
474 int rc = 0;
475 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
476 int mem_ret = -1;
477 #endif
478 MQTTClients *m = NULL;
479
480 #if (defined(_WIN32) || defined(_WIN64)) && defined(PAHO_MQTT_STATIC)
481 /* intializes mutexes once. Must come before FUNC_ENTRY */
482 BOOL bStatus = InitOnceExecuteOnce(&g_InitOnce, InitOnceFunction, NULL, NULL);
483 #endif
484 FUNC_ENTRY;
485 if ((rc = Thread_lock_mutex(mqttclient_mutex)) != 0)
486 goto nounlock_exit;
487 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
488 if (handle == NULL || serverURI == NULL || clientId == NULL)
489 #else
490 if (serverURI == NULL || clientId == NULL)
491 #endif
492 {
493 rc = MQTTCLIENT_NULL_PARAMETER;
494 goto exit;
495 }
496
497 if (!UTF8_validateString(clientId))
498 {
499 rc = MQTTCLIENT_BAD_UTF8_STRING;
500 goto exit;
501 }
502
503 if (strlen(clientId) == 0 && persistence_type == MQTTCLIENT_PERSISTENCE_DEFAULT)
504 {
505 rc = MQTTCLIENT_PERSISTENCE_ERROR;
506 goto exit;
507 }
508
509 if (strstr(serverURI, "://") != NULL)
510 {
511 if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) != 0
512 && strncmp(URI_MQTT, serverURI, strlen(URI_MQTT)) != 0
513 && strncmp(URI_WS, serverURI, strlen(URI_WS)) != 0
514 #if defined(OPENSSL) || defined(MBEDTLS)
515 && strncmp(URI_SSL, serverURI, strlen(URI_SSL)) != 0
516 && strncmp(URI_MQTTS, serverURI, strlen(URI_MQTTS)) != 0
517 && strncmp(URI_WSS, serverURI, strlen(URI_WSS)) != 0
518 #endif
519 )
520 {
521 rc = MQTTCLIENT_BAD_PROTOCOL;
522 goto exit;
523 }
524 }
525
526 if (options && (strncmp(options->struct_id, "MQCO", 4) != 0 || options->struct_version != 0))
527 {
528 rc = MQTTCLIENT_BAD_STRUCTURE;
529 goto exit;
530 }
531
532 if (!library_initialized)
533 {
534 #if !defined(NO_HEAP_TRACKING)
535 Heap_initialize();
536 #endif
537 #if !defined(IOT_CONNECT) && !defined(IOT_LITEOS_ADAPT)
538 Log_initialize((Log_nameValue*)MQTTClient_getVersionInfo());
539 #endif
540 bstate->clients = ListInitialize();
541 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
542 rc = Socket_outInitialize();
543 #else
544 Socket_outInitialize();
545 #endif
546 Socket_setWriteCompleteCallback(MQTTClient_writeComplete);
547 Socket_setWriteContinueCallback(MQTTClient_writeContinue);
548 Socket_setWriteAvailableCallback(MQTTProtocol_writeAvailable);
549 handles = ListInitialize();
550 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
551 if (bstate->clients == NULL || rc != 0 || handles == NULL)
552 {
553 #if !defined(IOT_CONNECT) && !defined(IOT_LITEOS_ADAPT)
554 Log_terminate();
555 #endif
556 if (bstate->clients != NULL)
557 {
558 ListFree(bstate->clients);
559 bstate->clients = NULL;
560 }
561 if (rc == 0)
562 Socket_outTerminate();
563 if (handles != NULL)
564 {
565 ListFree(handles);
566 handles = NULL;
567 }
568 #if !defined(NO_HEAP_TRACKING)
569 Heap_terminate();
570 #endif
571
572 rc = PAHO_MEMORY_ERROR;
573 goto exit;
574 }
575 #else
576 #if defined(OPENSSL)
577 SSLSocket_initialize();
578 #endif
579 #endif
580 library_initialized = 1;
581 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
582 #if defined(OPENSSL)
583 if (SSLSocket_initialize() != 1)
584 #elif defined(MBEDTLS)
585 if (SSLSocket_initialize() != 0)
586 #endif
587 #if defined(OPENSSL) || defined(MBEDTLS)
588 {
589 MQTTClient_terminate();
590 rc = PAHO_MEMORY_ERROR;
591 goto exit;
592 }
593 #endif
594 #endif
595 }
596
597 if ((m = malloc(sizeof(MQTTClients))) == NULL)
598 {
599 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
600 if (bstate->clients->count == 0)
601 MQTTClient_terminate();
602 #endif
603 rc = PAHO_MEMORY_ERROR;
604 goto exit;
605 }
606 *handle = m;
607 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
608 mem_ret = memset_s(m, sizeof(MQTTClients), '\0', sizeof(MQTTClients));
609 if (mem_ret != 0)
610 {
611 free(m);
612 if (bstate->clients->count == 0)
613 MQTTClient_terminate();
614 rc = PAHO_MEMORY_ERROR;
615 goto exit;
616 }
617 #else
618 memset(m, '\0', sizeof(MQTTClients));
619 #endif
620 m->commandTimeout = 10000L;
621 if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0)
622 serverURI += strlen(URI_TCP);
623 else if (strncmp(URI_MQTT, serverURI, strlen(URI_MQTT)) == 0)
624 serverURI += strlen(URI_MQTT);
625 else if (strncmp(URI_WS, serverURI, strlen(URI_WS)) == 0)
626 {
627 serverURI += strlen(URI_WS);
628 m->websocket = 1;
629 }
630 else if (strncmp(URI_SSL, serverURI, strlen(URI_SSL)) == 0)
631 {
632 #if defined(OPENSSL) || defined(MBEDTLS)
633 serverURI += strlen(URI_SSL);
634 m->ssl = 1;
635 #else
636 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
637 free(m);
638 if (bstate->clients->count == 0)
639 MQTTClient_terminate();
640 #endif
641 rc = MQTTCLIENT_SSL_NOT_SUPPORTED;
642 goto exit;
643 #endif
644 }
645 else if (strncmp(URI_MQTTS, serverURI, strlen(URI_MQTTS)) == 0)
646 {
647 #if defined(OPENSSL)
648 serverURI += strlen(URI_MQTTS);
649 m->ssl = 1;
650 #else
651 rc = MQTTCLIENT_SSL_NOT_SUPPORTED;
652 goto exit;
653 #endif
654 }
655 else if (strncmp(URI_WSS, serverURI, strlen(URI_WSS)) == 0)
656 {
657 #if defined(OPENSSL) || defined(MBEDTLS)
658 serverURI += strlen(URI_WSS);
659 m->ssl = 1;
660 m->websocket = 1;
661 #else
662 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
663 free(m);
664 if (bstate->clients->count == 0)
665 MQTTClient_terminate();
666 #endif
667 rc = MQTTCLIENT_SSL_NOT_SUPPORTED;
668 goto exit;
669 #endif
670 }
671 m->serverURI = MQTTStrdup(serverURI);
672 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
673 if (m->serverURI == NULL)
674 {
675 free(m);
676 if (bstate->clients->count == 0)
677 MQTTClient_terminate();
678 rc = PAHO_MEMORY_ERROR;
679 goto exit;
680 }
681 if (ListAppend(handles, m, sizeof(MQTTClients)) == NULL)
682 {
683 free(m->serverURI);
684 free(m);
685 if (bstate->clients->count == 0)
686 MQTTClient_terminate();
687 rc = PAHO_MEMORY_ERROR;
688 goto exit;
689 }
690 #else
691 ListAppend(handles, m, sizeof(MQTTClients));
692 #endif
693
694 if ((m->c = malloc(sizeof(Clients))) == NULL)
695 {
696 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
697 free(m->serverURI);
698 #endif
699 ListRemove(handles, m); //m has been free in ListRemove.
700 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
701 if (bstate->clients->count == 0)
702 MQTTClient_terminate();
703 #endif
704 rc = PAHO_MEMORY_ERROR;
705 goto exit;
706 }
707 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
708 mem_ret = memset_s(m->c, sizeof(Clients), '\0', sizeof(Clients));
709 if (mem_ret != 0)
710 {
711 free(m->c);
712 free(m->serverURI);
713 ListRemove(handles, m); //m has been free in ListRemove.
714 if (bstate->clients->count == 0)
715 MQTTClient_terminate();
716 rc = PAHO_MEMORY_ERROR;
717 goto exit;
718 }
719 #else
720 memset(m->c, '\0', sizeof(Clients));
721 #endif
722 m->c->context = m;
723 m->c->MQTTVersion = (options) ? options->MQTTVersion : MQTTVERSION_DEFAULT;
724 m->c->outboundMsgs = ListInitialize();
725 m->c->inboundMsgs = ListInitialize();
726 m->c->messageQueue = ListInitialize();
727 m->c->outboundQueue = ListInitialize();
728 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
729 m->c->retryMsgs = -1;
730 #endif
731 m->c->clientID = MQTTStrdup(clientId);
732 #if defined(ZERO_SOCK_FD_IS_INVALID)
733 m->c->net.socket = 0;
734 #else
735 m->c->net.socket = -1;
736 #endif
737 m->connect_sem = Thread_create_sem(&rc);
738 m->connack_sem = Thread_create_sem(&rc);
739 m->suback_sem = Thread_create_sem(&rc);
740 m->unsuback_sem = Thread_create_sem(&rc);
741
742 #if !defined(NO_PERSISTENCE)
743 rc = MQTTPersistence_create(&(m->c->persistence), persistence_type, persistence_context);
744 if (rc == 0)
745 {
746 rc = MQTTPersistence_initialize(m->c, m->serverURI);
747 if (rc == 0)
748 MQTTPersistence_restoreMessageQueue(m->c);
749 }
750 #endif
751 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
752 if (m->c->outboundMsgs == NULL ||
753 m->c->inboundMsgs == NULL ||
754 m->c->messageQueue == NULL ||
755 m->c->clientID == NULL ||
756 m->connect_sem == NULL ||
757 m->connack_sem == NULL ||
758 m->suback_sem == NULL ||
759 m->unsuback_sem == NULL
760 #if !defined(NO_PERSISTENCE)
761 || rc != 0
762 #endif
763 )
764 {
765 #if !defined(NO_PERSISTENCE)
766 if (rc != 0)
767 MQTTPersistence_close(m->c);
768 #endif
769 if (m->c->outboundMsgs != NULL)
770 ListFree(m->c->outboundMsgs);
771 if (m->c->inboundMsgs != NULL)
772 ListFree(m->c->inboundMsgs);
773 if (m->c->messageQueue != NULL)
774 ListFree(m->c->messageQueue);
775 if (m->c->clientID != NULL)
776 free(m->c->clientID);
777 if (m->connect_sem != NULL)
778 Thread_destroy_sem(m->connect_sem);
779 if (m->connack_sem != NULL)
780 Thread_destroy_sem(m->connack_sem);
781 if (m->suback_sem != NULL)
782 Thread_destroy_sem(m->suback_sem);
783 if (m->unsuback_sem != NULL)
784 Thread_destroy_sem(m->unsuback_sem);
785
786 free(m->c);
787 free(m->serverURI);
788 ListRemove(handles, m);
789 if (bstate->clients->count == 0)
790 MQTTClient_terminate();
791
792 rc = PAHO_MEMORY_ERROR;
793 goto exit;
794 }
795 #endif
796
797 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
798 if (!ListAppend(bstate->clients, m->c, sizeof(Clients) + 3*sizeof(List)))
799 {
800 #if !defined(NO_PERSISTENCE)
801 if (rc != 0)
802 MQTTPersistence_close(m->c);
803 #endif
804 if (m->c->outboundMsgs != NULL)
805 ListFree(m->c->outboundMsgs);
806 if (m->c->inboundMsgs != NULL)
807 ListFree(m->c->inboundMsgs);
808 if (m->c->messageQueue != NULL)
809 ListFree(m->c->messageQueue);
810 if (m->c->clientID != NULL)
811 free(m->c->clientID);
812 if (m->connect_sem != NULL)
813 Thread_destroy_sem(m->connect_sem);
814 if (m->connack_sem != NULL)
815 Thread_destroy_sem(m->connack_sem);
816 if (m->suback_sem != NULL)
817 Thread_destroy_sem(m->suback_sem);
818 if (m->unsuback_sem != NULL)
819 Thread_destroy_sem(m->unsuback_sem);
820
821 free(m->c);
822 free(m->serverURI);
823 ListRemove(handles, m);
824 if (bstate->clients->count == 0)
825 MQTTClient_terminate();
826
827 rc = PAHO_MEMORY_ERROR;
828 goto exit;
829 }
830 #else
831 ListAppend(bstate->clients, m->c, sizeof(Clients) + 3*sizeof(List));
832 #endif
833
834 exit:
835 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
836 if (handle != NULL && rc != 0)
837 *handle = NULL;
838 #endif
839 Thread_unlock_mutex(mqttclient_mutex);
840 nounlock_exit:
841 FUNC_EXIT_RC(rc);
842 return rc;
843 }
844
845
MQTTClient_create(MQTTClient * handle,const char * serverURI,const char * clientId,int persistence_type,void * persistence_context)846 int MQTTClient_create(MQTTClient* handle, const char* serverURI, const char* clientId,
847 int persistence_type, void* persistence_context)
848 {
849 return MQTTClient_createWithOptions(handle, serverURI, clientId, persistence_type,
850 persistence_context, NULL);
851 }
852
853
MQTTClient_terminate(void)854 static void MQTTClient_terminate(void)
855 {
856 FUNC_ENTRY;
857 MQTTClient_stop();
858 if (library_initialized)
859 {
860 ListFree(bstate->clients);
861 ListFree(handles);
862 handles = NULL;
863 WebSocket_terminate();
864 #if !defined(NO_HEAP_TRACKING)
865 Heap_terminate();
866 #endif
867 #if !defined(IOT_CONNECT) && !defined(IOT_LITEOS_ADAPT)
868 Log_terminate();
869 #endif
870 library_initialized = 0;
871 }
872 FUNC_EXIT;
873 }
874
875
MQTTClient_emptyMessageQueue(Clients * client)876 static void MQTTClient_emptyMessageQueue(Clients* client)
877 {
878 FUNC_ENTRY;
879 /* empty message queue */
880 if (client->messageQueue->count > 0)
881 {
882 ListElement* current = NULL;
883 while (ListNextElement(client->messageQueue, ¤t))
884 {
885 qEntry* qe = (qEntry*)(current->content);
886 free(qe->topicName);
887 MQTTProperties_free(&qe->msg->properties);
888 free(qe->msg->payload);
889 free(qe->msg);
890 }
891 ListEmpty(client->messageQueue);
892 }
893 FUNC_EXIT;
894 }
895
896
MQTTClient_destroy(MQTTClient * handle)897 void MQTTClient_destroy(MQTTClient* handle)
898 {
899 MQTTClients* m = *handle;
900
901 FUNC_ENTRY;
902 Thread_lock_mutex(connect_mutex);
903 Thread_lock_mutex(mqttclient_mutex);
904
905 if (m == NULL)
906 goto exit;
907
908 if (m->c)
909 {
910 SOCKET saved_socket = m->c->net.socket;
911 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
912 char* saved_clientid = NULL;
913 if (m->c->clientID != NULL)
914 saved_clientid = MQTTStrdup(m->c->clientID);
915 #else
916 char* saved_clientid = MQTTStrdup(m->c->clientID);
917 #endif
918 #if !defined(NO_PERSISTENCE)
919 MQTTPersistence_close(m->c);
920 #endif
921 MQTTClient_emptyMessageQueue(m->c);
922 MQTTProtocol_freeClient(m->c);
923 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
924 if (bstate != NULL && bstate->clients != NULL)
925 {
926 if (!ListRemove(bstate->clients, m->c))
927 Log(LOG_ERROR, 0, NULL);
928 else
929 Log(TRACE_MIN, 1, NULL, saved_clientid, saved_socket);
930 }
931
932 if (saved_clientid != NULL)
933 free(saved_clientid);
934 #else
935 if (!ListRemove(bstate->clients, m->c))
936 Log(LOG_ERROR, 0, NULL);
937 else
938 Log(TRACE_MIN, 1, NULL, saved_clientid, saved_socket);
939 free(saved_clientid);
940 #endif
941 }
942 if (m->serverURI)
943 free(m->serverURI);
944 Thread_destroy_sem(m->connect_sem);
945 Thread_destroy_sem(m->connack_sem);
946 Thread_destroy_sem(m->suback_sem);
947 Thread_destroy_sem(m->unsuback_sem);
948 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
949 if (handles != NULL && ListRemove(handles, m) == 0)
950 #else
951 if (!ListRemove(handles, m))
952 #endif
953 Log(LOG_ERROR, -1, "free error");
954 *handle = NULL;
955 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
956 if (bstate != NULL && bstate->clients != NULL && bstate->clients->count == 0)
957 #else
958 if (bstate->clients->count == 0)
959 #endif
960 MQTTClient_terminate();
961
962 exit:
963 Thread_unlock_mutex(mqttclient_mutex);
964 Thread_unlock_mutex(connect_mutex);
965 FUNC_EXIT;
966 }
967
968
MQTTClient_freeMessage(MQTTClient_message ** message)969 void MQTTClient_freeMessage(MQTTClient_message** message)
970 {
971 FUNC_ENTRY;
972 MQTTProperties_free(&(*message)->properties);
973 free((*message)->payload);
974 free(*message);
975 *message = NULL;
976 FUNC_EXIT;
977 }
978
979
MQTTClient_free(void * memory)980 void MQTTClient_free(void* memory)
981 {
982 FUNC_ENTRY;
983 free(memory);
984 FUNC_EXIT;
985 }
986
987
MQTTResponse_free(MQTTResponse response)988 void MQTTResponse_free(MQTTResponse response)
989 {
990 FUNC_ENTRY;
991 if (response.reasonCodeCount > 0 && response.reasonCodes)
992 free(response.reasonCodes);
993 if (response.properties)
994 {
995 MQTTProperties_free(response.properties);
996 free(response.properties);
997 }
998 FUNC_EXIT;
999 }
1000
1001
MQTTClient_deliverMessage(int rc,MQTTClients * m,char ** topicName,int * topicLen,MQTTClient_message ** message)1002 static int MQTTClient_deliverMessage(int rc, MQTTClients* m, char** topicName, int* topicLen, MQTTClient_message** message)
1003 {
1004 qEntry* qe = (qEntry*)(m->c->messageQueue->first->content);
1005
1006 FUNC_ENTRY;
1007 *message = qe->msg;
1008 *topicName = qe->topicName;
1009 *topicLen = qe->topicLen;
1010 if (strlen(*topicName) != *topicLen)
1011 rc = MQTTCLIENT_TOPICNAME_TRUNCATED;
1012 #if !defined(NO_PERSISTENCE)
1013 if (m->c->persistence)
1014 MQTTPersistence_unpersistQueueEntry(m->c, (MQTTPersistence_qEntry*)qe);
1015 #endif
1016 ListRemove(m->c->messageQueue, m->c->messageQueue->first->content);
1017 FUNC_EXIT_RC(rc);
1018 return rc;
1019 }
1020
1021
1022 /**
1023 * List callback function for comparing clients by socket
1024 * @param a first integer value
1025 * @param b second integer value
1026 * @return boolean indicating whether a and b are equal
1027 */
clientSockCompare(void * a,void * b)1028 static int clientSockCompare(void* a, void* b)
1029 {
1030 MQTTClients* m = (MQTTClients*)a;
1031 return m->c->net.socket == *(int*)b;
1032 }
1033
1034
1035 /**
1036 * Wrapper function to call connection lost on a separate thread. A separate thread is needed to allow the
1037 * connectionLost function to make API calls (e.g. connect)
1038 * @param context a pointer to the relevant client
1039 * @return thread_return_type standard thread return value - not used here
1040 */
connectionLost_call(void * context)1041 static thread_return_type WINAPI connectionLost_call(void* context)
1042 {
1043 struct conlost_sync_data *data = (struct conlost_sync_data *)context;
1044 MQTTClients* m = (MQTTClients *)data->m;
1045
1046 (*(m->cl))(m->context, NULL);
1047
1048 Thread_post_sem(data->sem);
1049 return 0;
1050 }
1051
1052
MQTTClient_setDisconnected(MQTTClient handle,void * context,MQTTClient_disconnected * disconnected)1053 int MQTTClient_setDisconnected(MQTTClient handle, void* context, MQTTClient_disconnected* disconnected)
1054 {
1055 int rc = MQTTCLIENT_SUCCESS;
1056 MQTTClients* m = handle;
1057
1058 FUNC_ENTRY;
1059 Thread_lock_mutex(mqttclient_mutex);
1060
1061 if (m == NULL || m->c->connect_state != NOT_IN_PROGRESS)
1062 rc = MQTTCLIENT_FAILURE;
1063 else
1064 {
1065 m->disconnected_context = context;
1066 m->disconnected = disconnected;
1067 }
1068
1069 Thread_unlock_mutex(mqttclient_mutex);
1070 FUNC_EXIT_RC(rc);
1071 return rc;
1072 }
1073
1074
1075
1076 /**
1077 * Wrapper function to call disconnected on a separate thread. A separate thread is needed to allow the
1078 * disconnected function to make API calls (e.g. connect)
1079 * @param context a pointer to the relevant client
1080 * @return thread_return_type standard thread return value - not used here
1081 */
call_disconnected(void * context)1082 static thread_return_type WINAPI call_disconnected(void* context)
1083 {
1084 struct props_rc_parms* pr = (struct props_rc_parms*)context;
1085
1086 (*(pr->m->disconnected))(pr->m->disconnected_context, pr->properties, pr->reasonCode);
1087 MQTTProperties_free(pr->properties);
1088 free(pr->properties);
1089 free(pr);
1090 return 0;
1091 }
1092
1093
MQTTClient_setPublished(MQTTClient handle,void * context,MQTTClient_published * published)1094 int MQTTClient_setPublished(MQTTClient handle, void* context, MQTTClient_published* published)
1095 {
1096 int rc = MQTTCLIENT_SUCCESS;
1097 MQTTClients* m = handle;
1098
1099 FUNC_ENTRY;
1100 Thread_lock_mutex(mqttclient_mutex);
1101
1102 if (m == NULL || m->c->connect_state != NOT_IN_PROGRESS)
1103 rc = MQTTCLIENT_FAILURE;
1104 else
1105 {
1106 m->published_context = context;
1107 m->published = published;
1108 }
1109
1110 Thread_unlock_mutex(mqttclient_mutex);
1111 FUNC_EXIT_RC(rc);
1112 return rc;
1113 }
1114
1115
1116 #if 0
1117 int MQTTClient_setHandleAuth(MQTTClient handle, void* context, MQTTClient_handleAuth* auth_handle)
1118 {
1119 int rc = MQTTCLIENT_SUCCESS;
1120 MQTTClients* m = handle;
1121
1122 FUNC_ENTRY;
1123 Thread_lock_mutex(mqttclient_mutex);
1124
1125 if (m == NULL || m->c->connect_state != NOT_IN_PROGRESS)
1126 rc = MQTTCLIENT_FAILURE;
1127 else
1128 {
1129 m->auth_handle_context = context;
1130 m->auth_handle = auth_handle;
1131 }
1132
1133 Thread_unlock_mutex(mqttclient_mutex);
1134 FUNC_EXIT_RC(rc);
1135 return rc;
1136 }
1137
1138
1139 /**
1140 * Wrapper function to call authHandle on a separate thread. A separate thread is needed to allow the
1141 * disconnected function to make API calls (e.g. MQTTClient_auth)
1142 * @param context a pointer to the relevant client
1143 * @return thread_return_type standard thread return value - not used here
1144 */
1145 static thread_return_type WINAPI call_auth_handle(void* context)
1146 {
1147 struct props_rc_parms* pr = (struct props_rc_parms*)context;
1148
1149 (*(pr->m->auth_handle))(pr->m->auth_handle_context, pr->properties, pr->reasonCode);
1150 MQTTProperties_free(pr->properties);
1151 free(pr->properties);
1152 free(pr);
1153 return 0;
1154 }
1155 #endif
1156
1157
1158 /* This is the thread function that handles the calling of callback functions if set */
MQTTClient_run(void * n)1159 static thread_return_type WINAPI MQTTClient_run(void* n)
1160 {
1161 long timeout = 10L; /* first time in we have a small timeout. Gets things started more quickly */
1162
1163 FUNC_ENTRY;
1164 Thread_set_name("MQTTClient_run");
1165 Thread_lock_mutex(mqttclient_mutex);
1166
1167 run_id = Thread_getid();
1168 running = 1;
1169 while (!tostop)
1170 {
1171 int rc = SOCKET_ERROR;
1172 SOCKET sock = -1;
1173 MQTTClients* m = NULL;
1174 MQTTPacket* pack = NULL;
1175
1176 Thread_unlock_mutex(mqttclient_mutex);
1177 pack = MQTTClient_cycle(&sock, timeout, &rc);
1178 Thread_lock_mutex(mqttclient_mutex);
1179 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
1180 if (tostop != 0 || handles == NULL)
1181 #else
1182 if (tostop)
1183 #endif
1184 break;
1185 timeout = 100L;
1186
1187 /* find client corresponding to socket */
1188 if (ListFindItem(handles, &sock, clientSockCompare) == NULL)
1189 {
1190 /* assert: should not happen */
1191 continue;
1192 }
1193 m = (MQTTClient)(handles->current->content);
1194 if (m == NULL)
1195 {
1196 /* assert: should not happen */
1197 continue;
1198 }
1199 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
1200 if (m->c != NULL && m->c->messageQueue != NULL && m->c->messageQueue->count > 0)
1201 timeout = 0L;
1202 #endif
1203 if (rc == SOCKET_ERROR)
1204 {
1205 if (m->c->connected)
1206 MQTTClient_disconnect_internal(m, 0);
1207 else
1208 {
1209 if (m->c->connect_state == SSL_IN_PROGRESS)
1210 {
1211 Log(TRACE_MIN, -1, "Posting connect semaphore for client %s", m->c->clientID);
1212 m->c->connect_state = NOT_IN_PROGRESS;
1213 Thread_post_sem(m->connect_sem);
1214 }
1215 if (m->c->connect_state == WAIT_FOR_CONNACK)
1216 {
1217 Log(TRACE_MIN, -1, "Posting connack semaphore for client %s", m->c->clientID);
1218 m->c->connect_state = NOT_IN_PROGRESS;
1219 Thread_post_sem(m->connack_sem);
1220 }
1221 }
1222 }
1223 else
1224 {
1225 if (m->c->messageQueue->count > 0 && m->ma)
1226 {
1227 qEntry* qe = (qEntry*)(m->c->messageQueue->first->content);
1228 int topicLen = qe->topicLen;
1229
1230 if (strlen(qe->topicName) == topicLen)
1231 topicLen = 0;
1232
1233 Log(TRACE_MIN, -1, "Calling messageArrived for client %s, queue depth %d",
1234 m->c->clientID, m->c->messageQueue->count);
1235 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
1236 uapi_watchdog_kick();
1237 #endif
1238 Thread_unlock_mutex(mqttclient_mutex);
1239 rc = (*(m->ma))(m->context, qe->topicName, topicLen, qe->msg);
1240 Thread_lock_mutex(mqttclient_mutex);
1241 /* if 0 (false) is returned by the callback then it failed, so we don't remove the message from
1242 * the queue, and it will be retried later. If 1 is returned then the message data may have been freed,
1243 * so we must be careful how we use it.
1244 */
1245 if (rc)
1246 {
1247 #if !defined(NO_PERSISTENCE)
1248 if (m->c->persistence)
1249 MQTTPersistence_unpersistQueueEntry(m->c, (MQTTPersistence_qEntry*)qe);
1250 #endif
1251 ListRemove(m->c->messageQueue, qe);
1252 }
1253 else
1254 Log(TRACE_MIN, -1, "False returned from messageArrived for client %s, message remains on queue",
1255 m->c->clientID);
1256 }
1257 if (pack)
1258 {
1259 if (pack->header.bits.type == CONNACK)
1260 {
1261 Log(TRACE_MIN, -1, "Posting connack semaphore for client %s", m->c->clientID);
1262 m->pack = pack;
1263 Thread_post_sem(m->connack_sem);
1264 }
1265 else if (pack->header.bits.type == SUBACK)
1266 {
1267 Log(TRACE_MIN, -1, "Posting suback semaphore for client %s", m->c->clientID);
1268 m->pack = pack;
1269 Thread_post_sem(m->suback_sem);
1270 }
1271 else if (pack->header.bits.type == UNSUBACK)
1272 {
1273 Log(TRACE_MIN, -1, "Posting unsuback semaphore for client %s", m->c->clientID);
1274 m->pack = pack;
1275 Thread_post_sem(m->unsuback_sem);
1276 }
1277 else if (m->c->MQTTVersion >= MQTTVERSION_5)
1278 {
1279 if (pack->header.bits.type == DISCONNECT && m->disconnected)
1280 {
1281 struct props_rc_parms* dp;
1282 Ack* disc = (Ack*)pack;
1283
1284 dp = malloc(sizeof(struct props_rc_parms));
1285 if (dp)
1286 {
1287 dp->m = m;
1288 dp->reasonCode = disc->rc;
1289 dp->properties = malloc(sizeof(MQTTProperties));
1290 if (dp->properties)
1291 {
1292 *(dp->properties) = disc->properties;
1293 MQTTClient_disconnect1(m, 10, 0, 1, MQTTREASONCODE_SUCCESS, NULL);
1294 Log(TRACE_MIN, -1, "Calling disconnected for client %s", m->c->clientID);
1295 Thread_start(call_disconnected, dp);
1296 }
1297 else
1298 free(dp);
1299 }
1300 free(disc);
1301 }
1302 #if 0
1303 if (pack->header.bits.type == AUTH && m->auth_handle)
1304 {
1305 struct props_rc_parms dp;
1306 Ack* disc = (Ack*)pack;
1307
1308 dp.m = m;
1309 dp.properties = &disc->properties;
1310 dp.reasonCode = disc->rc;
1311 free(pack);
1312 Log(TRACE_MIN, -1, "Calling auth_handle for client %s", m->c->clientID);
1313 Thread_start(call_auth_handle, &dp);
1314 }
1315 #endif
1316 }
1317 }
1318 else if (m->c->connect_state == TCP_IN_PROGRESS)
1319 {
1320 int error = 0;
1321 socklen_t len = sizeof(error);
1322
1323 if ((m->rc = getsockopt(m->c->net.socket, SOL_SOCKET, SO_ERROR, (char*)&error, &len)) == 0)
1324 m->rc = error;
1325 Log(TRACE_MIN, -1, "Posting connect semaphore for client %s rc %d", m->c->clientID, m->rc);
1326 m->c->connect_state = NOT_IN_PROGRESS;
1327 Thread_post_sem(m->connect_sem);
1328 }
1329 #if defined(OPENSSL) || defined(MBEDTLS)
1330 else if (m->c->connect_state == SSL_IN_PROGRESS)
1331 {
1332 rc = m->c->sslopts->struct_version >= 3 ?
1333 SSLSocket_connect(m->c->net.ssl, m->c->net.socket, m->serverURI,
1334 m->c->sslopts->verify, m->c->sslopts->ssl_error_cb, m->c->sslopts->ssl_error_context) :
1335 SSLSocket_connect(m->c->net.ssl, m->c->net.socket, m->serverURI,
1336 m->c->sslopts->verify, NULL, NULL);
1337 if (rc == 1 || rc == SSL_FATAL)
1338 {
1339 if (rc == 1 && (m->c->cleansession == 0 && m->c->cleanstart == 0) && m->c->session == NULL)
1340 #if defined(OPENSSL)
1341 m->c->session = SSL_get1_session(m->c->net.ssl);
1342 #elif defined(MBEDTLS)
1343 mbedtls_ssl_get_session(m->c->net.ssl, m->c->session);
1344 #endif
1345 m->rc = rc;
1346 Log(TRACE_MIN, -1, "Posting connect semaphore for SSL client %s rc %d", m->c->clientID, m->rc);
1347 m->c->connect_state = NOT_IN_PROGRESS;
1348 Thread_post_sem(m->connect_sem);
1349 }
1350 }
1351 #endif
1352 else if (m->c->connect_state == WEBSOCKET_IN_PROGRESS)
1353 {
1354 if (rc != TCPSOCKET_INTERRUPTED)
1355 {
1356 Log(TRACE_MIN, -1, "Posting websocket handshake for client %s rc %d", m->c->clientID, m->rc);
1357 m->c->connect_state = WAIT_FOR_CONNACK;
1358 Thread_post_sem(m->connect_sem);
1359 }
1360 }
1361 }
1362 }
1363 run_id = 0;
1364 running = tostop = 0;
1365 Thread_unlock_mutex(mqttclient_mutex);
1366 FUNC_EXIT;
1367 #if defined(_WIN32) || defined(_WIN64)
1368 ExitThread(0);
1369 #endif
1370 return 0;
1371 }
1372
1373
MQTTClient_stop(void)1374 static int MQTTClient_stop(void)
1375 {
1376 int rc = 0;
1377
1378 FUNC_ENTRY;
1379 if (running == 1 && tostop == 0)
1380 {
1381 int conn_count = 0;
1382 ListElement* current = NULL;
1383
1384 if (handles != NULL)
1385 {
1386 /* find out how many handles are still connected */
1387 while (ListNextElement(handles, ¤t))
1388 {
1389 if (((MQTTClients*)(current->content))->c->connect_state > NOT_IN_PROGRESS ||
1390 ((MQTTClients*)(current->content))->c->connected)
1391 ++conn_count;
1392 }
1393 }
1394 Log(TRACE_MIN, -1, "Conn_count is %d", conn_count);
1395 /* stop the background thread, if we are the last one to be using it */
1396 if (conn_count == 0)
1397 {
1398 int count = 0;
1399 tostop = 1;
1400 if (Thread_getid() != run_id)
1401 {
1402 while (running && ++count < 100)
1403 {
1404 Thread_unlock_mutex(mqttclient_mutex);
1405 Log(TRACE_MIN, -1, "sleeping");
1406 MQTTTime_sleep(100L);
1407 Thread_lock_mutex(mqttclient_mutex);
1408 }
1409 }
1410 rc = 1;
1411 }
1412 }
1413 FUNC_EXIT_RC(rc);
1414 return rc;
1415 }
1416
1417
MQTTClient_setCallbacks(MQTTClient handle,void * context,MQTTClient_connectionLost * cl,MQTTClient_messageArrived * ma,MQTTClient_deliveryComplete * dc)1418 int MQTTClient_setCallbacks(MQTTClient handle, void* context, MQTTClient_connectionLost* cl,
1419 MQTTClient_messageArrived* ma, MQTTClient_deliveryComplete* dc)
1420 {
1421 int rc = MQTTCLIENT_SUCCESS;
1422 MQTTClients* m = handle;
1423
1424 FUNC_ENTRY;
1425 Thread_lock_mutex(mqttclient_mutex);
1426
1427 if (m == NULL || ma == NULL || m->c->connect_state != NOT_IN_PROGRESS)
1428 rc = MQTTCLIENT_FAILURE;
1429 else
1430 {
1431 m->context = context;
1432 m->cl = cl;
1433 m->ma = ma;
1434 m->dc = dc;
1435 }
1436
1437 Thread_unlock_mutex(mqttclient_mutex);
1438 FUNC_EXIT_RC(rc);
1439 return rc;
1440 }
1441
1442
MQTTClient_closeSession(Clients * client,enum MQTTReasonCodes reason,MQTTProperties * props)1443 static void MQTTClient_closeSession(Clients* client, enum MQTTReasonCodes reason, MQTTProperties* props)
1444 {
1445 FUNC_ENTRY;
1446 client->good = 0;
1447 client->ping_outstanding = 0;
1448 client->ping_due = 0;
1449 #if defined(ZERO_SOCK_FD_IS_INVALID)
1450 if (client->net.socket > 0)
1451 #else
1452 if (client->net.socket >= 0)
1453 #endif
1454 {
1455 if (client->connected)
1456 MQTTPacket_send_disconnect(client, reason, props);
1457 Thread_lock_mutex(socket_mutex);
1458 WebSocket_close(&client->net, WebSocket_CLOSE_NORMAL, NULL);
1459
1460 #if defined(OPENSSL)
1461 SSL_SESSION_free(client->session); /* is a no-op if session is NULL */
1462 #elif defined(MBEDTLS)
1463 mbedtls_ssl_session_free(client->session);
1464 #endif
1465 #if defined(OPENSSL) || defined(MBEDTLS)
1466 client->session = NULL; /* show the session has been freed */
1467 SSLSocket_close(&client->net);
1468 #endif
1469 Socket_close(client->net.socket);
1470 Thread_unlock_mutex(socket_mutex);
1471 #if defined(ZERO_SOCK_FD_IS_INVALID)
1472 client->net.socket = 0;
1473 #else
1474 client->net.socket = -1;
1475 #endif
1476 #if defined(OPENSSL) || defined(MBEDTLS)
1477 client->net.ssl = NULL;
1478 #endif
1479 }
1480 client->connected = 0;
1481 client->connect_state = NOT_IN_PROGRESS;
1482
1483 if (client->MQTTVersion < MQTTVERSION_5 && client->cleansession)
1484 MQTTClient_cleanSession(client);
1485 FUNC_EXIT;
1486 }
1487
1488
MQTTClient_cleanSession(Clients * client)1489 static int MQTTClient_cleanSession(Clients* client)
1490 {
1491 int rc = 0;
1492
1493 FUNC_ENTRY;
1494 #if !defined(NO_PERSISTENCE)
1495 rc = MQTTPersistence_clear(client);
1496 #endif
1497 MQTTProtocol_emptyMessageList(client->inboundMsgs);
1498 MQTTProtocol_emptyMessageList(client->outboundMsgs);
1499 MQTTClient_emptyMessageQueue(client);
1500 client->msgID = 0;
1501 FUNC_EXIT_RC(rc);
1502 return rc;
1503 }
1504
1505
Protocol_processPublication(Publish * publish,Clients * client,int allocatePayload)1506 void Protocol_processPublication(Publish* publish, Clients* client, int allocatePayload)
1507 {
1508 qEntry* qe = NULL;
1509 MQTTClient_message* mm = NULL;
1510 MQTTClient_message initialized = MQTTClient_message_initializer;
1511
1512 FUNC_ENTRY;
1513 qe = malloc(sizeof(qEntry));
1514 if (!qe)
1515 goto exit;
1516 mm = malloc(sizeof(MQTTClient_message));
1517 if (!mm)
1518 {
1519 free(qe);
1520 goto exit;
1521 }
1522 memcpy(mm, &initialized, sizeof(MQTTClient_message));
1523
1524 qe->msg = mm;
1525 qe->topicName = publish->topic;
1526 qe->topicLen = publish->topiclen;
1527 publish->topic = NULL;
1528 if (allocatePayload)
1529 {
1530 mm->payload = malloc(publish->payloadlen);
1531 if (mm->payload == NULL)
1532 {
1533 free(mm);
1534 free(qe);
1535 goto exit;
1536 }
1537 memcpy(mm->payload, publish->payload, publish->payloadlen);
1538 }
1539 else
1540 mm->payload = publish->payload;
1541 mm->payloadlen = publish->payloadlen;
1542 mm->qos = publish->header.bits.qos;
1543 mm->retained = publish->header.bits.retain;
1544 if (publish->header.bits.qos == 2)
1545 mm->dup = 0; /* ensure that a QoS2 message is not passed to the application with dup = 1 */
1546 else
1547 mm->dup = publish->header.bits.dup;
1548 mm->msgid = publish->msgId;
1549
1550 if (publish->MQTTVersion >= 5)
1551 mm->properties = MQTTProperties_copy(&publish->properties);
1552
1553 ListAppend(client->messageQueue, qe, sizeof(qe) + sizeof(mm) + mm->payloadlen + strlen(qe->topicName)+1);
1554 #if !defined(NO_PERSISTENCE)
1555 if (client->persistence)
1556 MQTTPersistence_persistQueueEntry(client, (MQTTPersistence_qEntry*)qe);
1557 #endif
1558 exit:
1559 FUNC_EXIT;
1560 }
1561
1562
MQTTClient_connectURIVersion(MQTTClient handle,MQTTClient_connectOptions * options,const char * serverURI,int MQTTVersion,START_TIME_TYPE start,ELAPSED_TIME_TYPE millisecsTimeout,MQTTProperties * connectProperties,MQTTProperties * willProperties)1563 static MQTTResponse MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOptions* options, const char* serverURI, int MQTTVersion,
1564 START_TIME_TYPE start, ELAPSED_TIME_TYPE millisecsTimeout, MQTTProperties* connectProperties, MQTTProperties* willProperties)
1565 {
1566 MQTTClients* m = handle;
1567 int rc = SOCKET_ERROR;
1568 int sessionPresent = 0;
1569 MQTTResponse resp = MQTTResponse_initializer;
1570
1571 FUNC_ENTRY;
1572 resp.reasonCode = SOCKET_ERROR;
1573 if (m->ma && !running)
1574 {
1575 int count = 0;
1576
1577 Thread_start(MQTTClient_run, handle);
1578 #if defined(IOT_CONNECT)
1579 MQTTTime_sleep(100L);
1580 #endif
1581 if (MQTTTime_elapsed(start) >= millisecsTimeout)
1582 {
1583 rc = SOCKET_ERROR;
1584 goto exit;
1585 }
1586 while (!running && ++count < 5)
1587 {
1588 Thread_unlock_mutex(mqttclient_mutex);
1589 MQTTTime_sleep(100L);
1590 Thread_lock_mutex(mqttclient_mutex);
1591 }
1592 if (!running)
1593 {
1594 rc = SOCKET_ERROR;
1595 goto exit;
1596 }
1597 }
1598
1599 Log(TRACE_MIN, -1, "Connecting to serverURI %s with MQTT version %d", serverURI, MQTTVersion);
1600 #if defined(OPENSSL) || defined(MBEDTLS)
1601 #if defined(__GNUC__) && defined(__linux__) && !defined(__LITEOS__)
1602 rc = MQTTProtocol_connect(serverURI, m->c, m->ssl, m->websocket, MQTTVersion, connectProperties, willProperties,
1603 millisecsTimeout - MQTTTime_elapsed(start));
1604 #else
1605 rc = MQTTProtocol_connect(serverURI, m->c, m->ssl, m->websocket, MQTTVersion, connectProperties, willProperties);
1606 #endif
1607 #else
1608 #if defined(__GNUC__) && defined(__linux__) && !defined(__LITEOS__)
1609 rc = MQTTProtocol_connect(serverURI, m->c, m->websocket, MQTTVersion, connectProperties, willProperties,
1610 millisecsTimeout - MQTTTime_elapsed(start));
1611 #else
1612 rc = MQTTProtocol_connect(serverURI, m->c, m->websocket, MQTTVersion, connectProperties, willProperties);
1613 #endif
1614 #endif
1615 if (rc == SOCKET_ERROR)
1616 goto exit;
1617
1618 if (m->c->connect_state == NOT_IN_PROGRESS)
1619 {
1620 rc = SOCKET_ERROR;
1621 goto exit;
1622 }
1623
1624 if (m->c->connect_state == TCP_IN_PROGRESS) /* TCP connect started - wait for completion */
1625 {
1626 Thread_unlock_mutex(mqttclient_mutex);
1627 MQTTClient_waitfor(handle, CONNECT, &rc, millisecsTimeout - MQTTTime_elapsed(start));
1628 Thread_lock_mutex(mqttclient_mutex);
1629 if (rc != 0)
1630 {
1631 rc = SOCKET_ERROR;
1632 goto exit;
1633 }
1634 #if defined(OPENSSL) || defined(MBEDTLS)
1635 if (m->ssl)
1636 {
1637 int port1;
1638 size_t hostname_len;
1639 const char *topic;
1640 int setSocketForSSLrc = 0;
1641
1642 if (m->c->net.https_proxy) {
1643 m->c->connect_state = PROXY_CONNECT_IN_PROGRESS;
1644 if ((rc = Proxy_connect( &m->c->net, 1, serverURI)) == SOCKET_ERROR )
1645 goto exit;
1646 }
1647
1648 hostname_len = MQTTProtocol_addressPort(serverURI, &port1, &topic, MQTT_DEFAULT_PORT);
1649 setSocketForSSLrc = SSLSocket_setSocketForSSL(&m->c->net, m->c->sslopts,
1650 serverURI, hostname_len);
1651
1652 if (setSocketForSSLrc != MQTTCLIENT_SUCCESS)
1653 {
1654 if (m->c->session != NULL)
1655 #if defined(OPENSSL)
1656 if ((rc = SSL_set_session(m->c->net.ssl, m->c->session)) != 1)
1657 #elif defined(MBEDTLS)
1658 if ((rc = mbedtls_ssl_set_session(m->c->net.ssl, m->c->session)) != 0)
1659 #endif
1660 Log(TRACE_MIN, -1, "Failed to set SSL session with stored data, non critical");
1661 rc = m->c->sslopts->struct_version >= 3 ?
1662 SSLSocket_connect(m->c->net.ssl, m->c->net.socket, serverURI,
1663 m->c->sslopts->verify, m->c->sslopts->ssl_error_cb, m->c->sslopts->ssl_error_context) :
1664 SSLSocket_connect(m->c->net.ssl, m->c->net.socket, serverURI,
1665 m->c->sslopts->verify, NULL, NULL);
1666 if (rc == TCPSOCKET_INTERRUPTED)
1667 m->c->connect_state = SSL_IN_PROGRESS; /* the connect is still in progress */
1668 else if (rc == SSL_FATAL)
1669 {
1670 rc = SOCKET_ERROR;
1671 goto exit;
1672 }
1673 else if (rc == 1)
1674 {
1675 if (m->websocket)
1676 {
1677 m->c->connect_state = WEBSOCKET_IN_PROGRESS;
1678 rc = WebSocket_connect(&m->c->net, 1, serverURI);
1679 if ( rc == SOCKET_ERROR )
1680 goto exit;
1681 }
1682 else
1683 {
1684 rc = MQTTCLIENT_SUCCESS;
1685 m->c->connect_state = WAIT_FOR_CONNACK;
1686 if (MQTTPacket_send_connect(m->c, MQTTVersion, connectProperties, willProperties) == SOCKET_ERROR)
1687 {
1688 rc = SOCKET_ERROR;
1689 goto exit;
1690 }
1691 if ((m->c->cleansession == 0 && m->c->cleanstart == 0) && m->c->session == NULL)
1692 #if defined(OPENSSL)
1693 m->c->session = SSL_get1_session(m->c->net.ssl);
1694 #elif defined(MBEDTLS)
1695 mbedtls_ssl_get_session(m->c->net.ssl, m->c->session);
1696 #endif
1697 }
1698 }
1699 }
1700 else
1701 {
1702 rc = SOCKET_ERROR;
1703 goto exit;
1704 }
1705 }
1706 #endif
1707 else
1708 {
1709 if (m->c->net.http_proxy) {
1710 m->c->connect_state = PROXY_CONNECT_IN_PROGRESS;
1711 if ((rc = Proxy_connect( &m->c->net, 0, serverURI)) == SOCKET_ERROR )
1712 goto exit;
1713 }
1714
1715 if (m->websocket)
1716 {
1717 m->c->connect_state = WEBSOCKET_IN_PROGRESS;
1718 if ( WebSocket_connect(&m->c->net, 0, serverURI) == SOCKET_ERROR )
1719 {
1720 rc = SOCKET_ERROR;
1721 goto exit;
1722 }
1723 }
1724 else
1725 {
1726 m->c->connect_state = WAIT_FOR_CONNACK; /* TCP connect completed, in which case send the MQTT connect packet */
1727 if (MQTTPacket_send_connect(m->c, MQTTVersion, connectProperties, willProperties) == SOCKET_ERROR)
1728 {
1729 rc = SOCKET_ERROR;
1730 goto exit;
1731 }
1732 }
1733 }
1734 }
1735
1736 #if defined(OPENSSL) || defined(MBEDTLS)
1737 if (m->c->connect_state == SSL_IN_PROGRESS) /* SSL connect sent - wait for completion */
1738 {
1739 Thread_unlock_mutex(mqttclient_mutex);
1740 MQTTClient_waitfor(handle, CONNECT, &rc, millisecsTimeout - MQTTTime_elapsed(start));
1741 Thread_lock_mutex(mqttclient_mutex);
1742 if (rc != 1)
1743 {
1744 rc = SOCKET_ERROR;
1745 goto exit;
1746 }
1747 if((m->c->cleansession == 0 && m->c->cleanstart == 0) && m->c->session == NULL)
1748 #if defined(OPENSSL)
1749 m->c->session = SSL_get1_session(m->c->net.ssl);
1750 #elif defined(MBEDTLS)
1751 mbedtls_ssl_get_session(m->c->net.ssl, m->c->session);
1752 #endif
1753
1754 if ( m->websocket )
1755 {
1756 /* wait for websocket connect */
1757 m->c->connect_state = WEBSOCKET_IN_PROGRESS;
1758 rc = WebSocket_connect( &m->c->net, 1, serverURI);
1759 if ( rc != 1 )
1760 {
1761 rc = SOCKET_ERROR;
1762 goto exit;
1763 }
1764 }
1765 else
1766 {
1767 m->c->connect_state = WAIT_FOR_CONNACK; /* TCP connect completed, in which case send the MQTT connect packet */
1768 if (MQTTPacket_send_connect(m->c, MQTTVersion, connectProperties, willProperties) == SOCKET_ERROR)
1769 {
1770 rc = SOCKET_ERROR;
1771 goto exit;
1772 }
1773 }
1774 }
1775 #endif
1776
1777 if (m->c->connect_state == WEBSOCKET_IN_PROGRESS) /* websocket request sent - wait for upgrade */
1778 {
1779 Thread_unlock_mutex(mqttclient_mutex);
1780 MQTTClient_waitfor(handle, CONNECT, &rc, millisecsTimeout - MQTTTime_elapsed(start));
1781 Thread_lock_mutex(mqttclient_mutex);
1782 m->c->connect_state = WAIT_FOR_CONNACK; /* websocket upgrade complete */
1783 if (MQTTPacket_send_connect(m->c, MQTTVersion, connectProperties, willProperties) == SOCKET_ERROR)
1784 {
1785 rc = SOCKET_ERROR;
1786 goto exit;
1787 }
1788 }
1789
1790 if (m->c->connect_state == WAIT_FOR_CONNACK) /* MQTT connect sent - wait for CONNACK */
1791 {
1792 MQTTPacket* pack = NULL;
1793 Thread_unlock_mutex(mqttclient_mutex);
1794 pack = MQTTClient_waitfor(handle, CONNACK, &rc, millisecsTimeout - MQTTTime_elapsed(start));
1795 Thread_lock_mutex(mqttclient_mutex);
1796 if (pack == NULL)
1797 rc = SOCKET_ERROR;
1798 else
1799 {
1800 Connack* connack = (Connack*)pack;
1801 Log(TRACE_PROTOCOL, 1, NULL, m->c->net.socket, m->c->clientID, connack->rc);
1802 if ((rc = connack->rc) == MQTTCLIENT_SUCCESS)
1803 {
1804 m->c->connected = 1;
1805 m->c->good = 1;
1806 m->c->connect_state = NOT_IN_PROGRESS;
1807 if (MQTTVersion == 4)
1808 sessionPresent = connack->flags.bits.sessionPresent;
1809 if (m->c->cleansession || m->c->cleanstart)
1810 rc = MQTTClient_cleanSession(m->c);
1811 if (m->c->outboundMsgs->count > 0)
1812 {
1813 ListElement* outcurrent = NULL;
1814 START_TIME_TYPE zero = START_TIME_ZERO;
1815
1816 while (ListNextElement(m->c->outboundMsgs, &outcurrent))
1817 {
1818 Messages* m = (Messages*)(outcurrent->content);
1819 memset(&m->lastTouch, '\0', sizeof(m->lastTouch));
1820 }
1821 MQTTProtocol_retry(zero, 1, 1);
1822 if (m->c->connected != 1)
1823 rc = MQTTCLIENT_DISCONNECTED;
1824 }
1825 if (m->c->MQTTVersion == MQTTVERSION_5)
1826 {
1827 if ((resp.properties = malloc(sizeof(MQTTProperties))) == NULL)
1828 {
1829 rc = PAHO_MEMORY_ERROR;
1830 goto exit;
1831 }
1832 *resp.properties = MQTTProperties_copy(&connack->properties);
1833 }
1834 }
1835 MQTTPacket_freeConnack(connack);
1836 m->pack = NULL;
1837 }
1838 }
1839 exit:
1840 if (rc == MQTTCLIENT_SUCCESS)
1841 {
1842 if (options->struct_version >= 4) /* means we have to fill out return values */
1843 {
1844 options->returned.serverURI = serverURI;
1845 options->returned.MQTTVersion = MQTTVersion;
1846 options->returned.sessionPresent = sessionPresent;
1847 }
1848 }
1849 else
1850 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
1851 MQTTClient_disconnect1(handle, 0, 0, MQTTVersion == 3, MQTTREASONCODE_SUCCESS, NULL); /* don't want to call connection lost */
1852 #else
1853 MQTTClient_disconnect1(handle, 0, 0, (MQTTVersion == 3), MQTTREASONCODE_SUCCESS, NULL); /* don't want to call connection lost */
1854 #endif
1855 resp.reasonCode = rc;
1856 FUNC_EXIT_RC(resp.reasonCode);
1857 return resp;
1858 }
1859
1860
1861 static int retryLoopIntervalms = 5000;
1862
setRetryLoopInterval(int keepalive)1863 void setRetryLoopInterval(int keepalive)
1864 {
1865 retryLoopIntervalms = (keepalive*1000) / 10;
1866
1867 if (retryLoopIntervalms < 100)
1868 retryLoopIntervalms = 100;
1869 else if (retryLoopIntervalms > 5000)
1870 retryLoopIntervalms = 5000;
1871 }
1872
1873
MQTTClient_connectURI(MQTTClient handle,MQTTClient_connectOptions * options,const char * serverURI,MQTTProperties * connectProperties,MQTTProperties * willProperties)1874 static MQTTResponse MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectOptions* options, const char* serverURI,
1875 MQTTProperties* connectProperties, MQTTProperties* willProperties)
1876 {
1877 MQTTClients* m = handle;
1878 START_TIME_TYPE start;
1879 ELAPSED_TIME_TYPE millisecsTimeout = 30000L;
1880 MQTTResponse rc = MQTTResponse_initializer;
1881 int MQTTVersion = 0;
1882 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
1883 int mem_ret = -1;
1884 #endif
1885
1886 FUNC_ENTRY;
1887 rc.reasonCode = SOCKET_ERROR;
1888 millisecsTimeout = options->connectTimeout * 1000;
1889 start = MQTTTime_start_clock();
1890
1891 m->currentServerURI = serverURI;
1892 m->c->keepAliveInterval = options->keepAliveInterval;
1893 m->c->retryInterval = options->retryInterval;
1894 setRetryLoopInterval(options->keepAliveInterval);
1895 m->c->MQTTVersion = options->MQTTVersion;
1896 m->c->cleanstart = m->c->cleansession = 0;
1897 if (m->c->MQTTVersion >= MQTTVERSION_5)
1898 m->c->cleanstart = options->cleanstart;
1899 else
1900 m->c->cleansession = options->cleansession;
1901 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
1902 m->c->maxInflightMessages = (options->reliable) ? 1 : 2;
1903 #else
1904 m->c->maxInflightMessages = (options->reliable) ? 1 : 10;
1905 #endif
1906 if (options->struct_version >= 6)
1907 {
1908 if (options->maxInflightMessages > 0)
1909 m->c->maxInflightMessages = options->maxInflightMessages;
1910 }
1911
1912 if (options->struct_version >= 7)
1913 {
1914 m->c->net.httpHeaders = options->httpHeaders;
1915 }
1916 if (options->struct_version >= 8)
1917 {
1918 if (options->httpProxy)
1919 m->c->httpProxy = MQTTStrdup(options->httpProxy);
1920 if (options->httpsProxy)
1921 m->c->httpsProxy = MQTTStrdup(options->httpsProxy);
1922 }
1923
1924 if (m->c->will)
1925 {
1926 free(m->c->will->payload);
1927 free(m->c->will->topic);
1928 free(m->c->will);
1929 m->c->will = NULL;
1930 }
1931
1932 if (options->will && (options->will->struct_version == 0 || options->will->struct_version == 1))
1933 {
1934 const void* source = NULL;
1935
1936 if ((m->c->will = malloc(sizeof(willMessages))) == NULL)
1937 {
1938 rc.reasonCode = PAHO_MEMORY_ERROR;
1939 goto exit;
1940 }
1941 if (options->will->message || (options->will->struct_version == 1 && options->will->payload.data))
1942 {
1943 if (options->will->struct_version == 1 && options->will->payload.data)
1944 {
1945 m->c->will->payloadlen = options->will->payload.len;
1946 source = options->will->payload.data;
1947 }
1948 else
1949 {
1950 m->c->will->payloadlen = (int)strlen(options->will->message);
1951 source = (void*)options->will->message;
1952 }
1953 if ((m->c->will->payload = malloc(m->c->will->payloadlen)) == NULL)
1954 {
1955 free(m->c->will);
1956 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
1957 m->c->will = NULL;
1958 #endif
1959 rc.reasonCode = PAHO_MEMORY_ERROR;
1960 goto exit;
1961 }
1962 memcpy(m->c->will->payload, source, m->c->will->payloadlen);
1963 }
1964 else
1965 {
1966 m->c->will->payload = NULL;
1967 m->c->will->payloadlen = 0;
1968 }
1969 m->c->will->qos = options->will->qos;
1970 m->c->will->retained = options->will->retained;
1971 m->c->will->topic = MQTTStrdup(options->will->topicName);
1972 }
1973
1974 #if defined(OPENSSL) || defined(MBEDTLS)
1975 if (m->c->sslopts)
1976 {
1977 if (m->c->sslopts->trustStore)
1978 free((void*)m->c->sslopts->trustStore);
1979 if (m->c->sslopts->keyStore)
1980 free((void*)m->c->sslopts->keyStore);
1981 if (m->c->sslopts->privateKey)
1982 free((void*)m->c->sslopts->privateKey);
1983 if (m->c->sslopts->privateKeyPassword)
1984 free((void*)m->c->sslopts->privateKeyPassword);
1985 if (m->c->sslopts->enabledCipherSuites)
1986 free((void*)m->c->sslopts->enabledCipherSuites);
1987 if (m->c->sslopts->struct_version >= 2)
1988 {
1989 if (m->c->sslopts->CApath)
1990 free((void*)m->c->sslopts->CApath);
1991 }
1992 free(m->c->sslopts);
1993 m->c->sslopts = NULL;
1994 }
1995
1996 if (options->struct_version != 0 && options->ssl)
1997 {
1998 if ((m->c->sslopts = malloc(sizeof(MQTTClient_SSLOptions))) == NULL)
1999 {
2000 rc.reasonCode = PAHO_MEMORY_ERROR;
2001 goto exit;
2002 }
2003 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
2004 mem_ret = memset_s(m->c->sslopts, sizeof(MQTTClient_SSLOptions), '\0', sizeof(MQTTClient_SSLOptions));
2005 if (mem_ret != 0)
2006 {
2007 free(m->c->sslopts);
2008 m->c->sslopts = NULL;
2009 rc.reasonCode = PAHO_MEMORY_ERROR;
2010 goto exit;
2011 }
2012 #else
2013 memset(m->c->sslopts, '\0', sizeof(MQTTClient_SSLOptions));
2014 #endif
2015 m->c->sslopts->struct_version = options->ssl->struct_version;
2016 if (options->ssl->trustStore)
2017 m->c->sslopts->trustStore = MQTTStrdup(options->ssl->trustStore);
2018 if (options->ssl->keyStore)
2019 m->c->sslopts->keyStore = MQTTStrdup(options->ssl->keyStore);
2020 if (options->ssl->privateKey)
2021 m->c->sslopts->privateKey = MQTTStrdup(options->ssl->privateKey);
2022 if (options->ssl->privateKeyPassword)
2023 m->c->sslopts->privateKeyPassword = MQTTStrdup(options->ssl->privateKeyPassword);
2024 if (options->ssl->enabledCipherSuites)
2025 m->c->sslopts->enabledCipherSuites = MQTTStrdup(options->ssl->enabledCipherSuites);
2026 m->c->sslopts->enableServerCertAuth = options->ssl->enableServerCertAuth;
2027 if (m->c->sslopts->struct_version >= 1)
2028 m->c->sslopts->sslVersion = options->ssl->sslVersion;
2029 if (m->c->sslopts->struct_version >= 2)
2030 {
2031 m->c->sslopts->verify = options->ssl->verify;
2032 if (options->ssl->CApath)
2033 m->c->sslopts->CApath = MQTTStrdup(options->ssl->CApath);
2034 }
2035 if (m->c->sslopts->struct_version >= 3)
2036 {
2037 m->c->sslopts->ssl_error_cb = options->ssl->ssl_error_cb;
2038 m->c->sslopts->ssl_error_context = options->ssl->ssl_error_context;
2039 }
2040 if (m->c->sslopts->struct_version >= 4)
2041 {
2042 m->c->sslopts->ssl_psk_cb = options->ssl->ssl_psk_cb;
2043 m->c->sslopts->ssl_psk_context = options->ssl->ssl_psk_context;
2044 m->c->sslopts->disableDefaultTrustStore = options->ssl->disableDefaultTrustStore;
2045 }
2046 if (m->c->sslopts->struct_version >= 5)
2047 {
2048 m->c->sslopts->protos = options->ssl->protos;
2049 m->c->sslopts->protos_len = options->ssl->protos_len;
2050 }
2051 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
2052 #if defined(MBEDTLS_USE_CRT)
2053 if (options->ssl->los_trustStore != NULL)
2054 m->c->sslopts->los_trustStore = options->ssl->los_trustStore;
2055 if (options->ssl->los_keyStore != NULL)
2056 m->c->sslopts->los_keyStore = options->ssl->los_keyStore;
2057 if (options->ssl->los_privateKey != NULL)
2058 m->c->sslopts->los_privateKey = options->ssl->los_privateKey;
2059 #endif /* MBEDTLS_USE_CRT */
2060 #if defined(MBEDTLS_USE_PSK)
2061 if (options->ssl->psk != NULL)
2062 {
2063 m->c->sslopts->psk = options->ssl->psk;
2064 m->c->sslopts->psk_len = options->ssl->psk_len;
2065 }
2066 if (options->ssl->psk_id != NULL)
2067 {
2068 m->c->sslopts->psk_id = options->ssl->psk_id;
2069 m->c->sslopts->psk_id_len = options->ssl->psk_id_len;
2070 }
2071 #endif /* MBEDTLS_USE_PSK */
2072 #endif
2073 }
2074 #endif
2075
2076 if (m->c->username)
2077 {
2078 free((void*)m->c->username);
2079 m->c->username = NULL;
2080 }
2081 if (options->username)
2082 m->c->username = MQTTStrdup(options->username);
2083 if (m->c->password)
2084 {
2085 free((void*)m->c->password);
2086 m->c->password = NULL;
2087 }
2088 if (options->password)
2089 {
2090 m->c->password = MQTTStrdup(options->password);
2091 m->c->passwordlen = (int)strlen(options->password);
2092 }
2093 else if (options->struct_version >= 5 && options->binarypwd.data)
2094 {
2095 m->c->passwordlen = options->binarypwd.len;
2096 if ((m->c->password = malloc(m->c->passwordlen)) == NULL)
2097 {
2098 rc.reasonCode = PAHO_MEMORY_ERROR;
2099 goto exit;
2100 }
2101 memcpy((void*)m->c->password, options->binarypwd.data, m->c->passwordlen);
2102 }
2103
2104 if (options->struct_version >= 3)
2105 MQTTVersion = options->MQTTVersion;
2106 else
2107 MQTTVersion = MQTTVERSION_DEFAULT;
2108 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
2109 if (MQTTVersion == MQTTVERSION_DEFAULT)
2110 MQTTVersion = MQTTVERSION_3_1_1;
2111
2112 if (MQTTVersion != MQTTVERSION_3_1)
2113 {
2114 rc = MQTTClient_connectURIVersion(handle, options, serverURI, MQTTVersion, start, millisecsTimeout,
2115 connectProperties, willProperties);
2116 if (rc.reasonCode == MQTTCLIENT_SUCCESS)
2117 goto exit;
2118 }
2119 MQTTTime_sleep(50L); // wait for the disconnect to complete.
2120 start = MQTTTime_start_clock();
2121 rc = MQTTClient_connectURIVersion(handle, options, serverURI, MQTTVERSION_3_1, start, millisecsTimeout,
2122 connectProperties, willProperties);
2123 #else
2124 if (MQTTVersion == MQTTVERSION_DEFAULT)
2125 {
2126 rc = MQTTClient_connectURIVersion(handle, options, serverURI, 4, start, millisecsTimeout,
2127 connectProperties, willProperties);
2128 if (rc.reasonCode != MQTTCLIENT_SUCCESS)
2129 {
2130 rc = MQTTClient_connectURIVersion(handle, options, serverURI, 3, start, millisecsTimeout,
2131 connectProperties, willProperties);
2132 }
2133 }
2134 else
2135 rc = MQTTClient_connectURIVersion(handle, options, serverURI, MQTTVersion, start, millisecsTimeout,
2136 connectProperties, willProperties);
2137 #endif
2138 exit:
2139 FUNC_EXIT_RC(rc.reasonCode);
2140 return rc;
2141 }
2142
2143 MQTTResponse MQTTClient_connectAll(MQTTClient handle, MQTTClient_connectOptions* options,
2144 MQTTProperties* connectProperties, MQTTProperties* willProperties);
2145
MQTTClient_connect(MQTTClient handle,MQTTClient_connectOptions * options)2146 int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions* options)
2147 {
2148 MQTTClients* m = handle;
2149 MQTTResponse response;
2150
2151 if (m != NULL && m->c != NULL && m->c->MQTTVersion >= MQTTVERSION_5)
2152 return MQTTCLIENT_WRONG_MQTT_VERSION;
2153
2154 response = MQTTClient_connectAll(handle, options, NULL, NULL);
2155
2156 return response.reasonCode;
2157 }
2158
2159
MQTTClient_connect5(MQTTClient handle,MQTTClient_connectOptions * options,MQTTProperties * connectProperties,MQTTProperties * willProperties)2160 MQTTResponse MQTTClient_connect5(MQTTClient handle, MQTTClient_connectOptions* options,
2161 MQTTProperties* connectProperties, MQTTProperties* willProperties)
2162 {
2163 MQTTClients* m = handle;
2164 MQTTResponse response = MQTTResponse_initializer;
2165
2166 if (m != NULL && m->c != NULL && m->c->MQTTVersion < MQTTVERSION_5)
2167 {
2168 response.reasonCode = MQTTCLIENT_WRONG_MQTT_VERSION;
2169 return response;
2170 }
2171
2172 return MQTTClient_connectAll(handle, options, connectProperties, willProperties);
2173 }
2174
2175
MQTTClient_connectAll(MQTTClient handle,MQTTClient_connectOptions * options,MQTTProperties * connectProperties,MQTTProperties * willProperties)2176 MQTTResponse MQTTClient_connectAll(MQTTClient handle, MQTTClient_connectOptions* options,
2177 MQTTProperties* connectProperties, MQTTProperties* willProperties)
2178 {
2179 MQTTClients* m = handle;
2180 MQTTResponse rc = MQTTResponse_initializer;
2181
2182 FUNC_ENTRY;
2183 Thread_lock_mutex(connect_mutex);
2184 Thread_lock_mutex(mqttclient_mutex);
2185
2186 rc.reasonCode = SOCKET_ERROR;
2187 if (!library_initialized)
2188 {
2189 rc.reasonCode = MQTTCLIENT_FAILURE;
2190 goto exit;
2191 }
2192
2193 if (options == NULL || m == NULL || m->c == NULL)
2194 {
2195 rc.reasonCode = MQTTCLIENT_NULL_PARAMETER;
2196 goto exit;
2197 }
2198
2199 if (strncmp(options->struct_id, "MQTC", 4) != 0 || options->struct_version < 0 || options->struct_version > 8)
2200 {
2201 rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
2202 goto exit;
2203 }
2204
2205 #if defined(OPENSSL) || defined(MBEDTLS)
2206 if (m->ssl && options->ssl == NULL)
2207 {
2208 rc.reasonCode = MQTTCLIENT_NULL_PARAMETER;
2209 goto exit;
2210 }
2211 #endif
2212
2213 if (options->will) /* check validity of will options structure */
2214 {
2215 if (strncmp(options->will->struct_id, "MQTW", 4) != 0 || (options->will->struct_version != 0 && options->will->struct_version != 1))
2216 {
2217 rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
2218 goto exit;
2219 }
2220 if (options->will->qos < 0 || options->will->qos > 2)
2221 {
2222 rc.reasonCode = MQTTCLIENT_BAD_QOS;
2223 goto exit;
2224 }
2225 if (options->will->topicName == NULL)
2226 {
2227 rc.reasonCode = MQTTCLIENT_NULL_PARAMETER;
2228 goto exit;
2229 } else if (strlen(options->will->topicName) == 0)
2230 {
2231 rc.reasonCode = MQTTCLIENT_0_LEN_WILL_TOPIC;
2232 goto exit;
2233 }
2234 }
2235
2236
2237 #if defined(OPENSSL) || defined(MBEDTLS)
2238 if (options->struct_version != 0 && options->ssl) /* check validity of SSL options structure */
2239 {
2240 if (strncmp(options->ssl->struct_id, "MQTS", 4) != 0 || options->ssl->struct_version < 0 || options->ssl->struct_version > 5)
2241 {
2242 rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
2243 goto exit;
2244 }
2245 }
2246 #endif
2247
2248 if ((options->username && !UTF8_validateString(options->username)) ||
2249 (options->password && !UTF8_validateString(options->password)))
2250 {
2251 rc.reasonCode = MQTTCLIENT_BAD_UTF8_STRING;
2252 goto exit;
2253 }
2254
2255 if (options->MQTTVersion != MQTTVERSION_DEFAULT &&
2256 (options->MQTTVersion < MQTTVERSION_3_1 || options->MQTTVersion > MQTTVERSION_5))
2257 {
2258 rc.reasonCode = MQTTCLIENT_BAD_MQTT_VERSION;
2259 goto exit;
2260 }
2261
2262 if (options->MQTTVersion >= MQTTVERSION_5)
2263 {
2264 if (options->cleansession != 0)
2265 {
2266 rc.reasonCode = MQTTCLIENT_BAD_MQTT_OPTION;
2267 goto exit;
2268 }
2269 }
2270 else if (options->cleanstart != 0)
2271 {
2272 rc.reasonCode = MQTTCLIENT_BAD_MQTT_OPTION;
2273 goto exit;
2274 }
2275
2276 if (options->struct_version < 2 || options->serverURIcount == 0)
2277 {
2278 if ( !m )
2279 {
2280 rc.reasonCode = MQTTCLIENT_NULL_PARAMETER;
2281 goto exit;
2282 }
2283 rc = MQTTClient_connectURI(handle, options, m->serverURI, connectProperties, willProperties);
2284 }
2285 else
2286 {
2287 int i;
2288
2289 for (i = 0; i < options->serverURIcount; ++i)
2290 {
2291 char* serverURI = options->serverURIs[i];
2292
2293 if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0)
2294 serverURI += strlen(URI_TCP);
2295 else if (strncmp(URI_MQTT, serverURI, strlen(URI_MQTT)) == 0)
2296 serverURI += strlen(URI_TCP);
2297 else if (strncmp(URI_WS, serverURI, strlen(URI_WS)) == 0)
2298 {
2299 serverURI += strlen(URI_WS);
2300 m->websocket = 1;
2301 }
2302 #if defined(OPENSSL) || defined(MBEDTLS)
2303 else if (strncmp(URI_SSL, serverURI, strlen(URI_SSL)) == 0)
2304 {
2305 serverURI += strlen(URI_SSL);
2306 m->ssl = 1;
2307 }
2308 else if (strncmp(URI_MQTTS, serverURI, strlen(URI_MQTTS)) == 0)
2309 {
2310 serverURI += strlen(URI_MQTTS);
2311 m->ssl = 1;
2312 }
2313 else if (strncmp(URI_WSS, serverURI, strlen(URI_WSS)) == 0)
2314 {
2315 serverURI += strlen(URI_WSS);
2316 m->ssl = 1;
2317 m->websocket = 1;
2318 }
2319 #endif
2320 rc = MQTTClient_connectURI(handle, options, serverURI, connectProperties, willProperties);
2321 if (rc.reasonCode == MQTTREASONCODE_SUCCESS)
2322 break;
2323 }
2324 }
2325 if (rc.reasonCode == MQTTREASONCODE_SUCCESS)
2326 {
2327 if (rc.properties && MQTTProperties_hasProperty(rc.properties, MQTTPROPERTY_CODE_RECEIVE_MAXIMUM))
2328 {
2329 int recv_max = MQTTProperties_getNumericValue(rc.properties, MQTTPROPERTY_CODE_RECEIVE_MAXIMUM);
2330 if (m->c->maxInflightMessages > recv_max)
2331 m->c->maxInflightMessages = recv_max;
2332 }
2333 }
2334
2335 exit:
2336 if (m && m->c && m->c->will)
2337 {
2338 if (m->c->will->payload)
2339 free(m->c->will->payload);
2340 if (m->c->will->topic)
2341 free(m->c->will->topic);
2342 free(m->c->will);
2343 m->c->will = NULL;
2344 }
2345 Thread_unlock_mutex(mqttclient_mutex);
2346 Thread_unlock_mutex(connect_mutex);
2347 FUNC_EXIT_RC(rc.reasonCode);
2348 return rc;
2349 }
2350
2351
2352 /**
2353 * mqttclient_mutex must be locked when you call this function, if multi threaded
2354 */
MQTTClient_disconnect1(MQTTClient handle,int timeout,int call_connection_lost,int stop,enum MQTTReasonCodes reason,MQTTProperties * props)2355 static int MQTTClient_disconnect1(MQTTClient handle, int timeout, int call_connection_lost, int stop,
2356 enum MQTTReasonCodes reason, MQTTProperties* props)
2357 {
2358 MQTTClients* m = handle;
2359 START_TIME_TYPE start;
2360 int rc = MQTTCLIENT_SUCCESS;
2361 int was_connected = 0;
2362 struct conlost_sync_data sync = {
2363 NULL, m
2364 };
2365
2366 FUNC_ENTRY;
2367 if (m == NULL || m->c == NULL)
2368 {
2369 rc = MQTTCLIENT_FAILURE;
2370 goto exit;
2371 }
2372 was_connected = m->c->connected; /* should be 1 */
2373 if (m->c->connected != 0)
2374 {
2375 start = MQTTTime_start_clock();
2376 m->c->connect_state = DISCONNECTING; /* indicate disconnecting */
2377 while (m->c->inboundMsgs->count > 0 || m->c->outboundMsgs->count > 0)
2378 { /* wait for all inflight message flows to finish, up to timeout */
2379 if (MQTTTime_elapsed(start) >= (ELAPSED_TIME_TYPE)timeout)
2380 break;
2381 Thread_unlock_mutex(mqttclient_mutex);
2382 MQTTClient_yield();
2383 Thread_lock_mutex(mqttclient_mutex);
2384 }
2385 }
2386
2387 MQTTClient_closeSession(m->c, reason, props);
2388
2389 exit:
2390 if (stop)
2391 MQTTClient_stop();
2392 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
2393 if (call_connection_lost && was_connected && m->cl != NULL)
2394 #else
2395 if (call_connection_lost && m->cl && was_connected)
2396 #endif
2397 {
2398 sync.sem = Thread_create_sem(&rc);
2399 Log(TRACE_MIN, -1, "Calling connectionLost for client %s", m->c->clientID);
2400 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
2401 connectionLost_call(&sync);
2402 #else
2403 Thread_start(connectionLost_call, &sync);
2404 #endif
2405 Thread_wait_sem(sync.sem, 5000);
2406 Thread_destroy_sem(sync.sem);
2407 }
2408 FUNC_EXIT_RC(rc);
2409 return rc;
2410 }
2411
2412
2413 /**
2414 * mqttclient_mutex must be locked when you call this function, if multi threaded
2415 */
MQTTClient_disconnect_internal(MQTTClient handle,int timeout)2416 static int MQTTClient_disconnect_internal(MQTTClient handle, int timeout)
2417 {
2418 return MQTTClient_disconnect1(handle, timeout, 1, 1, MQTTREASONCODE_SUCCESS, NULL);
2419 }
2420
2421
2422 /**
2423 * mqttclient_mutex must be locked when you call this function, if multi threaded
2424 */
MQTTProtocol_closeSession(Clients * c,int sendwill)2425 void MQTTProtocol_closeSession(Clients* c, int sendwill)
2426 {
2427 MQTTClient_disconnect_internal((MQTTClient)c->context, 0);
2428 }
2429
2430
MQTTClient_disconnect(MQTTClient handle,int timeout)2431 int MQTTClient_disconnect(MQTTClient handle, int timeout)
2432 {
2433 int rc = 0;
2434
2435 Thread_lock_mutex(mqttclient_mutex);
2436 rc = MQTTClient_disconnect1(handle, timeout, 0, 1, MQTTREASONCODE_SUCCESS, NULL);
2437 Thread_unlock_mutex(mqttclient_mutex);
2438 return rc;
2439 }
2440
2441
MQTTClient_disconnect5(MQTTClient handle,int timeout,enum MQTTReasonCodes reason,MQTTProperties * props)2442 int MQTTClient_disconnect5(MQTTClient handle, int timeout, enum MQTTReasonCodes reason, MQTTProperties* props)
2443 {
2444 int rc = 0;
2445
2446 Thread_lock_mutex(mqttclient_mutex);
2447 rc = MQTTClient_disconnect1(handle, timeout, 0, 1, reason, props);
2448 Thread_unlock_mutex(mqttclient_mutex);
2449 return rc;
2450 }
2451
2452
MQTTClient_isConnected(MQTTClient handle)2453 int MQTTClient_isConnected(MQTTClient handle)
2454 {
2455 MQTTClients* m = handle;
2456 int rc = 0;
2457
2458 FUNC_ENTRY;
2459 Thread_lock_mutex(mqttclient_mutex);
2460 if (m && m->c)
2461 rc = m->c->connected;
2462 Thread_unlock_mutex(mqttclient_mutex);
2463 FUNC_EXIT_RC(rc);
2464 return rc;
2465 }
2466
2467 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
MQTTClient_getConnectState(MQTTClient handle)2468 int MQTTClient_getConnectState(MQTTClient handle)
2469 {
2470 MQTTClients* m = handle;
2471 int rc = 0;
2472
2473 FUNC_ENTRY;
2474 Thread_lock_mutex(mqttclient_mutex);
2475 if (m != NULL && m->c != NULL)
2476 rc = m->c->connect_state;
2477 Thread_unlock_mutex(mqttclient_mutex);
2478 FUNC_EXIT_RC(rc);
2479 return rc;
2480 }
2481 #endif
2482
MQTTClient_subscribeMany5(MQTTClient handle,int count,char * const * topic,int * qos,MQTTSubscribe_options * opts,MQTTProperties * props)2483 MQTTResponse MQTTClient_subscribeMany5(MQTTClient handle, int count, char* const* topic,
2484 int* qos, MQTTSubscribe_options* opts, MQTTProperties* props)
2485 {
2486 MQTTClients* m = handle;
2487 List* topics = NULL;
2488 List* qoss = NULL;
2489 int i = 0;
2490 int rc = MQTTCLIENT_FAILURE;
2491 MQTTResponse resp = MQTTResponse_initializer;
2492 int msgid = 0;
2493
2494 FUNC_ENTRY;
2495 Thread_lock_mutex(subscribe_mutex);
2496 Thread_lock_mutex(mqttclient_mutex);
2497
2498 resp.reasonCode = MQTTCLIENT_FAILURE;
2499 if (m == NULL || m->c == NULL)
2500 {
2501 rc = MQTTCLIENT_FAILURE;
2502 goto exit;
2503 }
2504 if (m->c->connected == 0)
2505 {
2506 rc = MQTTCLIENT_DISCONNECTED;
2507 goto exit;
2508 }
2509 for (i = 0; i < count; i++)
2510 {
2511 if (!UTF8_validateString(topic[i]))
2512 {
2513 rc = MQTTCLIENT_BAD_UTF8_STRING;
2514 goto exit;
2515 }
2516
2517 if (qos[i] < 0 || qos[i] > 2)
2518 {
2519 rc = MQTTCLIENT_BAD_QOS;
2520 goto exit;
2521 }
2522 }
2523 if ((msgid = MQTTProtocol_assignMsgId(m->c)) == 0)
2524 {
2525 rc = MQTTCLIENT_MAX_MESSAGES_INFLIGHT;
2526 goto exit;
2527 }
2528
2529 topics = ListInitialize();
2530 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
2531 if (topics == NULL)
2532 {
2533 rc = PAHO_MEMORY_ERROR;
2534 goto exit;
2535 }
2536 #endif
2537 qoss = ListInitialize();
2538 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
2539 if (qoss == NULL)
2540 {
2541 ListFreeNoContent(topics);
2542 rc = PAHO_MEMORY_ERROR;
2543 goto exit;
2544 }
2545 #endif
2546
2547 for (i = 0; i < count; i++)
2548 {
2549 ListAppend(topics, topic[i], strlen(topic[i]));
2550 ListAppend(qoss, &qos[i], sizeof(int));
2551 }
2552
2553 rc = MQTTProtocol_subscribe(m->c, topics, qoss, msgid, opts, props);
2554 ListFreeNoContent(topics);
2555 ListFreeNoContent(qoss);
2556
2557 if (rc == TCPSOCKET_COMPLETE)
2558 {
2559 MQTTPacket* pack = NULL;
2560
2561 Thread_unlock_mutex(mqttclient_mutex);
2562 pack = MQTTClient_waitfor(handle, SUBACK, &rc, m->commandTimeout);
2563 Thread_lock_mutex(mqttclient_mutex);
2564 if (pack != NULL)
2565 {
2566 Suback* sub = (Suback*)pack;
2567
2568 if (m->c->MQTTVersion == MQTTVERSION_5)
2569 {
2570 if (sub->properties.count > 0)
2571 {
2572 if ((resp.properties = malloc(sizeof(MQTTProperties))) == NULL)
2573 {
2574 rc = PAHO_MEMORY_ERROR;
2575 goto exit;
2576 }
2577 *resp.properties = MQTTProperties_copy(&sub->properties);
2578 }
2579 resp.reasonCodeCount = sub->qoss->count;
2580 resp.reasonCode = *(int*)sub->qoss->first->content;
2581 if (sub->qoss->count > 1)
2582 {
2583 ListElement* current = NULL;
2584 int rc_count = 0;
2585
2586 if ((resp.reasonCodes = malloc(sizeof(enum MQTTReasonCodes) * (sub->qoss->count))) == NULL)
2587 {
2588 rc = PAHO_MEMORY_ERROR;
2589 goto exit;
2590 }
2591 while (ListNextElement(sub->qoss, ¤t))
2592 (resp.reasonCodes)[rc_count++] = *(enum MQTTReasonCodes*)(current->content);
2593 }
2594 }
2595 else
2596 {
2597 ListElement *current = NULL;
2598
2599 /* if the returned count is greater than requested, it's an error*/
2600 if (sub->qoss->count > count)
2601 rc = MQTTCLIENT_FAILURE;
2602 else
2603 {
2604 i = 0;
2605 while (ListNextElement(sub->qoss, ¤t))
2606 {
2607 int *reqqos = (int*) (current->content);
2608 qos[i++] = *reqqos;
2609 }
2610 }
2611 resp.reasonCode = rc;
2612 }
2613 rc = MQTTProtocol_handleSubacks(pack, m->c->net.socket);
2614 m->pack = NULL;
2615 }
2616 else
2617 rc = SOCKET_ERROR;
2618 }
2619
2620 if (rc == SOCKET_ERROR)
2621 MQTTClient_disconnect_internal(handle, 0);
2622 else if (rc == TCPSOCKET_COMPLETE)
2623 rc = MQTTCLIENT_SUCCESS;
2624
2625 exit:
2626 if (rc < 0)
2627 resp.reasonCode = rc;
2628 Thread_unlock_mutex(mqttclient_mutex);
2629 Thread_unlock_mutex(subscribe_mutex);
2630 FUNC_EXIT_RC(resp.reasonCode);
2631 return resp;
2632 }
2633
2634
2635
MQTTClient_subscribeMany(MQTTClient handle,int count,char * const * topic,int * qos)2636 int MQTTClient_subscribeMany(MQTTClient handle, int count, char* const* topic, int* qos)
2637 {
2638 MQTTClients* m = handle;
2639 MQTTResponse response = MQTTResponse_initializer;
2640
2641 if (m != NULL && m->c != NULL && m->c->MQTTVersion >= MQTTVERSION_5)
2642 response.reasonCode = MQTTCLIENT_WRONG_MQTT_VERSION;
2643 else
2644 response = MQTTClient_subscribeMany5(handle, count, topic, qos, NULL, NULL);
2645 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
2646 if (*qos == MQTT_BAD_SUBSCRIBE) /* addition for MQTT 3.1.1 - error code from subscribe */
2647 response.reasonCode = MQTT_BAD_SUBSCRIBE;
2648 #endif
2649 return response.reasonCode;
2650 }
2651
2652
2653
MQTTClient_subscribe5(MQTTClient handle,const char * topic,int qos,MQTTSubscribe_options * opts,MQTTProperties * props)2654 MQTTResponse MQTTClient_subscribe5(MQTTClient handle, const char* topic, int qos,
2655 MQTTSubscribe_options* opts, MQTTProperties* props)
2656 {
2657 MQTTResponse rc;
2658
2659 FUNC_ENTRY;
2660 rc = MQTTClient_subscribeMany5(handle, 1, (char * const *)(&topic), &qos, opts, props);
2661 if (qos == MQTT_BAD_SUBSCRIBE) /* addition for MQTT 3.1.1 - error code from subscribe */
2662 rc.reasonCode = MQTT_BAD_SUBSCRIBE;
2663 FUNC_EXIT_RC(rc.reasonCode);
2664 return rc;
2665 }
2666
2667
MQTTClient_subscribe(MQTTClient handle,const char * topic,int qos)2668 int MQTTClient_subscribe(MQTTClient handle, const char* topic, int qos)
2669 {
2670 MQTTClients* m = handle;
2671 MQTTResponse response = MQTTResponse_initializer;
2672
2673 if (m->c->MQTTVersion >= MQTTVERSION_5)
2674 response.reasonCode = MQTTCLIENT_WRONG_MQTT_VERSION;
2675 else
2676 response = MQTTClient_subscribe5(handle, topic, qos, NULL, NULL);
2677
2678 return response.reasonCode;
2679 }
2680
2681
MQTTClient_unsubscribeMany5(MQTTClient handle,int count,char * const * topic,MQTTProperties * props)2682 MQTTResponse MQTTClient_unsubscribeMany5(MQTTClient handle, int count, char* const* topic, MQTTProperties* props)
2683 {
2684 MQTTClients* m = handle;
2685 List* topics = NULL;
2686 int i = 0;
2687 int rc = SOCKET_ERROR;
2688 MQTTResponse resp = MQTTResponse_initializer;
2689 int msgid = 0;
2690
2691 FUNC_ENTRY;
2692 Thread_lock_mutex(unsubscribe_mutex);
2693 Thread_lock_mutex(mqttclient_mutex);
2694
2695 resp.reasonCode = MQTTCLIENT_FAILURE;
2696 if (m == NULL || m->c == NULL)
2697 {
2698 rc = MQTTCLIENT_FAILURE;
2699 goto exit;
2700 }
2701 if (m->c->connected == 0)
2702 {
2703 rc = MQTTCLIENT_DISCONNECTED;
2704 goto exit;
2705 }
2706 for (i = 0; i < count; i++)
2707 {
2708 if (!UTF8_validateString(topic[i]))
2709 {
2710 rc = MQTTCLIENT_BAD_UTF8_STRING;
2711 goto exit;
2712 }
2713 }
2714 if ((msgid = MQTTProtocol_assignMsgId(m->c)) == 0)
2715 {
2716 rc = MQTTCLIENT_MAX_MESSAGES_INFLIGHT;
2717 goto exit;
2718 }
2719
2720 topics = ListInitialize();
2721 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
2722 if (topics == NULL)
2723 {
2724 rc = PAHO_MEMORY_ERROR;
2725 goto exit;
2726 }
2727 #endif
2728 for (i = 0; i < count; i++)
2729 ListAppend(topics, topic[i], strlen(topic[i]));
2730 rc = MQTTProtocol_unsubscribe(m->c, topics, msgid, props);
2731 ListFreeNoContent(topics);
2732
2733 if (rc == TCPSOCKET_COMPLETE)
2734 {
2735 MQTTPacket* pack = NULL;
2736
2737 Thread_unlock_mutex(mqttclient_mutex);
2738 pack = MQTTClient_waitfor(handle, UNSUBACK, &rc, m->commandTimeout);
2739 Thread_lock_mutex(mqttclient_mutex);
2740 if (pack != NULL)
2741 {
2742 Unsuback* unsub = (Unsuback*)pack;
2743
2744 if (m->c->MQTTVersion == MQTTVERSION_5)
2745 {
2746 if (unsub->properties.count > 0)
2747 {
2748 if ((resp.properties = malloc(sizeof(MQTTProperties))) == NULL)
2749 {
2750 rc = PAHO_MEMORY_ERROR;
2751 goto exit;
2752 }
2753 *resp.properties = MQTTProperties_copy(&unsub->properties);
2754 }
2755 resp.reasonCodeCount = unsub->reasonCodes->count;
2756 resp.reasonCode = *(int*)unsub->reasonCodes->first->content;
2757 if (unsub->reasonCodes->count > 1)
2758 {
2759 ListElement* current = NULL;
2760 int rc_count = 0;
2761
2762 if ((resp.reasonCodes = malloc(sizeof(enum MQTTReasonCodes) * (unsub->reasonCodes->count))) == NULL)
2763 {
2764 rc = PAHO_MEMORY_ERROR;
2765 goto exit;
2766 }
2767 while (ListNextElement(unsub->reasonCodes, ¤t))
2768 (resp.reasonCodes)[rc_count++] = *(enum MQTTReasonCodes*)(current->content);
2769 }
2770 }
2771 else
2772 resp.reasonCode = rc;
2773 rc = MQTTProtocol_handleUnsubacks(pack, m->c->net.socket);
2774 m->pack = NULL;
2775 }
2776 else
2777 rc = SOCKET_ERROR;
2778 }
2779
2780 if (rc == SOCKET_ERROR)
2781 MQTTClient_disconnect_internal(handle, 0);
2782
2783 exit:
2784 if (rc < 0)
2785 resp.reasonCode = rc;
2786 Thread_unlock_mutex(mqttclient_mutex);
2787 Thread_unlock_mutex(unsubscribe_mutex);
2788 FUNC_EXIT_RC(resp.reasonCode);
2789 return resp;
2790 }
2791
2792
MQTTClient_unsubscribeMany(MQTTClient handle,int count,char * const * topic)2793 int MQTTClient_unsubscribeMany(MQTTClient handle, int count, char* const* topic)
2794 {
2795 MQTTClients* m = handle;
2796 MQTTResponse response = MQTTResponse_initializer;
2797
2798 if (m != NULL && m->c != NULL && m->c->MQTTVersion >= MQTTVERSION_5)
2799 response.reasonCode = MQTTCLIENT_WRONG_MQTT_VERSION;
2800 else
2801 response = MQTTClient_unsubscribeMany5(handle, count, topic, NULL);
2802
2803 return response.reasonCode;
2804 }
2805
2806
MQTTClient_unsubscribe5(MQTTClient handle,const char * topic,MQTTProperties * props)2807 MQTTResponse MQTTClient_unsubscribe5(MQTTClient handle, const char* topic, MQTTProperties* props)
2808 {
2809 MQTTResponse rc;
2810
2811 rc = MQTTClient_unsubscribeMany5(handle, 1, (char * const *)(&topic), props);
2812 return rc;
2813 }
2814
2815
MQTTClient_unsubscribe(MQTTClient handle,const char * topic)2816 int MQTTClient_unsubscribe(MQTTClient handle, const char* topic)
2817 {
2818 MQTTResponse response = MQTTClient_unsubscribe5(handle, topic, NULL);
2819
2820 return response.reasonCode;
2821 }
2822
2823
MQTTClient_publish5(MQTTClient handle,const char * topicName,int payloadlen,const void * payload,int qos,int retained,MQTTProperties * properties,MQTTClient_deliveryToken * deliveryToken)2824 MQTTResponse MQTTClient_publish5(MQTTClient handle, const char* topicName, int payloadlen, const void* payload,
2825 int qos, int retained, MQTTProperties* properties, MQTTClient_deliveryToken* deliveryToken)
2826 {
2827 int rc = MQTTCLIENT_SUCCESS;
2828 MQTTClients* m = handle;
2829 Messages* msg = NULL;
2830 Publish* p = NULL;
2831 int blocked = 0;
2832 int msgid = 0;
2833 MQTTResponse resp = MQTTResponse_initializer;
2834
2835 FUNC_ENTRY;
2836 Thread_lock_mutex(mqttclient_mutex);
2837
2838 if (m == NULL || m->c == NULL)
2839 rc = MQTTCLIENT_FAILURE;
2840 else if (m->c->connected == 0)
2841 rc = MQTTCLIENT_DISCONNECTED;
2842 else if (!UTF8_validateString(topicName))
2843 rc = MQTTCLIENT_BAD_UTF8_STRING;
2844
2845 if (rc != MQTTCLIENT_SUCCESS)
2846 goto exit;
2847
2848 /* If outbound queue is full, block until it is not */
2849 while (m->c->outboundMsgs->count >= m->c->maxInflightMessages ||
2850 Socket_noPendingWrites(m->c->net.socket) == 0) /* wait until the socket is free of large packets being written */
2851 {
2852 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
2853 rc = MQTTCLIENT_FAILURE;
2854 goto exit;
2855 #else
2856 if (blocked == 0)
2857 {
2858 blocked = 1;
2859 Log(TRACE_MIN, -1, "Blocking publish on queue full for client %s", m->c->clientID);
2860 }
2861 Thread_unlock_mutex(mqttclient_mutex);
2862 MQTTClient_yield();
2863 Thread_lock_mutex(mqttclient_mutex);
2864 if (m->c->connected == 0)
2865 {
2866 rc = MQTTCLIENT_FAILURE;
2867 goto exit;
2868 }
2869 #endif
2870 }
2871 if (blocked == 1)
2872 Log(TRACE_MIN, -1, "Resuming publish now queue not full for client %s", m->c->clientID);
2873 if (qos > 0 && (msgid = MQTTProtocol_assignMsgId(m->c)) == 0)
2874 { /* this should never happen as we've waited for spaces in the queue */
2875 rc = MQTTCLIENT_MAX_MESSAGES_INFLIGHT;
2876 goto exit;
2877 }
2878
2879 if ((p = malloc(sizeof(Publish))) == NULL)
2880 {
2881 rc = PAHO_MEMORY_ERROR;
2882 goto exit_and_free;
2883 }
2884 memset(p->mask, '\0', sizeof(p->mask));
2885 p->payload = NULL;
2886 p->payloadlen = payloadlen;
2887 if (payloadlen > 0)
2888 {
2889 if ((p->payload = malloc(payloadlen)) == NULL)
2890 {
2891 rc = PAHO_MEMORY_ERROR;
2892 goto exit_and_free;
2893 }
2894 memcpy(p->payload, payload, payloadlen);
2895 }
2896 if ((p->topic = MQTTStrdup(topicName)) == NULL)
2897 {
2898 rc = PAHO_MEMORY_ERROR;
2899 goto exit_and_free;
2900 }
2901 p->msgId = msgid;
2902 p->MQTTVersion = m->c->MQTTVersion;
2903 if (m->c->MQTTVersion >= MQTTVERSION_5)
2904 {
2905 if (properties)
2906 p->properties = *properties;
2907 else
2908 {
2909 MQTTProperties props = MQTTProperties_initializer;
2910 p->properties = props;
2911 }
2912 }
2913
2914 rc = MQTTProtocol_startPublish(m->c, p, qos, retained, &msg);
2915
2916 /* If the packet was partially written to the socket, wait for it to complete.
2917 * However, if the client is disconnected during this time and qos is not 0, still return success, as
2918 * the packet has already been written to persistence and assigned a message id so will
2919 * be sent when the client next connects.
2920 */
2921 if (rc == TCPSOCKET_INTERRUPTED)
2922 {
2923 while (m->c->connected == 1)
2924 {
2925 pending_writes* writing = NULL;
2926
2927 Thread_lock_mutex(socket_mutex);
2928 writing = SocketBuffer_getWrite(m->c->net.socket);
2929 Thread_unlock_mutex(socket_mutex);
2930
2931 if (writing == NULL)
2932 break;
2933
2934 Thread_unlock_mutex(mqttclient_mutex);
2935 MQTTClient_yield();
2936 Thread_lock_mutex(mqttclient_mutex);
2937 }
2938 rc = (qos > 0 || m->c->connected == 1) ? MQTTCLIENT_SUCCESS : MQTTCLIENT_FAILURE;
2939 }
2940 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
2941 else if (rc == PAHO_MEMORY_ERROR)
2942 {
2943 goto exit_and_free;
2944 }
2945 #endif
2946
2947 if (deliveryToken && qos > 0)
2948 *deliveryToken = msg->msgid;
2949
2950 exit_and_free:
2951 if (p)
2952 {
2953 if (p->topic)
2954 free(p->topic);
2955 if (p->payload)
2956 free(p->payload);
2957 free(p);
2958 }
2959
2960 if (rc == SOCKET_ERROR)
2961 {
2962 MQTTClient_disconnect_internal(handle, 0);
2963 /* Return success for qos > 0 as the send will be retried automatically */
2964 rc = (qos > 0) ? MQTTCLIENT_SUCCESS : MQTTCLIENT_FAILURE;
2965 }
2966
2967 exit:
2968 Thread_unlock_mutex(mqttclient_mutex);
2969 resp.reasonCode = rc;
2970 FUNC_EXIT_RC(resp.reasonCode);
2971 return resp;
2972 }
2973
2974
MQTTClient_publish(MQTTClient handle,const char * topicName,int payloadlen,const void * payload,int qos,int retained,MQTTClient_deliveryToken * deliveryToken)2975 int MQTTClient_publish(MQTTClient handle, const char* topicName, int payloadlen, const void* payload,
2976 int qos, int retained, MQTTClient_deliveryToken* deliveryToken)
2977 {
2978 MQTTClients* m = handle;
2979 MQTTResponse rc = MQTTResponse_initializer;
2980
2981 if (m->c->MQTTVersion >= MQTTVERSION_5)
2982 rc.reasonCode = MQTTCLIENT_WRONG_MQTT_VERSION;
2983 else
2984 rc = MQTTClient_publish5(handle, topicName, payloadlen, payload, qos, retained, NULL, deliveryToken);
2985 return rc.reasonCode;
2986 }
2987
2988
MQTTClient_publishMessage5(MQTTClient handle,const char * topicName,MQTTClient_message * message,MQTTClient_deliveryToken * deliveryToken)2989 MQTTResponse MQTTClient_publishMessage5(MQTTClient handle, const char* topicName, MQTTClient_message* message,
2990 MQTTClient_deliveryToken* deliveryToken)
2991 {
2992 MQTTResponse rc = MQTTResponse_initializer;
2993 MQTTProperties* props = NULL;
2994
2995 FUNC_ENTRY;
2996 if (message == NULL)
2997 {
2998 rc.reasonCode = MQTTCLIENT_NULL_PARAMETER;
2999 goto exit;
3000 }
3001
3002 if (strncmp(message->struct_id, "MQTM", 4) != 0 ||
3003 (message->struct_version != 0 && message->struct_version != 1))
3004 {
3005 rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
3006 goto exit;
3007 }
3008
3009 if (message->struct_version >= 1)
3010 props = &message->properties;
3011
3012 rc = MQTTClient_publish5(handle, topicName, message->payloadlen, message->payload,
3013 message->qos, message->retained, props, deliveryToken);
3014 exit:
3015 FUNC_EXIT_RC(rc.reasonCode);
3016 return rc;
3017 }
3018
3019
MQTTClient_publishMessage(MQTTClient handle,const char * topicName,MQTTClient_message * message,MQTTClient_deliveryToken * deliveryToken)3020 int MQTTClient_publishMessage(MQTTClient handle, const char* topicName, MQTTClient_message* message,
3021 MQTTClient_deliveryToken* deliveryToken)
3022 {
3023 MQTTClients* m = handle;
3024 MQTTResponse rc = MQTTResponse_initializer;
3025
3026 if (strncmp(message->struct_id, "MQTM", 4) != 0 ||
3027 (message->struct_version != 0 && message->struct_version != 1))
3028 rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
3029 else if (m != NULL && m->c != NULL && m->c->MQTTVersion >= MQTTVERSION_5)
3030 rc.reasonCode = MQTTCLIENT_WRONG_MQTT_VERSION;
3031 else
3032 rc = MQTTClient_publishMessage5(handle, topicName, message, deliveryToken);
3033 return rc.reasonCode;
3034 }
3035
3036
MQTTClient_retry(void)3037 static void MQTTClient_retry(void)
3038 {
3039 static START_TIME_TYPE last = START_TIME_ZERO;
3040 START_TIME_TYPE now;
3041
3042 FUNC_ENTRY;
3043 now = MQTTTime_now();
3044 if (MQTTTime_difftime(now, last) >= (DIFF_TIME_TYPE)(retryLoopIntervalms))
3045 {
3046 last = MQTTTime_now();
3047 MQTTProtocol_keepalive(now);
3048 MQTTProtocol_retry(now, 1, 0);
3049 }
3050 else
3051 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
3052 MQTTProtocol_retry(now, 1, 0);
3053 #else
3054 MQTTProtocol_retry(now, 0, 0);
3055 #endif
3056 FUNC_EXIT;
3057 }
3058
3059
MQTTClient_cycle(SOCKET * sock,ELAPSED_TIME_TYPE timeout,int * rc)3060 static MQTTPacket* MQTTClient_cycle(SOCKET* sock, ELAPSED_TIME_TYPE timeout, int* rc)
3061 {
3062 static Ack ack;
3063 MQTTPacket* pack = NULL;
3064 int rc1 = 0;
3065 START_TIME_TYPE start;
3066
3067 FUNC_ENTRY;
3068 #if defined(OPENSSL) || defined(MBEDTLS)
3069 if ((*sock = SSLSocket_getPendingRead()) == -1)
3070 {
3071 /* 0 from getReadySocket indicates no work to do, rc -1 == error */
3072 #endif
3073 start = MQTTTime_start_clock();
3074 *sock = Socket_getReadySocket(0, (int)timeout, socket_mutex, rc);
3075 *rc = rc1;
3076 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
3077 if (*sock == SOCKET_ERROR && timeout >= 100L && MQTTTime_elapsed(start) < (int64_t)10)
3078 #else
3079 if (*sock == 0 && timeout >= 100L && MQTTTime_elapsed(start) < (int64_t)10)
3080 #endif
3081 MQTTTime_sleep(100L);
3082 #if defined(OPENSSL) || defined(MBEDTLS)
3083 }
3084 #endif
3085 Thread_lock_mutex(mqttclient_mutex);
3086 #if defined(ZERO_SOCK_FD_IS_INVALID)
3087 if (*sock > 0 && rc1 == 0)
3088 #else
3089 if (*sock >= 0 && rc1 == 0)
3090 #endif
3091 {
3092 MQTTClients* m = NULL;
3093 if (ListFindItem(handles, sock, clientSockCompare) != NULL)
3094 m = (MQTTClient)(handles->current->content);
3095 if (m != NULL)
3096 {
3097 if (m->c->connect_state == TCP_IN_PROGRESS || m->c->connect_state == SSL_IN_PROGRESS)
3098 *rc = 0; /* waiting for connect state to clear */
3099 else if (m->c->connect_state == WEBSOCKET_IN_PROGRESS)
3100 *rc = WebSocket_upgrade(&m->c->net);
3101 else
3102 {
3103 pack = MQTTPacket_Factory(m->c->MQTTVersion, &m->c->net, rc);
3104 if (*rc == TCPSOCKET_INTERRUPTED)
3105 *rc = 0;
3106 #if !defined(ZERO_SOCK_FD_IS_INVALID)
3107 #if !defined(IOT_LITEOS_ADAPT)
3108 if (*rc == EXT_SOCKET_RET_INVALID_SOCKET)
3109 *rc = SOCKET_ERROR;
3110 #endif
3111 #endif
3112 }
3113 }
3114
3115 if (pack)
3116 {
3117 int freed = 1;
3118
3119 /* Note that these handle... functions free the packet structure that they are dealing with */
3120 if (pack->header.bits.type == PUBLISH)
3121 *rc = MQTTProtocol_handlePublishes(pack, *sock);
3122 else if (pack->header.bits.type == PUBACK || pack->header.bits.type == PUBCOMP)
3123 {
3124 int msgid;
3125
3126 ack = (pack->header.bits.type == PUBCOMP) ? *(Pubcomp*)pack : *(Puback*)pack;
3127 msgid = ack.msgId;
3128 if (m && m->c->MQTTVersion >= MQTTVERSION_5 && m->published)
3129 {
3130 Log(TRACE_MIN, -1, "Calling published for client %s, msgid %d", m->c->clientID, msgid);
3131 (*(m->published))(m->published_context, msgid, pack->header.bits.type, &ack.properties, ack.rc);
3132 }
3133 *rc = (pack->header.bits.type == PUBCOMP) ?
3134 MQTTProtocol_handlePubcomps(pack, *sock, NULL) : MQTTProtocol_handlePubacks(pack, *sock, NULL);
3135 if (m && m->dc)
3136 {
3137 Log(TRACE_MIN, -1, "Calling deliveryComplete for client %s, msgid %d", m->c->clientID, msgid);
3138 (*(m->dc))(m->context, msgid);
3139 }
3140 }
3141 else if (pack->header.bits.type == PUBREC)
3142 {
3143 Pubrec* pubrec = (Pubrec*)pack;
3144
3145 if (m && m->c->MQTTVersion >= MQTTVERSION_5 && m->published && pubrec->rc >= MQTTREASONCODE_UNSPECIFIED_ERROR)
3146 {
3147 Log(TRACE_MIN, -1, "Calling published for client %s, msgid %d", m->c->clientID, ack.msgId);
3148 (*(m->published))(m->published_context, pubrec->msgId, pack->header.bits.type,
3149 &pubrec->properties, pubrec->rc);
3150 }
3151 *rc = MQTTProtocol_handlePubrecs(pack, *sock, NULL);
3152 }
3153 else if (pack->header.bits.type == PUBREL)
3154 *rc = MQTTProtocol_handlePubrels(pack, *sock);
3155 else if (pack->header.bits.type == PINGRESP)
3156 *rc = MQTTProtocol_handlePingresps(pack, *sock);
3157 else
3158 freed = 0;
3159 if (freed)
3160 pack = NULL;
3161 }
3162 }
3163 MQTTClient_retry();
3164 Thread_unlock_mutex(mqttclient_mutex);
3165 FUNC_EXIT_RC(*rc);
3166 return pack;
3167 }
3168
3169
MQTTClient_waitfor(MQTTClient handle,int packet_type,int * rc,int64_t timeout)3170 static MQTTPacket* MQTTClient_waitfor(MQTTClient handle, int packet_type, int* rc, int64_t timeout)
3171 {
3172 MQTTPacket* pack = NULL;
3173 MQTTClients* m = handle;
3174 START_TIME_TYPE start = MQTTTime_start_clock();
3175 int is_running = 0; /* local copy of running */
3176
3177 FUNC_ENTRY;
3178 if (((MQTTClients*)handle) == NULL || timeout <= 0L)
3179 {
3180 *rc = MQTTCLIENT_FAILURE;
3181 goto exit;
3182 }
3183
3184 Thread_lock_mutex(mqttclient_mutex);
3185 is_running = running;
3186 Thread_unlock_mutex(mqttclient_mutex);
3187
3188 if (is_running)
3189 {
3190 if (packet_type == CONNECT)
3191 {
3192 if ((*rc = Thread_wait_sem(m->connect_sem, (int)timeout)) == 0)
3193 *rc = m->rc;
3194 }
3195 else if (packet_type == CONNACK)
3196 *rc = Thread_wait_sem(m->connack_sem, (int)timeout);
3197 else if (packet_type == SUBACK)
3198 *rc = Thread_wait_sem(m->suback_sem, (int)timeout);
3199 else if (packet_type == UNSUBACK)
3200 *rc = Thread_wait_sem(m->unsuback_sem, (int)timeout);
3201 if (*rc == 0 && packet_type != CONNECT && m->pack == NULL)
3202 Log(LOG_ERROR, -1, "waitfor unexpectedly is NULL for client %s, packet_type %d, timeout %ld", m->c->clientID, packet_type, timeout);
3203 pack = m->pack;
3204 }
3205 else
3206 {
3207 *rc = TCPSOCKET_COMPLETE;
3208 while (1)
3209 {
3210 SOCKET sock = -1;
3211 pack = MQTTClient_cycle(&sock, 100L, rc);
3212 if (sock == m->c->net.socket)
3213 {
3214 if (*rc == SOCKET_ERROR)
3215 break;
3216 if (pack && (pack->header.bits.type == packet_type))
3217 break;
3218 if (m->c->connect_state == TCP_IN_PROGRESS)
3219 {
3220 int error = 0;
3221 socklen_t len = sizeof(error);
3222
3223 if ((*rc = getsockopt(m->c->net.socket, SOL_SOCKET, SO_ERROR, (char*)&error, &len)) == 0)
3224 *rc = error;
3225 break;
3226 }
3227 #if defined(OPENSSL) || defined(MBEDTLS)
3228 else if (m->c->connect_state == SSL_IN_PROGRESS)
3229 {
3230
3231 *rc = m->c->sslopts->struct_version >= 3 ?
3232 SSLSocket_connect(m->c->net.ssl, sock, m->currentServerURI,
3233 m->c->sslopts->verify, m->c->sslopts->ssl_error_cb, m->c->sslopts->ssl_error_context) :
3234 SSLSocket_connect(m->c->net.ssl, sock, m->currentServerURI,
3235 m->c->sslopts->verify, NULL, NULL);
3236 if (*rc == SSL_FATAL)
3237 break;
3238 else if (*rc == 1) /* rc == 1 means SSL connect has finished and succeeded */
3239 {
3240 if ((m->c->cleansession == 0 && m->c->cleanstart == 0) && m->c->session == NULL)
3241 #if defined(OPENSSL)
3242 m->c->session = SSL_get1_session(m->c->net.ssl);
3243 #elif defined(MBEDTLS)
3244 mbedtls_ssl_get_session(m->c->net.ssl, m->c->session);
3245 #endif
3246 break;
3247 }
3248 }
3249 #endif
3250 else if (m->c->connect_state == WEBSOCKET_IN_PROGRESS && *rc != TCPSOCKET_INTERRUPTED)
3251 {
3252 *rc = 1;
3253 break;
3254 }
3255 else if (m->c->connect_state == PROXY_CONNECT_IN_PROGRESS )
3256 {
3257 *rc = 1;
3258 break;
3259 }
3260 else if (m->c->connect_state == WAIT_FOR_CONNACK)
3261 {
3262 int error = 0;
3263 socklen_t len = sizeof(error);
3264 if (getsockopt(m->c->net.socket, SOL_SOCKET, SO_ERROR, (char*)&error, &len) == 0)
3265 {
3266 if (error)
3267 {
3268 *rc = error;
3269 break;
3270 }
3271 }
3272 }
3273 }
3274 if (MQTTTime_elapsed(start) > (uint64_t)timeout)
3275 {
3276 pack = NULL;
3277 break;
3278 }
3279 }
3280 }
3281
3282 exit:
3283 FUNC_EXIT_RC(*rc);
3284 return pack;
3285 }
3286
3287
MQTTClient_receive(MQTTClient handle,char ** topicName,int * topicLen,MQTTClient_message ** message,unsigned long timeout)3288 int MQTTClient_receive(MQTTClient handle, char** topicName, int* topicLen, MQTTClient_message** message,
3289 unsigned long timeout)
3290 {
3291 int rc = TCPSOCKET_COMPLETE;
3292 START_TIME_TYPE start = MQTTTime_start_clock();
3293 ELAPSED_TIME_TYPE elapsed = 0L;
3294 MQTTClients* m = handle;
3295
3296 FUNC_ENTRY;
3297 if (m == NULL || m->c == NULL
3298 || running) /* receive is not meant to be called in a multi-thread environment */
3299 {
3300 rc = MQTTCLIENT_FAILURE;
3301 goto exit;
3302 }
3303 if (m->c->connected == 0)
3304 {
3305 rc = MQTTCLIENT_DISCONNECTED;
3306 goto exit;
3307 }
3308
3309 *topicName = NULL;
3310 *message = NULL;
3311
3312 /* if there is already a message waiting, don't hang around but still do some packet handling */
3313 if (m->c->messageQueue->count > 0)
3314 timeout = 0L;
3315
3316 elapsed = MQTTTime_elapsed(start);
3317 do
3318 {
3319 SOCKET sock = 0;
3320 MQTTClient_cycle(&sock, (timeout > elapsed) ? timeout - elapsed : 0L, &rc);
3321
3322 if (rc == SOCKET_ERROR)
3323 {
3324 if (ListFindItem(handles, &sock, clientSockCompare) && /* find client corresponding to socket */
3325 (MQTTClient)(handles->current->content) == handle)
3326 break; /* there was an error on the socket we are interested in */
3327 }
3328 elapsed = MQTTTime_elapsed(start);
3329 }
3330 while (elapsed < timeout && m->c->messageQueue->count == 0);
3331
3332 if (m->c->messageQueue->count > 0)
3333 rc = MQTTClient_deliverMessage(rc, m, topicName, topicLen, message);
3334
3335 if (rc == SOCKET_ERROR)
3336 MQTTClient_disconnect_internal(handle, 0);
3337
3338 exit:
3339 FUNC_EXIT_RC(rc);
3340 return rc;
3341 }
3342
3343
MQTTClient_yield(void)3344 void MQTTClient_yield(void)
3345 {
3346 START_TIME_TYPE start = MQTTTime_start_clock();
3347 ELAPSED_TIME_TYPE elapsed = 0L;
3348 ELAPSED_TIME_TYPE timeout = 100L;
3349 int rc = 0;
3350
3351 FUNC_ENTRY;
3352 if (running) /* yield is not meant to be called in a multi-thread environment */
3353 {
3354 MQTTTime_sleep(timeout);
3355 goto exit;
3356 }
3357
3358 elapsed = MQTTTime_elapsed(start);
3359 do
3360 {
3361 SOCKET sock = -1;
3362 MQTTClient_cycle(&sock, (timeout > elapsed) ? timeout - elapsed : 0L, &rc);
3363 Thread_lock_mutex(mqttclient_mutex);
3364 if (rc == SOCKET_ERROR && ListFindItem(handles, &sock, clientSockCompare))
3365 {
3366 MQTTClients* m = (MQTTClient)(handles->current->content);
3367 if (m->c->connect_state != DISCONNECTING)
3368 MQTTClient_disconnect_internal(m, 0);
3369 }
3370 Thread_unlock_mutex(mqttclient_mutex);
3371 elapsed = MQTTTime_elapsed(start);
3372 }
3373 while (elapsed < timeout);
3374 exit:
3375 FUNC_EXIT;
3376 }
3377
3378 /*
3379 static int pubCompare(void* a, void* b)
3380 {
3381 Messages* msg = (Messages*)a;
3382 return msg->publish == (Publications*)b;
3383 }*/
3384
3385
MQTTClient_waitForCompletion(MQTTClient handle,MQTTClient_deliveryToken mdt,unsigned long timeout)3386 int MQTTClient_waitForCompletion(MQTTClient handle, MQTTClient_deliveryToken mdt, unsigned long timeout)
3387 {
3388 int rc = MQTTCLIENT_FAILURE;
3389 START_TIME_TYPE start = MQTTTime_start_clock();
3390 ELAPSED_TIME_TYPE elapsed = 0L;
3391 MQTTClients* m = handle;
3392
3393 FUNC_ENTRY;
3394 Thread_lock_mutex(mqttclient_mutex);
3395
3396 if (m == NULL || m->c == NULL)
3397 {
3398 rc = MQTTCLIENT_FAILURE;
3399 goto exit;
3400 }
3401
3402 elapsed = MQTTTime_elapsed(start);
3403 while (elapsed < timeout)
3404 {
3405 if (m->c->connected == 0)
3406 {
3407 rc = MQTTCLIENT_DISCONNECTED;
3408 goto exit;
3409 }
3410 if (ListFindItem(m->c->outboundMsgs, &mdt, messageIDCompare) == NULL)
3411 {
3412 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
3413 if (m->c->retryMsgs == mdt)
3414 {
3415 rc = MQTTCLIENT_FAILURE;
3416 Log(TRACE_MIN, -1, "msgid = %d, retry end.\n", mdt);
3417 goto exit;
3418 }
3419 #endif
3420 rc = MQTTCLIENT_SUCCESS; /* well we couldn't find it */
3421 goto exit;
3422 }
3423 Thread_unlock_mutex(mqttclient_mutex);
3424 MQTTClient_yield();
3425 Thread_lock_mutex(mqttclient_mutex);
3426 elapsed = MQTTTime_elapsed(start);
3427 }
3428
3429 exit:
3430 Thread_unlock_mutex(mqttclient_mutex);
3431 FUNC_EXIT_RC(rc);
3432 return rc;
3433 }
3434
3435
MQTTClient_getPendingDeliveryTokens(MQTTClient handle,MQTTClient_deliveryToken ** tokens)3436 int MQTTClient_getPendingDeliveryTokens(MQTTClient handle, MQTTClient_deliveryToken **tokens)
3437 {
3438 int rc = MQTTCLIENT_SUCCESS;
3439 MQTTClients* m = handle;
3440 *tokens = NULL;
3441
3442 FUNC_ENTRY;
3443 Thread_lock_mutex(mqttclient_mutex);
3444
3445 if (m == NULL)
3446 {
3447 rc = MQTTCLIENT_FAILURE;
3448 goto exit;
3449 }
3450
3451 if (m->c && m->c->outboundMsgs->count > 0)
3452 {
3453 ListElement* current = NULL;
3454 int count = 0;
3455
3456 *tokens = malloc(sizeof(MQTTClient_deliveryToken) * (m->c->outboundMsgs->count + 1));
3457 if (!*tokens)
3458 {
3459 rc = PAHO_MEMORY_ERROR;
3460 goto exit;
3461 }
3462 while (ListNextElement(m->c->outboundMsgs, ¤t))
3463 {
3464 Messages* m = (Messages*)(current->content);
3465 (*tokens)[count++] = m->msgid;
3466 }
3467 (*tokens)[count] = -1;
3468 }
3469
3470 exit:
3471 Thread_unlock_mutex(mqttclient_mutex);
3472 FUNC_EXIT_RC(rc);
3473 return rc;
3474 }
3475
3476
MQTTClient_setTraceLevel(enum MQTTCLIENT_TRACE_LEVELS level)3477 void MQTTClient_setTraceLevel(enum MQTTCLIENT_TRACE_LEVELS level)
3478 {
3479 Log_setTraceLevel((enum LOG_LEVELS)level);
3480 }
3481
3482
MQTTClient_setTraceCallback(MQTTClient_traceCallback * callback)3483 void MQTTClient_setTraceCallback(MQTTClient_traceCallback* callback)
3484 {
3485 Log_setTraceCallback((Log_traceCallback*)callback);
3486 }
3487
3488
MQTTClient_setCommandTimeout(MQTTClient handle,unsigned long milliSeconds)3489 int MQTTClient_setCommandTimeout(MQTTClient handle, unsigned long milliSeconds)
3490 {
3491 int rc = MQTTCLIENT_SUCCESS;
3492 MQTTClients* m = handle;
3493
3494 FUNC_ENTRY;
3495 if (milliSeconds < 5000L)
3496 rc = MQTTCLIENT_FAILURE;
3497 else
3498 m->commandTimeout = milliSeconds;
3499 FUNC_EXIT_RC(rc);
3500 return rc;
3501 }
3502
3503
MQTTClient_getVersionInfo(void)3504 MQTTClient_nameValue* MQTTClient_getVersionInfo(void)
3505 {
3506 #define MAX_INFO_STRINGS 8
3507 static MQTTClient_nameValue libinfo[MAX_INFO_STRINGS + 1];
3508 int i = 0;
3509
3510 libinfo[i].name = "Product name";
3511 libinfo[i++].value = "Eclipse Paho Synchronous MQTT C Client Library";
3512
3513 libinfo[i].name = "Version";
3514 libinfo[i++].value = CLIENT_VERSION;
3515
3516 libinfo[i].name = "Build level";
3517 libinfo[i++].value = BUILD_TIMESTAMP;
3518 #if defined(OPENSSL)
3519 libinfo[i].name = "OpenSSL version";
3520 libinfo[i++].value = SSLeay_version(SSLEAY_VERSION);
3521
3522 libinfo[i].name = "OpenSSL flags";
3523 libinfo[i++].value = SSLeay_version(SSLEAY_CFLAGS);
3524
3525 libinfo[i].name = "OpenSSL build timestamp";
3526 libinfo[i++].value = SSLeay_version(SSLEAY_BUILT_ON);
3527
3528 libinfo[i].name = "OpenSSL platform";
3529 libinfo[i++].value = SSLeay_version(SSLEAY_PLATFORM);
3530
3531 libinfo[i].name = "OpenSSL directory";
3532 libinfo[i++].value = SSLeay_version(SSLEAY_DIR);
3533 #elif defined(MBEDTLS)
3534 libinfo[i].name = "MbedTLS Version";
3535 libinfo[i++].value = MBEDTLS_VERSION_STRING;
3536 #endif
3537 libinfo[i].name = NULL;
3538 libinfo[i].value = NULL;
3539 return libinfo;
3540 }
3541
3542
MQTTClient_strerror(int code)3543 const char* MQTTClient_strerror(int code)
3544 {
3545 static char buf[30];
3546 int chars = 0;
3547
3548 switch (code) {
3549 case MQTTCLIENT_SUCCESS:
3550 return "Success";
3551 case MQTTCLIENT_FAILURE:
3552 return "Failure";
3553 case MQTTCLIENT_DISCONNECTED:
3554 return "Disconnected";
3555 case MQTTCLIENT_MAX_MESSAGES_INFLIGHT:
3556 return "Maximum in-flight messages amount reached";
3557 case MQTTCLIENT_BAD_UTF8_STRING:
3558 return "Invalid UTF8 string";
3559 case MQTTCLIENT_NULL_PARAMETER:
3560 return "Invalid (NULL) parameter";
3561 case MQTTCLIENT_TOPICNAME_TRUNCATED:
3562 return "Topic containing NULL characters has been truncated";
3563 case MQTTCLIENT_BAD_STRUCTURE:
3564 return "Bad structure";
3565 case MQTTCLIENT_BAD_QOS:
3566 return "Invalid QoS value";
3567 case MQTTCLIENT_SSL_NOT_SUPPORTED:
3568 return "SSL is not supported";
3569 case MQTTCLIENT_BAD_MQTT_VERSION:
3570 return "Unrecognized MQTT version";
3571 case MQTTCLIENT_BAD_PROTOCOL:
3572 return "Invalid protocol scheme";
3573 case MQTTCLIENT_BAD_MQTT_OPTION:
3574 return "Options for wrong MQTT version";
3575 case MQTTCLIENT_WRONG_MQTT_VERSION:
3576 return "Client created for another version of MQTT";
3577 case MQTTCLIENT_0_LEN_WILL_TOPIC:
3578 return "Zero length will topic on connect";
3579 }
3580
3581 chars = snprintf(buf, sizeof(buf), "Unknown error code %d", code);
3582 if (chars >= sizeof(buf))
3583 {
3584 buf[sizeof(buf)-1] = '\0';
3585 Log(LOG_ERROR, 0, "Error writing %d chars with snprintf", chars);
3586 }
3587 return buf;
3588 }
3589
3590
3591 /**
3592 * See if any pending writes have been completed, and cleanup if so.
3593 * Cleaning up means removing any publication data that was stored because the write did
3594 * not originally complete.
3595 */
MQTTProtocol_checkPendingWrites(void)3596 static void MQTTProtocol_checkPendingWrites(void)
3597 {
3598 FUNC_ENTRY;
3599 if (state.pending_writes.count > 0)
3600 {
3601 ListElement* le = state.pending_writes.first;
3602 while (le)
3603 {
3604 if (Socket_noPendingWrites(((pending_write*)(le->content))->socket))
3605 {
3606 MQTTProtocol_removePublication(((pending_write*)(le->content))->p);
3607 state.pending_writes.current = le;
3608 ListRemove(&(state.pending_writes), le->content); /* does NextElement itself */
3609 le = state.pending_writes.current;
3610 }
3611 else
3612 ListNextElement(&(state.pending_writes), &le);
3613 }
3614 }
3615 FUNC_EXIT;
3616 }
3617
3618
MQTTClient_writeComplete(SOCKET socket,int rc)3619 static void MQTTClient_writeComplete(SOCKET socket, int rc)
3620 {
3621 ListElement* found = NULL;
3622
3623 FUNC_ENTRY;
3624 /* a partial write is now complete for a socket - this will be on a publish*/
3625
3626 MQTTProtocol_checkPendingWrites();
3627
3628 /* find the client using this socket */
3629 if ((found = ListFindItem(handles, &socket, clientSockCompare)) != NULL)
3630 {
3631 MQTTClients* m = (MQTTClients*)(found->content);
3632
3633 m->c->net.lastSent = MQTTTime_now();
3634 }
3635 FUNC_EXIT;
3636 }
3637
3638
MQTTClient_writeContinue(SOCKET socket)3639 static void MQTTClient_writeContinue(SOCKET socket)
3640 {
3641 ListElement* found = NULL;
3642
3643 if ((found = ListFindItem(handles, &socket, clientSockCompare)) != NULL)
3644 {
3645 MQTTClients* m = (MQTTClients*)(found->content);
3646
3647 m->c->net.lastSent = MQTTTime_now();
3648 }
3649 }
3650