1 /***************************************************************************
2 * _ _ ____ _
3 * Project ___| | | | _ \| |
4 * / __| | | | |_) | |
5 * | (__| |_| | _ <| |___
6 * \___|\___/|_| \_\_____|
7 *
8 * Copyright (C) 2020, Daniel Stenberg, <daniel@haxx.se>, et al.
9 * Copyright (C) 2019, Björn Stenberg, <bjorn@haxx.se>
10 *
11 * This software is licensed as described in the file COPYING, which
12 * you should have received as part of this distribution. The terms
13 * are also available at https://curl.haxx.se/docs/copyright.html.
14 *
15 * You may opt to use, copy, modify, merge, publish, distribute and/or sell
16 * copies of the Software, and permit persons to whom the Software is
17 * furnished to do so, under the terms of the COPYING file.
18 *
19 * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
20 * KIND, either express or implied.
21 *
22 ***************************************************************************/
23
24 #include "curl_setup.h"
25
26 #ifndef CURL_DISABLE_MQTT
27
28 #include "urldata.h"
29 #include <curl/curl.h>
30 #include "transfer.h"
31 #include "sendf.h"
32 #include "progress.h"
33 #include "mqtt.h"
34 #include "select.h"
35 #include "strdup.h"
36 #include "url.h"
37 #include "escape.h"
38 #include "warnless.h"
39 #include "curl_printf.h"
40 #include "curl_memory.h"
41 #include "multiif.h"
42 #include "rand.h"
43
44 /* The last #include file should be: */
45 #include "memdebug.h"
46
47 #define MQTT_MSG_CONNECT 0x10
48 #define MQTT_MSG_CONNACK 0x20
49 #define MQTT_MSG_PUBLISH 0x30
50 #define MQTT_MSG_SUBSCRIBE 0x82
51 #define MQTT_MSG_SUBACK 0x90
52 #define MQTT_MSG_DISCONNECT 0xe0
53
54 #define MQTT_CONNACK_LEN 2
55 #define MQTT_SUBACK_LEN 3
56 #define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
57
58 /*
59 * Forward declarations.
60 */
61
62 static CURLcode mqtt_do(struct connectdata *conn, bool *done);
63 static CURLcode mqtt_doing(struct connectdata *conn, bool *done);
64 static int mqtt_getsock(struct connectdata *conn, curl_socket_t *sock);
65 static CURLcode mqtt_setup_conn(struct connectdata *conn);
66
67 /*
68 * MQTT protocol handler.
69 */
70
71 const struct Curl_handler Curl_handler_mqtt = {
72 "MQTT", /* scheme */
73 mqtt_setup_conn, /* setup_connection */
74 mqtt_do, /* do_it */
75 ZERO_NULL, /* done */
76 ZERO_NULL, /* do_more */
77 ZERO_NULL, /* connect_it */
78 ZERO_NULL, /* connecting */
79 mqtt_doing, /* doing */
80 ZERO_NULL, /* proto_getsock */
81 mqtt_getsock, /* doing_getsock */
82 ZERO_NULL, /* domore_getsock */
83 ZERO_NULL, /* perform_getsock */
84 ZERO_NULL, /* disconnect */
85 ZERO_NULL, /* readwrite */
86 ZERO_NULL, /* connection_check */
87 PORT_MQTT, /* defport */
88 CURLPROTO_MQTT, /* protocol */
89 CURLPROTO_MQTT, /* family */
90 PROTOPT_NONE /* flags */
91 };
92
mqtt_setup_conn(struct connectdata * conn)93 static CURLcode mqtt_setup_conn(struct connectdata *conn)
94 {
95 /* allocate the HTTP-specific struct for the Curl_easy, only to survive
96 during this request */
97 struct MQTT *mq;
98 struct Curl_easy *data = conn->data;
99 DEBUGASSERT(data->req.protop == NULL);
100
101 mq = calloc(1, sizeof(struct MQTT));
102 if(!mq)
103 return CURLE_OUT_OF_MEMORY;
104 data->req.protop = mq;
105 return CURLE_OK;
106 }
107
mqtt_send(struct connectdata * conn,char * buf,size_t len)108 static CURLcode mqtt_send(struct connectdata *conn,
109 char *buf, size_t len)
110 {
111 CURLcode result = CURLE_OK;
112 curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
113 struct Curl_easy *data = conn->data;
114 struct MQTT *mq = data->req.protop;
115 ssize_t n;
116 result = Curl_write(conn, sockfd, buf, len, &n);
117 if(!result && data->set.verbose)
118 Curl_debug(data, CURLINFO_HEADER_OUT, buf, (size_t)n);
119 if(len != (size_t)n) {
120 size_t nsend = len - n;
121 char *sendleftovers = Curl_memdup(&buf[n], nsend);
122 if(!sendleftovers)
123 return CURLE_OUT_OF_MEMORY;
124 mq->sendleftovers = sendleftovers;
125 mq->nsend = nsend;
126 }
127 return result;
128 }
129
130 /* Generic function called by the multi interface to figure out what socket(s)
131 to wait for and for what actions during the DOING and PROTOCONNECT
132 states */
mqtt_getsock(struct connectdata * conn,curl_socket_t * sock)133 static int mqtt_getsock(struct connectdata *conn,
134 curl_socket_t *sock)
135 {
136 sock[0] = conn->sock[FIRSTSOCKET];
137 return GETSOCK_READSOCK(FIRSTSOCKET);
138 }
139
mqtt_connect(struct connectdata * conn)140 static CURLcode mqtt_connect(struct connectdata *conn)
141 {
142 CURLcode result = CURLE_OK;
143 const size_t client_id_offset = 14;
144 const size_t packetlen = client_id_offset + MQTT_CLIENTID_LEN;
145 char client_id[MQTT_CLIENTID_LEN + 1] = "curl";
146 const size_t clen = strlen("curl");
147 char packet[32] = {
148 MQTT_MSG_CONNECT, /* packet type */
149 0x00, /* remaining length */
150 0x00, 0x04, /* protocol length */
151 'M','Q','T','T', /* protocol name */
152 0x04, /* protocol level */
153 0x02, /* CONNECT flag: CleanSession */
154 0x00, 0x3c, /* keep-alive 0 = disabled */
155 0x00, 0x00 /* payload1 length */
156 };
157 packet[1] = (packetlen - 2) & 0x7f;
158 packet[client_id_offset - 1] = MQTT_CLIENTID_LEN;
159
160 result = Curl_rand_hex(conn->data, (unsigned char *)&client_id[clen],
161 MQTT_CLIENTID_LEN - clen + 1);
162 memcpy(&packet[client_id_offset], client_id, MQTT_CLIENTID_LEN);
163 infof(conn->data, "Using client id '%s'\n", client_id);
164 if(!result)
165 result = mqtt_send(conn, packet, packetlen);
166 return result;
167 }
168
mqtt_disconnect(struct connectdata * conn)169 static CURLcode mqtt_disconnect(struct connectdata *conn)
170 {
171 CURLcode result = CURLE_OK;
172 result = mqtt_send(conn, (char *)"\xe0\x00", 2);
173 return result;
174 }
175
mqtt_verify_connack(struct connectdata * conn)176 static CURLcode mqtt_verify_connack(struct connectdata *conn)
177 {
178 CURLcode result;
179 curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
180 unsigned char readbuf[MQTT_CONNACK_LEN];
181 ssize_t nread;
182 struct Curl_easy *data = conn->data;
183
184 result = Curl_read(conn, sockfd, (char *)readbuf, MQTT_CONNACK_LEN, &nread);
185 if(result)
186 goto fail;
187
188 if(data->set.verbose)
189 Curl_debug(data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
190
191 /* fixme */
192 if(nread < MQTT_CONNACK_LEN) {
193 result = CURLE_WEIRD_SERVER_REPLY;
194 goto fail;
195 }
196
197 /* verify CONNACK */
198 if(readbuf[0] != 0x00 || readbuf[1] != 0x00) {
199 failf(data, "Expected %02x%02x but got %02x%02x",
200 0x00, 0x00, readbuf[0], readbuf[1]);
201 result = CURLE_WEIRD_SERVER_REPLY;
202 }
203
204 fail:
205 return result;
206 }
207
mqtt_get_topic(struct connectdata * conn,char ** topic,size_t * topiclen)208 static CURLcode mqtt_get_topic(struct connectdata *conn,
209 char **topic, size_t *topiclen)
210 {
211 CURLcode result = CURLE_OK;
212 char *path = conn->data->state.up.path;
213
214 if(strlen(path) > 1) {
215 result = Curl_urldecode(conn->data, path + 1, 0, topic, topiclen,
216 REJECT_NADA);
217 }
218 else {
219 failf(conn->data, "Error: No topic specified.");
220 result = CURLE_URL_MALFORMAT;
221 }
222 return result;
223 }
224
225
mqtt_encode_len(char * buf,size_t len)226 static int mqtt_encode_len(char *buf, size_t len)
227 {
228 unsigned char encoded;
229 int i;
230
231 for(i = 0; (len > 0) && (i<4); i++) {
232 encoded = len % 0x80;
233 len /= 0x80;
234 if(len)
235 encoded |= 0x80;
236 buf[i] = encoded;
237 }
238
239 return i;
240 }
241
mqtt_subscribe(struct connectdata * conn)242 static CURLcode mqtt_subscribe(struct connectdata *conn)
243 {
244 CURLcode result = CURLE_OK;
245 char *topic = NULL;
246 size_t topiclen;
247 unsigned char *packet = NULL;
248 size_t packetlen;
249 char encodedsize[4];
250 size_t n;
251
252 result = mqtt_get_topic(conn, &topic, &topiclen);
253 if(result)
254 goto fail;
255
256 conn->proto.mqtt.packetid++;
257
258 packetlen = topiclen + 5; /* packetid + topic (has a two byte length field)
259 + 2 bytes topic length + QoS byte */
260 n = mqtt_encode_len((char *)encodedsize, packetlen);
261 packetlen += n + 1; /* add one for the control packet type byte */
262
263 packet = malloc(packetlen);
264 if(!packet) {
265 result = CURLE_OUT_OF_MEMORY;
266 goto fail;
267 }
268
269 packet[0] = MQTT_MSG_SUBSCRIBE;
270 memcpy(&packet[1], encodedsize, n);
271 packet[1 + n] = (conn->proto.mqtt.packetid >> 8) & 0xff;
272 packet[2 + n] = conn->proto.mqtt.packetid & 0xff;
273 packet[3 + n] = (topiclen >> 8) & 0xff;
274 packet[4 + n ] = topiclen & 0xff;
275 memcpy(&packet[5 + n], topic, topiclen);
276 packet[5 + n + topiclen] = 0; /* QoS zero */
277
278 result = mqtt_send(conn, (char *)packet, packetlen);
279
280 fail:
281 free(topic);
282 free(packet);
283 return result;
284 }
285
286 /*
287 * Called when the first byte was already read.
288 */
mqtt_verify_suback(struct connectdata * conn)289 static CURLcode mqtt_verify_suback(struct connectdata *conn)
290 {
291 CURLcode result;
292 curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
293 unsigned char readbuf[MQTT_SUBACK_LEN];
294 ssize_t nread;
295 struct mqtt_conn *mqtt = &conn->proto.mqtt;
296
297 result = Curl_read(conn, sockfd, (char *)readbuf, MQTT_SUBACK_LEN, &nread);
298 if(result)
299 goto fail;
300
301 if(conn->data->set.verbose)
302 Curl_debug(conn->data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
303
304 /* fixme */
305 if(nread < MQTT_SUBACK_LEN) {
306 result = CURLE_WEIRD_SERVER_REPLY;
307 goto fail;
308 }
309
310 /* verify SUBACK */
311 if(readbuf[0] != ((mqtt->packetid >> 8) & 0xff) ||
312 readbuf[1] != (mqtt->packetid & 0xff) ||
313 readbuf[2] != 0x00)
314 result = CURLE_WEIRD_SERVER_REPLY;
315
316 fail:
317 return result;
318 }
319
mqtt_publish(struct connectdata * conn)320 static CURLcode mqtt_publish(struct connectdata *conn)
321 {
322 CURLcode result;
323 char *payload = conn->data->set.postfields;
324 size_t payloadlen = (size_t)conn->data->set.postfieldsize;
325 char *topic = NULL;
326 size_t topiclen;
327 unsigned char *pkt = NULL;
328 size_t i = 0;
329 size_t remaininglength;
330 size_t encodelen;
331 char encodedbytes[4];
332
333 result = mqtt_get_topic(conn, &topic, &topiclen);
334 if(result)
335 goto fail;
336
337 remaininglength = payloadlen + 2 + topiclen;
338 encodelen = mqtt_encode_len(encodedbytes, remaininglength);
339
340 /* add the control byte and the encoded remaining length */
341 pkt = malloc(remaininglength + 1 + encodelen);
342 if(!pkt) {
343 result = CURLE_OUT_OF_MEMORY;
344 goto fail;
345 }
346
347 /* assemble packet */
348 pkt[i++] = MQTT_MSG_PUBLISH;
349 memcpy(&pkt[i], encodedbytes, encodelen);
350 i += encodelen;
351 pkt[i++] = (topiclen >> 8) & 0xff;
352 pkt[i++] = (topiclen & 0xff);
353 memcpy(&pkt[i], topic, topiclen);
354 i += topiclen;
355 memcpy(&pkt[i], payload, payloadlen);
356 i += payloadlen;
357 result = mqtt_send(conn, (char *)pkt, i);
358
359 fail:
360 free(pkt);
361 free(topic);
362 return result;
363 }
364
mqtt_decode_len(unsigned char * buf,size_t buflen,size_t * lenbytes)365 static size_t mqtt_decode_len(unsigned char *buf,
366 size_t buflen, size_t *lenbytes)
367 {
368 size_t len = 0;
369 size_t mult = 1;
370 size_t i;
371 unsigned char encoded = 128;
372
373 for(i = 0; (i < buflen) && (encoded & 128); i++) {
374 encoded = buf[i];
375 len += (encoded & 127) * mult;
376 mult *= 128;
377 }
378
379 if(lenbytes)
380 *lenbytes = i;
381
382 return len;
383 }
384
385 #ifdef CURLDEBUG
386 static const char *statenames[]={
387 "MQTT_FIRST",
388 "MQTT_REMAINING_LENGTH",
389 "MQTT_CONNACK",
390 "MQTT_SUBACK",
391 "MQTT_SUBACK_COMING",
392 "MQTT_PUBWAIT",
393 "MQTT_PUB_REMAIN",
394
395 "NOT A STATE"
396 };
397 #endif
398
399 /* The only way to change state */
mqstate(struct connectdata * conn,enum mqttstate state,enum mqttstate nextstate)400 static void mqstate(struct connectdata *conn,
401 enum mqttstate state,
402 enum mqttstate nextstate) /* used if state == FIRST */
403 {
404 struct mqtt_conn *mqtt = &conn->proto.mqtt;
405 #ifdef CURLDEBUG
406 infof(conn->data, "%s (from %s) (next is %s)\n",
407 statenames[state],
408 statenames[mqtt->state],
409 (state == MQTT_FIRST)? statenames[nextstate] : "");
410 #endif
411 mqtt->state = state;
412 if(state == MQTT_FIRST)
413 mqtt->nextstate = nextstate;
414 }
415
416
417 /* for the publish packet */
418 #define MQTT_HEADER_LEN 5 /* max 5 bytes */
419
mqtt_read_publish(struct connectdata * conn,bool * done)420 static CURLcode mqtt_read_publish(struct connectdata *conn,
421 bool *done)
422 {
423 CURLcode result = CURLE_OK;
424 curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
425 ssize_t nread;
426 struct Curl_easy *data = conn->data;
427 unsigned char *pkt = (unsigned char *)data->state.buffer;
428 size_t remlen;
429 struct mqtt_conn *mqtt = &conn->proto.mqtt;
430 struct MQTT *mq = data->req.protop;
431 unsigned char packet;
432
433 switch(mqtt->state) {
434 MQTT_SUBACK_COMING:
435 case MQTT_SUBACK_COMING:
436 result = mqtt_verify_suback(conn);
437 if(result)
438 break;
439
440 mqstate(conn, MQTT_FIRST, MQTT_PUBWAIT);
441 break;
442
443 case MQTT_SUBACK:
444 case MQTT_PUBWAIT:
445 /* we are expecting PUBLISH or SUBACK */
446 packet = mq->firstbyte & 0xf0;
447 if(packet == MQTT_MSG_PUBLISH)
448 mqstate(conn, MQTT_PUB_REMAIN, MQTT_NOSTATE);
449 else if(packet == MQTT_MSG_SUBACK) {
450 mqstate(conn, MQTT_SUBACK_COMING, MQTT_NOSTATE);
451 goto MQTT_SUBACK_COMING;
452 }
453 else if(packet == MQTT_MSG_DISCONNECT) {
454 infof(data, "Got DISCONNECT\n");
455 *done = TRUE;
456 goto end;
457 }
458 else {
459 result = CURLE_WEIRD_SERVER_REPLY;
460 goto end;
461 }
462
463 /* -- switched state -- */
464 remlen = mq->remaining_length;
465 infof(data, "Remaining length: %zd bytes\n", remlen);
466 Curl_pgrsSetDownloadSize(data, remlen);
467 data->req.bytecount = 0;
468 data->req.size = remlen;
469 mq->npacket = remlen; /* get this many bytes */
470 /* FALLTHROUGH */
471 case MQTT_PUB_REMAIN: {
472 /* read rest of packet, but no more. Cap to buffer size */
473 struct SingleRequest *k = &data->req;
474 size_t rest = mq->npacket;
475 if(rest > (size_t)data->set.buffer_size)
476 rest = (size_t)data->set.buffer_size;
477 result = Curl_read(conn, sockfd, (char *)pkt, rest, &nread);
478 if(result) {
479 if(CURLE_AGAIN == result) {
480 infof(data, "EEEE AAAAGAIN\n");
481 }
482 goto end;
483 }
484 if(!nread) {
485 infof(data, "server disconnected\n");
486 result = CURLE_PARTIAL_FILE;
487 goto end;
488 }
489 if(data->set.verbose)
490 Curl_debug(data, CURLINFO_DATA_IN, (char *)pkt, (size_t)nread);
491
492 mq->npacket -= nread;
493 k->bytecount += nread;
494 Curl_pgrsSetDownloadCounter(data, k->bytecount);
495
496 /* if QoS is set, message contains packet id */
497
498 result = Curl_client_write(conn, CLIENTWRITE_BODY, (char *)pkt, nread);
499 if(result)
500 goto end;
501
502 if(!mq->npacket)
503 /* no more PUBLISH payload, back to subscribe wait state */
504 mqstate(conn, MQTT_FIRST, MQTT_PUBWAIT);
505 break;
506 }
507 default:
508 DEBUGASSERT(NULL); /* illegal state */
509 result = CURLE_WEIRD_SERVER_REPLY;
510 goto end;
511 }
512 end:
513 return result;
514 }
515
mqtt_do(struct connectdata * conn,bool * done)516 static CURLcode mqtt_do(struct connectdata *conn, bool *done)
517 {
518 CURLcode result = CURLE_OK;
519 struct Curl_easy *data = conn->data;
520
521 *done = FALSE; /* unconditionally */
522
523 result = mqtt_connect(conn);
524 if(result) {
525 failf(data, "Error %d sending MQTT CONN request", result);
526 return result;
527 }
528 mqstate(conn, MQTT_FIRST, MQTT_CONNACK);
529 return CURLE_OK;
530 }
531
mqtt_doing(struct connectdata * conn,bool * done)532 static CURLcode mqtt_doing(struct connectdata *conn, bool *done)
533 {
534 CURLcode result = CURLE_OK;
535 struct mqtt_conn *mqtt = &conn->proto.mqtt;
536 struct Curl_easy *data = conn->data;
537 struct MQTT *mq = data->req.protop;
538 ssize_t nread;
539 curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
540 unsigned char *pkt = (unsigned char *)data->state.buffer;
541 unsigned char byte;
542
543 *done = FALSE;
544
545 if(mq->nsend) {
546 /* send the remainder of an outgoing packet */
547 char *ptr = mq->sendleftovers;
548 result = mqtt_send(conn, mq->sendleftovers, mq->nsend);
549 free(ptr);
550 if(result)
551 return result;
552 }
553
554 infof(data, "mqtt_doing: state [%d]\n", (int) mqtt->state);
555 switch(mqtt->state) {
556 case MQTT_FIRST:
557 /* Read the initial byte only */
558 result = Curl_read(conn, sockfd, (char *)&mq->firstbyte, 1, &nread);
559 if(result)
560 break;
561 if(data->set.verbose)
562 Curl_debug(data, CURLINFO_HEADER_IN, (char *)&mq->firstbyte, 1);
563 /* remember the first byte */
564 mq->npacket = 0;
565 mqstate(conn, MQTT_REMAINING_LENGTH, MQTT_NOSTATE);
566 /* FALLTHROUGH */
567 case MQTT_REMAINING_LENGTH:
568 do {
569 result = Curl_read(conn, sockfd, (char *)&byte, 1, &nread);
570 if(result)
571 break;
572 if(data->set.verbose)
573 Curl_debug(data, CURLINFO_HEADER_IN, (char *)&byte, 1);
574 pkt[mq->npacket++] = byte;
575 } while((byte & 0x80) && (mq->npacket < 4));
576 if(result)
577 break;
578 mq->remaining_length = mqtt_decode_len(&pkt[0], mq->npacket, NULL);
579 mq->npacket = 0;
580 if(mq->remaining_length) {
581 mqstate(conn, mqtt->nextstate, MQTT_NOSTATE);
582 break;
583 }
584 mqstate(conn, MQTT_FIRST, MQTT_FIRST);
585
586 if(mq->firstbyte == MQTT_MSG_DISCONNECT) {
587 infof(data, "Got DISCONNECT\n");
588 *done = TRUE;
589 }
590 break;
591 case MQTT_CONNACK:
592 result = mqtt_verify_connack(conn);
593 if(result)
594 break;
595
596 if(conn->data->state.httpreq == HTTPREQ_POST) {
597 result = mqtt_publish(conn);
598 if(!result) {
599 result = mqtt_disconnect(conn);
600 *done = TRUE;
601 }
602 mqtt->nextstate = MQTT_FIRST;
603 }
604 else {
605 result = mqtt_subscribe(conn);
606 if(!result) {
607 mqstate(conn, MQTT_FIRST, MQTT_SUBACK);
608 }
609 }
610 break;
611
612 case MQTT_SUBACK:
613 case MQTT_PUBWAIT:
614 case MQTT_PUB_REMAIN:
615 result = mqtt_read_publish(conn, done);
616 break;
617
618 default:
619 failf(conn->data, "State not handled yet");
620 *done = TRUE;
621 break;
622 }
623
624 if(result == CURLE_AGAIN)
625 result = CURLE_OK;
626 return result;
627 }
628
629 #endif /* CURL_DISABLE_MQTT */
630