• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*******************************************************************************
2  * Copyright (c) 2009, 2022 IBM Corp., Ian Craggs and others
3  *
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v2.0
6  * and Eclipse Distribution License v1.0 which accompany this distribution.
7  *
8  * The Eclipse Public License is available at
9  *    https://www.eclipse.org/legal/epl-2.0/
10  * and the Eclipse Distribution License is available at
11  *   http://www.eclipse.org/org/documents/edl-v10.php.
12  *
13  * Contributors:
14  *    Ian Craggs - initial API and implementation and/or initial documentation
15  *    Ian Craggs, Allan Stockdill-Mander - SSL updates
16  *    Ian Craggs - fix for buffer overflow in addressPort bug #433290
17  *    Ian Craggs - MQTT 3.1.1 support
18  *    Rong Xiang, Ian Craggs - C++ compatibility
19  *    Ian Craggs - fix for bug 479376
20  *    Ian Craggs - SNI support
21  *    Ian Craggs - fix for issue #164
22  *    Ian Craggs - fix for issue #179
23  *    Ian Craggs - MQTT 5.0 support
24  *    Sven Gambel - add generic proxy support
25  *******************************************************************************/
26 
27 /**
28  * @file
29  * \brief Functions dealing with the MQTT protocol exchanges
30  *
31  * Some other related functions are in the MQTTProtocolClient module
32  */
33 
34 #include <stdlib.h>
35 #include <string.h>
36 #include <ctype.h>
37 
38 #include "MQTTProtocolOut.h"
39 #include "StackTrace.h"
40 #include "Heap.h"
41 #include "WebSocket.h"
42 #include "Proxy.h"
43 #include "Base64.h"
44 
45 extern ClientStates* bstate;
46 
47 
48 
49 /**
50  * Separates an address:port into two separate values
51  * @param[in] uri the input string - hostname:port
52  * @param[out] port the returned port integer
53  * @param[out] topic optional topic portion of the address starting with '/'
54  * @return the address string
55  */
MQTTProtocol_addressPort(const char * uri,int * port,const char ** topic,int default_port)56 size_t MQTTProtocol_addressPort(const char* uri, int* port, const char **topic, int default_port)
57 {
58 	char* buf = (char*)uri;
59 	char* colon_pos;
60 	size_t len;
61 	char* topic_pos;
62 
63 	FUNC_ENTRY;
64 	colon_pos = strrchr(uri, ':'); /* reverse find to allow for ':' in IPv6 addresses */
65 
66 	if (uri[0] == '[')
67 	{  /* ip v6 */
68 		if (colon_pos < strrchr(uri, ']'))
69 			colon_pos = NULL;  /* means it was an IPv6 separator, not for host:port */
70 	}
71 
72 	if (colon_pos) /* have to strip off the port */
73 	{
74 		len = colon_pos - uri;
75 		*port = atoi(colon_pos + 1);
76 	}
77 	else
78 	{
79 		len = strlen(buf);
80 		*port = default_port;
81 	}
82 
83 	/* find any topic portion */
84 	topic_pos = (char*)uri;
85 	if (colon_pos)
86 		topic_pos = colon_pos;
87 	topic_pos = strchr(topic_pos, '/');
88 	if (topic_pos)
89 	{
90 		if (topic)
91 			*topic = topic_pos;
92 		if (!colon_pos)
93 			len = topic_pos - uri;
94 	}
95 
96 	if (buf[len - 1] == ']')
97 	{
98 		/* we are stripping off the final ], so length is 1 shorter */
99 		--len;
100 	}
101 	FUNC_EXIT;
102 	return len;
103 }
104 
105 
106 /**
107  * Allow user or password characters to be expressed in the form of %XX, XX being the
108  * hexadecimal value of the character. This will avoid problems when a user code or a password
109  * contains a '@' or another special character ('%' included)
110  * @param p0 output string
111  * @param p1 input string
112  * @param basic_auth_in_len
113  */
MQTTProtocol_specialChars(char * p0,char * p1,b64_size_t * basic_auth_in_len)114 void MQTTProtocol_specialChars(char* p0, char* p1, b64_size_t *basic_auth_in_len)
115 {
116 	while (*p1 != '@')
117 	{
118 		if (*p1 != '%')
119 		{
120 			*p0++ = *p1++;
121 		}
122 		else if (isxdigit(*(p1 + 1)) && isxdigit(*(p1 + 2)))
123 		{
124 			/* next 2 characters are hexa digits */
125 			char hex[3];
126 			p1++;
127 			hex[0] = *p1++;
128 			hex[1] = *p1++;
129 			hex[2] = '\0';
130 			*p0++ = (char)strtol(hex, 0, 16);
131 			/* 3 input char => 1 output char */
132 			*basic_auth_in_len -= 2;
133 		}
134 	}
135 	*p0 = 0x0;
136 }
137 
138 
139 /*
140  * Examples of proxy settings:
141  *   http://your.proxy.server:8080/
142  *   http://user:pass@my.proxy.server:8080/
143  */
MQTTProtocol_setHTTPProxy(Clients * aClient,char * source,char ** dest,char ** auth_dest,char * prefix)144 int MQTTProtocol_setHTTPProxy(Clients* aClient, char* source, char** dest, char** auth_dest, char* prefix)
145 {
146 	b64_size_t basic_auth_in_len, basic_auth_out_len;
147 	b64_data_t *basic_auth;
148 	char *p1;
149 	int rc = 0;
150 
151 	if (*auth_dest)
152 	{
153 		free(*auth_dest);
154 		*auth_dest = NULL;
155 	}
156 
157 	if (source)
158 	{
159 		if ((p1 = strstr(source, prefix)) != NULL) /* skip http:// prefix, if any */
160 			source += strlen(prefix);
161 		*dest = source;
162 		if ((p1 = strchr(source, '@')) != NULL) /* find user.pass separator */
163 			*dest = p1 + 1;
164 
165 		if (p1)
166 		{
167 			/* basic auth len is string between http:// and @ */
168 			basic_auth_in_len = (b64_size_t)(p1 - source);
169 			if (basic_auth_in_len > 0)
170 			{
171 				basic_auth = (b64_data_t *)malloc(sizeof(char)*(basic_auth_in_len+1));
172 				if (!basic_auth)
173 				{
174 					rc = PAHO_MEMORY_ERROR;
175 					goto exit;
176 				}
177 				MQTTProtocol_specialChars((char*)basic_auth, source, &basic_auth_in_len);
178 				basic_auth_out_len = Base64_encodeLength(basic_auth, basic_auth_in_len) + 1; /* add 1 for trailing NULL */
179 				if ((*auth_dest = (char *)malloc(sizeof(char)*basic_auth_out_len)) == NULL)
180 				{
181 					free(basic_auth);
182 					rc = PAHO_MEMORY_ERROR;
183 					goto exit;
184 				}
185 				Base64_encode(*auth_dest, basic_auth_out_len, basic_auth, basic_auth_in_len);
186 				free(basic_auth);
187 			}
188 		}
189 	}
190 exit:
191 	return rc;
192 }
193 
194 
195 /**
196  * MQTT outgoing connect processing for a client
197  * @param ip_address the TCP address:port to connect to
198  * @param aClient a structure with all MQTT data needed
199  * @param int ssl
200  * @param int MQTTVersion the MQTT version to connect with (3 or 4)
201  * @param long timeout how long to wait for a new socket to be created
202  * @return return code
203  */
204 #if defined(OPENSSL) || defined(MBEDTLS)
205 #if defined(__GNUC__) && defined(__linux__) && !defined(__LITEOS__)
MQTTProtocol_connect(const char * ip_address,Clients * aClient,int ssl,int websocket,int MQTTVersion,MQTTProperties * connectProperties,MQTTProperties * willProperties,long timeout)206 int MQTTProtocol_connect(const char* ip_address, Clients* aClient, int ssl, int websocket, int MQTTVersion,
207 		MQTTProperties* connectProperties, MQTTProperties* willProperties, long timeout)
208 #else
209 int MQTTProtocol_connect(const char* ip_address, Clients* aClient, int ssl, int websocket, int MQTTVersion,
210 		MQTTProperties* connectProperties, MQTTProperties* willProperties)
211 #endif
212 #else
213 #if defined(__GNUC__) && defined(__linux__) && !defined(__LITEOS__)
214 int MQTTProtocol_connect(const char* ip_address, Clients* aClient, int websocket, int MQTTVersion,
215 		MQTTProperties* connectProperties, MQTTProperties* willProperties, long timeout)
216 #else
217 int MQTTProtocol_connect(const char* ip_address, Clients* aClient, int websocket, int MQTTVersion,
218 		MQTTProperties* connectProperties, MQTTProperties* willProperties)
219 #endif
220 #endif
221 {
222 	int rc = 0,
223 		port;
224 	size_t addr_len;
225 	char* p0;
226 
227 	FUNC_ENTRY;
228 	aClient->good = 1;
229 
230 	if (aClient->httpProxy)
231 		p0 = aClient->httpProxy;
232 	else
233 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
234 		p0 = 0;
235 #else
236 		p0 = getenv("http_proxy");
237 #endif
238 
239 	if (p0)
240 	{
241 		if ((rc = MQTTProtocol_setHTTPProxy(aClient, p0, &aClient->net.http_proxy, &aClient->net.http_proxy_auth, "http://")) != 0)
242 			goto exit;
243 		Log(TRACE_PROTOCOL, -1, "Setting http proxy to %s", aClient->net.http_proxy);
244 		if (aClient->net.http_proxy_auth)
245 			Log(TRACE_PROTOCOL, -1, "Setting http proxy auth to %s", aClient->net.http_proxy_auth);
246 	}
247 
248 #if defined(OPENSSL) || defined(MBEDTLS)
249 	if (aClient->httpsProxy)
250 		p0 = aClient->httpsProxy;
251 	else
252 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
253 		p0 = 0;
254 #else
255 		p0 = getenv("https_proxy");
256 #endif
257 
258 	if (p0)
259 	{
260 		if ((rc = MQTTProtocol_setHTTPProxy(aClient, p0, &aClient->net.https_proxy, &aClient->net.https_proxy_auth, "https://")) != 0)
261 			goto exit;
262 		Log(TRACE_PROTOCOL, -1, "Setting https proxy to %s", aClient->net.https_proxy);
263 		if (aClient->net.https_proxy_auth)
264 			Log(TRACE_PROTOCOL, -1, "Setting https proxy auth to %s", aClient->net.https_proxy_auth);
265 	}
266 
267 	if (!ssl && aClient->net.http_proxy) {
268 #else
269 	if (aClient->net.http_proxy) {
270 #endif
271 		addr_len = MQTTProtocol_addressPort(aClient->net.http_proxy, &port, NULL, PROXY_DEFAULT_PORT);
272 #if defined(__GNUC__) && defined(__linux__) && !defined(__LITEOS__)
273 		if (timeout < 0)
274 			rc = -1;
275 		else
276 			rc = Socket_new(aClient->net.http_proxy, addr_len, port, &(aClient->net.socket), timeout);
277 #else
278 		rc = Socket_new(aClient->net.http_proxy, addr_len, port, &(aClient->net.socket));
279 #endif
280 	}
281 #if defined(OPENSSL) || defined(MBEDTLS)
282 	else if (ssl && aClient->net.https_proxy) {
283 		addr_len = MQTTProtocol_addressPort(aClient->net.https_proxy, &port, NULL, PROXY_DEFAULT_PORT);
284 #if defined(__GNUC__) && defined(__linux__) && !defined(__LITEOS__)
285 		if (timeout < 0)
286 			rc = -1;
287 		else
288 			rc = Socket_new(aClient->net.https_proxy, addr_len, port, &(aClient->net.socket), timeout);
289 #else
290 		rc = Socket_new(aClient->net.https_proxy, addr_len, port, &(aClient->net.socket));
291 #endif
292 	}
293 #endif
294 	else {
295 #if defined(OPENSSL) || defined(MBEDTLS)
296 		addr_len = MQTTProtocol_addressPort(ip_address, &port, NULL, ssl ?
297 				(websocket ? WSS_DEFAULT_PORT : SECURE_MQTT_DEFAULT_PORT) :
298 				(websocket ? WS_DEFAULT_PORT : MQTT_DEFAULT_PORT) );
299 #else
300 		addr_len = MQTTProtocol_addressPort(ip_address, &port, NULL, websocket ? WS_DEFAULT_PORT : MQTT_DEFAULT_PORT);
301 #endif
302 #if defined(__GNUC__) && defined(__linux__) && !defined(__LITEOS__)
303 		if (timeout < 0)
304 			rc = -1;
305 		else
306 			rc = Socket_new(ip_address, addr_len, port, &(aClient->net.socket), timeout);
307 #else
308 		rc = Socket_new(ip_address, addr_len, port, &(aClient->net.socket));
309 #endif
310 	}
311 	if (rc == EINPROGRESS || rc == EWOULDBLOCK)
312 		aClient->connect_state = TCP_IN_PROGRESS; /* TCP connect called - wait for connect completion */
313 	else if (rc == 0)
314 	{	/* TCP connect completed. If SSL, send SSL connect */
315 #if defined(OPENSSL) || defined(MBEDTLS)
316 		if (ssl)
317 		{
318 			if (aClient->net.https_proxy) {
319 				aClient->connect_state = PROXY_CONNECT_IN_PROGRESS;
320 				rc = Proxy_connect( &aClient->net, 1, ip_address);
321 			}
322 			if (rc == 0 && SSLSocket_setSocketForSSL(&aClient->net, aClient->sslopts, ip_address, addr_len) == 1)
323 			{
324 				rc = aClient->sslopts->struct_version >= 3 ?
325 					SSLSocket_connect(aClient->net.ssl, aClient->net.socket, ip_address,
326 						aClient->sslopts->verify, aClient->sslopts->ssl_error_cb, aClient->sslopts->ssl_error_context) :
327 					SSLSocket_connect(aClient->net.ssl, aClient->net.socket, ip_address,
328 						aClient->sslopts->verify, NULL, NULL);
329 				if (rc == TCPSOCKET_INTERRUPTED)
330 					aClient->connect_state = SSL_IN_PROGRESS; /* SSL connect called - wait for completion */
331 			}
332 			else
333 				rc = SOCKET_ERROR;
334 		}
335 		else if (aClient->net.http_proxy) {
336 #else
337 		if (aClient->net.http_proxy) {
338 #endif
339 			aClient->connect_state = PROXY_CONNECT_IN_PROGRESS;
340 			rc = Proxy_connect( &aClient->net, 0, ip_address);
341 		}
342 		if ( websocket )
343 		{
344 #if defined(OPENSSL)
345 			rc = WebSocket_connect(&aClient->net, ssl, ip_address);
346 #endif
347 			rc = WebSocket_connect(&aClient->net, 0, ip_address);
348 			if ( rc == TCPSOCKET_INTERRUPTED )
349 				aClient->connect_state = WEBSOCKET_IN_PROGRESS; /* Websocket connect called - wait for completion */
350 		}
351 		if (rc == 0)
352 		{
353 			/* Now send the MQTT connect packet */
354 			if ((rc = MQTTPacket_send_connect(aClient, MQTTVersion, connectProperties, willProperties)) == 0)
355 				aClient->connect_state = WAIT_FOR_CONNACK; /* MQTT Connect sent - wait for CONNACK */
356 			else
357 				aClient->connect_state = NOT_IN_PROGRESS;
358 		}
359 	}
360 
361 exit:
362 	FUNC_EXIT_RC(rc);
363 	return rc;
364 }
365 
366 
367 /**
368  * Process an incoming pingresp packet for a socket
369  * @param pack pointer to the publish packet
370  * @param sock the socket on which the packet was received
371  * @return completion code
372  */
373 int MQTTProtocol_handlePingresps(void* pack, SOCKET sock)
374 {
375 	Clients* client = NULL;
376 	int rc = TCPSOCKET_COMPLETE;
377 
378 	FUNC_ENTRY;
379 	client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
380 	Log(LOG_PROTOCOL, 21, NULL, sock, client->clientID);
381 	client->ping_outstanding = 0;
382 	FUNC_EXIT_RC(rc);
383 	return rc;
384 }
385 
386 
387 /**
388  * MQTT outgoing subscribe processing for a client
389  * @param client the client structure
390  * @param topics list of topics
391  * @param qoss corresponding list of QoSs
392  * @param opts MQTT 5.0 subscribe options
393  * @param props MQTT 5.0 subscribe properties
394  * @return completion code
395  */
396 int MQTTProtocol_subscribe(Clients* client, List* topics, List* qoss, int msgID,
397 		MQTTSubscribe_options* opts, MQTTProperties* props)
398 {
399 	int rc = 0;
400 
401 	FUNC_ENTRY;
402 	rc = MQTTPacket_send_subscribe(topics, qoss, opts, props, msgID, 0, client);
403 	FUNC_EXIT_RC(rc);
404 	return rc;
405 }
406 
407 
408 /**
409  * Process an incoming suback packet for a socket
410  * @param pack pointer to the publish packet
411  * @param sock the socket on which the packet was received
412  * @return completion code
413  */
414 int MQTTProtocol_handleSubacks(void* pack, SOCKET sock)
415 {
416 	Suback* suback = (Suback*)pack;
417 	Clients* client = NULL;
418 	int rc = TCPSOCKET_COMPLETE;
419 
420 	FUNC_ENTRY;
421 	client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
422 	Log(LOG_PROTOCOL, 23, NULL, sock, client->clientID, suback->msgId);
423 	MQTTPacket_freeSuback(suback);
424 	FUNC_EXIT_RC(rc);
425 	return rc;
426 }
427 
428 
429 /**
430  * MQTT outgoing unsubscribe processing for a client
431  * @param client the client structure
432  * @param topics list of topics
433  * @return completion code
434  */
435 int MQTTProtocol_unsubscribe(Clients* client, List* topics, int msgID, MQTTProperties* props)
436 {
437 	int rc = 0;
438 
439 	FUNC_ENTRY;
440 	rc = MQTTPacket_send_unsubscribe(topics, props, msgID, 0, client);
441 	FUNC_EXIT_RC(rc);
442 	return rc;
443 }
444 
445 
446 /**
447  * Process an incoming unsuback packet for a socket
448  * @param pack pointer to the publish packet
449  * @param sock the socket on which the packet was received
450  * @return completion code
451  */
452 int MQTTProtocol_handleUnsubacks(void* pack, SOCKET sock)
453 {
454 	Unsuback* unsuback = (Unsuback*)pack;
455 	Clients* client = NULL;
456 	int rc = TCPSOCKET_COMPLETE;
457 
458 	FUNC_ENTRY;
459 	client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
460 	Log(LOG_PROTOCOL, 24, NULL, sock, client->clientID, unsuback->msgId);
461 	MQTTPacket_freeUnsuback(unsuback);
462 	FUNC_EXIT_RC(rc);
463 	return rc;
464 }
465 
466 
467 /**
468  * Process an incoming disconnect packet for a socket
469  * @param pack pointer to the disconnect packet
470  * @param sock the socket on which the packet was received
471  * @return completion code
472  */
473 int MQTTProtocol_handleDisconnects(void* pack, SOCKET sock)
474 {
475 	Ack* disconnect = (Ack*)pack;
476 	Clients* client = NULL;
477 	int rc = TCPSOCKET_COMPLETE;
478 
479 	FUNC_ENTRY;
480 	client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
481 	Log(LOG_PROTOCOL, 30, NULL, sock, client->clientID, disconnect->rc);
482 	MQTTPacket_freeAck(disconnect);
483 	FUNC_EXIT_RC(rc);
484 	return rc;
485 }
486 
487