1 /*******************************************************************************
2 * Copyright (c) 2009, 2022 IBM Corp., Ian Craggs and others
3 *
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v2.0
6 * and Eclipse Distribution License v1.0 which accompany this distribution.
7 *
8 * The Eclipse Public License is available at
9 * https://www.eclipse.org/legal/epl-2.0/
10 * and the Eclipse Distribution License is available at
11 * http://www.eclipse.org/org/documents/edl-v10.php.
12 *
13 * Contributors:
14 * Ian Craggs - initial API and implementation and/or initial documentation
15 * Ian Craggs, Allan Stockdill-Mander - SSL updates
16 * Ian Craggs - fix for buffer overflow in addressPort bug #433290
17 * Ian Craggs - MQTT 3.1.1 support
18 * Rong Xiang, Ian Craggs - C++ compatibility
19 * Ian Craggs - fix for bug 479376
20 * Ian Craggs - SNI support
21 * Ian Craggs - fix for issue #164
22 * Ian Craggs - fix for issue #179
23 * Ian Craggs - MQTT 5.0 support
24 * Sven Gambel - add generic proxy support
25 *******************************************************************************/
26
27 /**
28 * @file
29 * \brief Functions dealing with the MQTT protocol exchanges
30 *
31 * Some other related functions are in the MQTTProtocolClient module
32 */
33
34 #include <stdlib.h>
35 #include <string.h>
36 #include <ctype.h>
37
38 #include "MQTTProtocolOut.h"
39 #include "StackTrace.h"
40 #include "Heap.h"
41 #include "WebSocket.h"
42 #include "Proxy.h"
43 #include "Base64.h"
44
45 extern ClientStates* bstate;
46
47
48
49 /**
50 * Separates an address:port into two separate values
51 * @param[in] uri the input string - hostname:port
52 * @param[out] port the returned port integer
53 * @param[out] topic optional topic portion of the address starting with '/'
54 * @return the address string
55 */
MQTTProtocol_addressPort(const char * uri,int * port,const char ** topic,int default_port)56 size_t MQTTProtocol_addressPort(const char* uri, int* port, const char **topic, int default_port)
57 {
58 char* buf = (char*)uri;
59 char* colon_pos;
60 size_t len;
61 char* topic_pos;
62
63 FUNC_ENTRY;
64 colon_pos = strrchr(uri, ':'); /* reverse find to allow for ':' in IPv6 addresses */
65
66 if (uri[0] == '[')
67 { /* ip v6 */
68 if (colon_pos < strrchr(uri, ']'))
69 colon_pos = NULL; /* means it was an IPv6 separator, not for host:port */
70 }
71
72 if (colon_pos) /* have to strip off the port */
73 {
74 len = colon_pos - uri;
75 *port = atoi(colon_pos + 1);
76 }
77 else
78 {
79 len = strlen(buf);
80 *port = default_port;
81 }
82
83 /* find any topic portion */
84 topic_pos = (char*)uri;
85 if (colon_pos)
86 topic_pos = colon_pos;
87 topic_pos = strchr(topic_pos, '/');
88 if (topic_pos)
89 {
90 if (topic)
91 *topic = topic_pos;
92 if (!colon_pos)
93 len = topic_pos - uri;
94 }
95
96 if (buf[len - 1] == ']')
97 {
98 /* we are stripping off the final ], so length is 1 shorter */
99 --len;
100 }
101 FUNC_EXIT;
102 return len;
103 }
104
105
106 /**
107 * Allow user or password characters to be expressed in the form of %XX, XX being the
108 * hexadecimal value of the character. This will avoid problems when a user code or a password
109 * contains a '@' or another special character ('%' included)
110 * @param p0 output string
111 * @param p1 input string
112 * @param basic_auth_in_len
113 */
MQTTProtocol_specialChars(char * p0,char * p1,b64_size_t * basic_auth_in_len)114 void MQTTProtocol_specialChars(char* p0, char* p1, b64_size_t *basic_auth_in_len)
115 {
116 while (*p1 != '@')
117 {
118 if (*p1 != '%')
119 {
120 *p0++ = *p1++;
121 }
122 else if (isxdigit(*(p1 + 1)) && isxdigit(*(p1 + 2)))
123 {
124 /* next 2 characters are hexa digits */
125 char hex[3];
126 p1++;
127 hex[0] = *p1++;
128 hex[1] = *p1++;
129 hex[2] = '\0';
130 *p0++ = (char)strtol(hex, 0, 16);
131 /* 3 input char => 1 output char */
132 *basic_auth_in_len -= 2;
133 }
134 }
135 *p0 = 0x0;
136 }
137
138
139 /*
140 * Examples of proxy settings:
141 * http://your.proxy.server:8080/
142 * http://user:pass@my.proxy.server:8080/
143 */
MQTTProtocol_setHTTPProxy(Clients * aClient,char * source,char ** dest,char ** auth_dest,char * prefix)144 int MQTTProtocol_setHTTPProxy(Clients* aClient, char* source, char** dest, char** auth_dest, char* prefix)
145 {
146 b64_size_t basic_auth_in_len, basic_auth_out_len;
147 b64_data_t *basic_auth;
148 char *p1;
149 int rc = 0;
150
151 if (*auth_dest)
152 {
153 free(*auth_dest);
154 *auth_dest = NULL;
155 }
156
157 if (source)
158 {
159 if ((p1 = strstr(source, prefix)) != NULL) /* skip http:// prefix, if any */
160 source += strlen(prefix);
161 *dest = source;
162 if ((p1 = strchr(source, '@')) != NULL) /* find user.pass separator */
163 *dest = p1 + 1;
164
165 if (p1)
166 {
167 /* basic auth len is string between http:// and @ */
168 basic_auth_in_len = (b64_size_t)(p1 - source);
169 if (basic_auth_in_len > 0)
170 {
171 basic_auth = (b64_data_t *)malloc(sizeof(char)*(basic_auth_in_len+1));
172 if (!basic_auth)
173 {
174 rc = PAHO_MEMORY_ERROR;
175 goto exit;
176 }
177 MQTTProtocol_specialChars((char*)basic_auth, source, &basic_auth_in_len);
178 basic_auth_out_len = Base64_encodeLength(basic_auth, basic_auth_in_len) + 1; /* add 1 for trailing NULL */
179 if ((*auth_dest = (char *)malloc(sizeof(char)*basic_auth_out_len)) == NULL)
180 {
181 free(basic_auth);
182 rc = PAHO_MEMORY_ERROR;
183 goto exit;
184 }
185 Base64_encode(*auth_dest, basic_auth_out_len, basic_auth, basic_auth_in_len);
186 free(basic_auth);
187 }
188 }
189 }
190 exit:
191 return rc;
192 }
193
194
195 /**
196 * MQTT outgoing connect processing for a client
197 * @param ip_address the TCP address:port to connect to
198 * @param aClient a structure with all MQTT data needed
199 * @param int ssl
200 * @param int MQTTVersion the MQTT version to connect with (3 or 4)
201 * @param long timeout how long to wait for a new socket to be created
202 * @return return code
203 */
204 #if defined(OPENSSL) || defined(MBEDTLS)
205 #if defined(__GNUC__) && defined(__linux__) && !defined(__LITEOS__)
MQTTProtocol_connect(const char * ip_address,Clients * aClient,int ssl,int websocket,int MQTTVersion,MQTTProperties * connectProperties,MQTTProperties * willProperties,long timeout)206 int MQTTProtocol_connect(const char* ip_address, Clients* aClient, int ssl, int websocket, int MQTTVersion,
207 MQTTProperties* connectProperties, MQTTProperties* willProperties, long timeout)
208 #else
209 int MQTTProtocol_connect(const char* ip_address, Clients* aClient, int ssl, int websocket, int MQTTVersion,
210 MQTTProperties* connectProperties, MQTTProperties* willProperties)
211 #endif
212 #else
213 #if defined(__GNUC__) && defined(__linux__) && !defined(__LITEOS__)
214 int MQTTProtocol_connect(const char* ip_address, Clients* aClient, int websocket, int MQTTVersion,
215 MQTTProperties* connectProperties, MQTTProperties* willProperties, long timeout)
216 #else
217 int MQTTProtocol_connect(const char* ip_address, Clients* aClient, int websocket, int MQTTVersion,
218 MQTTProperties* connectProperties, MQTTProperties* willProperties)
219 #endif
220 #endif
221 {
222 int rc = 0,
223 port;
224 size_t addr_len;
225 char* p0;
226
227 FUNC_ENTRY;
228 aClient->good = 1;
229
230 if (aClient->httpProxy)
231 p0 = aClient->httpProxy;
232 else
233 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
234 p0 = 0;
235 #else
236 p0 = getenv("http_proxy");
237 #endif
238
239 if (p0)
240 {
241 if ((rc = MQTTProtocol_setHTTPProxy(aClient, p0, &aClient->net.http_proxy, &aClient->net.http_proxy_auth, "http://")) != 0)
242 goto exit;
243 Log(TRACE_PROTOCOL, -1, "Setting http proxy to %s", aClient->net.http_proxy);
244 if (aClient->net.http_proxy_auth)
245 Log(TRACE_PROTOCOL, -1, "Setting http proxy auth to %s", aClient->net.http_proxy_auth);
246 }
247
248 #if defined(OPENSSL) || defined(MBEDTLS)
249 if (aClient->httpsProxy)
250 p0 = aClient->httpsProxy;
251 else
252 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
253 p0 = 0;
254 #else
255 p0 = getenv("https_proxy");
256 #endif
257
258 if (p0)
259 {
260 if ((rc = MQTTProtocol_setHTTPProxy(aClient, p0, &aClient->net.https_proxy, &aClient->net.https_proxy_auth, "https://")) != 0)
261 goto exit;
262 Log(TRACE_PROTOCOL, -1, "Setting https proxy to %s", aClient->net.https_proxy);
263 if (aClient->net.https_proxy_auth)
264 Log(TRACE_PROTOCOL, -1, "Setting https proxy auth to %s", aClient->net.https_proxy_auth);
265 }
266
267 if (!ssl && aClient->net.http_proxy) {
268 #else
269 if (aClient->net.http_proxy) {
270 #endif
271 addr_len = MQTTProtocol_addressPort(aClient->net.http_proxy, &port, NULL, PROXY_DEFAULT_PORT);
272 #if defined(__GNUC__) && defined(__linux__) && !defined(__LITEOS__)
273 if (timeout < 0)
274 rc = -1;
275 else
276 rc = Socket_new(aClient->net.http_proxy, addr_len, port, &(aClient->net.socket), timeout);
277 #else
278 rc = Socket_new(aClient->net.http_proxy, addr_len, port, &(aClient->net.socket));
279 #endif
280 }
281 #if defined(OPENSSL) || defined(MBEDTLS)
282 else if (ssl && aClient->net.https_proxy) {
283 addr_len = MQTTProtocol_addressPort(aClient->net.https_proxy, &port, NULL, PROXY_DEFAULT_PORT);
284 #if defined(__GNUC__) && defined(__linux__) && !defined(__LITEOS__)
285 if (timeout < 0)
286 rc = -1;
287 else
288 rc = Socket_new(aClient->net.https_proxy, addr_len, port, &(aClient->net.socket), timeout);
289 #else
290 rc = Socket_new(aClient->net.https_proxy, addr_len, port, &(aClient->net.socket));
291 #endif
292 }
293 #endif
294 else {
295 #if defined(OPENSSL) || defined(MBEDTLS)
296 addr_len = MQTTProtocol_addressPort(ip_address, &port, NULL, ssl ?
297 (websocket ? WSS_DEFAULT_PORT : SECURE_MQTT_DEFAULT_PORT) :
298 (websocket ? WS_DEFAULT_PORT : MQTT_DEFAULT_PORT) );
299 #else
300 addr_len = MQTTProtocol_addressPort(ip_address, &port, NULL, websocket ? WS_DEFAULT_PORT : MQTT_DEFAULT_PORT);
301 #endif
302 #if defined(__GNUC__) && defined(__linux__) && !defined(__LITEOS__)
303 if (timeout < 0)
304 rc = -1;
305 else
306 rc = Socket_new(ip_address, addr_len, port, &(aClient->net.socket), timeout);
307 #else
308 rc = Socket_new(ip_address, addr_len, port, &(aClient->net.socket));
309 #endif
310 }
311 if (rc == EINPROGRESS || rc == EWOULDBLOCK)
312 aClient->connect_state = TCP_IN_PROGRESS; /* TCP connect called - wait for connect completion */
313 else if (rc == 0)
314 { /* TCP connect completed. If SSL, send SSL connect */
315 #if defined(OPENSSL) || defined(MBEDTLS)
316 if (ssl)
317 {
318 if (aClient->net.https_proxy) {
319 aClient->connect_state = PROXY_CONNECT_IN_PROGRESS;
320 rc = Proxy_connect( &aClient->net, 1, ip_address);
321 }
322 if (rc == 0 && SSLSocket_setSocketForSSL(&aClient->net, aClient->sslopts, ip_address, addr_len) == 1)
323 {
324 rc = aClient->sslopts->struct_version >= 3 ?
325 SSLSocket_connect(aClient->net.ssl, aClient->net.socket, ip_address,
326 aClient->sslopts->verify, aClient->sslopts->ssl_error_cb, aClient->sslopts->ssl_error_context) :
327 SSLSocket_connect(aClient->net.ssl, aClient->net.socket, ip_address,
328 aClient->sslopts->verify, NULL, NULL);
329 if (rc == TCPSOCKET_INTERRUPTED)
330 aClient->connect_state = SSL_IN_PROGRESS; /* SSL connect called - wait for completion */
331 }
332 else
333 rc = SOCKET_ERROR;
334 }
335 else if (aClient->net.http_proxy) {
336 #else
337 if (aClient->net.http_proxy) {
338 #endif
339 aClient->connect_state = PROXY_CONNECT_IN_PROGRESS;
340 rc = Proxy_connect( &aClient->net, 0, ip_address);
341 }
342 if ( websocket )
343 {
344 #if defined(OPENSSL)
345 rc = WebSocket_connect(&aClient->net, ssl, ip_address);
346 #endif
347 rc = WebSocket_connect(&aClient->net, 0, ip_address);
348 if ( rc == TCPSOCKET_INTERRUPTED )
349 aClient->connect_state = WEBSOCKET_IN_PROGRESS; /* Websocket connect called - wait for completion */
350 }
351 if (rc == 0)
352 {
353 /* Now send the MQTT connect packet */
354 if ((rc = MQTTPacket_send_connect(aClient, MQTTVersion, connectProperties, willProperties)) == 0)
355 aClient->connect_state = WAIT_FOR_CONNACK; /* MQTT Connect sent - wait for CONNACK */
356 else
357 aClient->connect_state = NOT_IN_PROGRESS;
358 }
359 }
360
361 exit:
362 FUNC_EXIT_RC(rc);
363 return rc;
364 }
365
366
367 /**
368 * Process an incoming pingresp packet for a socket
369 * @param pack pointer to the publish packet
370 * @param sock the socket on which the packet was received
371 * @return completion code
372 */
373 int MQTTProtocol_handlePingresps(void* pack, SOCKET sock)
374 {
375 Clients* client = NULL;
376 int rc = TCPSOCKET_COMPLETE;
377
378 FUNC_ENTRY;
379 client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
380 Log(LOG_PROTOCOL, 21, NULL, sock, client->clientID);
381 client->ping_outstanding = 0;
382 FUNC_EXIT_RC(rc);
383 return rc;
384 }
385
386
387 /**
388 * MQTT outgoing subscribe processing for a client
389 * @param client the client structure
390 * @param topics list of topics
391 * @param qoss corresponding list of QoSs
392 * @param opts MQTT 5.0 subscribe options
393 * @param props MQTT 5.0 subscribe properties
394 * @return completion code
395 */
396 int MQTTProtocol_subscribe(Clients* client, List* topics, List* qoss, int msgID,
397 MQTTSubscribe_options* opts, MQTTProperties* props)
398 {
399 int rc = 0;
400
401 FUNC_ENTRY;
402 rc = MQTTPacket_send_subscribe(topics, qoss, opts, props, msgID, 0, client);
403 FUNC_EXIT_RC(rc);
404 return rc;
405 }
406
407
408 /**
409 * Process an incoming suback packet for a socket
410 * @param pack pointer to the publish packet
411 * @param sock the socket on which the packet was received
412 * @return completion code
413 */
414 int MQTTProtocol_handleSubacks(void* pack, SOCKET sock)
415 {
416 Suback* suback = (Suback*)pack;
417 Clients* client = NULL;
418 int rc = TCPSOCKET_COMPLETE;
419
420 FUNC_ENTRY;
421 client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
422 Log(LOG_PROTOCOL, 23, NULL, sock, client->clientID, suback->msgId);
423 MQTTPacket_freeSuback(suback);
424 FUNC_EXIT_RC(rc);
425 return rc;
426 }
427
428
429 /**
430 * MQTT outgoing unsubscribe processing for a client
431 * @param client the client structure
432 * @param topics list of topics
433 * @return completion code
434 */
435 int MQTTProtocol_unsubscribe(Clients* client, List* topics, int msgID, MQTTProperties* props)
436 {
437 int rc = 0;
438
439 FUNC_ENTRY;
440 rc = MQTTPacket_send_unsubscribe(topics, props, msgID, 0, client);
441 FUNC_EXIT_RC(rc);
442 return rc;
443 }
444
445
446 /**
447 * Process an incoming unsuback packet for a socket
448 * @param pack pointer to the publish packet
449 * @param sock the socket on which the packet was received
450 * @return completion code
451 */
452 int MQTTProtocol_handleUnsubacks(void* pack, SOCKET sock)
453 {
454 Unsuback* unsuback = (Unsuback*)pack;
455 Clients* client = NULL;
456 int rc = TCPSOCKET_COMPLETE;
457
458 FUNC_ENTRY;
459 client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
460 Log(LOG_PROTOCOL, 24, NULL, sock, client->clientID, unsuback->msgId);
461 MQTTPacket_freeUnsuback(unsuback);
462 FUNC_EXIT_RC(rc);
463 return rc;
464 }
465
466
467 /**
468 * Process an incoming disconnect packet for a socket
469 * @param pack pointer to the disconnect packet
470 * @param sock the socket on which the packet was received
471 * @return completion code
472 */
473 int MQTTProtocol_handleDisconnects(void* pack, SOCKET sock)
474 {
475 Ack* disconnect = (Ack*)pack;
476 Clients* client = NULL;
477 int rc = TCPSOCKET_COMPLETE;
478
479 FUNC_ENTRY;
480 client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
481 Log(LOG_PROTOCOL, 30, NULL, sock, client->clientID, disconnect->rc);
482 MQTTPacket_freeAck(disconnect);
483 FUNC_EXIT_RC(rc);
484 return rc;
485 }
486
487