• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*******************************************************************************
2  * Copyright (c) 2009, 2022 IBM Corp., Ian Craggs and others
3  *
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v2.0
6  * and Eclipse Distribution License v1.0 which accompany this distribution.
7  *
8  * The Eclipse Public License is available at
9  *    https://www.eclipse.org/legal/epl-2.0/
10  * and the Eclipse Distribution License is available at
11  *   http://www.eclipse.org/org/documents/edl-v10.php.
12  *
13  * Contributors:
14  *    Ian Craggs - initial API and implementation and/or initial documentation
15  *    Ian Craggs - bug 384016 - segv setting will message
16  *    Ian Craggs - bug 384053 - v1.0.0.7 - stop MQTTClient_receive on socket error
17  *    Ian Craggs, Allan Stockdill-Mander - add ability to connect with SSL
18  *    Ian Craggs - multiple server connection support
19  *    Ian Craggs - fix for bug 413429 - connectionLost not called
20  *    Ian Craggs - fix for bug 421103 - trying to write to same socket, in publish/retries
21  *    Ian Craggs - fix for bug 419233 - mutexes not reporting errors
22  *    Ian Craggs - fix for bug 420851
23  *    Ian Craggs - fix for bug 432903 - queue persistence
24  *    Ian Craggs - MQTT 3.1.1 support
25  *    Ian Craggs - fix for bug 438176 - MQTT version selection
26  *    Rong Xiang, Ian Craggs - C++ compatibility
27  *    Ian Craggs - fix for bug 443724 - stack corruption
28  *    Ian Craggs - fix for bug 447672 - simultaneous access to socket structure
29  *    Ian Craggs - fix for bug 459791 - deadlock in WaitForCompletion for bad client
30  *    Ian Craggs - fix for bug 474905 - insufficient synchronization for subscribe, unsubscribe, connect
31  *    Ian Craggs - make it clear that yield and receive are not intended for multi-threaded mode (bug 474748)
32  *    Ian Craggs - SNI support, message queue unpersist bug
33  *    Ian Craggs - binary will message support
34  *    Ian Craggs - waitforCompletion fix #240
35  *    Ian Craggs - check for NULL SSL options #334
36  *    Ian Craggs - allocate username/password buffers #431
37  *    Ian Craggs - MQTT 5.0 support
38  *    Sven Gambel - add generic proxy support
39  *******************************************************************************/
40 
41 /**
42  * @file
43  * \brief Synchronous API implementation
44  *
45  */
46 
47 #include <stdlib.h>
48 #include <string.h>
49 #if !defined(_WIN32) && !defined(_WIN64)
50 	#include <sys/time.h>
51 #else
52 	#if defined(_MSC_VER) && _MSC_VER < 1900
53 		#define snprintf _snprintf
54 	#endif
55 #endif
56 
57 #include "MQTTClient.h"
58 #if !defined(NO_PERSISTENCE)
59 #include "MQTTPersistence.h"
60 #endif
61 
62 #include "utf-8.h"
63 #include "MQTTProtocol.h"
64 #include "MQTTProtocolOut.h"
65 #include "Thread.h"
66 #include "SocketBuffer.h"
67 #include "StackTrace.h"
68 #include "Heap.h"
69 
70 #if defined(IOT_CONNECT)
71 #include "securec.h"
72 #include "soc_socket_types.h"
73 #include "atiny_mqtt_commu.h"
74 #endif
75 
76 #if defined(OPENSSL)
77 #include <openssl/ssl.h>
78 #elif defined(MBEDTLS)
79 #include <mbedtls/version.h>
80 #include "Clients.h"
81 #else
82 #define URI_SSL   "ssl://"
83 #define URI_MQTTS "mqtts://"
84 #endif
85 
86 #include "OsWrapper.h"
87 
88 #define URI_TCP  "tcp://"
89 #define URI_MQTT "mqtt://"
90 #define URI_WS   "ws://"
91 #define URI_WSS  "wss://"
92 
93 #include "VersionInfo.h"
94 #include "WebSocket.h"
95 #include "Proxy.h"
96 
97 const char *client_timestamp_eye = "MQTTClientV3_Timestamp " BUILD_TIMESTAMP;
98 const char *client_version_eye = "MQTTClientV3_Version " CLIENT_VERSION;
99 
100 struct conlost_sync_data {
101 	sem_type sem;
102 	void *m;
103 };
104 
105 int MQTTClient_init(void);
106 
MQTTClient_global_init(MQTTClient_init_options * inits)107 void MQTTClient_global_init(MQTTClient_init_options* inits)
108 {
109 	MQTTClient_init();
110 #if defined(OPENSSL) || defined(MBEDTLS)
111 	SSLSocket_handleOpensslInit(inits->do_openssl_init);
112 #endif
113 }
114 
115 static ClientStates ClientState =
116 {
117 	CLIENT_VERSION, /* version */
118 	NULL /* client list */
119 };
120 
121 ClientStates* bstate = &ClientState;
122 
123 MQTTProtocol state;
124 
125 #if defined(_WIN32) || defined(_WIN64)
126 static mutex_type mqttclient_mutex = NULL;
127 mutex_type socket_mutex = NULL;
128 static mutex_type subscribe_mutex = NULL;
129 static mutex_type unsubscribe_mutex = NULL;
130 static mutex_type connect_mutex = NULL;
131 #if !defined(NO_HEAP_TRACKING)
132 extern mutex_type stack_mutex;
133 extern mutex_type heap_mutex;
134 #endif
135 extern mutex_type log_mutex;
136 
MQTTClient_init(void)137 int MQTTClient_init(void)
138 {
139 	DWORD rc = 0;
140 
141 	if (mqttclient_mutex == NULL)
142 	{
143 		if ((mqttclient_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
144 		{
145 			rc = GetLastError();
146 			printf("mqttclient_mutex error %d\n", rc);
147 			goto exit;
148 		}
149 		if ((subscribe_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
150 		{
151 			rc = GetLastError();
152 			printf("subscribe_mutex error %d\n", rc);
153 			goto exit;
154 		}
155 		if ((unsubscribe_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
156 		{
157 			rc = GetLastError();
158 			printf("unsubscribe_mutex error %d\n", rc);
159 			goto exit;
160 		}
161 		if ((connect_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
162 		{
163 			rc = GetLastError();
164 			printf("connect_mutex error %d\n", rc);
165 			goto exit;
166 		}
167 #if !defined(NO_HEAP_TRACKING)
168 		if ((stack_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
169 		{
170 			rc = GetLastError();
171 			printf("stack_mutex error %d\n", rc);
172 			goto exit;
173 		}
174 		if ((heap_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
175 		{
176 			rc = GetLastError();
177 			printf("heap_mutex error %d\n", rc);
178 			goto exit;
179 		}
180 #endif
181 		if ((log_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
182 		{
183 			rc = GetLastError();
184 			printf("log_mutex error %d\n", rc);
185 			goto exit;
186 		}
187 		if ((socket_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
188 		{
189 			rc = GetLastError();
190 			printf("socket_mutex error %d\n", rc);
191 			goto exit;
192 		}
193 	}
194 exit:
195 	return rc;
196 }
197 
MQTTClient_cleanup(void)198 void MQTTClient_cleanup(void)
199 {
200 	if (connect_mutex)
201 		CloseHandle(connect_mutex);
202 	if (subscribe_mutex)
203 		CloseHandle(subscribe_mutex);
204 	if (unsubscribe_mutex)
205 		CloseHandle(unsubscribe_mutex);
206 #if !defined(NO_HEAP_TRACKING)
207 	if (stack_mutex)
208 		CloseHandle(stack_mutex);
209 	if (heap_mutex)
210 		CloseHandle(heap_mutex);
211 #endif
212 	if (log_mutex)
213 		CloseHandle(log_mutex);
214 	if (socket_mutex)
215 		CloseHandle(socket_mutex);
216 	if (mqttclient_mutex)
217 		CloseHandle(mqttclient_mutex);
218 }
219 
220 #if defined(PAHO_MQTT_STATIC)
221 /* Global variable for one-time initialization structure */
222 static INIT_ONCE g_InitOnce = INIT_ONCE_STATIC_INIT; /* Static initialization */
223 
224 /* One time initialization function */
InitOnceFunction(PINIT_ONCE InitOnce,PVOID Parameter,PVOID * lpContext)225 BOOL CALLBACK InitOnceFunction (
226     PINIT_ONCE InitOnce,        /* Pointer to one-time initialization structure */
227     PVOID Parameter,            /* Optional parameter passed by InitOnceExecuteOnce */
228     PVOID *lpContext)           /* Receives pointer to event object */
229 {
230 	int rc = MQTTClient_init();
231     return rc == 0;
232 }
233 
234 #else
DllMain(HANDLE hModule,DWORD ul_reason_for_call,LPVOID lpReserved)235 BOOL APIENTRY DllMain(HANDLE hModule,
236 					  DWORD  ul_reason_for_call,
237 					  LPVOID lpReserved)
238 {
239 	switch (ul_reason_for_call)
240 	{
241 		case DLL_PROCESS_ATTACH:
242 			MQTTClient_init();
243 			break;
244 		case DLL_THREAD_ATTACH:
245 			break;
246 		case DLL_THREAD_DETACH:
247 			break;
248 		case DLL_PROCESS_DETACH:
249 			if (lpReserved)
250 				MQTTClient_cleanup();
251 			break;
252 	}
253 	return TRUE;
254 }
255 #endif
256 #elif defined (COMPAT_CMSIS)
257 
258 static mutex_type mqttclient_mutex = NULL;
259 mutex_type socket_mutex = NULL;
260 static mutex_type subscribe_mutex = NULL;
261 static mutex_type unsubscribe_mutex = NULL;
262 static mutex_type connect_mutex = NULL;
263 #if !defined(NO_HEAP_TRACKING)
264 extern mutex_type stack_mutex;
265 extern mutex_type heap_mutex;
266 #endif
267 extern mutex_type log_mutex;
268 
MQTTClient_init(void)269 int MQTTClient_init(void)
270 {
271 	int rc = MQTTCLIENT_SUCCESS;
272 	if ((mqttclient_mutex = Thread_create_mutex(&rc)) == NULL)
273 	{
274 		Log(TRACE_MIN, -1,"MQTTClient: error initializing client_mutex\n");
275 		return rc;
276 	}
277 	if ((subscribe_mutex = Thread_create_mutex(&rc)) == NULL)
278 	{
279 		Log(TRACE_MIN, -1,"MQTTClient: error initializing subscribe_mutex\n");
280 		return rc;
281 	}
282 	if ((unsubscribe_mutex = Thread_create_mutex(&rc)) == NULL)
283 	{
284 		Log(TRACE_MIN, -1,"MQTTClient: error initializing unsubscribe_mutex\n");
285 		return rc;
286 	}
287 	if ((connect_mutex = Thread_create_mutex(&rc)) == NULL)
288 	{
289 		Log(TRACE_MIN, -1,"MQTTClient: error initializing connect_mutex\n");
290 		return rc;
291 	}
292 #if !defined(NO_HEAP_TRACKING)
293 	if ((stack_mutex = Thread_create_mutex(&rc)) == NULL)
294 	{
295 		Log(TRACE_MIN, -1,"MQTTClient: error initializing stack_mutex\n");
296 		return rc;
297 	}
298 	if ((heap_mutex = Thread_create_mutex(&rc)) == NULL)
299 	{
300 		Log(TRACE_MIN, -1,"MQTTClient: error initializing heap_mutex\n");
301 		return rc;
302 	}
303 #endif
304 	if ((log_mutex = Thread_create_mutex(&rc)) == NULL)
305 	{
306 		Log(TRACE_MIN, -1,"MQTTClient: error initializing log_mutex\n");
307 		return rc;
308 	}
309 	if ((socket_mutex = Thread_create_mutex(&rc)) == NULL)
310 	{
311 		Log(TRACE_MIN, -1,"MQTTClient: error initializing socket_mutex\n");
312 		return rc;
313 	}
314 	return rc;
315 }
316 
MQTTClient_cleanup(void)317 void MQTTClient_cleanup(void)
318 {
319 	Thread_destroy_mutex(connect_mutex);
320 	connect_mutex = NULL;
321 	Thread_destroy_mutex(subscribe_mutex);
322 	subscribe_mutex = NULL;
323 	Thread_destroy_mutex(unsubscribe_mutex);
324 	unsubscribe_mutex = NULL;
325 #if !defined(NO_HEAP_TRACKING)
326 	Thread_destroy_mutex(stack_mutex);
327 	stack_mutex = NULL;
328 	Thread_destroy_mutex(heap_mutex);
329 	heap_mutex = NULL;
330 #endif
331 	Thread_destroy_mutex(log_mutex);
332 	log_mutex = NULL;
333 	Thread_destroy_mutex(socket_mutex);
334 	socket_mutex = NULL;
335 	Thread_destroy_mutex(mqttclient_mutex);
336 	mqttclient_mutex = NULL;
337 }
338 
339 #define WINAPI
340 #else
341 static pthread_mutex_t mqttclient_mutex_store = PTHREAD_MUTEX_INITIALIZER;
342 static mutex_type mqttclient_mutex = &mqttclient_mutex_store;
343 
344 static pthread_mutex_t socket_mutex_store = PTHREAD_MUTEX_INITIALIZER;
345 mutex_type socket_mutex = &socket_mutex_store;
346 
347 static pthread_mutex_t subscribe_mutex_store = PTHREAD_MUTEX_INITIALIZER;
348 static mutex_type subscribe_mutex = &subscribe_mutex_store;
349 
350 static pthread_mutex_t unsubscribe_mutex_store = PTHREAD_MUTEX_INITIALIZER;
351 static mutex_type unsubscribe_mutex = &unsubscribe_mutex_store;
352 
353 static pthread_mutex_t connect_mutex_store = PTHREAD_MUTEX_INITIALIZER;
354 static mutex_type connect_mutex = &connect_mutex_store;
355 
MQTTClient_init(void)356 int MQTTClient_init(void)
357 {
358 	pthread_mutexattr_t attr;
359 	int rc;
360 
361 	pthread_mutexattr_init(&attr);
362 #if !defined(_WRS_KERNEL)
363 	pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
364 #else
365 	/* #warning "no pthread_mutexattr_settype" */
366 #endif /* !defined(_WRS_KERNEL) */
367 	if ((rc = pthread_mutex_init(mqttclient_mutex, &attr)) != 0)
368 		printf("MQTTClient: error %d initializing client_mutex\n", rc);
369 	else if ((rc = pthread_mutex_init(socket_mutex, &attr)) != 0)
370 		printf("MQTTClient: error %d initializing socket_mutex\n", rc);
371 	else if ((rc = pthread_mutex_init(subscribe_mutex, &attr)) != 0)
372 		printf("MQTTClient: error %d initializing subscribe_mutex\n", rc);
373 	else if ((rc = pthread_mutex_init(unsubscribe_mutex, &attr)) != 0)
374 		printf("MQTTClient: error %d initializing unsubscribe_mutex\n", rc);
375 	else if ((rc = pthread_mutex_init(connect_mutex, &attr)) != 0)
376 		printf("MQTTClient: error %d initializing connect_mutex\n", rc);
377 
378 	return rc;
379 }
380 
381 #define WINAPI
382 #endif
383 
384 static volatile int library_initialized = 0;
385 static List* handles = NULL;
386 static int running = 0;
387 static int tostop = 0;
388 static thread_id_type run_id = 0;
389 
390 typedef struct
391 {
392 	MQTTClient_message* msg;
393 	char* topicName;
394 	int topicLen;
395 	unsigned int seqno; /* only used on restore */
396 } qEntry;
397 
398 
399 typedef struct
400 {
401 	char* serverURI;
402 	const char* currentServerURI; /* when using HA options, set the currently used serverURI */
403 #if defined(OPENSSL) || defined(MBEDTLS)
404 	int ssl;
405 #endif
406 	int websocket;
407 	Clients* c;
408 	MQTTClient_connectionLost* cl;
409 	MQTTClient_messageArrived* ma;
410 	MQTTClient_deliveryComplete* dc;
411 	void* context;
412 
413 	MQTTClient_disconnected* disconnected;
414 	void* disconnected_context; /* the context to be associated with the disconnected callback*/
415 
416 	MQTTClient_published* published;
417 	void* published_context; /* the context to be associated with the disconnected callback*/
418 
419 #if 0
420 	MQTTClient_authHandle* auth_handle;
421 	void* auth_handle_context; /* the context to be associated with the authHandle callback*/
422 #endif
423 
424 	sem_type connect_sem;
425 	int rc; /* getsockopt return code in connect */
426 	sem_type connack_sem;
427 	sem_type suback_sem;
428 	sem_type unsuback_sem;
429 	MQTTPacket* pack;
430 
431 	unsigned long commandTimeout;
432 } MQTTClients;
433 
434 struct props_rc_parms
435 {
436 	MQTTClients* m;
437 	MQTTProperties* properties;
438 	enum MQTTReasonCodes reasonCode;
439 };
440 
441 static void MQTTClient_terminate(void);
442 static void MQTTClient_emptyMessageQueue(Clients* client);
443 static int MQTTClient_deliverMessage(
444 		int rc, MQTTClients* m,
445 		char** topicName, int* topicLen,
446 		MQTTClient_message** message);
447 static int clientSockCompare(void* a, void* b);
448 static thread_return_type WINAPI connectionLost_call(void* context);
449 static thread_return_type WINAPI MQTTClient_run(void* n);
450 static int MQTTClient_stop(void);
451 static void MQTTClient_closeSession(Clients* client, enum MQTTReasonCodes reason, MQTTProperties* props);
452 static int MQTTClient_cleanSession(Clients* client);
453 static MQTTResponse MQTTClient_connectURIVersion(
454 	MQTTClient handle, MQTTClient_connectOptions* options,
455 	const char* serverURI, int MQTTVersion,
456 	START_TIME_TYPE start, ELAPSED_TIME_TYPE millisecsTimeout,
457 	MQTTProperties* connectProperties, MQTTProperties* willProperties);
458 static MQTTResponse MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectOptions* options, const char* serverURI,
459 	MQTTProperties* connectProperties, MQTTProperties* willProperties);
460 static int MQTTClient_disconnect1(MQTTClient handle, int timeout, int internal, int stop, enum MQTTReasonCodes, MQTTProperties*);
461 static int MQTTClient_disconnect_internal(MQTTClient handle, int timeout);
462 static void MQTTClient_retry(void);
463 static MQTTPacket* MQTTClient_cycle(SOCKET* sock, ELAPSED_TIME_TYPE timeout, int* rc);
464 static MQTTPacket* MQTTClient_waitfor(MQTTClient handle, int packet_type, int* rc, int64_t timeout);
465 /*static int pubCompare(void* a, void* b); */
466 static void MQTTProtocol_checkPendingWrites(void);
467 static void MQTTClient_writeComplete(SOCKET socket, int rc);
468 static void MQTTClient_writeContinue(SOCKET socket);
469 
470 
MQTTClient_createWithOptions(MQTTClient * handle,const char * serverURI,const char * clientId,int persistence_type,void * persistence_context,MQTTClient_createOptions * options)471 int MQTTClient_createWithOptions(MQTTClient* handle, const char* serverURI, const char* clientId,
472 		int persistence_type, void* persistence_context, MQTTClient_createOptions* options)
473 {
474 	int rc = 0;
475 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
476 	int mem_ret = -1;
477 #endif
478 	MQTTClients *m = NULL;
479 
480 #if (defined(_WIN32) || defined(_WIN64)) && defined(PAHO_MQTT_STATIC)
481 	/* intializes mutexes once.  Must come before FUNC_ENTRY */
482 	BOOL bStatus = InitOnceExecuteOnce(&g_InitOnce, InitOnceFunction, NULL, NULL);
483 #endif
484 	FUNC_ENTRY;
485 	if ((rc = Thread_lock_mutex(mqttclient_mutex)) != 0)
486 		goto nounlock_exit;
487 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
488 	if (handle == NULL || serverURI == NULL || clientId == NULL)
489 #else
490 	if (serverURI == NULL || clientId == NULL)
491 #endif
492 	{
493 		rc = MQTTCLIENT_NULL_PARAMETER;
494 		goto exit;
495 	}
496 
497 	if (!UTF8_validateString(clientId))
498 	{
499 		rc = MQTTCLIENT_BAD_UTF8_STRING;
500 		goto exit;
501 	}
502 
503 	if (strlen(clientId) == 0 && persistence_type == MQTTCLIENT_PERSISTENCE_DEFAULT)
504 	{
505 		rc = MQTTCLIENT_PERSISTENCE_ERROR;
506 		goto exit;
507 	}
508 
509 	if (strstr(serverURI, "://") != NULL)
510 	{
511 		if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) != 0
512 		 && strncmp(URI_MQTT, serverURI, strlen(URI_MQTT)) != 0
513 		 && strncmp(URI_WS, serverURI, strlen(URI_WS)) != 0
514 #if defined(OPENSSL) || defined(MBEDTLS)
515          && strncmp(URI_SSL, serverURI, strlen(URI_SSL)) != 0
516          && strncmp(URI_MQTTS, serverURI, strlen(URI_MQTTS)) != 0
517 		 && strncmp(URI_WSS, serverURI, strlen(URI_WSS)) != 0
518 #endif
519 			)
520 		{
521 			rc = MQTTCLIENT_BAD_PROTOCOL;
522 			goto exit;
523 		}
524 	}
525 
526 	if (options && (strncmp(options->struct_id, "MQCO", 4) != 0 || options->struct_version != 0))
527 	{
528 		rc = MQTTCLIENT_BAD_STRUCTURE;
529 		goto exit;
530 	}
531 
532 	if (!library_initialized)
533 	{
534 		#if !defined(NO_HEAP_TRACKING)
535 			Heap_initialize();
536 		#endif
537 #if !defined(IOT_CONNECT) && !defined(IOT_LITEOS_ADAPT)
538 			Log_initialize((Log_nameValue*)MQTTClient_getVersionInfo());
539 #endif
540 		bstate->clients = ListInitialize();
541 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
542 		rc = Socket_outInitialize();
543 #else
544 		Socket_outInitialize();
545 #endif
546 		Socket_setWriteCompleteCallback(MQTTClient_writeComplete);
547 		Socket_setWriteContinueCallback(MQTTClient_writeContinue);
548 		Socket_setWriteAvailableCallback(MQTTProtocol_writeAvailable);
549 		handles = ListInitialize();
550 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
551 		if (bstate->clients == NULL || rc != 0 || handles == NULL)
552 		{
553 			#if !defined(IOT_CONNECT) && !defined(IOT_LITEOS_ADAPT)
554 				Log_terminate();
555 			#endif
556 			if (bstate->clients != NULL)
557 			{
558 				ListFree(bstate->clients);
559 				bstate->clients = NULL;
560 			}
561 			if (rc == 0)
562 				Socket_outTerminate();
563 			if (handles != NULL)
564 			{
565 				ListFree(handles);
566 				handles = NULL;
567 			}
568 			#if !defined(NO_HEAP_TRACKING)
569 				Heap_terminate();
570 			#endif
571 
572 			rc = PAHO_MEMORY_ERROR;
573 			goto exit;
574 		}
575 #else
576 #if defined(OPENSSL)
577 		SSLSocket_initialize();
578 #endif
579 #endif
580 		library_initialized = 1;
581 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
582 #if defined(OPENSSL)
583 		if (SSLSocket_initialize() != 1)
584 #elif defined(MBEDTLS)
585 		if (SSLSocket_initialize() != 0)
586 #endif
587 #if defined(OPENSSL) || defined(MBEDTLS)
588 		{
589 			MQTTClient_terminate();
590 			rc = PAHO_MEMORY_ERROR;
591 			goto exit;
592 		}
593 #endif
594 #endif
595 	}
596 
597 	if ((m = malloc(sizeof(MQTTClients))) == NULL)
598 	{
599 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
600 		if (bstate->clients->count == 0)
601 			MQTTClient_terminate();
602 #endif
603 		rc = PAHO_MEMORY_ERROR;
604 		goto exit;
605 	}
606 	*handle = m;
607 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
608 	mem_ret = memset_s(m, sizeof(MQTTClients), '\0', sizeof(MQTTClients));
609 	if (mem_ret != 0)
610 	{
611 		free(m);
612 		if (bstate->clients->count == 0)
613 			MQTTClient_terminate();
614 		rc = PAHO_MEMORY_ERROR;
615 		goto exit;
616 	}
617 #else
618 	memset(m, '\0', sizeof(MQTTClients));
619 #endif
620 	m->commandTimeout = 10000L;
621 	if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0)
622 		serverURI += strlen(URI_TCP);
623 	else if (strncmp(URI_MQTT, serverURI, strlen(URI_MQTT)) == 0)
624 		serverURI += strlen(URI_MQTT);
625 	else if (strncmp(URI_WS, serverURI, strlen(URI_WS)) == 0)
626 	{
627 		serverURI += strlen(URI_WS);
628 		m->websocket = 1;
629 	}
630 	else if (strncmp(URI_SSL, serverURI, strlen(URI_SSL)) == 0)
631 	{
632 #if defined(OPENSSL) || defined(MBEDTLS)
633 		serverURI += strlen(URI_SSL);
634 		m->ssl = 1;
635 #else
636 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
637 		free(m);
638 		if (bstate->clients->count == 0)
639 			MQTTClient_terminate();
640 #endif
641 		rc = MQTTCLIENT_SSL_NOT_SUPPORTED;
642 		goto exit;
643 #endif
644 	}
645 	else if (strncmp(URI_MQTTS, serverURI, strlen(URI_MQTTS)) == 0)
646 	{
647 #if defined(OPENSSL)
648 		serverURI += strlen(URI_MQTTS);
649 		m->ssl = 1;
650 #else
651 		rc = MQTTCLIENT_SSL_NOT_SUPPORTED;
652 		goto exit;
653 #endif
654 	}
655 	else if (strncmp(URI_WSS, serverURI, strlen(URI_WSS)) == 0)
656 	{
657 #if defined(OPENSSL) || defined(MBEDTLS)
658 		serverURI += strlen(URI_WSS);
659 		m->ssl = 1;
660 		m->websocket = 1;
661 #else
662 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
663 		free(m);
664 		if (bstate->clients->count == 0)
665 			MQTTClient_terminate();
666 #endif
667 		rc = MQTTCLIENT_SSL_NOT_SUPPORTED;
668 		goto exit;
669 #endif
670 	}
671 	m->serverURI = MQTTStrdup(serverURI);
672 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
673 	if (m->serverURI == NULL)
674 	{
675 		free(m);
676 		if (bstate->clients->count == 0)
677 			MQTTClient_terminate();
678 		rc = PAHO_MEMORY_ERROR;
679 		goto exit;
680 	}
681 	if (ListAppend(handles, m, sizeof(MQTTClients)) == NULL)
682 	{
683 		free(m->serverURI);
684 		free(m);
685 		if (bstate->clients->count == 0)
686 			MQTTClient_terminate();
687 		rc = PAHO_MEMORY_ERROR;
688 		goto exit;
689 	}
690 #else
691 	ListAppend(handles, m, sizeof(MQTTClients));
692 #endif
693 
694 	if ((m->c = malloc(sizeof(Clients))) == NULL)
695 	{
696 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
697 		free(m->serverURI);
698 #endif
699 		ListRemove(handles, m); //m has been free in ListRemove.
700 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
701 		if (bstate->clients->count == 0)
702 			MQTTClient_terminate();
703 #endif
704 		rc = PAHO_MEMORY_ERROR;
705 		goto exit;
706 	}
707 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
708 	mem_ret = memset_s(m->c, sizeof(Clients), '\0', sizeof(Clients));
709 	if (mem_ret != 0)
710 	{
711 		free(m->c);
712 		free(m->serverURI);
713 		ListRemove(handles, m); //m has been free in ListRemove.
714 		if (bstate->clients->count == 0)
715 			MQTTClient_terminate();
716 		rc = PAHO_MEMORY_ERROR;
717 		goto exit;
718 	}
719 #else
720 	memset(m->c, '\0', sizeof(Clients));
721 #endif
722 	m->c->context = m;
723 	m->c->MQTTVersion = (options) ? options->MQTTVersion : MQTTVERSION_DEFAULT;
724 	m->c->outboundMsgs = ListInitialize();
725 	m->c->inboundMsgs = ListInitialize();
726 	m->c->messageQueue = ListInitialize();
727 	m->c->outboundQueue = ListInitialize();
728 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
729 	m->c->retryMsgs = -1;
730 #endif
731 	m->c->clientID = MQTTStrdup(clientId);
732 #if defined(ZERO_SOCK_FD_IS_INVALID)
733 	m->c->net.socket = 0;
734 #else
735 	m->c->net.socket = -1;
736 #endif
737 	m->connect_sem = Thread_create_sem(&rc);
738 	m->connack_sem = Thread_create_sem(&rc);
739 	m->suback_sem = Thread_create_sem(&rc);
740 	m->unsuback_sem = Thread_create_sem(&rc);
741 
742 #if !defined(NO_PERSISTENCE)
743 	rc = MQTTPersistence_create(&(m->c->persistence), persistence_type, persistence_context);
744 	if (rc == 0)
745 	{
746 		rc = MQTTPersistence_initialize(m->c, m->serverURI);
747 		if (rc == 0)
748 			MQTTPersistence_restoreMessageQueue(m->c);
749 	}
750 #endif
751 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
752 	if (m->c->outboundMsgs == NULL ||
753 		m->c->inboundMsgs == NULL  ||
754 		m->c->messageQueue == NULL ||
755 		m->c->clientID == NULL ||
756 		m->connect_sem == NULL ||
757 		m->connack_sem == NULL ||
758 		m->suback_sem  == NULL ||
759 		m->unsuback_sem == NULL
760 #if !defined(NO_PERSISTENCE)
761 		|| rc != 0
762 #endif
763 		)
764 	{
765 #if !defined(NO_PERSISTENCE)
766 		if (rc != 0)
767 			MQTTPersistence_close(m->c);
768 #endif
769 		if (m->c->outboundMsgs != NULL)
770 			ListFree(m->c->outboundMsgs);
771 		if (m->c->inboundMsgs != NULL)
772 			ListFree(m->c->inboundMsgs);
773 		if (m->c->messageQueue != NULL)
774 			ListFree(m->c->messageQueue);
775 		if (m->c->clientID != NULL)
776 			free(m->c->clientID);
777 		if (m->connect_sem != NULL)
778 			Thread_destroy_sem(m->connect_sem);
779 		if (m->connack_sem != NULL)
780 			Thread_destroy_sem(m->connack_sem);
781 		if (m->suback_sem != NULL)
782 			Thread_destroy_sem(m->suback_sem);
783 		if (m->unsuback_sem != NULL)
784 			Thread_destroy_sem(m->unsuback_sem);
785 
786 		free(m->c);
787 		free(m->serverURI);
788 		ListRemove(handles, m);
789 		if (bstate->clients->count == 0)
790 			MQTTClient_terminate();
791 
792 		rc = PAHO_MEMORY_ERROR;
793 		goto exit;
794 	}
795 #endif
796 
797 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
798 	if (!ListAppend(bstate->clients, m->c, sizeof(Clients) + 3*sizeof(List)))
799 	{
800 #if !defined(NO_PERSISTENCE)
801 		if (rc != 0)
802 			MQTTPersistence_close(m->c);
803 #endif
804 		if (m->c->outboundMsgs != NULL)
805 			ListFree(m->c->outboundMsgs);
806 		if (m->c->inboundMsgs != NULL)
807 			ListFree(m->c->inboundMsgs);
808 		if (m->c->messageQueue != NULL)
809 			ListFree(m->c->messageQueue);
810 		if (m->c->clientID != NULL)
811 			free(m->c->clientID);
812 		if (m->connect_sem != NULL)
813 			Thread_destroy_sem(m->connect_sem);
814 		if (m->connack_sem != NULL)
815 			Thread_destroy_sem(m->connack_sem);
816 		if (m->suback_sem != NULL)
817 			Thread_destroy_sem(m->suback_sem);
818 		if (m->unsuback_sem != NULL)
819 			Thread_destroy_sem(m->unsuback_sem);
820 
821 		free(m->c);
822 		free(m->serverURI);
823 		ListRemove(handles, m);
824 		if (bstate->clients->count == 0)
825 			MQTTClient_terminate();
826 
827 		rc = PAHO_MEMORY_ERROR;
828 		goto exit;
829 	}
830 #else
831 	ListAppend(bstate->clients, m->c, sizeof(Clients) + 3*sizeof(List));
832 #endif
833 
834 exit:
835 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
836 	if (handle != NULL && rc != 0)
837 		*handle = NULL;
838 #endif
839 	Thread_unlock_mutex(mqttclient_mutex);
840 nounlock_exit:
841 	FUNC_EXIT_RC(rc);
842 	return rc;
843 }
844 
845 
MQTTClient_create(MQTTClient * handle,const char * serverURI,const char * clientId,int persistence_type,void * persistence_context)846 int MQTTClient_create(MQTTClient* handle, const char* serverURI, const char* clientId,
847 		int persistence_type, void* persistence_context)
848 {
849 	return MQTTClient_createWithOptions(handle, serverURI, clientId, persistence_type,
850 		persistence_context, NULL);
851 }
852 
853 
MQTTClient_terminate(void)854 static void MQTTClient_terminate(void)
855 {
856 	FUNC_ENTRY;
857 	MQTTClient_stop();
858 	if (library_initialized)
859 	{
860 		ListFree(bstate->clients);
861 		ListFree(handles);
862 		handles = NULL;
863 		WebSocket_terminate();
864 		#if !defined(NO_HEAP_TRACKING)
865 			Heap_terminate();
866 		#endif
867 #if !defined(IOT_CONNECT) && !defined(IOT_LITEOS_ADAPT)
868 		Log_terminate();
869 #endif
870 		library_initialized = 0;
871 	}
872 	FUNC_EXIT;
873 }
874 
875 
MQTTClient_emptyMessageQueue(Clients * client)876 static void MQTTClient_emptyMessageQueue(Clients* client)
877 {
878 	FUNC_ENTRY;
879 	/* empty message queue */
880 	if (client->messageQueue->count > 0)
881 	{
882 		ListElement* current = NULL;
883 		while (ListNextElement(client->messageQueue, &current))
884 		{
885 			qEntry* qe = (qEntry*)(current->content);
886 			free(qe->topicName);
887 			MQTTProperties_free(&qe->msg->properties);
888 			free(qe->msg->payload);
889 			free(qe->msg);
890 		}
891 		ListEmpty(client->messageQueue);
892 	}
893 	FUNC_EXIT;
894 }
895 
896 
MQTTClient_destroy(MQTTClient * handle)897 void MQTTClient_destroy(MQTTClient* handle)
898 {
899 	MQTTClients* m = *handle;
900 
901 	FUNC_ENTRY;
902 	Thread_lock_mutex(connect_mutex);
903 	Thread_lock_mutex(mqttclient_mutex);
904 
905 	if (m == NULL)
906 		goto exit;
907 
908 	if (m->c)
909 	{
910 		SOCKET saved_socket = m->c->net.socket;
911 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
912 		char* saved_clientid = NULL;
913 		if (m->c->clientID != NULL)
914 			saved_clientid = MQTTStrdup(m->c->clientID);
915 #else
916 		char* saved_clientid = MQTTStrdup(m->c->clientID);
917 #endif
918 #if !defined(NO_PERSISTENCE)
919 		MQTTPersistence_close(m->c);
920 #endif
921 		MQTTClient_emptyMessageQueue(m->c);
922 		MQTTProtocol_freeClient(m->c);
923 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
924 		if (bstate != NULL && bstate->clients != NULL)
925 		{
926 			if (!ListRemove(bstate->clients, m->c))
927 				Log(LOG_ERROR, 0, NULL);
928 			else
929 				Log(TRACE_MIN, 1, NULL, saved_clientid, saved_socket);
930 		}
931 
932 		if (saved_clientid != NULL)
933 			free(saved_clientid);
934 #else
935 		if (!ListRemove(bstate->clients, m->c))
936 			Log(LOG_ERROR, 0, NULL);
937 		else
938 			Log(TRACE_MIN, 1, NULL, saved_clientid, saved_socket);
939 		free(saved_clientid);
940 #endif
941 	}
942 	if (m->serverURI)
943 		free(m->serverURI);
944 	Thread_destroy_sem(m->connect_sem);
945 	Thread_destroy_sem(m->connack_sem);
946 	Thread_destroy_sem(m->suback_sem);
947 	Thread_destroy_sem(m->unsuback_sem);
948 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
949 	if (handles != NULL && ListRemove(handles, m) == 0)
950 #else
951 	if (!ListRemove(handles, m))
952 #endif
953 		Log(LOG_ERROR, -1, "free error");
954 	*handle = NULL;
955 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
956 	if (bstate != NULL && bstate->clients != NULL && bstate->clients->count == 0)
957 #else
958 	if (bstate->clients->count == 0)
959 #endif
960 		MQTTClient_terminate();
961 
962 exit:
963 	Thread_unlock_mutex(mqttclient_mutex);
964 	Thread_unlock_mutex(connect_mutex);
965 	FUNC_EXIT;
966 }
967 
968 
MQTTClient_freeMessage(MQTTClient_message ** message)969 void MQTTClient_freeMessage(MQTTClient_message** message)
970 {
971 	FUNC_ENTRY;
972 	MQTTProperties_free(&(*message)->properties);
973 	free((*message)->payload);
974 	free(*message);
975 	*message = NULL;
976 	FUNC_EXIT;
977 }
978 
979 
MQTTClient_free(void * memory)980 void MQTTClient_free(void* memory)
981 {
982 	FUNC_ENTRY;
983 	free(memory);
984 	FUNC_EXIT;
985 }
986 
987 
MQTTResponse_free(MQTTResponse response)988 void MQTTResponse_free(MQTTResponse response)
989 {
990 	FUNC_ENTRY;
991 	if (response.reasonCodeCount > 0 && response.reasonCodes)
992 		free(response.reasonCodes);
993 	if (response.properties)
994 	{
995 		MQTTProperties_free(response.properties);
996 		free(response.properties);
997 	}
998 	FUNC_EXIT;
999 }
1000 
1001 
MQTTClient_deliverMessage(int rc,MQTTClients * m,char ** topicName,int * topicLen,MQTTClient_message ** message)1002 static int MQTTClient_deliverMessage(int rc, MQTTClients* m, char** topicName, int* topicLen, MQTTClient_message** message)
1003 {
1004 	qEntry* qe = (qEntry*)(m->c->messageQueue->first->content);
1005 
1006 	FUNC_ENTRY;
1007 	*message = qe->msg;
1008 	*topicName = qe->topicName;
1009 	*topicLen = qe->topicLen;
1010 	if (strlen(*topicName) != *topicLen)
1011 		rc = MQTTCLIENT_TOPICNAME_TRUNCATED;
1012 #if !defined(NO_PERSISTENCE)
1013 	if (m->c->persistence)
1014 		MQTTPersistence_unpersistQueueEntry(m->c, (MQTTPersistence_qEntry*)qe);
1015 #endif
1016 	ListRemove(m->c->messageQueue, m->c->messageQueue->first->content);
1017 	FUNC_EXIT_RC(rc);
1018 	return rc;
1019 }
1020 
1021 
1022 /**
1023  * List callback function for comparing clients by socket
1024  * @param a first integer value
1025  * @param b second integer value
1026  * @return boolean indicating whether a and b are equal
1027  */
clientSockCompare(void * a,void * b)1028 static int clientSockCompare(void* a, void* b)
1029 {
1030 	MQTTClients* m = (MQTTClients*)a;
1031 	return m->c->net.socket == *(int*)b;
1032 }
1033 
1034 
1035 /**
1036  * Wrapper function to call connection lost on a separate thread.  A separate thread is needed to allow the
1037  * connectionLost function to make API calls (e.g. connect)
1038  * @param context a pointer to the relevant client
1039  * @return thread_return_type standard thread return value - not used here
1040  */
connectionLost_call(void * context)1041 static thread_return_type WINAPI connectionLost_call(void* context)
1042 {
1043 	struct conlost_sync_data *data = (struct conlost_sync_data *)context;
1044 	MQTTClients* m = (MQTTClients *)data->m;
1045 
1046 	(*(m->cl))(m->context, NULL);
1047 
1048 	Thread_post_sem(data->sem);
1049 	return 0;
1050 }
1051 
1052 
MQTTClient_setDisconnected(MQTTClient handle,void * context,MQTTClient_disconnected * disconnected)1053 int MQTTClient_setDisconnected(MQTTClient handle, void* context, MQTTClient_disconnected* disconnected)
1054 {
1055 	int rc = MQTTCLIENT_SUCCESS;
1056 	MQTTClients* m = handle;
1057 
1058 	FUNC_ENTRY;
1059 	Thread_lock_mutex(mqttclient_mutex);
1060 
1061 	if (m == NULL || m->c->connect_state != NOT_IN_PROGRESS)
1062 		rc = MQTTCLIENT_FAILURE;
1063 	else
1064 	{
1065 		m->disconnected_context = context;
1066 		m->disconnected = disconnected;
1067 	}
1068 
1069 	Thread_unlock_mutex(mqttclient_mutex);
1070 	FUNC_EXIT_RC(rc);
1071 	return rc;
1072 }
1073 
1074 
1075 
1076 /**
1077  * Wrapper function to call disconnected on a separate thread.  A separate thread is needed to allow the
1078  * disconnected function to make API calls (e.g. connect)
1079  * @param context a pointer to the relevant client
1080  * @return thread_return_type standard thread return value - not used here
1081  */
call_disconnected(void * context)1082 static thread_return_type WINAPI call_disconnected(void* context)
1083 {
1084 	struct props_rc_parms* pr = (struct props_rc_parms*)context;
1085 
1086 	(*(pr->m->disconnected))(pr->m->disconnected_context, pr->properties, pr->reasonCode);
1087 	MQTTProperties_free(pr->properties);
1088 	free(pr->properties);
1089 	free(pr);
1090 	return 0;
1091 }
1092 
1093 
MQTTClient_setPublished(MQTTClient handle,void * context,MQTTClient_published * published)1094 int MQTTClient_setPublished(MQTTClient handle, void* context, MQTTClient_published* published)
1095 {
1096 	int rc = MQTTCLIENT_SUCCESS;
1097 	MQTTClients* m = handle;
1098 
1099 	FUNC_ENTRY;
1100 	Thread_lock_mutex(mqttclient_mutex);
1101 
1102 	if (m == NULL || m->c->connect_state != NOT_IN_PROGRESS)
1103 		rc = MQTTCLIENT_FAILURE;
1104 	else
1105 	{
1106 		m->published_context = context;
1107 		m->published = published;
1108 	}
1109 
1110 	Thread_unlock_mutex(mqttclient_mutex);
1111 	FUNC_EXIT_RC(rc);
1112 	return rc;
1113 }
1114 
1115 
1116 #if 0
1117 int MQTTClient_setHandleAuth(MQTTClient handle, void* context, MQTTClient_handleAuth* auth_handle)
1118 {
1119 	int rc = MQTTCLIENT_SUCCESS;
1120 	MQTTClients* m = handle;
1121 
1122 	FUNC_ENTRY;
1123 	Thread_lock_mutex(mqttclient_mutex);
1124 
1125 	if (m == NULL || m->c->connect_state != NOT_IN_PROGRESS)
1126 		rc = MQTTCLIENT_FAILURE;
1127 	else
1128 	{
1129 		m->auth_handle_context = context;
1130 		m->auth_handle = auth_handle;
1131 	}
1132 
1133 	Thread_unlock_mutex(mqttclient_mutex);
1134 	FUNC_EXIT_RC(rc);
1135 	return rc;
1136 }
1137 
1138 
1139 /**
1140  * Wrapper function to call authHandle on a separate thread.  A separate thread is needed to allow the
1141  * disconnected function to make API calls (e.g. MQTTClient_auth)
1142  * @param context a pointer to the relevant client
1143  * @return thread_return_type standard thread return value - not used here
1144  */
1145 static thread_return_type WINAPI call_auth_handle(void* context)
1146 {
1147 	struct props_rc_parms* pr = (struct props_rc_parms*)context;
1148 
1149 	(*(pr->m->auth_handle))(pr->m->auth_handle_context, pr->properties, pr->reasonCode);
1150 	MQTTProperties_free(pr->properties);
1151 	free(pr->properties);
1152 	free(pr);
1153 	return 0;
1154 }
1155 #endif
1156 
1157 
1158 /* This is the thread function that handles the calling of callback functions if set */
MQTTClient_run(void * n)1159 static thread_return_type WINAPI MQTTClient_run(void* n)
1160 {
1161 	long timeout = 10L; /* first time in we have a small timeout.  Gets things started more quickly */
1162 
1163 	FUNC_ENTRY;
1164 	Thread_set_name("MQTTClient_run");
1165 	Thread_lock_mutex(mqttclient_mutex);
1166 
1167 	run_id = Thread_getid();
1168 	running = 1;
1169 	while (!tostop)
1170 	{
1171 		int rc = SOCKET_ERROR;
1172 		SOCKET sock = -1;
1173 		MQTTClients* m = NULL;
1174 		MQTTPacket* pack = NULL;
1175 
1176 		Thread_unlock_mutex(mqttclient_mutex);
1177 		pack = MQTTClient_cycle(&sock, timeout, &rc);
1178 		Thread_lock_mutex(mqttclient_mutex);
1179 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
1180 		if (tostop != 0 || handles == NULL)
1181 #else
1182 		if (tostop)
1183 #endif
1184 			break;
1185 		timeout = 100L;
1186 
1187 		/* find client corresponding to socket */
1188 		if (ListFindItem(handles, &sock, clientSockCompare) == NULL)
1189 		{
1190 			/* assert: should not happen */
1191 			continue;
1192 		}
1193 		m = (MQTTClient)(handles->current->content);
1194 		if (m == NULL)
1195 		{
1196 			/* assert: should not happen */
1197 			continue;
1198 		}
1199 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
1200 		if (m->c != NULL && m->c->messageQueue != NULL && m->c->messageQueue->count > 0)
1201 			timeout = 0L;
1202 #endif
1203 		if (rc == SOCKET_ERROR)
1204 		{
1205 			if (m->c->connected)
1206 				MQTTClient_disconnect_internal(m, 0);
1207 			else
1208 			{
1209 				if (m->c->connect_state == SSL_IN_PROGRESS)
1210 				{
1211 					Log(TRACE_MIN, -1, "Posting connect semaphore for client %s", m->c->clientID);
1212 					m->c->connect_state = NOT_IN_PROGRESS;
1213 					Thread_post_sem(m->connect_sem);
1214 				}
1215 				if (m->c->connect_state == WAIT_FOR_CONNACK)
1216 				{
1217 					Log(TRACE_MIN, -1, "Posting connack semaphore for client %s", m->c->clientID);
1218 					m->c->connect_state = NOT_IN_PROGRESS;
1219 					Thread_post_sem(m->connack_sem);
1220 				}
1221 			}
1222 		}
1223 		else
1224 		{
1225 			if (m->c->messageQueue->count > 0 && m->ma)
1226 			{
1227 				qEntry* qe = (qEntry*)(m->c->messageQueue->first->content);
1228 				int topicLen = qe->topicLen;
1229 
1230 				if (strlen(qe->topicName) == topicLen)
1231 					topicLen = 0;
1232 
1233 				Log(TRACE_MIN, -1, "Calling messageArrived for client %s, queue depth %d",
1234 					m->c->clientID, m->c->messageQueue->count);
1235 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
1236 				uapi_watchdog_kick();
1237 #endif
1238 				Thread_unlock_mutex(mqttclient_mutex);
1239 				rc = (*(m->ma))(m->context, qe->topicName, topicLen, qe->msg);
1240 				Thread_lock_mutex(mqttclient_mutex);
1241 				/* if 0 (false) is returned by the callback then it failed, so we don't remove the message from
1242 				 * the queue, and it will be retried later.  If 1 is returned then the message data may have been freed,
1243 				 * so we must be careful how we use it.
1244 				 */
1245 				if (rc)
1246 				{
1247 					#if !defined(NO_PERSISTENCE)
1248 					if (m->c->persistence)
1249 						MQTTPersistence_unpersistQueueEntry(m->c, (MQTTPersistence_qEntry*)qe);
1250 					#endif
1251 					ListRemove(m->c->messageQueue, qe);
1252 				}
1253 				else
1254 					Log(TRACE_MIN, -1, "False returned from messageArrived for client %s, message remains on queue",
1255 						m->c->clientID);
1256 			}
1257 			if (pack)
1258 			{
1259 				if (pack->header.bits.type == CONNACK)
1260 				{
1261 					Log(TRACE_MIN, -1, "Posting connack semaphore for client %s", m->c->clientID);
1262 					m->pack = pack;
1263 					Thread_post_sem(m->connack_sem);
1264 				}
1265 				else if (pack->header.bits.type == SUBACK)
1266 				{
1267 					Log(TRACE_MIN, -1, "Posting suback semaphore for client %s", m->c->clientID);
1268 					m->pack = pack;
1269 					Thread_post_sem(m->suback_sem);
1270 				}
1271 				else if (pack->header.bits.type == UNSUBACK)
1272 				{
1273 					Log(TRACE_MIN, -1, "Posting unsuback semaphore for client %s", m->c->clientID);
1274 					m->pack = pack;
1275 					Thread_post_sem(m->unsuback_sem);
1276 				}
1277 				else if (m->c->MQTTVersion >= MQTTVERSION_5)
1278 				{
1279 					if (pack->header.bits.type == DISCONNECT && m->disconnected)
1280 					{
1281 						struct props_rc_parms* dp;
1282 						Ack* disc = (Ack*)pack;
1283 
1284 						dp = malloc(sizeof(struct props_rc_parms));
1285 						if (dp)
1286 						{
1287 							dp->m = m;
1288 							dp->reasonCode = disc->rc;
1289 							dp->properties = malloc(sizeof(MQTTProperties));
1290 							if (dp->properties)
1291 							{
1292 								*(dp->properties) = disc->properties;
1293 								MQTTClient_disconnect1(m, 10, 0, 1, MQTTREASONCODE_SUCCESS, NULL);
1294 								Log(TRACE_MIN, -1, "Calling disconnected for client %s", m->c->clientID);
1295 								Thread_start(call_disconnected, dp);
1296 							}
1297 							else
1298 								free(dp);
1299 						}
1300 						free(disc);
1301 					}
1302 #if 0
1303 					if (pack->header.bits.type == AUTH && m->auth_handle)
1304 					{
1305 						struct props_rc_parms dp;
1306 						Ack* disc = (Ack*)pack;
1307 
1308 						dp.m = m;
1309 						dp.properties = &disc->properties;
1310 						dp.reasonCode = disc->rc;
1311 						free(pack);
1312 						Log(TRACE_MIN, -1, "Calling auth_handle for client %s", m->c->clientID);
1313 						Thread_start(call_auth_handle, &dp);
1314 					}
1315 #endif
1316 				}
1317 			}
1318 			else if (m->c->connect_state == TCP_IN_PROGRESS)
1319 			{
1320 				int error = 0;
1321 				socklen_t len = sizeof(error);
1322 
1323 				if ((m->rc = getsockopt(m->c->net.socket, SOL_SOCKET, SO_ERROR, (char*)&error, &len)) == 0)
1324 					m->rc = error;
1325 				Log(TRACE_MIN, -1, "Posting connect semaphore for client %s rc %d", m->c->clientID, m->rc);
1326 				m->c->connect_state = NOT_IN_PROGRESS;
1327 				Thread_post_sem(m->connect_sem);
1328 			}
1329 #if defined(OPENSSL) || defined(MBEDTLS)
1330 			else if (m->c->connect_state == SSL_IN_PROGRESS)
1331 			{
1332 				rc = m->c->sslopts->struct_version >= 3 ?
1333 					SSLSocket_connect(m->c->net.ssl, m->c->net.socket, m->serverURI,
1334 						m->c->sslopts->verify, m->c->sslopts->ssl_error_cb, m->c->sslopts->ssl_error_context) :
1335 					SSLSocket_connect(m->c->net.ssl, m->c->net.socket, m->serverURI,
1336 						m->c->sslopts->verify, NULL, NULL);
1337 				if (rc == 1 || rc == SSL_FATAL)
1338 				{
1339 					if (rc == 1 && (m->c->cleansession == 0 && m->c->cleanstart == 0) && m->c->session == NULL)
1340 #if defined(OPENSSL)
1341 						m->c->session = SSL_get1_session(m->c->net.ssl);
1342 #elif defined(MBEDTLS)
1343 						mbedtls_ssl_get_session(m->c->net.ssl, m->c->session);
1344 #endif
1345 					m->rc = rc;
1346 					Log(TRACE_MIN, -1, "Posting connect semaphore for SSL client %s rc %d", m->c->clientID, m->rc);
1347 					m->c->connect_state = NOT_IN_PROGRESS;
1348 					Thread_post_sem(m->connect_sem);
1349 				}
1350 			}
1351 #endif
1352 			else if (m->c->connect_state == WEBSOCKET_IN_PROGRESS)
1353 			{
1354 				if (rc != TCPSOCKET_INTERRUPTED)
1355 				{
1356 					Log(TRACE_MIN, -1, "Posting websocket handshake for client %s rc %d", m->c->clientID, m->rc);
1357 					m->c->connect_state = WAIT_FOR_CONNACK;
1358 					Thread_post_sem(m->connect_sem);
1359 				}
1360 			}
1361 		}
1362 	}
1363 	run_id = 0;
1364 	running = tostop = 0;
1365 	Thread_unlock_mutex(mqttclient_mutex);
1366 	FUNC_EXIT;
1367 #if defined(_WIN32) || defined(_WIN64)
1368 	ExitThread(0);
1369 #endif
1370 	return 0;
1371 }
1372 
1373 
MQTTClient_stop(void)1374 static int MQTTClient_stop(void)
1375 {
1376 	int rc = 0;
1377 
1378 	FUNC_ENTRY;
1379 	if (running == 1 && tostop == 0)
1380 	{
1381 		int conn_count = 0;
1382 		ListElement* current = NULL;
1383 
1384 		if (handles != NULL)
1385 		{
1386 			/* find out how many handles are still connected */
1387 			while (ListNextElement(handles, &current))
1388 			{
1389 				if (((MQTTClients*)(current->content))->c->connect_state > NOT_IN_PROGRESS ||
1390 						((MQTTClients*)(current->content))->c->connected)
1391 					++conn_count;
1392 			}
1393 		}
1394 		Log(TRACE_MIN, -1, "Conn_count is %d", conn_count);
1395 		/* stop the background thread, if we are the last one to be using it */
1396 		if (conn_count == 0)
1397 		{
1398 			int count = 0;
1399 			tostop = 1;
1400 			if (Thread_getid() != run_id)
1401 			{
1402 				while (running && ++count < 100)
1403 				{
1404 					Thread_unlock_mutex(mqttclient_mutex);
1405 					Log(TRACE_MIN, -1, "sleeping");
1406 					MQTTTime_sleep(100L);
1407 					Thread_lock_mutex(mqttclient_mutex);
1408 				}
1409 			}
1410 			rc = 1;
1411 		}
1412 	}
1413 	FUNC_EXIT_RC(rc);
1414 	return rc;
1415 }
1416 
1417 
MQTTClient_setCallbacks(MQTTClient handle,void * context,MQTTClient_connectionLost * cl,MQTTClient_messageArrived * ma,MQTTClient_deliveryComplete * dc)1418 int MQTTClient_setCallbacks(MQTTClient handle, void* context, MQTTClient_connectionLost* cl,
1419 														MQTTClient_messageArrived* ma, MQTTClient_deliveryComplete* dc)
1420 {
1421 	int rc = MQTTCLIENT_SUCCESS;
1422 	MQTTClients* m = handle;
1423 
1424 	FUNC_ENTRY;
1425 	Thread_lock_mutex(mqttclient_mutex);
1426 
1427 	if (m == NULL || ma == NULL || m->c->connect_state != NOT_IN_PROGRESS)
1428 		rc = MQTTCLIENT_FAILURE;
1429 	else
1430 	{
1431 		m->context = context;
1432 		m->cl = cl;
1433 		m->ma = ma;
1434 		m->dc = dc;
1435 	}
1436 
1437 	Thread_unlock_mutex(mqttclient_mutex);
1438 	FUNC_EXIT_RC(rc);
1439 	return rc;
1440 }
1441 
1442 
MQTTClient_closeSession(Clients * client,enum MQTTReasonCodes reason,MQTTProperties * props)1443 static void MQTTClient_closeSession(Clients* client, enum MQTTReasonCodes reason, MQTTProperties* props)
1444 {
1445 	FUNC_ENTRY;
1446 	client->good = 0;
1447 	client->ping_outstanding = 0;
1448 	client->ping_due = 0;
1449 #if defined(ZERO_SOCK_FD_IS_INVALID)
1450 	if (client->net.socket > 0)
1451 #else
1452 	if (client->net.socket >= 0)
1453 #endif
1454 	{
1455 		if (client->connected)
1456 			MQTTPacket_send_disconnect(client, reason, props);
1457 		Thread_lock_mutex(socket_mutex);
1458 		WebSocket_close(&client->net, WebSocket_CLOSE_NORMAL, NULL);
1459 
1460 #if defined(OPENSSL)
1461 		SSL_SESSION_free(client->session); /* is a no-op if session is NULL */
1462 #elif defined(MBEDTLS)
1463 		mbedtls_ssl_session_free(client->session);
1464 #endif
1465 #if defined(OPENSSL) || defined(MBEDTLS)
1466 		client->session = NULL; /* show the session has been freed */
1467 		SSLSocket_close(&client->net);
1468 #endif
1469 		Socket_close(client->net.socket);
1470 		Thread_unlock_mutex(socket_mutex);
1471 #if defined(ZERO_SOCK_FD_IS_INVALID)
1472 		client->net.socket = 0;
1473 #else
1474 		client->net.socket = -1;
1475 #endif
1476 #if defined(OPENSSL) || defined(MBEDTLS)
1477 		client->net.ssl = NULL;
1478 #endif
1479 	}
1480 	client->connected = 0;
1481 	client->connect_state = NOT_IN_PROGRESS;
1482 
1483 	if (client->MQTTVersion < MQTTVERSION_5 && client->cleansession)
1484 		MQTTClient_cleanSession(client);
1485 	FUNC_EXIT;
1486 }
1487 
1488 
MQTTClient_cleanSession(Clients * client)1489 static int MQTTClient_cleanSession(Clients* client)
1490 {
1491 	int rc = 0;
1492 
1493 	FUNC_ENTRY;
1494 #if !defined(NO_PERSISTENCE)
1495 	rc = MQTTPersistence_clear(client);
1496 #endif
1497 	MQTTProtocol_emptyMessageList(client->inboundMsgs);
1498 	MQTTProtocol_emptyMessageList(client->outboundMsgs);
1499 	MQTTClient_emptyMessageQueue(client);
1500 	client->msgID = 0;
1501 	FUNC_EXIT_RC(rc);
1502 	return rc;
1503 }
1504 
1505 
Protocol_processPublication(Publish * publish,Clients * client,int allocatePayload)1506 void Protocol_processPublication(Publish* publish, Clients* client, int allocatePayload)
1507 {
1508 	qEntry* qe = NULL;
1509 	MQTTClient_message* mm = NULL;
1510 	MQTTClient_message initialized = MQTTClient_message_initializer;
1511 
1512 	FUNC_ENTRY;
1513 	qe = malloc(sizeof(qEntry));
1514 	if (!qe)
1515 		goto exit;
1516 	mm = malloc(sizeof(MQTTClient_message));
1517 	if (!mm)
1518 	{
1519 		free(qe);
1520 		goto exit;
1521 	}
1522 	memcpy(mm, &initialized, sizeof(MQTTClient_message));
1523 
1524 	qe->msg = mm;
1525 	qe->topicName = publish->topic;
1526 	qe->topicLen = publish->topiclen;
1527 	publish->topic = NULL;
1528 	if (allocatePayload)
1529 	{
1530 		mm->payload = malloc(publish->payloadlen);
1531 		if (mm->payload == NULL)
1532 		{
1533 			free(mm);
1534 			free(qe);
1535 			goto exit;
1536 		}
1537 		memcpy(mm->payload, publish->payload, publish->payloadlen);
1538 	}
1539 	else
1540 		mm->payload = publish->payload;
1541 	mm->payloadlen = publish->payloadlen;
1542 	mm->qos = publish->header.bits.qos;
1543 	mm->retained = publish->header.bits.retain;
1544 	if (publish->header.bits.qos == 2)
1545 		mm->dup = 0;  /* ensure that a QoS2 message is not passed to the application with dup = 1 */
1546 	else
1547 		mm->dup = publish->header.bits.dup;
1548 	mm->msgid = publish->msgId;
1549 
1550 	if (publish->MQTTVersion >= 5)
1551 		mm->properties = MQTTProperties_copy(&publish->properties);
1552 
1553 	ListAppend(client->messageQueue, qe, sizeof(qe) + sizeof(mm) + mm->payloadlen + strlen(qe->topicName)+1);
1554 #if !defined(NO_PERSISTENCE)
1555 	if (client->persistence)
1556 		MQTTPersistence_persistQueueEntry(client, (MQTTPersistence_qEntry*)qe);
1557 #endif
1558 exit:
1559 	FUNC_EXIT;
1560 }
1561 
1562 
MQTTClient_connectURIVersion(MQTTClient handle,MQTTClient_connectOptions * options,const char * serverURI,int MQTTVersion,START_TIME_TYPE start,ELAPSED_TIME_TYPE millisecsTimeout,MQTTProperties * connectProperties,MQTTProperties * willProperties)1563 static MQTTResponse MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOptions* options, const char* serverURI, int MQTTVersion,
1564 	START_TIME_TYPE start, ELAPSED_TIME_TYPE millisecsTimeout, MQTTProperties* connectProperties, MQTTProperties* willProperties)
1565 {
1566 	MQTTClients* m = handle;
1567 	int rc = SOCKET_ERROR;
1568 	int sessionPresent = 0;
1569 	MQTTResponse resp = MQTTResponse_initializer;
1570 
1571 	FUNC_ENTRY;
1572 	resp.reasonCode = SOCKET_ERROR;
1573 	if (m->ma && !running)
1574 	{
1575 		int count = 0;
1576 
1577 		Thread_start(MQTTClient_run, handle);
1578 #if defined(IOT_CONNECT)
1579 		MQTTTime_sleep(100L);
1580 #endif
1581 		if (MQTTTime_elapsed(start) >= millisecsTimeout)
1582 		{
1583 			rc = SOCKET_ERROR;
1584 			goto exit;
1585 		}
1586 		while (!running && ++count < 5)
1587 		{
1588 			Thread_unlock_mutex(mqttclient_mutex);
1589 			MQTTTime_sleep(100L);
1590 			Thread_lock_mutex(mqttclient_mutex);
1591 		}
1592 		if (!running)
1593 		{
1594 			rc = SOCKET_ERROR;
1595 			goto exit;
1596 		}
1597 	}
1598 
1599 	Log(TRACE_MIN, -1, "Connecting to serverURI %s with MQTT version %d", serverURI, MQTTVersion);
1600 #if defined(OPENSSL) || defined(MBEDTLS)
1601 #if defined(__GNUC__) && defined(__linux__) && !defined(__LITEOS__)
1602 	rc = MQTTProtocol_connect(serverURI, m->c, m->ssl, m->websocket, MQTTVersion, connectProperties, willProperties,
1603 			millisecsTimeout - MQTTTime_elapsed(start));
1604 #else
1605 	rc = MQTTProtocol_connect(serverURI, m->c, m->ssl, m->websocket, MQTTVersion, connectProperties, willProperties);
1606 #endif
1607 #else
1608 #if defined(__GNUC__) && defined(__linux__) && !defined(__LITEOS__)
1609 	rc = MQTTProtocol_connect(serverURI, m->c, m->websocket, MQTTVersion, connectProperties, willProperties,
1610 			millisecsTimeout - MQTTTime_elapsed(start));
1611 #else
1612 	rc = MQTTProtocol_connect(serverURI, m->c, m->websocket, MQTTVersion, connectProperties, willProperties);
1613 #endif
1614 #endif
1615 	if (rc == SOCKET_ERROR)
1616 		goto exit;
1617 
1618 	if (m->c->connect_state == NOT_IN_PROGRESS)
1619 	{
1620 		rc = SOCKET_ERROR;
1621 		goto exit;
1622 	}
1623 
1624 	if (m->c->connect_state == TCP_IN_PROGRESS) /* TCP connect started - wait for completion */
1625 	{
1626 		Thread_unlock_mutex(mqttclient_mutex);
1627 		MQTTClient_waitfor(handle, CONNECT, &rc, millisecsTimeout - MQTTTime_elapsed(start));
1628 		Thread_lock_mutex(mqttclient_mutex);
1629 		if (rc != 0)
1630 		{
1631 			rc = SOCKET_ERROR;
1632 			goto exit;
1633 		}
1634 #if defined(OPENSSL) || defined(MBEDTLS)
1635 		if (m->ssl)
1636 		{
1637 			int port1;
1638 			size_t hostname_len;
1639 			const char *topic;
1640 			int setSocketForSSLrc = 0;
1641 
1642 			if (m->c->net.https_proxy) {
1643 				m->c->connect_state = PROXY_CONNECT_IN_PROGRESS;
1644 				if ((rc = Proxy_connect( &m->c->net, 1, serverURI)) == SOCKET_ERROR )
1645 					goto exit;
1646 			}
1647 
1648 			hostname_len = MQTTProtocol_addressPort(serverURI, &port1, &topic, MQTT_DEFAULT_PORT);
1649 			setSocketForSSLrc = SSLSocket_setSocketForSSL(&m->c->net, m->c->sslopts,
1650 				serverURI, hostname_len);
1651 
1652 			if (setSocketForSSLrc != MQTTCLIENT_SUCCESS)
1653 			{
1654 				if (m->c->session != NULL)
1655 #if defined(OPENSSL)
1656 					if ((rc = SSL_set_session(m->c->net.ssl, m->c->session)) != 1)
1657 #elif defined(MBEDTLS)
1658 					if ((rc = mbedtls_ssl_set_session(m->c->net.ssl, m->c->session)) != 0)
1659 #endif
1660 						Log(TRACE_MIN, -1, "Failed to set SSL session with stored data, non critical");
1661 				rc = m->c->sslopts->struct_version >= 3 ?
1662 					SSLSocket_connect(m->c->net.ssl, m->c->net.socket, serverURI,
1663 						m->c->sslopts->verify, m->c->sslopts->ssl_error_cb, m->c->sslopts->ssl_error_context) :
1664 					SSLSocket_connect(m->c->net.ssl, m->c->net.socket, serverURI,
1665 						m->c->sslopts->verify, NULL, NULL);
1666 				if (rc == TCPSOCKET_INTERRUPTED)
1667 					m->c->connect_state = SSL_IN_PROGRESS;  /* the connect is still in progress */
1668 				else if (rc == SSL_FATAL)
1669 				{
1670 					rc = SOCKET_ERROR;
1671 					goto exit;
1672 				}
1673 				else if (rc == 1)
1674 				{
1675 					if (m->websocket)
1676 					{
1677 						m->c->connect_state = WEBSOCKET_IN_PROGRESS;
1678 						rc = WebSocket_connect(&m->c->net, 1, serverURI);
1679 						if ( rc == SOCKET_ERROR )
1680 							goto exit;
1681 					}
1682 					else
1683 					{
1684 						rc = MQTTCLIENT_SUCCESS;
1685 						m->c->connect_state = WAIT_FOR_CONNACK;
1686 						if (MQTTPacket_send_connect(m->c, MQTTVersion, connectProperties, willProperties) == SOCKET_ERROR)
1687 						{
1688 							rc = SOCKET_ERROR;
1689 							goto exit;
1690 						}
1691 						if ((m->c->cleansession == 0 && m->c->cleanstart == 0) && m->c->session == NULL)
1692 #if defined(OPENSSL)
1693 							m->c->session = SSL_get1_session(m->c->net.ssl);
1694 #elif defined(MBEDTLS)
1695 							mbedtls_ssl_get_session(m->c->net.ssl, m->c->session);
1696 #endif
1697 					}
1698 				}
1699 			}
1700 			else
1701 			{
1702 				rc = SOCKET_ERROR;
1703 				goto exit;
1704 			}
1705 		}
1706 #endif
1707 		else
1708 		{
1709 			if (m->c->net.http_proxy) {
1710 				m->c->connect_state = PROXY_CONNECT_IN_PROGRESS;
1711 				if ((rc = Proxy_connect( &m->c->net, 0, serverURI)) == SOCKET_ERROR )
1712 					goto exit;
1713 			}
1714 
1715 			if (m->websocket)
1716 			{
1717 				m->c->connect_state = WEBSOCKET_IN_PROGRESS;
1718 				if ( WebSocket_connect(&m->c->net, 0, serverURI) == SOCKET_ERROR )
1719 				{
1720 					rc = SOCKET_ERROR;
1721 					goto exit;
1722 				}
1723 			}
1724 			else
1725 			{
1726 				m->c->connect_state = WAIT_FOR_CONNACK; /* TCP connect completed, in which case send the MQTT connect packet */
1727 				if (MQTTPacket_send_connect(m->c, MQTTVersion, connectProperties, willProperties) == SOCKET_ERROR)
1728 				{
1729 					rc = SOCKET_ERROR;
1730 					goto exit;
1731 				}
1732 			}
1733 		}
1734 	}
1735 
1736 #if defined(OPENSSL) || defined(MBEDTLS)
1737 	if (m->c->connect_state == SSL_IN_PROGRESS) /* SSL connect sent - wait for completion */
1738 	{
1739 		Thread_unlock_mutex(mqttclient_mutex);
1740 		MQTTClient_waitfor(handle, CONNECT, &rc, millisecsTimeout - MQTTTime_elapsed(start));
1741 		Thread_lock_mutex(mqttclient_mutex);
1742 		if (rc != 1)
1743 		{
1744 			rc = SOCKET_ERROR;
1745 			goto exit;
1746 		}
1747 		if((m->c->cleansession == 0 && m->c->cleanstart == 0) && m->c->session == NULL)
1748 #if defined(OPENSSL)
1749 			m->c->session = SSL_get1_session(m->c->net.ssl);
1750 #elif defined(MBEDTLS)
1751 			mbedtls_ssl_get_session(m->c->net.ssl, m->c->session);
1752 #endif
1753 
1754 		if ( m->websocket )
1755 		{
1756 			/* wait for websocket connect */
1757 			m->c->connect_state = WEBSOCKET_IN_PROGRESS;
1758 			rc = WebSocket_connect( &m->c->net, 1, serverURI);
1759 			if ( rc != 1 )
1760 			{
1761 				rc = SOCKET_ERROR;
1762 				goto exit;
1763 			}
1764 		}
1765 		else
1766 		{
1767 			m->c->connect_state = WAIT_FOR_CONNACK; /* TCP connect completed, in which case send the MQTT connect packet */
1768 			if (MQTTPacket_send_connect(m->c, MQTTVersion, connectProperties, willProperties) == SOCKET_ERROR)
1769 			{
1770 				rc = SOCKET_ERROR;
1771 				goto exit;
1772 			}
1773 		}
1774 	}
1775 #endif
1776 
1777 	if (m->c->connect_state == WEBSOCKET_IN_PROGRESS) /* websocket request sent - wait for upgrade */
1778 	{
1779 		Thread_unlock_mutex(mqttclient_mutex);
1780 		MQTTClient_waitfor(handle, CONNECT, &rc, millisecsTimeout - MQTTTime_elapsed(start));
1781 		Thread_lock_mutex(mqttclient_mutex);
1782 		m->c->connect_state = WAIT_FOR_CONNACK; /* websocket upgrade complete */
1783 		if (MQTTPacket_send_connect(m->c, MQTTVersion, connectProperties, willProperties) == SOCKET_ERROR)
1784 		{
1785 			rc = SOCKET_ERROR;
1786 			goto exit;
1787 		}
1788 	}
1789 
1790 	if (m->c->connect_state == WAIT_FOR_CONNACK) /* MQTT connect sent - wait for CONNACK */
1791 	{
1792 		MQTTPacket* pack = NULL;
1793 		Thread_unlock_mutex(mqttclient_mutex);
1794 		pack = MQTTClient_waitfor(handle, CONNACK, &rc, millisecsTimeout - MQTTTime_elapsed(start));
1795 		Thread_lock_mutex(mqttclient_mutex);
1796 		if (pack == NULL)
1797 			rc = SOCKET_ERROR;
1798 		else
1799 		{
1800 			Connack* connack = (Connack*)pack;
1801 			Log(TRACE_PROTOCOL, 1, NULL, m->c->net.socket, m->c->clientID, connack->rc);
1802 			if ((rc = connack->rc) == MQTTCLIENT_SUCCESS)
1803 			{
1804 				m->c->connected = 1;
1805 				m->c->good = 1;
1806 				m->c->connect_state = NOT_IN_PROGRESS;
1807 				if (MQTTVersion == 4)
1808 					sessionPresent = connack->flags.bits.sessionPresent;
1809 				if (m->c->cleansession || m->c->cleanstart)
1810 					rc = MQTTClient_cleanSession(m->c);
1811 				if (m->c->outboundMsgs->count > 0)
1812 				{
1813 					ListElement* outcurrent = NULL;
1814 					START_TIME_TYPE zero = START_TIME_ZERO;
1815 
1816 					while (ListNextElement(m->c->outboundMsgs, &outcurrent))
1817 					{
1818 						Messages* m = (Messages*)(outcurrent->content);
1819 						memset(&m->lastTouch, '\0', sizeof(m->lastTouch));
1820 					}
1821 					MQTTProtocol_retry(zero, 1, 1);
1822 					if (m->c->connected != 1)
1823 						rc = MQTTCLIENT_DISCONNECTED;
1824 				}
1825 				if (m->c->MQTTVersion == MQTTVERSION_5)
1826 				{
1827 					if ((resp.properties = malloc(sizeof(MQTTProperties))) == NULL)
1828 					{
1829 						rc = PAHO_MEMORY_ERROR;
1830 						goto exit;
1831 					}
1832 					*resp.properties = MQTTProperties_copy(&connack->properties);
1833 				}
1834 			}
1835 			MQTTPacket_freeConnack(connack);
1836 			m->pack = NULL;
1837 		}
1838 	}
1839 exit:
1840 	if (rc == MQTTCLIENT_SUCCESS)
1841 	{
1842 		if (options->struct_version >= 4) /* means we have to fill out return values */
1843 		{
1844 			options->returned.serverURI = serverURI;
1845 			options->returned.MQTTVersion = MQTTVersion;
1846 			options->returned.sessionPresent = sessionPresent;
1847 		}
1848 	}
1849 	else
1850 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
1851 		MQTTClient_disconnect1(handle, 0, 0, MQTTVersion == 3, MQTTREASONCODE_SUCCESS, NULL); /* don't want to call connection lost */
1852 #else
1853 		MQTTClient_disconnect1(handle, 0, 0, (MQTTVersion == 3), MQTTREASONCODE_SUCCESS, NULL); /* don't want to call connection lost */
1854 #endif
1855 	resp.reasonCode = rc;
1856 	FUNC_EXIT_RC(resp.reasonCode);
1857 	return resp;
1858 }
1859 
1860 
1861 static int retryLoopIntervalms = 5000;
1862 
setRetryLoopInterval(int keepalive)1863 void setRetryLoopInterval(int keepalive)
1864 {
1865 	retryLoopIntervalms = (keepalive*1000) / 10;
1866 
1867 	if (retryLoopIntervalms < 100)
1868 		retryLoopIntervalms = 100;
1869 	else if (retryLoopIntervalms  > 5000)
1870 		retryLoopIntervalms = 5000;
1871 }
1872 
1873 
MQTTClient_connectURI(MQTTClient handle,MQTTClient_connectOptions * options,const char * serverURI,MQTTProperties * connectProperties,MQTTProperties * willProperties)1874 static MQTTResponse MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectOptions* options, const char* serverURI,
1875 		MQTTProperties* connectProperties, MQTTProperties* willProperties)
1876 {
1877 	MQTTClients* m = handle;
1878 	START_TIME_TYPE start;
1879 	ELAPSED_TIME_TYPE millisecsTimeout = 30000L;
1880 	MQTTResponse rc = MQTTResponse_initializer;
1881 	int MQTTVersion = 0;
1882 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
1883 	int mem_ret = -1;
1884 #endif
1885 
1886 	FUNC_ENTRY;
1887 	rc.reasonCode = SOCKET_ERROR;
1888 	millisecsTimeout = options->connectTimeout * 1000;
1889 	start = MQTTTime_start_clock();
1890 
1891 	m->currentServerURI = serverURI;
1892 	m->c->keepAliveInterval = options->keepAliveInterval;
1893 	m->c->retryInterval = options->retryInterval;
1894 	setRetryLoopInterval(options->keepAliveInterval);
1895 	m->c->MQTTVersion = options->MQTTVersion;
1896 	m->c->cleanstart = m->c->cleansession = 0;
1897 	if (m->c->MQTTVersion >= MQTTVERSION_5)
1898 		m->c->cleanstart = options->cleanstart;
1899 	else
1900 		m->c->cleansession = options->cleansession;
1901 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
1902 	m->c->maxInflightMessages = (options->reliable) ? 1 : 2;
1903 #else
1904 	m->c->maxInflightMessages = (options->reliable) ? 1 : 10;
1905 #endif
1906 	if (options->struct_version >= 6)
1907 	{
1908 		if (options->maxInflightMessages > 0)
1909 			m->c->maxInflightMessages = options->maxInflightMessages;
1910 	}
1911 
1912 	if (options->struct_version >= 7)
1913 	{
1914 		m->c->net.httpHeaders = options->httpHeaders;
1915 	}
1916 	if (options->struct_version >= 8)
1917 	{
1918 		if (options->httpProxy)
1919 			m->c->httpProxy = MQTTStrdup(options->httpProxy);
1920 		if (options->httpsProxy)
1921 			m->c->httpsProxy = MQTTStrdup(options->httpsProxy);
1922 	}
1923 
1924 	if (m->c->will)
1925 	{
1926 		free(m->c->will->payload);
1927 		free(m->c->will->topic);
1928 		free(m->c->will);
1929 		m->c->will = NULL;
1930 	}
1931 
1932 	if (options->will && (options->will->struct_version == 0 || options->will->struct_version == 1))
1933 	{
1934 		const void* source = NULL;
1935 
1936 		if ((m->c->will = malloc(sizeof(willMessages))) == NULL)
1937 		{
1938 			rc.reasonCode = PAHO_MEMORY_ERROR;
1939 			goto exit;
1940 		}
1941 		if (options->will->message || (options->will->struct_version == 1 && options->will->payload.data))
1942 		{
1943 			if (options->will->struct_version == 1 && options->will->payload.data)
1944 			{
1945 				m->c->will->payloadlen = options->will->payload.len;
1946 				source = options->will->payload.data;
1947 			}
1948 			else
1949 			{
1950 				m->c->will->payloadlen = (int)strlen(options->will->message);
1951 				source = (void*)options->will->message;
1952 			}
1953 			if ((m->c->will->payload = malloc(m->c->will->payloadlen)) == NULL)
1954 			{
1955 				free(m->c->will);
1956 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
1957 				m->c->will = NULL;
1958 #endif
1959 				rc.reasonCode = PAHO_MEMORY_ERROR;
1960 				goto exit;
1961 			}
1962 			memcpy(m->c->will->payload, source, m->c->will->payloadlen);
1963 		}
1964 		else
1965 		{
1966 			m->c->will->payload = NULL;
1967 			m->c->will->payloadlen = 0;
1968 		}
1969 		m->c->will->qos = options->will->qos;
1970 		m->c->will->retained = options->will->retained;
1971 		m->c->will->topic = MQTTStrdup(options->will->topicName);
1972 	}
1973 
1974 #if defined(OPENSSL) || defined(MBEDTLS)
1975 	if (m->c->sslopts)
1976 	{
1977 		if (m->c->sslopts->trustStore)
1978 			free((void*)m->c->sslopts->trustStore);
1979 		if (m->c->sslopts->keyStore)
1980 			free((void*)m->c->sslopts->keyStore);
1981 		if (m->c->sslopts->privateKey)
1982 			free((void*)m->c->sslopts->privateKey);
1983 		if (m->c->sslopts->privateKeyPassword)
1984 			free((void*)m->c->sslopts->privateKeyPassword);
1985 		if (m->c->sslopts->enabledCipherSuites)
1986 			free((void*)m->c->sslopts->enabledCipherSuites);
1987 		if (m->c->sslopts->struct_version >= 2)
1988 		{
1989 			if (m->c->sslopts->CApath)
1990 				free((void*)m->c->sslopts->CApath);
1991 		}
1992 		free(m->c->sslopts);
1993 		m->c->sslopts = NULL;
1994 	}
1995 
1996 	if (options->struct_version != 0 && options->ssl)
1997 	{
1998 		if ((m->c->sslopts = malloc(sizeof(MQTTClient_SSLOptions))) == NULL)
1999 		{
2000 			rc.reasonCode = PAHO_MEMORY_ERROR;
2001 			goto exit;
2002 		}
2003 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
2004 		mem_ret = memset_s(m->c->sslopts, sizeof(MQTTClient_SSLOptions), '\0', sizeof(MQTTClient_SSLOptions));
2005 		if (mem_ret != 0)
2006 		{
2007 			free(m->c->sslopts);
2008 			m->c->sslopts = NULL;
2009 			rc.reasonCode = PAHO_MEMORY_ERROR;
2010 			goto exit;
2011 		}
2012 #else
2013 		memset(m->c->sslopts, '\0', sizeof(MQTTClient_SSLOptions));
2014 #endif
2015 		m->c->sslopts->struct_version = options->ssl->struct_version;
2016 		if (options->ssl->trustStore)
2017 			m->c->sslopts->trustStore = MQTTStrdup(options->ssl->trustStore);
2018 		if (options->ssl->keyStore)
2019 			m->c->sslopts->keyStore = MQTTStrdup(options->ssl->keyStore);
2020 		if (options->ssl->privateKey)
2021 			m->c->sslopts->privateKey = MQTTStrdup(options->ssl->privateKey);
2022 		if (options->ssl->privateKeyPassword)
2023 			m->c->sslopts->privateKeyPassword = MQTTStrdup(options->ssl->privateKeyPassword);
2024 		if (options->ssl->enabledCipherSuites)
2025 			m->c->sslopts->enabledCipherSuites = MQTTStrdup(options->ssl->enabledCipherSuites);
2026 		m->c->sslopts->enableServerCertAuth = options->ssl->enableServerCertAuth;
2027 		if (m->c->sslopts->struct_version >= 1)
2028 			m->c->sslopts->sslVersion = options->ssl->sslVersion;
2029 		if (m->c->sslopts->struct_version >= 2)
2030 		{
2031 			m->c->sslopts->verify = options->ssl->verify;
2032 			if (options->ssl->CApath)
2033 				m->c->sslopts->CApath = MQTTStrdup(options->ssl->CApath);
2034 		}
2035 		if (m->c->sslopts->struct_version >= 3)
2036 		{
2037 			m->c->sslopts->ssl_error_cb = options->ssl->ssl_error_cb;
2038 			m->c->sslopts->ssl_error_context = options->ssl->ssl_error_context;
2039 		}
2040 		if (m->c->sslopts->struct_version >= 4)
2041 		{
2042 			m->c->sslopts->ssl_psk_cb = options->ssl->ssl_psk_cb;
2043 			m->c->sslopts->ssl_psk_context = options->ssl->ssl_psk_context;
2044 			m->c->sslopts->disableDefaultTrustStore = options->ssl->disableDefaultTrustStore;
2045 		}
2046 		if (m->c->sslopts->struct_version >= 5)
2047 		{
2048 		    m->c->sslopts->protos = options->ssl->protos;
2049 		    m->c->sslopts->protos_len = options->ssl->protos_len;
2050 		}
2051 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
2052 #if defined(MBEDTLS_USE_CRT)
2053 		if (options->ssl->los_trustStore != NULL)
2054 			m->c->sslopts->los_trustStore = options->ssl->los_trustStore;
2055 		if (options->ssl->los_keyStore != NULL)
2056 			m->c->sslopts->los_keyStore = options->ssl->los_keyStore;
2057 		if (options->ssl->los_privateKey != NULL)
2058 			m->c->sslopts->los_privateKey = options->ssl->los_privateKey;
2059 #endif /* MBEDTLS_USE_CRT */
2060 #if defined(MBEDTLS_USE_PSK)
2061 		if (options->ssl->psk != NULL)
2062 		{
2063 			m->c->sslopts->psk = options->ssl->psk;
2064 			m->c->sslopts->psk_len = options->ssl->psk_len;
2065 		}
2066 		if (options->ssl->psk_id != NULL)
2067 		{
2068 			m->c->sslopts->psk_id = options->ssl->psk_id;
2069 			m->c->sslopts->psk_id_len = options->ssl->psk_id_len;
2070 		}
2071 #endif /* MBEDTLS_USE_PSK */
2072 #endif
2073 	}
2074 #endif
2075 
2076 	if (m->c->username)
2077 	{
2078 		free((void*)m->c->username);
2079 		m->c->username = NULL;
2080 	}
2081 	if (options->username)
2082 		m->c->username = MQTTStrdup(options->username);
2083 	if (m->c->password)
2084 	{
2085 		free((void*)m->c->password);
2086 		m->c->password = NULL;
2087 	}
2088 	if (options->password)
2089 	{
2090 		m->c->password = MQTTStrdup(options->password);
2091 		m->c->passwordlen = (int)strlen(options->password);
2092 	}
2093 	else if (options->struct_version >= 5 && options->binarypwd.data)
2094 	{
2095 		m->c->passwordlen = options->binarypwd.len;
2096 		if ((m->c->password = malloc(m->c->passwordlen)) == NULL)
2097 		{
2098 			rc.reasonCode = PAHO_MEMORY_ERROR;
2099 			goto exit;
2100 		}
2101 		memcpy((void*)m->c->password, options->binarypwd.data, m->c->passwordlen);
2102 	}
2103 
2104 	if (options->struct_version >= 3)
2105 		MQTTVersion = options->MQTTVersion;
2106 	else
2107 		MQTTVersion = MQTTVERSION_DEFAULT;
2108 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
2109 	if (MQTTVersion == MQTTVERSION_DEFAULT)
2110 		MQTTVersion = MQTTVERSION_3_1_1;
2111 
2112 	if (MQTTVersion != MQTTVERSION_3_1)
2113 	{
2114 		rc = MQTTClient_connectURIVersion(handle, options, serverURI, MQTTVersion, start, millisecsTimeout,
2115 				connectProperties, willProperties);
2116 		if (rc.reasonCode == MQTTCLIENT_SUCCESS)
2117 			goto exit;
2118 	}
2119 	MQTTTime_sleep(50L); // wait for the disconnect to complete.
2120 	start = MQTTTime_start_clock();
2121 	rc = MQTTClient_connectURIVersion(handle, options, serverURI, MQTTVERSION_3_1, start, millisecsTimeout,
2122 			connectProperties, willProperties);
2123 #else
2124 	if (MQTTVersion == MQTTVERSION_DEFAULT)
2125 	{
2126 		rc = MQTTClient_connectURIVersion(handle, options, serverURI, 4, start, millisecsTimeout,
2127 				connectProperties, willProperties);
2128 		if (rc.reasonCode != MQTTCLIENT_SUCCESS)
2129 		{
2130 			rc = MQTTClient_connectURIVersion(handle, options, serverURI, 3, start, millisecsTimeout,
2131 					connectProperties, willProperties);
2132 		}
2133 	}
2134 	else
2135 		rc = MQTTClient_connectURIVersion(handle, options, serverURI, MQTTVersion, start, millisecsTimeout,
2136 				connectProperties, willProperties);
2137 #endif
2138 exit:
2139 	FUNC_EXIT_RC(rc.reasonCode);
2140 	return rc;
2141 }
2142 
2143 MQTTResponse MQTTClient_connectAll(MQTTClient handle, MQTTClient_connectOptions* options,
2144 		MQTTProperties* connectProperties, MQTTProperties* willProperties);
2145 
MQTTClient_connect(MQTTClient handle,MQTTClient_connectOptions * options)2146 int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions* options)
2147 {
2148 	MQTTClients* m = handle;
2149 	MQTTResponse response;
2150 
2151 	if (m != NULL && m->c != NULL && m->c->MQTTVersion >= MQTTVERSION_5)
2152 		return MQTTCLIENT_WRONG_MQTT_VERSION;
2153 
2154 	response = MQTTClient_connectAll(handle, options, NULL, NULL);
2155 
2156 	return response.reasonCode;
2157 }
2158 
2159 
MQTTClient_connect5(MQTTClient handle,MQTTClient_connectOptions * options,MQTTProperties * connectProperties,MQTTProperties * willProperties)2160 MQTTResponse MQTTClient_connect5(MQTTClient handle, MQTTClient_connectOptions* options,
2161 		MQTTProperties* connectProperties, MQTTProperties* willProperties)
2162 {
2163 	MQTTClients* m = handle;
2164 	MQTTResponse response = MQTTResponse_initializer;
2165 
2166 	if (m != NULL && m->c != NULL && m->c->MQTTVersion < MQTTVERSION_5)
2167 	{
2168 		response.reasonCode = MQTTCLIENT_WRONG_MQTT_VERSION;
2169 		return response;
2170 	}
2171 
2172 	return MQTTClient_connectAll(handle, options, connectProperties, willProperties);
2173 }
2174 
2175 
MQTTClient_connectAll(MQTTClient handle,MQTTClient_connectOptions * options,MQTTProperties * connectProperties,MQTTProperties * willProperties)2176 MQTTResponse MQTTClient_connectAll(MQTTClient handle, MQTTClient_connectOptions* options,
2177 		MQTTProperties* connectProperties, MQTTProperties* willProperties)
2178 {
2179 	MQTTClients* m = handle;
2180 	MQTTResponse rc = MQTTResponse_initializer;
2181 
2182 	FUNC_ENTRY;
2183 	Thread_lock_mutex(connect_mutex);
2184 	Thread_lock_mutex(mqttclient_mutex);
2185 
2186 	rc.reasonCode = SOCKET_ERROR;
2187 	if (!library_initialized)
2188 	{
2189 		rc.reasonCode = MQTTCLIENT_FAILURE;
2190 		goto exit;
2191 	}
2192 
2193 	if (options == NULL || m == NULL || m->c == NULL)
2194 	{
2195 		rc.reasonCode = MQTTCLIENT_NULL_PARAMETER;
2196 		goto exit;
2197 	}
2198 
2199 	if (strncmp(options->struct_id, "MQTC", 4) != 0 || options->struct_version < 0 || options->struct_version > 8)
2200 	{
2201 		rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
2202 		goto exit;
2203 	}
2204 
2205 #if defined(OPENSSL) || defined(MBEDTLS)
2206 	if (m->ssl && options->ssl == NULL)
2207 	{
2208 		rc.reasonCode = MQTTCLIENT_NULL_PARAMETER;
2209 		goto exit;
2210 	}
2211 #endif
2212 
2213 	if (options->will) /* check validity of will options structure */
2214 	{
2215 		if (strncmp(options->will->struct_id, "MQTW", 4) != 0 || (options->will->struct_version != 0 && options->will->struct_version != 1))
2216 		{
2217 			rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
2218 			goto exit;
2219 		}
2220 		if (options->will->qos < 0 || options->will->qos > 2)
2221 		{
2222 			rc.reasonCode = MQTTCLIENT_BAD_QOS;
2223 			goto exit;
2224 		}
2225 		if (options->will->topicName == NULL)
2226 		{
2227 			rc.reasonCode = MQTTCLIENT_NULL_PARAMETER;
2228 			goto exit;
2229 		} else if (strlen(options->will->topicName) == 0)
2230 		{
2231 			rc.reasonCode = MQTTCLIENT_0_LEN_WILL_TOPIC;
2232 			goto exit;
2233 		}
2234 	}
2235 
2236 
2237 #if defined(OPENSSL) || defined(MBEDTLS)
2238 	if (options->struct_version != 0 && options->ssl) /* check validity of SSL options structure */
2239 	{
2240 		if (strncmp(options->ssl->struct_id, "MQTS", 4) != 0 || options->ssl->struct_version < 0 || options->ssl->struct_version > 5)
2241 		{
2242 			rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
2243 			goto exit;
2244 		}
2245 	}
2246 #endif
2247 
2248 	if ((options->username && !UTF8_validateString(options->username)) ||
2249 		(options->password && !UTF8_validateString(options->password)))
2250 	{
2251 		rc.reasonCode = MQTTCLIENT_BAD_UTF8_STRING;
2252 		goto exit;
2253 	}
2254 
2255 	if (options->MQTTVersion != MQTTVERSION_DEFAULT &&
2256 			(options->MQTTVersion < MQTTVERSION_3_1 || options->MQTTVersion > MQTTVERSION_5))
2257 	{
2258 		rc.reasonCode = MQTTCLIENT_BAD_MQTT_VERSION;
2259 		goto exit;
2260 	}
2261 
2262 	if (options->MQTTVersion >= MQTTVERSION_5)
2263 	{
2264 		if (options->cleansession != 0)
2265 		{
2266 			rc.reasonCode = MQTTCLIENT_BAD_MQTT_OPTION;
2267 			goto exit;
2268 		}
2269 	}
2270 	else if (options->cleanstart != 0)
2271 	{
2272 		rc.reasonCode = MQTTCLIENT_BAD_MQTT_OPTION;
2273 		goto exit;
2274 	}
2275 
2276 	if (options->struct_version < 2 || options->serverURIcount == 0)
2277 	{
2278 		if ( !m )
2279 		{
2280 			rc.reasonCode = MQTTCLIENT_NULL_PARAMETER;
2281 			goto exit;
2282 		}
2283 		rc = MQTTClient_connectURI(handle, options, m->serverURI, connectProperties, willProperties);
2284 	}
2285 	else
2286 	{
2287 		int i;
2288 
2289 		for (i = 0; i < options->serverURIcount; ++i)
2290 		{
2291 			char* serverURI = options->serverURIs[i];
2292 
2293 			if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0)
2294 				serverURI += strlen(URI_TCP);
2295 			else if (strncmp(URI_MQTT, serverURI, strlen(URI_MQTT)) == 0)
2296 				serverURI += strlen(URI_TCP);
2297 			else if (strncmp(URI_WS, serverURI, strlen(URI_WS)) == 0)
2298 			{
2299 				serverURI += strlen(URI_WS);
2300 				m->websocket = 1;
2301 			}
2302 #if defined(OPENSSL) || defined(MBEDTLS)
2303 			else if (strncmp(URI_SSL, serverURI, strlen(URI_SSL)) == 0)
2304 			{
2305 				serverURI += strlen(URI_SSL);
2306 				m->ssl = 1;
2307 			}
2308 			else if (strncmp(URI_MQTTS, serverURI, strlen(URI_MQTTS)) == 0)
2309 			{
2310 				serverURI += strlen(URI_MQTTS);
2311 				m->ssl = 1;
2312 			}
2313 			else if (strncmp(URI_WSS, serverURI, strlen(URI_WSS)) == 0)
2314 			{
2315 				serverURI += strlen(URI_WSS);
2316 				m->ssl = 1;
2317 				m->websocket = 1;
2318 			}
2319 #endif
2320 			rc = MQTTClient_connectURI(handle, options, serverURI, connectProperties, willProperties);
2321 			if (rc.reasonCode == MQTTREASONCODE_SUCCESS)
2322 				break;
2323 		}
2324 	}
2325 	if (rc.reasonCode == MQTTREASONCODE_SUCCESS)
2326 	{
2327 		if (rc.properties && MQTTProperties_hasProperty(rc.properties, MQTTPROPERTY_CODE_RECEIVE_MAXIMUM))
2328 		{
2329 			int recv_max = MQTTProperties_getNumericValue(rc.properties, MQTTPROPERTY_CODE_RECEIVE_MAXIMUM);
2330 			if (m->c->maxInflightMessages > recv_max)
2331 				m->c->maxInflightMessages = recv_max;
2332 		}
2333 	}
2334 
2335 exit:
2336 	if (m && m->c && m->c->will)
2337 	{
2338 		if (m->c->will->payload)
2339 			free(m->c->will->payload);
2340 		if (m->c->will->topic)
2341 			free(m->c->will->topic);
2342 		free(m->c->will);
2343 		m->c->will = NULL;
2344 	}
2345 	Thread_unlock_mutex(mqttclient_mutex);
2346 	Thread_unlock_mutex(connect_mutex);
2347 	FUNC_EXIT_RC(rc.reasonCode);
2348 	return rc;
2349 }
2350 
2351 
2352 /**
2353  * mqttclient_mutex must be locked when you call this function, if multi threaded
2354  */
MQTTClient_disconnect1(MQTTClient handle,int timeout,int call_connection_lost,int stop,enum MQTTReasonCodes reason,MQTTProperties * props)2355 static int MQTTClient_disconnect1(MQTTClient handle, int timeout, int call_connection_lost, int stop,
2356 		enum MQTTReasonCodes reason, MQTTProperties* props)
2357 {
2358 	MQTTClients* m = handle;
2359 	START_TIME_TYPE start;
2360 	int rc = MQTTCLIENT_SUCCESS;
2361 	int was_connected = 0;
2362 	struct conlost_sync_data sync = {
2363 		NULL, m
2364 	};
2365 
2366 	FUNC_ENTRY;
2367 	if (m == NULL || m->c == NULL)
2368 	{
2369 		rc = MQTTCLIENT_FAILURE;
2370 		goto exit;
2371 	}
2372 	was_connected = m->c->connected; /* should be 1 */
2373 	if (m->c->connected != 0)
2374 	{
2375 		start = MQTTTime_start_clock();
2376 		m->c->connect_state = DISCONNECTING; /* indicate disconnecting */
2377 		while (m->c->inboundMsgs->count > 0 || m->c->outboundMsgs->count > 0)
2378 		{ /* wait for all inflight message flows to finish, up to timeout */
2379 			if (MQTTTime_elapsed(start) >= (ELAPSED_TIME_TYPE)timeout)
2380 				break;
2381 			Thread_unlock_mutex(mqttclient_mutex);
2382 			MQTTClient_yield();
2383 			Thread_lock_mutex(mqttclient_mutex);
2384 		}
2385 	}
2386 
2387 	MQTTClient_closeSession(m->c, reason, props);
2388 
2389 exit:
2390 	if (stop)
2391 		MQTTClient_stop();
2392 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
2393 	if (call_connection_lost && was_connected && m->cl != NULL)
2394 #else
2395 	if (call_connection_lost && m->cl && was_connected)
2396 #endif
2397 	{
2398 		sync.sem = Thread_create_sem(&rc);
2399 		Log(TRACE_MIN, -1, "Calling connectionLost for client %s", m->c->clientID);
2400 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
2401 		connectionLost_call(&sync);
2402 #else
2403 		Thread_start(connectionLost_call, &sync);
2404 #endif
2405 		Thread_wait_sem(sync.sem, 5000);
2406 		Thread_destroy_sem(sync.sem);
2407 	}
2408 	FUNC_EXIT_RC(rc);
2409 	return rc;
2410 }
2411 
2412 
2413 /**
2414  * mqttclient_mutex must be locked when you call this function, if multi threaded
2415  */
MQTTClient_disconnect_internal(MQTTClient handle,int timeout)2416 static int MQTTClient_disconnect_internal(MQTTClient handle, int timeout)
2417 {
2418 	return MQTTClient_disconnect1(handle, timeout, 1, 1, MQTTREASONCODE_SUCCESS, NULL);
2419 }
2420 
2421 
2422 /**
2423  * mqttclient_mutex must be locked when you call this function, if multi threaded
2424  */
MQTTProtocol_closeSession(Clients * c,int sendwill)2425 void MQTTProtocol_closeSession(Clients* c, int sendwill)
2426 {
2427 	MQTTClient_disconnect_internal((MQTTClient)c->context, 0);
2428 }
2429 
2430 
MQTTClient_disconnect(MQTTClient handle,int timeout)2431 int MQTTClient_disconnect(MQTTClient handle, int timeout)
2432 {
2433 	int rc = 0;
2434 
2435 	Thread_lock_mutex(mqttclient_mutex);
2436 	rc = MQTTClient_disconnect1(handle, timeout, 0, 1, MQTTREASONCODE_SUCCESS, NULL);
2437 	Thread_unlock_mutex(mqttclient_mutex);
2438 	return rc;
2439 }
2440 
2441 
MQTTClient_disconnect5(MQTTClient handle,int timeout,enum MQTTReasonCodes reason,MQTTProperties * props)2442 int MQTTClient_disconnect5(MQTTClient handle, int timeout, enum MQTTReasonCodes reason, MQTTProperties* props)
2443 {
2444 	int rc = 0;
2445 
2446 	Thread_lock_mutex(mqttclient_mutex);
2447 	rc = MQTTClient_disconnect1(handle, timeout, 0, 1, reason, props);
2448 	Thread_unlock_mutex(mqttclient_mutex);
2449 	return rc;
2450 }
2451 
2452 
MQTTClient_isConnected(MQTTClient handle)2453 int MQTTClient_isConnected(MQTTClient handle)
2454 {
2455 	MQTTClients* m = handle;
2456 	int rc = 0;
2457 
2458 	FUNC_ENTRY;
2459 	Thread_lock_mutex(mqttclient_mutex);
2460 	if (m && m->c)
2461 		rc = m->c->connected;
2462 	Thread_unlock_mutex(mqttclient_mutex);
2463 	FUNC_EXIT_RC(rc);
2464 	return rc;
2465 }
2466 
2467 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
MQTTClient_getConnectState(MQTTClient handle)2468 int MQTTClient_getConnectState(MQTTClient handle)
2469 {
2470 	MQTTClients* m = handle;
2471 	int rc = 0;
2472 
2473 	FUNC_ENTRY;
2474 	Thread_lock_mutex(mqttclient_mutex);
2475 	if (m != NULL && m->c != NULL)
2476 		rc = m->c->connect_state;
2477 	Thread_unlock_mutex(mqttclient_mutex);
2478 	FUNC_EXIT_RC(rc);
2479 	return rc;
2480 }
2481 #endif
2482 
MQTTClient_subscribeMany5(MQTTClient handle,int count,char * const * topic,int * qos,MQTTSubscribe_options * opts,MQTTProperties * props)2483 MQTTResponse MQTTClient_subscribeMany5(MQTTClient handle, int count, char* const* topic,
2484 		int* qos, MQTTSubscribe_options* opts, MQTTProperties* props)
2485 {
2486 	MQTTClients* m = handle;
2487 	List* topics = NULL;
2488 	List* qoss = NULL;
2489 	int i = 0;
2490 	int rc = MQTTCLIENT_FAILURE;
2491 	MQTTResponse resp = MQTTResponse_initializer;
2492 	int msgid = 0;
2493 
2494 	FUNC_ENTRY;
2495 	Thread_lock_mutex(subscribe_mutex);
2496 	Thread_lock_mutex(mqttclient_mutex);
2497 
2498 	resp.reasonCode = MQTTCLIENT_FAILURE;
2499 	if (m == NULL || m->c == NULL)
2500 	{
2501 		rc = MQTTCLIENT_FAILURE;
2502 		goto exit;
2503 	}
2504 	if (m->c->connected == 0)
2505 	{
2506 		rc = MQTTCLIENT_DISCONNECTED;
2507 		goto exit;
2508 	}
2509 	for (i = 0; i < count; i++)
2510 	{
2511 		if (!UTF8_validateString(topic[i]))
2512 		{
2513 			rc = MQTTCLIENT_BAD_UTF8_STRING;
2514 			goto exit;
2515 		}
2516 
2517 		if (qos[i] < 0 || qos[i] > 2)
2518 		{
2519 			rc = MQTTCLIENT_BAD_QOS;
2520 			goto exit;
2521 		}
2522 	}
2523 	if ((msgid = MQTTProtocol_assignMsgId(m->c)) == 0)
2524 	{
2525 		rc = MQTTCLIENT_MAX_MESSAGES_INFLIGHT;
2526 		goto exit;
2527 	}
2528 
2529 	topics = ListInitialize();
2530 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
2531 	if (topics == NULL)
2532 	{
2533 		rc = PAHO_MEMORY_ERROR;
2534 		goto exit;
2535 	}
2536 #endif
2537 	qoss = ListInitialize();
2538 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
2539 	if (qoss == NULL)
2540 	{
2541 		ListFreeNoContent(topics);
2542 		rc = PAHO_MEMORY_ERROR;
2543 		goto exit;
2544 	}
2545 #endif
2546 
2547 	for (i = 0; i < count; i++)
2548 	{
2549 		ListAppend(topics, topic[i], strlen(topic[i]));
2550 		ListAppend(qoss, &qos[i], sizeof(int));
2551 	}
2552 
2553 	rc = MQTTProtocol_subscribe(m->c, topics, qoss, msgid, opts, props);
2554 	ListFreeNoContent(topics);
2555 	ListFreeNoContent(qoss);
2556 
2557 	if (rc == TCPSOCKET_COMPLETE)
2558 	{
2559 		MQTTPacket* pack = NULL;
2560 
2561 		Thread_unlock_mutex(mqttclient_mutex);
2562 		pack = MQTTClient_waitfor(handle, SUBACK, &rc, m->commandTimeout);
2563 		Thread_lock_mutex(mqttclient_mutex);
2564 		if (pack != NULL)
2565 		{
2566 			Suback* sub = (Suback*)pack;
2567 
2568 			if (m->c->MQTTVersion == MQTTVERSION_5)
2569 			{
2570 				if (sub->properties.count > 0)
2571 				{
2572 					if ((resp.properties = malloc(sizeof(MQTTProperties))) == NULL)
2573 					{
2574 						rc = PAHO_MEMORY_ERROR;
2575 						goto exit;
2576 					}
2577 					*resp.properties = MQTTProperties_copy(&sub->properties);
2578 				}
2579 				resp.reasonCodeCount = sub->qoss->count;
2580 				resp.reasonCode = *(int*)sub->qoss->first->content;
2581 				if (sub->qoss->count > 1)
2582 				{
2583 					ListElement* current = NULL;
2584 					int rc_count = 0;
2585 
2586 					if ((resp.reasonCodes = malloc(sizeof(enum MQTTReasonCodes) * (sub->qoss->count))) == NULL)
2587 					{
2588 						rc = PAHO_MEMORY_ERROR;
2589 						goto exit;
2590 					}
2591 					while (ListNextElement(sub->qoss, &current))
2592 						(resp.reasonCodes)[rc_count++] = *(enum MQTTReasonCodes*)(current->content);
2593 				}
2594 			}
2595 			else
2596 			{
2597 				ListElement *current = NULL;
2598 
2599 				/* if the returned count is greater than requested, it's an error*/
2600 				if (sub->qoss->count > count)
2601 					rc = MQTTCLIENT_FAILURE;
2602 				else
2603 				{
2604 					i = 0;
2605 					while (ListNextElement(sub->qoss, &current))
2606 					{
2607 						int *reqqos = (int*) (current->content);
2608 						qos[i++] = *reqqos;
2609 					}
2610 				}
2611 				resp.reasonCode = rc;
2612 			}
2613 			rc = MQTTProtocol_handleSubacks(pack, m->c->net.socket);
2614 			m->pack = NULL;
2615 		}
2616 		else
2617 			rc = SOCKET_ERROR;
2618 	}
2619 
2620 	if (rc == SOCKET_ERROR)
2621 		MQTTClient_disconnect_internal(handle, 0);
2622 	else if (rc == TCPSOCKET_COMPLETE)
2623 		rc = MQTTCLIENT_SUCCESS;
2624 
2625 exit:
2626 	if (rc < 0)
2627 		resp.reasonCode = rc;
2628 	Thread_unlock_mutex(mqttclient_mutex);
2629 	Thread_unlock_mutex(subscribe_mutex);
2630 	FUNC_EXIT_RC(resp.reasonCode);
2631 	return resp;
2632 }
2633 
2634 
2635 
MQTTClient_subscribeMany(MQTTClient handle,int count,char * const * topic,int * qos)2636 int MQTTClient_subscribeMany(MQTTClient handle, int count, char* const* topic, int* qos)
2637 {
2638 	MQTTClients* m = handle;
2639 	MQTTResponse response = MQTTResponse_initializer;
2640 
2641 	if (m != NULL && m->c != NULL && m->c->MQTTVersion >= MQTTVERSION_5)
2642 		response.reasonCode = MQTTCLIENT_WRONG_MQTT_VERSION;
2643 	else
2644 		response = MQTTClient_subscribeMany5(handle, count, topic, qos, NULL, NULL);
2645 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
2646 	if (*qos == MQTT_BAD_SUBSCRIBE) /* addition for MQTT 3.1.1 - error code from subscribe */
2647 		response.reasonCode = MQTT_BAD_SUBSCRIBE;
2648 #endif
2649 	return response.reasonCode;
2650 }
2651 
2652 
2653 
MQTTClient_subscribe5(MQTTClient handle,const char * topic,int qos,MQTTSubscribe_options * opts,MQTTProperties * props)2654 MQTTResponse MQTTClient_subscribe5(MQTTClient handle, const char* topic, int qos,
2655 		MQTTSubscribe_options* opts, MQTTProperties* props)
2656 {
2657 	MQTTResponse rc;
2658 
2659 	FUNC_ENTRY;
2660 	rc = MQTTClient_subscribeMany5(handle, 1, (char * const *)(&topic), &qos, opts, props);
2661 	if (qos == MQTT_BAD_SUBSCRIBE) /* addition for MQTT 3.1.1 - error code from subscribe */
2662 		rc.reasonCode = MQTT_BAD_SUBSCRIBE;
2663 	FUNC_EXIT_RC(rc.reasonCode);
2664 	return rc;
2665 }
2666 
2667 
MQTTClient_subscribe(MQTTClient handle,const char * topic,int qos)2668 int MQTTClient_subscribe(MQTTClient handle, const char* topic, int qos)
2669 {
2670 	MQTTClients* m = handle;
2671 	MQTTResponse response = MQTTResponse_initializer;
2672 
2673 	if (m->c->MQTTVersion >= MQTTVERSION_5)
2674 		response.reasonCode = MQTTCLIENT_WRONG_MQTT_VERSION;
2675 	else
2676 		response = MQTTClient_subscribe5(handle, topic, qos, NULL, NULL);
2677 
2678 	return response.reasonCode;
2679 }
2680 
2681 
MQTTClient_unsubscribeMany5(MQTTClient handle,int count,char * const * topic,MQTTProperties * props)2682 MQTTResponse MQTTClient_unsubscribeMany5(MQTTClient handle, int count, char* const* topic, MQTTProperties* props)
2683 {
2684 	MQTTClients* m = handle;
2685 	List* topics = NULL;
2686 	int i = 0;
2687 	int rc = SOCKET_ERROR;
2688 	MQTTResponse resp = MQTTResponse_initializer;
2689 	int msgid = 0;
2690 
2691 	FUNC_ENTRY;
2692 	Thread_lock_mutex(unsubscribe_mutex);
2693 	Thread_lock_mutex(mqttclient_mutex);
2694 
2695 	resp.reasonCode = MQTTCLIENT_FAILURE;
2696 	if (m == NULL || m->c == NULL)
2697 	{
2698 		rc = MQTTCLIENT_FAILURE;
2699 		goto exit;
2700 	}
2701 	if (m->c->connected == 0)
2702 	{
2703 		rc = MQTTCLIENT_DISCONNECTED;
2704 		goto exit;
2705 	}
2706 	for (i = 0; i < count; i++)
2707 	{
2708 		if (!UTF8_validateString(topic[i]))
2709 		{
2710 			rc = MQTTCLIENT_BAD_UTF8_STRING;
2711 			goto exit;
2712 		}
2713 	}
2714 	if ((msgid = MQTTProtocol_assignMsgId(m->c)) == 0)
2715 	{
2716 		rc = MQTTCLIENT_MAX_MESSAGES_INFLIGHT;
2717 		goto exit;
2718 	}
2719 
2720 	topics = ListInitialize();
2721 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
2722 	if (topics == NULL)
2723 	{
2724 		rc = PAHO_MEMORY_ERROR;
2725 		goto exit;
2726 	}
2727 #endif
2728 	for (i = 0; i < count; i++)
2729 		ListAppend(topics, topic[i], strlen(topic[i]));
2730 	rc = MQTTProtocol_unsubscribe(m->c, topics, msgid, props);
2731 	ListFreeNoContent(topics);
2732 
2733 	if (rc == TCPSOCKET_COMPLETE)
2734 	{
2735 		MQTTPacket* pack = NULL;
2736 
2737 		Thread_unlock_mutex(mqttclient_mutex);
2738 		pack = MQTTClient_waitfor(handle, UNSUBACK, &rc, m->commandTimeout);
2739 		Thread_lock_mutex(mqttclient_mutex);
2740 		if (pack != NULL)
2741 		{
2742 			Unsuback* unsub = (Unsuback*)pack;
2743 
2744 			if (m->c->MQTTVersion == MQTTVERSION_5)
2745 			{
2746 				if (unsub->properties.count > 0)
2747 				{
2748 					if ((resp.properties = malloc(sizeof(MQTTProperties))) == NULL)
2749 					{
2750 						rc = PAHO_MEMORY_ERROR;
2751 						goto exit;
2752 					}
2753 					*resp.properties = MQTTProperties_copy(&unsub->properties);
2754 				}
2755 				resp.reasonCodeCount = unsub->reasonCodes->count;
2756 				resp.reasonCode = *(int*)unsub->reasonCodes->first->content;
2757 				if (unsub->reasonCodes->count > 1)
2758 				{
2759 					ListElement* current = NULL;
2760 					int rc_count = 0;
2761 
2762 					if ((resp.reasonCodes = malloc(sizeof(enum MQTTReasonCodes) * (unsub->reasonCodes->count))) == NULL)
2763 					{
2764 						rc = PAHO_MEMORY_ERROR;
2765 						goto exit;
2766 					}
2767 					while (ListNextElement(unsub->reasonCodes, &current))
2768 						(resp.reasonCodes)[rc_count++] = *(enum MQTTReasonCodes*)(current->content);
2769 				}
2770 			}
2771 			else
2772 				resp.reasonCode = rc;
2773 			rc = MQTTProtocol_handleUnsubacks(pack, m->c->net.socket);
2774 			m->pack = NULL;
2775 		}
2776 		else
2777 			rc = SOCKET_ERROR;
2778 	}
2779 
2780 	if (rc == SOCKET_ERROR)
2781 		MQTTClient_disconnect_internal(handle, 0);
2782 
2783 exit:
2784 	if (rc < 0)
2785 		resp.reasonCode = rc;
2786 	Thread_unlock_mutex(mqttclient_mutex);
2787 	Thread_unlock_mutex(unsubscribe_mutex);
2788 	FUNC_EXIT_RC(resp.reasonCode);
2789 	return resp;
2790 }
2791 
2792 
MQTTClient_unsubscribeMany(MQTTClient handle,int count,char * const * topic)2793 int MQTTClient_unsubscribeMany(MQTTClient handle, int count, char* const* topic)
2794 {
2795 	MQTTClients* m = handle;
2796 	MQTTResponse response = MQTTResponse_initializer;
2797 
2798 	if (m != NULL && m->c != NULL && m->c->MQTTVersion >= MQTTVERSION_5)
2799 		response.reasonCode = MQTTCLIENT_WRONG_MQTT_VERSION;
2800 	else
2801 		response = MQTTClient_unsubscribeMany5(handle, count, topic, NULL);
2802 
2803 	return response.reasonCode;
2804 }
2805 
2806 
MQTTClient_unsubscribe5(MQTTClient handle,const char * topic,MQTTProperties * props)2807 MQTTResponse MQTTClient_unsubscribe5(MQTTClient handle, const char* topic, MQTTProperties* props)
2808 {
2809 	MQTTResponse rc;
2810 
2811 	rc = MQTTClient_unsubscribeMany5(handle, 1, (char * const *)(&topic), props);
2812 	return rc;
2813 }
2814 
2815 
MQTTClient_unsubscribe(MQTTClient handle,const char * topic)2816 int MQTTClient_unsubscribe(MQTTClient handle, const char* topic)
2817 {
2818 	MQTTResponse response = MQTTClient_unsubscribe5(handle, topic, NULL);
2819 
2820 	return response.reasonCode;
2821 }
2822 
2823 
MQTTClient_publish5(MQTTClient handle,const char * topicName,int payloadlen,const void * payload,int qos,int retained,MQTTProperties * properties,MQTTClient_deliveryToken * deliveryToken)2824 MQTTResponse MQTTClient_publish5(MQTTClient handle, const char* topicName, int payloadlen, const void* payload,
2825 		int qos, int retained, MQTTProperties* properties, MQTTClient_deliveryToken* deliveryToken)
2826 {
2827 	int rc = MQTTCLIENT_SUCCESS;
2828 	MQTTClients* m = handle;
2829 	Messages* msg = NULL;
2830 	Publish* p = NULL;
2831 	int blocked = 0;
2832 	int msgid = 0;
2833 	MQTTResponse resp = MQTTResponse_initializer;
2834 
2835 	FUNC_ENTRY;
2836 	Thread_lock_mutex(mqttclient_mutex);
2837 
2838 	if (m == NULL || m->c == NULL)
2839 		rc = MQTTCLIENT_FAILURE;
2840 	else if (m->c->connected == 0)
2841 		rc = MQTTCLIENT_DISCONNECTED;
2842 	else if (!UTF8_validateString(topicName))
2843 		rc = MQTTCLIENT_BAD_UTF8_STRING;
2844 
2845 	if (rc != MQTTCLIENT_SUCCESS)
2846 		goto exit;
2847 
2848 	/* If outbound queue is full, block until it is not */
2849 	while (m->c->outboundMsgs->count >= m->c->maxInflightMessages ||
2850          Socket_noPendingWrites(m->c->net.socket) == 0) /* wait until the socket is free of large packets being written */
2851 	{
2852 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
2853 		rc = MQTTCLIENT_FAILURE;
2854 		goto exit;
2855 #else
2856 		if (blocked == 0)
2857 		{
2858 			blocked = 1;
2859 			Log(TRACE_MIN, -1, "Blocking publish on queue full for client %s", m->c->clientID);
2860 		}
2861 		Thread_unlock_mutex(mqttclient_mutex);
2862 		MQTTClient_yield();
2863 		Thread_lock_mutex(mqttclient_mutex);
2864 		if (m->c->connected == 0)
2865 		{
2866 			rc = MQTTCLIENT_FAILURE;
2867 			goto exit;
2868 		}
2869 #endif
2870 	}
2871 	if (blocked == 1)
2872 		Log(TRACE_MIN, -1, "Resuming publish now queue not full for client %s", m->c->clientID);
2873 	if (qos > 0 && (msgid = MQTTProtocol_assignMsgId(m->c)) == 0)
2874 	{	/* this should never happen as we've waited for spaces in the queue */
2875 		rc = MQTTCLIENT_MAX_MESSAGES_INFLIGHT;
2876 		goto exit;
2877 	}
2878 
2879 	if ((p = malloc(sizeof(Publish))) == NULL)
2880 	{
2881 		rc = PAHO_MEMORY_ERROR;
2882 		goto exit_and_free;
2883 	}
2884 	memset(p->mask, '\0', sizeof(p->mask));
2885 	p->payload = NULL;
2886 	p->payloadlen = payloadlen;
2887 	if (payloadlen > 0)
2888 	{
2889 		if ((p->payload = malloc(payloadlen)) == NULL)
2890 		{
2891 			rc = PAHO_MEMORY_ERROR;
2892 			goto exit_and_free;
2893 		}
2894 		memcpy(p->payload, payload, payloadlen);
2895 	}
2896 	if ((p->topic = MQTTStrdup(topicName)) == NULL)
2897 	{
2898 		rc = PAHO_MEMORY_ERROR;
2899 		goto exit_and_free;
2900 	}
2901 	p->msgId = msgid;
2902 	p->MQTTVersion = m->c->MQTTVersion;
2903 	if (m->c->MQTTVersion >= MQTTVERSION_5)
2904 	{
2905 		if (properties)
2906 			p->properties = *properties;
2907 		else
2908 		{
2909 			MQTTProperties props = MQTTProperties_initializer;
2910 			p->properties = props;
2911 		}
2912 	}
2913 
2914 	rc = MQTTProtocol_startPublish(m->c, p, qos, retained, &msg);
2915 
2916 	/* If the packet was partially written to the socket, wait for it to complete.
2917 	 * However, if the client is disconnected during this time and qos is not 0, still return success, as
2918 	 * the packet has already been written to persistence and assigned a message id so will
2919 	 * be sent when the client next connects.
2920 	 */
2921 	if (rc == TCPSOCKET_INTERRUPTED)
2922 	{
2923 		while (m->c->connected == 1)
2924 		{
2925 			pending_writes* writing = NULL;
2926 
2927 			Thread_lock_mutex(socket_mutex);
2928 			writing = SocketBuffer_getWrite(m->c->net.socket);
2929 			Thread_unlock_mutex(socket_mutex);
2930 
2931 			if (writing == NULL)
2932 				break;
2933 
2934 			Thread_unlock_mutex(mqttclient_mutex);
2935 			MQTTClient_yield();
2936 			Thread_lock_mutex(mqttclient_mutex);
2937 		}
2938 		rc = (qos > 0 || m->c->connected == 1) ? MQTTCLIENT_SUCCESS : MQTTCLIENT_FAILURE;
2939 	}
2940 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
2941 	else if (rc == PAHO_MEMORY_ERROR)
2942 	{
2943 		goto exit_and_free;
2944 	}
2945 #endif
2946 
2947 	if (deliveryToken && qos > 0)
2948 		*deliveryToken = msg->msgid;
2949 
2950 exit_and_free:
2951 	if (p)
2952 	{
2953 		if (p->topic)
2954 			free(p->topic);
2955 		if (p->payload)
2956 			free(p->payload);
2957 		free(p);
2958 	}
2959 
2960 	if (rc == SOCKET_ERROR)
2961 	{
2962 		MQTTClient_disconnect_internal(handle, 0);
2963 		/* Return success for qos > 0 as the send will be retried automatically */
2964 		rc = (qos > 0) ? MQTTCLIENT_SUCCESS : MQTTCLIENT_FAILURE;
2965 	}
2966 
2967 exit:
2968 	Thread_unlock_mutex(mqttclient_mutex);
2969 	resp.reasonCode = rc;
2970 	FUNC_EXIT_RC(resp.reasonCode);
2971 	return resp;
2972 }
2973 
2974 
MQTTClient_publish(MQTTClient handle,const char * topicName,int payloadlen,const void * payload,int qos,int retained,MQTTClient_deliveryToken * deliveryToken)2975 int MQTTClient_publish(MQTTClient handle, const char* topicName, int payloadlen, const void* payload,
2976 							 int qos, int retained, MQTTClient_deliveryToken* deliveryToken)
2977 {
2978 	MQTTClients* m = handle;
2979 	MQTTResponse rc = MQTTResponse_initializer;
2980 
2981 	if (m->c->MQTTVersion >= MQTTVERSION_5)
2982 		rc.reasonCode = MQTTCLIENT_WRONG_MQTT_VERSION;
2983 	else
2984 		rc = MQTTClient_publish5(handle, topicName, payloadlen, payload, qos, retained, NULL, deliveryToken);
2985 	return rc.reasonCode;
2986 }
2987 
2988 
MQTTClient_publishMessage5(MQTTClient handle,const char * topicName,MQTTClient_message * message,MQTTClient_deliveryToken * deliveryToken)2989 MQTTResponse MQTTClient_publishMessage5(MQTTClient handle, const char* topicName, MQTTClient_message* message,
2990 								MQTTClient_deliveryToken* deliveryToken)
2991 {
2992 	MQTTResponse rc = MQTTResponse_initializer;
2993 	MQTTProperties* props = NULL;
2994 
2995 	FUNC_ENTRY;
2996 	if (message == NULL)
2997 	{
2998 		rc.reasonCode = MQTTCLIENT_NULL_PARAMETER;
2999 		goto exit;
3000 	}
3001 
3002 	if (strncmp(message->struct_id, "MQTM", 4) != 0 ||
3003 			(message->struct_version != 0 && message->struct_version != 1))
3004 	{
3005 		rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
3006 		goto exit;
3007 	}
3008 
3009 	if (message->struct_version >= 1)
3010 		props = &message->properties;
3011 
3012 	rc = MQTTClient_publish5(handle, topicName, message->payloadlen, message->payload,
3013 								message->qos, message->retained, props, deliveryToken);
3014 exit:
3015 	FUNC_EXIT_RC(rc.reasonCode);
3016 	return rc;
3017 }
3018 
3019 
MQTTClient_publishMessage(MQTTClient handle,const char * topicName,MQTTClient_message * message,MQTTClient_deliveryToken * deliveryToken)3020 int MQTTClient_publishMessage(MQTTClient handle, const char* topicName, MQTTClient_message* message,
3021 															 MQTTClient_deliveryToken* deliveryToken)
3022 {
3023 	MQTTClients* m = handle;
3024 	MQTTResponse rc = MQTTResponse_initializer;
3025 
3026 	if (strncmp(message->struct_id, "MQTM", 4) != 0 ||
3027 			(message->struct_version != 0 && message->struct_version != 1))
3028 		rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
3029 	else if (m != NULL && m->c != NULL && m->c->MQTTVersion >= MQTTVERSION_5)
3030 		rc.reasonCode = MQTTCLIENT_WRONG_MQTT_VERSION;
3031 	else
3032 		rc = MQTTClient_publishMessage5(handle, topicName, message, deliveryToken);
3033 	return rc.reasonCode;
3034 }
3035 
3036 
MQTTClient_retry(void)3037 static void MQTTClient_retry(void)
3038 {
3039 	static START_TIME_TYPE last = START_TIME_ZERO;
3040 	START_TIME_TYPE now;
3041 
3042 	FUNC_ENTRY;
3043 	now = MQTTTime_now();
3044 	if (MQTTTime_difftime(now, last) >= (DIFF_TIME_TYPE)(retryLoopIntervalms))
3045 	{
3046 		last = MQTTTime_now();
3047 		MQTTProtocol_keepalive(now);
3048 		MQTTProtocol_retry(now, 1, 0);
3049 	}
3050 	else
3051 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
3052 		MQTTProtocol_retry(now, 1, 0);
3053 #else
3054 		MQTTProtocol_retry(now, 0, 0);
3055 #endif
3056 	FUNC_EXIT;
3057 }
3058 
3059 
MQTTClient_cycle(SOCKET * sock,ELAPSED_TIME_TYPE timeout,int * rc)3060 static MQTTPacket* MQTTClient_cycle(SOCKET* sock, ELAPSED_TIME_TYPE timeout, int* rc)
3061 {
3062 	static Ack ack;
3063 	MQTTPacket* pack = NULL;
3064 	int rc1 = 0;
3065 	START_TIME_TYPE start;
3066 
3067 	FUNC_ENTRY;
3068 #if defined(OPENSSL) || defined(MBEDTLS)
3069 	if ((*sock = SSLSocket_getPendingRead()) == -1)
3070 	{
3071 		/* 0 from getReadySocket indicates no work to do, rc -1 == error */
3072 #endif
3073 		start = MQTTTime_start_clock();
3074 		*sock = Socket_getReadySocket(0, (int)timeout, socket_mutex, rc);
3075 		*rc = rc1;
3076 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
3077 		if (*sock == SOCKET_ERROR && timeout >= 100L && MQTTTime_elapsed(start) < (int64_t)10)
3078 #else
3079 		if (*sock == 0 && timeout >= 100L && MQTTTime_elapsed(start) < (int64_t)10)
3080 #endif
3081 			MQTTTime_sleep(100L);
3082 #if defined(OPENSSL) || defined(MBEDTLS)
3083 	}
3084 #endif
3085 	Thread_lock_mutex(mqttclient_mutex);
3086 #if defined(ZERO_SOCK_FD_IS_INVALID)
3087 	if (*sock > 0 && rc1 == 0)
3088 #else
3089 	if (*sock >= 0 && rc1 == 0)
3090 #endif
3091 	{
3092 		MQTTClients* m = NULL;
3093 		if (ListFindItem(handles, sock, clientSockCompare) != NULL)
3094 			m = (MQTTClient)(handles->current->content);
3095 		if (m != NULL)
3096 		{
3097 			if (m->c->connect_state == TCP_IN_PROGRESS || m->c->connect_state == SSL_IN_PROGRESS)
3098 				*rc = 0;  /* waiting for connect state to clear */
3099 			else if (m->c->connect_state == WEBSOCKET_IN_PROGRESS)
3100 				*rc = WebSocket_upgrade(&m->c->net);
3101 			else
3102 			{
3103 				pack = MQTTPacket_Factory(m->c->MQTTVersion, &m->c->net, rc);
3104 				if (*rc == TCPSOCKET_INTERRUPTED)
3105 					*rc = 0;
3106 #if !defined(ZERO_SOCK_FD_IS_INVALID)
3107 #if !defined(IOT_LITEOS_ADAPT)
3108 				if (*rc == EXT_SOCKET_RET_INVALID_SOCKET)
3109 					*rc = SOCKET_ERROR;
3110 #endif
3111 #endif
3112 			}
3113 		}
3114 
3115 		if (pack)
3116 		{
3117 			int freed = 1;
3118 
3119 			/* Note that these handle... functions free the packet structure that they are dealing with */
3120 			if (pack->header.bits.type == PUBLISH)
3121 				*rc = MQTTProtocol_handlePublishes(pack, *sock);
3122 			else if (pack->header.bits.type == PUBACK || pack->header.bits.type == PUBCOMP)
3123 			{
3124 				int msgid;
3125 
3126 				ack = (pack->header.bits.type == PUBCOMP) ? *(Pubcomp*)pack : *(Puback*)pack;
3127 				msgid = ack.msgId;
3128 				if (m && m->c->MQTTVersion >= MQTTVERSION_5 && m->published)
3129 				{
3130 					Log(TRACE_MIN, -1, "Calling published for client %s, msgid %d", m->c->clientID, msgid);
3131 					(*(m->published))(m->published_context, msgid, pack->header.bits.type, &ack.properties, ack.rc);
3132 				}
3133 				*rc = (pack->header.bits.type == PUBCOMP) ?
3134 					MQTTProtocol_handlePubcomps(pack, *sock, NULL) : MQTTProtocol_handlePubacks(pack, *sock, NULL);
3135 				if (m && m->dc)
3136 				{
3137 					Log(TRACE_MIN, -1, "Calling deliveryComplete for client %s, msgid %d", m->c->clientID, msgid);
3138 					(*(m->dc))(m->context, msgid);
3139 				}
3140 			}
3141 			else if (pack->header.bits.type == PUBREC)
3142 			{
3143 				Pubrec* pubrec = (Pubrec*)pack;
3144 
3145 				if (m && m->c->MQTTVersion >= MQTTVERSION_5 && m->published && pubrec->rc >= MQTTREASONCODE_UNSPECIFIED_ERROR)
3146 				{
3147 					Log(TRACE_MIN, -1, "Calling published for client %s, msgid %d", m->c->clientID, ack.msgId);
3148 					(*(m->published))(m->published_context, pubrec->msgId, pack->header.bits.type,
3149 							&pubrec->properties, pubrec->rc);
3150 				}
3151 				*rc = MQTTProtocol_handlePubrecs(pack, *sock, NULL);
3152 			}
3153 			else if (pack->header.bits.type == PUBREL)
3154 				*rc = MQTTProtocol_handlePubrels(pack, *sock);
3155 			else if (pack->header.bits.type == PINGRESP)
3156 				*rc = MQTTProtocol_handlePingresps(pack, *sock);
3157 			else
3158 				freed = 0;
3159 			if (freed)
3160 				pack = NULL;
3161 		}
3162 	}
3163 	MQTTClient_retry();
3164 	Thread_unlock_mutex(mqttclient_mutex);
3165 	FUNC_EXIT_RC(*rc);
3166 	return pack;
3167 }
3168 
3169 
MQTTClient_waitfor(MQTTClient handle,int packet_type,int * rc,int64_t timeout)3170 static MQTTPacket* MQTTClient_waitfor(MQTTClient handle, int packet_type, int* rc, int64_t timeout)
3171 {
3172 	MQTTPacket* pack = NULL;
3173 	MQTTClients* m = handle;
3174 	START_TIME_TYPE start = MQTTTime_start_clock();
3175 	int is_running = 0; /* local copy of running */
3176 
3177 	FUNC_ENTRY;
3178 	if (((MQTTClients*)handle) == NULL || timeout <= 0L)
3179 	{
3180 		*rc = MQTTCLIENT_FAILURE;
3181 		goto exit;
3182 	}
3183 
3184 	Thread_lock_mutex(mqttclient_mutex);
3185 	is_running = running;
3186 	Thread_unlock_mutex(mqttclient_mutex);
3187 
3188 	if (is_running)
3189 	{
3190 		if (packet_type == CONNECT)
3191 		{
3192 			if ((*rc = Thread_wait_sem(m->connect_sem, (int)timeout)) == 0)
3193 				*rc = m->rc;
3194 		}
3195 		else if (packet_type == CONNACK)
3196 			*rc = Thread_wait_sem(m->connack_sem, (int)timeout);
3197 		else if (packet_type == SUBACK)
3198 			*rc = Thread_wait_sem(m->suback_sem, (int)timeout);
3199 		else if (packet_type == UNSUBACK)
3200 			*rc = Thread_wait_sem(m->unsuback_sem, (int)timeout);
3201 		if (*rc == 0 && packet_type != CONNECT && m->pack == NULL)
3202 			Log(LOG_ERROR, -1, "waitfor unexpectedly is NULL for client %s, packet_type %d, timeout %ld", m->c->clientID, packet_type, timeout);
3203 		pack = m->pack;
3204 	}
3205 	else
3206 	{
3207 		*rc = TCPSOCKET_COMPLETE;
3208 		while (1)
3209 		{
3210 			SOCKET sock = -1;
3211 			pack = MQTTClient_cycle(&sock, 100L, rc);
3212 			if (sock == m->c->net.socket)
3213 			{
3214 				if (*rc == SOCKET_ERROR)
3215 					break;
3216 				if (pack && (pack->header.bits.type == packet_type))
3217 					break;
3218 				if (m->c->connect_state == TCP_IN_PROGRESS)
3219 				{
3220 					int error = 0;
3221 					socklen_t len = sizeof(error);
3222 
3223 					if ((*rc = getsockopt(m->c->net.socket, SOL_SOCKET, SO_ERROR, (char*)&error, &len)) == 0)
3224 						*rc = error;
3225 					break;
3226 				}
3227 #if defined(OPENSSL) || defined(MBEDTLS)
3228 				else if (m->c->connect_state == SSL_IN_PROGRESS)
3229 				{
3230 
3231 					*rc = m->c->sslopts->struct_version >= 3 ?
3232 						SSLSocket_connect(m->c->net.ssl, sock, m->currentServerURI,
3233 							m->c->sslopts->verify, m->c->sslopts->ssl_error_cb, m->c->sslopts->ssl_error_context) :
3234 						SSLSocket_connect(m->c->net.ssl, sock, m->currentServerURI,
3235 							m->c->sslopts->verify, NULL, NULL);
3236 					if (*rc == SSL_FATAL)
3237 						break;
3238 					else if (*rc == 1) /* rc == 1 means SSL connect has finished and succeeded */
3239 					{
3240 						if ((m->c->cleansession == 0 && m->c->cleanstart == 0) && m->c->session == NULL)
3241 #if defined(OPENSSL)
3242 							m->c->session = SSL_get1_session(m->c->net.ssl);
3243 #elif defined(MBEDTLS)
3244 							mbedtls_ssl_get_session(m->c->net.ssl, m->c->session);
3245 #endif
3246 						break;
3247 					}
3248 				}
3249 #endif
3250 				else if (m->c->connect_state == WEBSOCKET_IN_PROGRESS && *rc != TCPSOCKET_INTERRUPTED)
3251 				{
3252 					*rc = 1;
3253 					break;
3254 				}
3255 				else if (m->c->connect_state == PROXY_CONNECT_IN_PROGRESS )
3256 				{
3257 					*rc = 1;
3258 					break;
3259 				}
3260 				else if (m->c->connect_state == WAIT_FOR_CONNACK)
3261 				{
3262 					int error = 0;
3263 					socklen_t len = sizeof(error);
3264 					if (getsockopt(m->c->net.socket, SOL_SOCKET, SO_ERROR, (char*)&error, &len) == 0)
3265 					{
3266 						if (error)
3267 						{
3268 							*rc = error;
3269 							break;
3270 						}
3271 					}
3272 				}
3273 			}
3274 			if (MQTTTime_elapsed(start) > (uint64_t)timeout)
3275 			{
3276 				pack = NULL;
3277 				break;
3278 			}
3279 		}
3280 	}
3281 
3282 exit:
3283 	FUNC_EXIT_RC(*rc);
3284 	return pack;
3285 }
3286 
3287 
MQTTClient_receive(MQTTClient handle,char ** topicName,int * topicLen,MQTTClient_message ** message,unsigned long timeout)3288 int MQTTClient_receive(MQTTClient handle, char** topicName, int* topicLen, MQTTClient_message** message,
3289 											 unsigned long timeout)
3290 {
3291 	int rc = TCPSOCKET_COMPLETE;
3292 	START_TIME_TYPE start = MQTTTime_start_clock();
3293 	ELAPSED_TIME_TYPE elapsed = 0L;
3294 	MQTTClients* m = handle;
3295 
3296 	FUNC_ENTRY;
3297 	if (m == NULL || m->c == NULL
3298 			|| running) /* receive is not meant to be called in a multi-thread environment */
3299 	{
3300 		rc = MQTTCLIENT_FAILURE;
3301 		goto exit;
3302 	}
3303 	if (m->c->connected == 0)
3304 	{
3305 		rc = MQTTCLIENT_DISCONNECTED;
3306 		goto exit;
3307 	}
3308 
3309 	*topicName = NULL;
3310 	*message = NULL;
3311 
3312 	/* if there is already a message waiting, don't hang around but still do some packet handling */
3313 	if (m->c->messageQueue->count > 0)
3314 		timeout = 0L;
3315 
3316 	elapsed = MQTTTime_elapsed(start);
3317 	do
3318 	{
3319 		SOCKET sock = 0;
3320 		MQTTClient_cycle(&sock, (timeout > elapsed) ? timeout - elapsed : 0L, &rc);
3321 
3322 		if (rc == SOCKET_ERROR)
3323 		{
3324 			if (ListFindItem(handles, &sock, clientSockCompare) && 	/* find client corresponding to socket */
3325 			  (MQTTClient)(handles->current->content) == handle)
3326 				break; /* there was an error on the socket we are interested in */
3327 		}
3328 		elapsed = MQTTTime_elapsed(start);
3329 	}
3330 	while (elapsed < timeout && m->c->messageQueue->count == 0);
3331 
3332 	if (m->c->messageQueue->count > 0)
3333 		rc = MQTTClient_deliverMessage(rc, m, topicName, topicLen, message);
3334 
3335 	if (rc == SOCKET_ERROR)
3336 		MQTTClient_disconnect_internal(handle, 0);
3337 
3338 exit:
3339 	FUNC_EXIT_RC(rc);
3340 	return rc;
3341 }
3342 
3343 
MQTTClient_yield(void)3344 void MQTTClient_yield(void)
3345 {
3346 	START_TIME_TYPE start = MQTTTime_start_clock();
3347 	ELAPSED_TIME_TYPE elapsed = 0L;
3348 	ELAPSED_TIME_TYPE timeout = 100L;
3349 	int rc = 0;
3350 
3351 	FUNC_ENTRY;
3352 	if (running) /* yield is not meant to be called in a multi-thread environment */
3353 	{
3354 		MQTTTime_sleep(timeout);
3355 		goto exit;
3356 	}
3357 
3358 	elapsed = MQTTTime_elapsed(start);
3359 	do
3360 	{
3361 		SOCKET sock = -1;
3362 		MQTTClient_cycle(&sock, (timeout > elapsed) ? timeout - elapsed : 0L, &rc);
3363 		Thread_lock_mutex(mqttclient_mutex);
3364 		if (rc == SOCKET_ERROR && ListFindItem(handles, &sock, clientSockCompare))
3365 		{
3366 			MQTTClients* m = (MQTTClient)(handles->current->content);
3367 			if (m->c->connect_state != DISCONNECTING)
3368 				MQTTClient_disconnect_internal(m, 0);
3369 		}
3370 		Thread_unlock_mutex(mqttclient_mutex);
3371 		elapsed = MQTTTime_elapsed(start);
3372 	}
3373 	while (elapsed < timeout);
3374 exit:
3375 	FUNC_EXIT;
3376 }
3377 
3378 /*
3379 static int pubCompare(void* a, void* b)
3380 {
3381 	Messages* msg = (Messages*)a;
3382 	return msg->publish == (Publications*)b;
3383 }*/
3384 
3385 
MQTTClient_waitForCompletion(MQTTClient handle,MQTTClient_deliveryToken mdt,unsigned long timeout)3386 int MQTTClient_waitForCompletion(MQTTClient handle, MQTTClient_deliveryToken mdt, unsigned long timeout)
3387 {
3388 	int rc = MQTTCLIENT_FAILURE;
3389 	START_TIME_TYPE start = MQTTTime_start_clock();
3390 	ELAPSED_TIME_TYPE elapsed = 0L;
3391 	MQTTClients* m = handle;
3392 
3393 	FUNC_ENTRY;
3394 	Thread_lock_mutex(mqttclient_mutex);
3395 
3396 	if (m == NULL || m->c == NULL)
3397 	{
3398 		rc = MQTTCLIENT_FAILURE;
3399 		goto exit;
3400 	}
3401 
3402 	elapsed = MQTTTime_elapsed(start);
3403 	while (elapsed < timeout)
3404 	{
3405 		if (m->c->connected == 0)
3406 		{
3407 			rc = MQTTCLIENT_DISCONNECTED;
3408 			goto exit;
3409 		}
3410 		if (ListFindItem(m->c->outboundMsgs, &mdt, messageIDCompare) == NULL)
3411 		{
3412 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
3413 			if (m->c->retryMsgs == mdt)
3414 			{
3415 				rc = MQTTCLIENT_FAILURE;
3416 				Log(TRACE_MIN, -1, "msgid = %d, retry end.\n", mdt);
3417 				goto exit;
3418 			}
3419 #endif
3420 			rc = MQTTCLIENT_SUCCESS; /* well we couldn't find it */
3421 			goto exit;
3422 		}
3423 		Thread_unlock_mutex(mqttclient_mutex);
3424 		MQTTClient_yield();
3425 		Thread_lock_mutex(mqttclient_mutex);
3426 		elapsed = MQTTTime_elapsed(start);
3427 	}
3428 
3429 exit:
3430 	Thread_unlock_mutex(mqttclient_mutex);
3431 	FUNC_EXIT_RC(rc);
3432 	return rc;
3433 }
3434 
3435 
MQTTClient_getPendingDeliveryTokens(MQTTClient handle,MQTTClient_deliveryToken ** tokens)3436 int MQTTClient_getPendingDeliveryTokens(MQTTClient handle, MQTTClient_deliveryToken **tokens)
3437 {
3438 	int rc = MQTTCLIENT_SUCCESS;
3439 	MQTTClients* m = handle;
3440 	*tokens = NULL;
3441 
3442 	FUNC_ENTRY;
3443 	Thread_lock_mutex(mqttclient_mutex);
3444 
3445 	if (m == NULL)
3446 	{
3447 		rc = MQTTCLIENT_FAILURE;
3448 		goto exit;
3449 	}
3450 
3451 	if (m->c && m->c->outboundMsgs->count > 0)
3452 	{
3453 		ListElement* current = NULL;
3454 		int count = 0;
3455 
3456 		*tokens = malloc(sizeof(MQTTClient_deliveryToken) * (m->c->outboundMsgs->count + 1));
3457 		if (!*tokens)
3458 		{
3459 			rc = PAHO_MEMORY_ERROR;
3460 			goto exit;
3461 		}
3462 		while (ListNextElement(m->c->outboundMsgs, &current))
3463 		{
3464 			Messages* m = (Messages*)(current->content);
3465 			(*tokens)[count++] = m->msgid;
3466 		}
3467 		(*tokens)[count] = -1;
3468 	}
3469 
3470 exit:
3471 	Thread_unlock_mutex(mqttclient_mutex);
3472 	FUNC_EXIT_RC(rc);
3473 	return rc;
3474 }
3475 
3476 
MQTTClient_setTraceLevel(enum MQTTCLIENT_TRACE_LEVELS level)3477 void MQTTClient_setTraceLevel(enum MQTTCLIENT_TRACE_LEVELS level)
3478 {
3479 	Log_setTraceLevel((enum LOG_LEVELS)level);
3480 }
3481 
3482 
MQTTClient_setTraceCallback(MQTTClient_traceCallback * callback)3483 void MQTTClient_setTraceCallback(MQTTClient_traceCallback* callback)
3484 {
3485 	Log_setTraceCallback((Log_traceCallback*)callback);
3486 }
3487 
3488 
MQTTClient_setCommandTimeout(MQTTClient handle,unsigned long milliSeconds)3489 int MQTTClient_setCommandTimeout(MQTTClient handle, unsigned long milliSeconds)
3490 {
3491 	int rc = MQTTCLIENT_SUCCESS;
3492 	MQTTClients* m = handle;
3493 
3494 	FUNC_ENTRY;
3495 	if (milliSeconds < 5000L)
3496 		rc = MQTTCLIENT_FAILURE;
3497 	else
3498 		m->commandTimeout = milliSeconds;
3499 	FUNC_EXIT_RC(rc);
3500 	return rc;
3501 }
3502 
3503 
MQTTClient_getVersionInfo(void)3504 MQTTClient_nameValue* MQTTClient_getVersionInfo(void)
3505 {
3506 	#define MAX_INFO_STRINGS 8
3507 	static MQTTClient_nameValue libinfo[MAX_INFO_STRINGS + 1];
3508 	int i = 0;
3509 
3510 	libinfo[i].name = "Product name";
3511 	libinfo[i++].value = "Eclipse Paho Synchronous MQTT C Client Library";
3512 
3513 	libinfo[i].name = "Version";
3514 	libinfo[i++].value = CLIENT_VERSION;
3515 
3516 	libinfo[i].name = "Build level";
3517 	libinfo[i++].value = BUILD_TIMESTAMP;
3518 #if defined(OPENSSL)
3519 	libinfo[i].name = "OpenSSL version";
3520 	libinfo[i++].value = SSLeay_version(SSLEAY_VERSION);
3521 
3522 	libinfo[i].name = "OpenSSL flags";
3523 	libinfo[i++].value = SSLeay_version(SSLEAY_CFLAGS);
3524 
3525 	libinfo[i].name = "OpenSSL build timestamp";
3526 	libinfo[i++].value = SSLeay_version(SSLEAY_BUILT_ON);
3527 
3528 	libinfo[i].name = "OpenSSL platform";
3529 	libinfo[i++].value = SSLeay_version(SSLEAY_PLATFORM);
3530 
3531 	libinfo[i].name = "OpenSSL directory";
3532 	libinfo[i++].value = SSLeay_version(SSLEAY_DIR);
3533 #elif defined(MBEDTLS)
3534 	libinfo[i].name = "MbedTLS Version";
3535 	libinfo[i++].value = MBEDTLS_VERSION_STRING;
3536 #endif
3537 	libinfo[i].name = NULL;
3538 	libinfo[i].value = NULL;
3539 	return libinfo;
3540 }
3541 
3542 
MQTTClient_strerror(int code)3543 const char* MQTTClient_strerror(int code)
3544 {
3545   static char buf[30];
3546   int chars = 0;
3547 
3548   switch (code) {
3549     case MQTTCLIENT_SUCCESS:
3550       return "Success";
3551     case MQTTCLIENT_FAILURE:
3552       return "Failure";
3553     case MQTTCLIENT_DISCONNECTED:
3554       return "Disconnected";
3555     case MQTTCLIENT_MAX_MESSAGES_INFLIGHT:
3556       return "Maximum in-flight messages amount reached";
3557     case MQTTCLIENT_BAD_UTF8_STRING:
3558       return "Invalid UTF8 string";
3559     case MQTTCLIENT_NULL_PARAMETER:
3560       return "Invalid (NULL) parameter";
3561     case MQTTCLIENT_TOPICNAME_TRUNCATED:
3562       return "Topic containing NULL characters has been truncated";
3563     case MQTTCLIENT_BAD_STRUCTURE:
3564       return "Bad structure";
3565     case MQTTCLIENT_BAD_QOS:
3566       return "Invalid QoS value";
3567     case MQTTCLIENT_SSL_NOT_SUPPORTED:
3568       return "SSL is not supported";
3569     case MQTTCLIENT_BAD_MQTT_VERSION:
3570       return "Unrecognized MQTT version";
3571     case MQTTCLIENT_BAD_PROTOCOL:
3572       return "Invalid protocol scheme";
3573     case MQTTCLIENT_BAD_MQTT_OPTION:
3574       return "Options for wrong MQTT version";
3575     case MQTTCLIENT_WRONG_MQTT_VERSION:
3576       return "Client created for another version of MQTT";
3577     case MQTTCLIENT_0_LEN_WILL_TOPIC:
3578       return "Zero length will topic on connect";
3579   }
3580 
3581   chars = snprintf(buf, sizeof(buf), "Unknown error code %d", code);
3582   if (chars >= sizeof(buf))
3583   {
3584 	buf[sizeof(buf)-1] = '\0';
3585 	Log(LOG_ERROR, 0, "Error writing %d chars with snprintf", chars);
3586   }
3587   return buf;
3588 }
3589 
3590 
3591 /**
3592  * See if any pending writes have been completed, and cleanup if so.
3593  * Cleaning up means removing any publication data that was stored because the write did
3594  * not originally complete.
3595  */
MQTTProtocol_checkPendingWrites(void)3596 static void MQTTProtocol_checkPendingWrites(void)
3597 {
3598 	FUNC_ENTRY;
3599 	if (state.pending_writes.count > 0)
3600 	{
3601 		ListElement* le = state.pending_writes.first;
3602 		while (le)
3603 		{
3604 			if (Socket_noPendingWrites(((pending_write*)(le->content))->socket))
3605 			{
3606 				MQTTProtocol_removePublication(((pending_write*)(le->content))->p);
3607 				state.pending_writes.current = le;
3608 				ListRemove(&(state.pending_writes), le->content); /* does NextElement itself */
3609 				le = state.pending_writes.current;
3610 			}
3611 			else
3612 				ListNextElement(&(state.pending_writes), &le);
3613 		}
3614 	}
3615 	FUNC_EXIT;
3616 }
3617 
3618 
MQTTClient_writeComplete(SOCKET socket,int rc)3619 static void MQTTClient_writeComplete(SOCKET socket, int rc)
3620 {
3621 	ListElement* found = NULL;
3622 
3623 	FUNC_ENTRY;
3624 	/* a partial write is now complete for a socket - this will be on a publish*/
3625 
3626 	MQTTProtocol_checkPendingWrites();
3627 
3628 	/* find the client using this socket */
3629 	if ((found = ListFindItem(handles, &socket, clientSockCompare)) != NULL)
3630 	{
3631 		MQTTClients* m = (MQTTClients*)(found->content);
3632 
3633 		m->c->net.lastSent = MQTTTime_now();
3634 	}
3635 	FUNC_EXIT;
3636 }
3637 
3638 
MQTTClient_writeContinue(SOCKET socket)3639 static void MQTTClient_writeContinue(SOCKET socket)
3640 {
3641 	ListElement* found = NULL;
3642 
3643 	if ((found = ListFindItem(handles, &socket, clientSockCompare)) != NULL)
3644 	{
3645 		MQTTClients* m = (MQTTClients*)(found->content);
3646 
3647 		m->c->net.lastSent = MQTTTime_now();
3648 	}
3649 }
3650