1 /***************************************************************************
2 * _ _ ____ _
3 * Project ___| | | | _ \| |
4 * / __| | | | |_) | |
5 * | (__| |_| | _ <| |___
6 * \___|\___/|_| \_\_____|
7 *
8 * Copyright (C) 2020 - 2021, Daniel Stenberg, <daniel@haxx.se>, et al.
9 * Copyright (C) 2019, Björn Stenberg, <bjorn@haxx.se>
10 *
11 * This software is licensed as described in the file COPYING, which
12 * you should have received as part of this distribution. The terms
13 * are also available at https://curl.se/docs/copyright.html.
14 *
15 * You may opt to use, copy, modify, merge, publish, distribute and/or sell
16 * copies of the Software, and permit persons to whom the Software is
17 * furnished to do so, under the terms of the COPYING file.
18 *
19 * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
20 * KIND, either express or implied.
21 *
22 ***************************************************************************/
23
24 #include "curl_setup.h"
25
26 #ifndef CURL_DISABLE_MQTT
27
28 #include "urldata.h"
29 #include <curl/curl.h>
30 #include "transfer.h"
31 #include "sendf.h"
32 #include "progress.h"
33 #include "mqtt.h"
34 #include "select.h"
35 #include "strdup.h"
36 #include "url.h"
37 #include "escape.h"
38 #include "warnless.h"
39 #include "curl_printf.h"
40 #include "curl_memory.h"
41 #include "multiif.h"
42 #include "rand.h"
43
44 /* The last #include file should be: */
45 #include "memdebug.h"
46
47 #define MQTT_MSG_CONNECT 0x10
48 #define MQTT_MSG_CONNACK 0x20
49 #define MQTT_MSG_PUBLISH 0x30
50 #define MQTT_MSG_SUBSCRIBE 0x82
51 #define MQTT_MSG_SUBACK 0x90
52 #define MQTT_MSG_DISCONNECT 0xe0
53
54 #define MQTT_CONNACK_LEN 2
55 #define MQTT_SUBACK_LEN 3
56 #define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
57
58 /*
59 * Forward declarations.
60 */
61
62 static CURLcode mqtt_do(struct Curl_easy *data, bool *done);
63 static CURLcode mqtt_doing(struct Curl_easy *data, bool *done);
64 static int mqtt_getsock(struct Curl_easy *data, struct connectdata *conn,
65 curl_socket_t *sock);
66 static CURLcode mqtt_setup_conn(struct Curl_easy *data,
67 struct connectdata *conn);
68
69 /*
70 * MQTT protocol handler.
71 */
72
73 const struct Curl_handler Curl_handler_mqtt = {
74 "MQTT", /* scheme */
75 mqtt_setup_conn, /* setup_connection */
76 mqtt_do, /* do_it */
77 ZERO_NULL, /* done */
78 ZERO_NULL, /* do_more */
79 ZERO_NULL, /* connect_it */
80 ZERO_NULL, /* connecting */
81 mqtt_doing, /* doing */
82 ZERO_NULL, /* proto_getsock */
83 mqtt_getsock, /* doing_getsock */
84 ZERO_NULL, /* domore_getsock */
85 ZERO_NULL, /* perform_getsock */
86 ZERO_NULL, /* disconnect */
87 ZERO_NULL, /* readwrite */
88 ZERO_NULL, /* connection_check */
89 ZERO_NULL, /* attach connection */
90 PORT_MQTT, /* defport */
91 CURLPROTO_MQTT, /* protocol */
92 CURLPROTO_MQTT, /* family */
93 PROTOPT_NONE /* flags */
94 };
95
mqtt_setup_conn(struct Curl_easy * data,struct connectdata * conn)96 static CURLcode mqtt_setup_conn(struct Curl_easy *data,
97 struct connectdata *conn)
98 {
99 /* allocate the HTTP-specific struct for the Curl_easy, only to survive
100 during this request */
101 struct MQTT *mq;
102 (void)conn;
103 DEBUGASSERT(data->req.p.mqtt == NULL);
104
105 mq = calloc(1, sizeof(struct MQTT));
106 if(!mq)
107 return CURLE_OUT_OF_MEMORY;
108 data->req.p.mqtt = mq;
109 return CURLE_OK;
110 }
111
mqtt_send(struct Curl_easy * data,char * buf,size_t len)112 static CURLcode mqtt_send(struct Curl_easy *data,
113 char *buf, size_t len)
114 {
115 CURLcode result = CURLE_OK;
116 struct connectdata *conn = data->conn;
117 curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
118 struct MQTT *mq = data->req.p.mqtt;
119 ssize_t n;
120 result = Curl_write(data, sockfd, buf, len, &n);
121 if(!result)
122 Curl_debug(data, CURLINFO_HEADER_OUT, buf, (size_t)n);
123 if(len != (size_t)n) {
124 size_t nsend = len - n;
125 char *sendleftovers = Curl_memdup(&buf[n], nsend);
126 if(!sendleftovers)
127 return CURLE_OUT_OF_MEMORY;
128 mq->sendleftovers = sendleftovers;
129 mq->nsend = nsend;
130 }
131 else {
132 mq->sendleftovers = NULL;
133 mq->nsend = 0;
134 }
135 return result;
136 }
137
138 /* Generic function called by the multi interface to figure out what socket(s)
139 to wait for and for what actions during the DOING and PROTOCONNECT
140 states */
mqtt_getsock(struct Curl_easy * data,struct connectdata * conn,curl_socket_t * sock)141 static int mqtt_getsock(struct Curl_easy *data,
142 struct connectdata *conn,
143 curl_socket_t *sock)
144 {
145 (void)data;
146 sock[0] = conn->sock[FIRSTSOCKET];
147 return GETSOCK_READSOCK(FIRSTSOCKET);
148 }
149
mqtt_encode_len(char * buf,size_t len)150 static int mqtt_encode_len(char *buf, size_t len)
151 {
152 unsigned char encoded;
153 int i;
154
155 for(i = 0; (len > 0) && (i<4); i++) {
156 encoded = len % 0x80;
157 len /= 0x80;
158 if(len)
159 encoded |= 0x80;
160 buf[i] = encoded;
161 }
162
163 return i;
164 }
165
166 /* add the passwd to the CONNECT packet */
add_passwd(const char * passwd,const size_t plen,char * pkt,const size_t start,int remain_pos)167 static int add_passwd(const char *passwd, const size_t plen,
168 char *pkt, const size_t start, int remain_pos)
169 {
170 /* magic number that need to be set properly */
171 const size_t conn_flags_pos = remain_pos + 8;
172 if(plen > 0xffff)
173 return 1;
174
175 /* set password flag */
176 pkt[conn_flags_pos] |= 0x40;
177
178 /* length of password provided */
179 pkt[start] = (char)((plen >> 8) & 0xFF);
180 pkt[start + 1] = (char)(plen & 0xFF);
181 memcpy(&pkt[start + 2], passwd, plen);
182 return 0;
183 }
184
185 /* add user to the CONN packet */
add_user(const char * username,const size_t ulen,unsigned char * pkt,const size_t start,int remain_pos)186 static int add_user(const char *username, const size_t ulen,
187 unsigned char *pkt, const size_t start, int remain_pos)
188 {
189 /* magic number that need to be set properly */
190 const size_t conn_flags_pos = remain_pos + 8;
191 if(ulen > 0xffff)
192 return 1;
193
194 /* set username flag */
195 pkt[conn_flags_pos] |= 0x80;
196 /* length of username provided */
197 pkt[start] = (unsigned char)((ulen >> 8) & 0xFF);
198 pkt[start + 1] = (unsigned char)(ulen & 0xFF);
199 memcpy(&pkt[start + 2], username, ulen);
200 return 0;
201 }
202
203 /* add client ID to the CONN packet */
add_client_id(const char * client_id,const size_t client_id_len,char * pkt,const size_t start)204 static int add_client_id(const char *client_id, const size_t client_id_len,
205 char *pkt, const size_t start)
206 {
207 if(client_id_len != MQTT_CLIENTID_LEN)
208 return 1;
209 pkt[start] = 0x00;
210 pkt[start + 1] = MQTT_CLIENTID_LEN;
211 memcpy(&pkt[start + 2], client_id, MQTT_CLIENTID_LEN);
212 return 0;
213 }
214
215 /* Set initial values of CONN packet */
init_connpack(char * packet,char * remain,int remain_pos)216 static int init_connpack(char *packet, char *remain, int remain_pos)
217 {
218 /* Fixed header starts */
219 /* packet type */
220 packet[0] = MQTT_MSG_CONNECT;
221 /* remaining length field */
222 memcpy(&packet[1], remain, remain_pos);
223 /* Fixed header ends */
224
225 /* Variable header starts */
226 /* protocol length */
227 packet[remain_pos + 1] = 0x00;
228 packet[remain_pos + 2] = 0x04;
229 /* protocol name */
230 packet[remain_pos + 3] = 'M';
231 packet[remain_pos + 4] = 'Q';
232 packet[remain_pos + 5] = 'T';
233 packet[remain_pos + 6] = 'T';
234 /* protocol level */
235 packet[remain_pos + 7] = 0x04;
236 /* CONNECT flag: CleanSession */
237 packet[remain_pos + 8] = 0x02;
238 /* keep-alive 0 = disabled */
239 packet[remain_pos + 9] = 0x00;
240 packet[remain_pos + 10] = 0x3c;
241 /*end of variable header*/
242 return remain_pos + 10;
243 }
244
mqtt_connect(struct Curl_easy * data)245 static CURLcode mqtt_connect(struct Curl_easy *data)
246 {
247 CURLcode result = CURLE_OK;
248 int pos = 0;
249 int rc = 0;
250 /*remain length*/
251 int remain_pos = 0;
252 char remain[4] = {0};
253 size_t packetlen = 0;
254 size_t payloadlen = 0;
255 size_t start_user = 0;
256 size_t start_pwd = 0;
257 char client_id[MQTT_CLIENTID_LEN + 1] = "curl";
258 const size_t clen = strlen("curl");
259 char *packet = NULL;
260
261 /* extracting username from request */
262 const char *username = data->state.aptr.user ?
263 data->state.aptr.user : "";
264 const size_t ulen = strlen(username);
265 /* extracting password from request */
266 const char *passwd = data->state.aptr.passwd ?
267 data->state.aptr.passwd : "";
268 const size_t plen = strlen(passwd);
269
270 payloadlen = ulen + plen + MQTT_CLIENTID_LEN + 2;
271 /* The plus 2 are for the MSB and LSB describing the length of the string to
272 * be added on the payload. Refer to spec 1.5.2 and 1.5.4 */
273 if(ulen)
274 payloadlen += 2;
275 if(plen)
276 payloadlen += 2;
277
278 /* getting how much occupy the remain length */
279 remain_pos = mqtt_encode_len(remain, payloadlen + 10);
280
281 /* 10 length of variable header and 1 the first byte of the fixed header */
282 packetlen = payloadlen + 10 + remain_pos + 1;
283
284 /* allocating packet */
285 if(packetlen > 268435455)
286 return CURLE_WEIRD_SERVER_REPLY;
287 packet = malloc(packetlen);
288 if(!packet)
289 return CURLE_OUT_OF_MEMORY;
290 memset(packet, 0, packetlen);
291
292 /* set initial values for CONN pack */
293 pos = init_connpack(packet, remain, remain_pos);
294
295 result = Curl_rand_hex(data, (unsigned char *)&client_id[clen],
296 MQTT_CLIENTID_LEN - clen + 1);
297 /* add client id */
298 rc = add_client_id(client_id, strlen(client_id), packet, pos + 1);
299 if(rc) {
300 failf(data, "Client ID length mismatched: [%lu]", strlen(client_id));
301 result = CURLE_WEIRD_SERVER_REPLY;
302 goto end;
303 }
304 infof(data, "Using client id '%s'", client_id);
305
306 /* position where starts the user payload */
307 start_user = pos + 3 + MQTT_CLIENTID_LEN;
308 /* position where starts the password payload */
309 start_pwd = start_user + ulen;
310 /* if user name was provided, add it to the packet */
311 if(ulen) {
312 start_pwd += 2;
313
314 rc = add_user(username, ulen,
315 (unsigned char *)packet, start_user, remain_pos);
316 if(rc) {
317 failf(data, "Username is too large: [%lu]", ulen);
318 result = CURLE_WEIRD_SERVER_REPLY;
319 goto end;
320 }
321 }
322
323 /* if passwd was provided, add it to the packet */
324 if(plen) {
325 rc = add_passwd(passwd, plen, packet, start_pwd, remain_pos);
326 if(rc) {
327 failf(data, "Password is too large: [%lu]", plen);
328 result = CURLE_WEIRD_SERVER_REPLY;
329 goto end;
330 }
331 }
332
333 if(!result)
334 result = mqtt_send(data, packet, packetlen);
335
336 end:
337 if(packet)
338 free(packet);
339 Curl_safefree(data->state.aptr.user);
340 Curl_safefree(data->state.aptr.passwd);
341 return result;
342 }
343
mqtt_disconnect(struct Curl_easy * data)344 static CURLcode mqtt_disconnect(struct Curl_easy *data)
345 {
346 CURLcode result = CURLE_OK;
347 result = mqtt_send(data, (char *)"\xe0\x00", 2);
348 return result;
349 }
350
mqtt_verify_connack(struct Curl_easy * data)351 static CURLcode mqtt_verify_connack(struct Curl_easy *data)
352 {
353 CURLcode result;
354 struct connectdata *conn = data->conn;
355 curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
356 unsigned char readbuf[MQTT_CONNACK_LEN];
357 ssize_t nread;
358
359 result = Curl_read(data, sockfd, (char *)readbuf, MQTT_CONNACK_LEN, &nread);
360 if(result)
361 goto fail;
362
363 Curl_debug(data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
364
365 /* fixme */
366 if(nread < MQTT_CONNACK_LEN) {
367 result = CURLE_WEIRD_SERVER_REPLY;
368 goto fail;
369 }
370
371 /* verify CONNACK */
372 if(readbuf[0] != 0x00 || readbuf[1] != 0x00) {
373 failf(data, "Expected %02x%02x but got %02x%02x",
374 0x00, 0x00, readbuf[0], readbuf[1]);
375 result = CURLE_WEIRD_SERVER_REPLY;
376 }
377
378 fail:
379 return result;
380 }
381
mqtt_get_topic(struct Curl_easy * data,char ** topic,size_t * topiclen)382 static CURLcode mqtt_get_topic(struct Curl_easy *data,
383 char **topic, size_t *topiclen)
384 {
385 char *path = data->state.up.path;
386 if(strlen(path) > 1)
387 return Curl_urldecode(data, path + 1, 0, topic, topiclen,
388 REJECT_NADA);
389 failf(data, "No MQTT topic found. Forgot to URL encode it?");
390 return CURLE_URL_MALFORMAT;
391 }
392
mqtt_subscribe(struct Curl_easy * data)393 static CURLcode mqtt_subscribe(struct Curl_easy *data)
394 {
395 CURLcode result = CURLE_OK;
396 char *topic = NULL;
397 size_t topiclen;
398 unsigned char *packet = NULL;
399 size_t packetlen;
400 char encodedsize[4];
401 size_t n;
402 struct connectdata *conn = data->conn;
403
404 result = mqtt_get_topic(data, &topic, &topiclen);
405 if(result)
406 goto fail;
407
408 conn->proto.mqtt.packetid++;
409
410 packetlen = topiclen + 5; /* packetid + topic (has a two byte length field)
411 + 2 bytes topic length + QoS byte */
412 n = mqtt_encode_len((char *)encodedsize, packetlen);
413 packetlen += n + 1; /* add one for the control packet type byte */
414
415 packet = malloc(packetlen);
416 if(!packet) {
417 result = CURLE_OUT_OF_MEMORY;
418 goto fail;
419 }
420
421 packet[0] = MQTT_MSG_SUBSCRIBE;
422 memcpy(&packet[1], encodedsize, n);
423 packet[1 + n] = (conn->proto.mqtt.packetid >> 8) & 0xff;
424 packet[2 + n] = conn->proto.mqtt.packetid & 0xff;
425 packet[3 + n] = (topiclen >> 8) & 0xff;
426 packet[4 + n ] = topiclen & 0xff;
427 memcpy(&packet[5 + n], topic, topiclen);
428 packet[5 + n + topiclen] = 0; /* QoS zero */
429
430 result = mqtt_send(data, (char *)packet, packetlen);
431
432 fail:
433 free(topic);
434 free(packet);
435 return result;
436 }
437
438 /*
439 * Called when the first byte was already read.
440 */
mqtt_verify_suback(struct Curl_easy * data)441 static CURLcode mqtt_verify_suback(struct Curl_easy *data)
442 {
443 CURLcode result;
444 struct connectdata *conn = data->conn;
445 curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
446 unsigned char readbuf[MQTT_SUBACK_LEN];
447 ssize_t nread;
448 struct mqtt_conn *mqtt = &conn->proto.mqtt;
449
450 result = Curl_read(data, sockfd, (char *)readbuf, MQTT_SUBACK_LEN, &nread);
451 if(result)
452 goto fail;
453
454 Curl_debug(data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
455
456 /* fixme */
457 if(nread < MQTT_SUBACK_LEN) {
458 result = CURLE_WEIRD_SERVER_REPLY;
459 goto fail;
460 }
461
462 /* verify SUBACK */
463 if(readbuf[0] != ((mqtt->packetid >> 8) & 0xff) ||
464 readbuf[1] != (mqtt->packetid & 0xff) ||
465 readbuf[2] != 0x00)
466 result = CURLE_WEIRD_SERVER_REPLY;
467
468 fail:
469 return result;
470 }
471
mqtt_publish(struct Curl_easy * data)472 static CURLcode mqtt_publish(struct Curl_easy *data)
473 {
474 CURLcode result;
475 char *payload = data->set.postfields;
476 size_t payloadlen;
477 char *topic = NULL;
478 size_t topiclen;
479 unsigned char *pkt = NULL;
480 size_t i = 0;
481 size_t remaininglength;
482 size_t encodelen;
483 char encodedbytes[4];
484 curl_off_t postfieldsize = data->set.postfieldsize;
485
486 if(!payload)
487 return CURLE_BAD_FUNCTION_ARGUMENT;
488 if(postfieldsize < 0)
489 payloadlen = strlen(payload);
490 else
491 payloadlen = (size_t)postfieldsize;
492
493 result = mqtt_get_topic(data, &topic, &topiclen);
494 if(result)
495 goto fail;
496
497 remaininglength = payloadlen + 2 + topiclen;
498 encodelen = mqtt_encode_len(encodedbytes, remaininglength);
499
500 /* add the control byte and the encoded remaining length */
501 pkt = malloc(remaininglength + 1 + encodelen);
502 if(!pkt) {
503 result = CURLE_OUT_OF_MEMORY;
504 goto fail;
505 }
506
507 /* assemble packet */
508 pkt[i++] = MQTT_MSG_PUBLISH;
509 memcpy(&pkt[i], encodedbytes, encodelen);
510 i += encodelen;
511 pkt[i++] = (topiclen >> 8) & 0xff;
512 pkt[i++] = (topiclen & 0xff);
513 memcpy(&pkt[i], topic, topiclen);
514 i += topiclen;
515 memcpy(&pkt[i], payload, payloadlen);
516 i += payloadlen;
517 result = mqtt_send(data, (char *)pkt, i);
518
519 fail:
520 free(pkt);
521 free(topic);
522 return result;
523 }
524
mqtt_decode_len(unsigned char * buf,size_t buflen,size_t * lenbytes)525 static size_t mqtt_decode_len(unsigned char *buf,
526 size_t buflen, size_t *lenbytes)
527 {
528 size_t len = 0;
529 size_t mult = 1;
530 size_t i;
531 unsigned char encoded = 128;
532
533 for(i = 0; (i < buflen) && (encoded & 128); i++) {
534 encoded = buf[i];
535 len += (encoded & 127) * mult;
536 mult *= 128;
537 }
538
539 if(lenbytes)
540 *lenbytes = i;
541
542 return len;
543 }
544
545 #ifdef CURLDEBUG
546 static const char *statenames[]={
547 "MQTT_FIRST",
548 "MQTT_REMAINING_LENGTH",
549 "MQTT_CONNACK",
550 "MQTT_SUBACK",
551 "MQTT_SUBACK_COMING",
552 "MQTT_PUBWAIT",
553 "MQTT_PUB_REMAIN",
554
555 "NOT A STATE"
556 };
557 #endif
558
559 /* The only way to change state */
mqstate(struct Curl_easy * data,enum mqttstate state,enum mqttstate nextstate)560 static void mqstate(struct Curl_easy *data,
561 enum mqttstate state,
562 enum mqttstate nextstate) /* used if state == FIRST */
563 {
564 struct connectdata *conn = data->conn;
565 struct mqtt_conn *mqtt = &conn->proto.mqtt;
566 #ifdef CURLDEBUG
567 infof(data, "%s (from %s) (next is %s)",
568 statenames[state],
569 statenames[mqtt->state],
570 (state == MQTT_FIRST)? statenames[nextstate] : "");
571 #endif
572 mqtt->state = state;
573 if(state == MQTT_FIRST)
574 mqtt->nextstate = nextstate;
575 }
576
577
578 /* for the publish packet */
579 #define MQTT_HEADER_LEN 5 /* max 5 bytes */
580
mqtt_read_publish(struct Curl_easy * data,bool * done)581 static CURLcode mqtt_read_publish(struct Curl_easy *data, bool *done)
582 {
583 CURLcode result = CURLE_OK;
584 struct connectdata *conn = data->conn;
585 curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
586 ssize_t nread;
587 unsigned char *pkt = (unsigned char *)data->state.buffer;
588 size_t remlen;
589 struct mqtt_conn *mqtt = &conn->proto.mqtt;
590 struct MQTT *mq = data->req.p.mqtt;
591 unsigned char packet;
592
593 switch(mqtt->state) {
594 MQTT_SUBACK_COMING:
595 case MQTT_SUBACK_COMING:
596 result = mqtt_verify_suback(data);
597 if(result)
598 break;
599
600 mqstate(data, MQTT_FIRST, MQTT_PUBWAIT);
601 break;
602
603 case MQTT_SUBACK:
604 case MQTT_PUBWAIT:
605 /* we are expecting PUBLISH or SUBACK */
606 packet = mq->firstbyte & 0xf0;
607 if(packet == MQTT_MSG_PUBLISH)
608 mqstate(data, MQTT_PUB_REMAIN, MQTT_NOSTATE);
609 else if(packet == MQTT_MSG_SUBACK) {
610 mqstate(data, MQTT_SUBACK_COMING, MQTT_NOSTATE);
611 goto MQTT_SUBACK_COMING;
612 }
613 else if(packet == MQTT_MSG_DISCONNECT) {
614 infof(data, "Got DISCONNECT");
615 *done = TRUE;
616 goto end;
617 }
618 else {
619 result = CURLE_WEIRD_SERVER_REPLY;
620 goto end;
621 }
622
623 /* -- switched state -- */
624 remlen = mq->remaining_length;
625 infof(data, "Remaining length: %zd bytes", remlen);
626 if(data->set.max_filesize &&
627 (curl_off_t)remlen > data->set.max_filesize) {
628 failf(data, "Maximum file size exceeded");
629 result = CURLE_FILESIZE_EXCEEDED;
630 goto end;
631 }
632 Curl_pgrsSetDownloadSize(data, remlen);
633 data->req.bytecount = 0;
634 data->req.size = remlen;
635 mq->npacket = remlen; /* get this many bytes */
636 /* FALLTHROUGH */
637 case MQTT_PUB_REMAIN: {
638 /* read rest of packet, but no more. Cap to buffer size */
639 struct SingleRequest *k = &data->req;
640 size_t rest = mq->npacket;
641 if(rest > (size_t)data->set.buffer_size)
642 rest = (size_t)data->set.buffer_size;
643 result = Curl_read(data, sockfd, (char *)pkt, rest, &nread);
644 if(result) {
645 if(CURLE_AGAIN == result) {
646 infof(data, "EEEE AAAAGAIN");
647 }
648 goto end;
649 }
650 if(!nread) {
651 infof(data, "server disconnected");
652 result = CURLE_PARTIAL_FILE;
653 goto end;
654 }
655 Curl_debug(data, CURLINFO_DATA_IN, (char *)pkt, (size_t)nread);
656
657 mq->npacket -= nread;
658 k->bytecount += nread;
659 Curl_pgrsSetDownloadCounter(data, k->bytecount);
660
661 /* if QoS is set, message contains packet id */
662
663 result = Curl_client_write(data, CLIENTWRITE_BODY, (char *)pkt, nread);
664 if(result)
665 goto end;
666
667 if(!mq->npacket)
668 /* no more PUBLISH payload, back to subscribe wait state */
669 mqstate(data, MQTT_FIRST, MQTT_PUBWAIT);
670 break;
671 }
672 default:
673 DEBUGASSERT(NULL); /* illegal state */
674 result = CURLE_WEIRD_SERVER_REPLY;
675 goto end;
676 }
677 end:
678 return result;
679 }
680
mqtt_do(struct Curl_easy * data,bool * done)681 static CURLcode mqtt_do(struct Curl_easy *data, bool *done)
682 {
683 CURLcode result = CURLE_OK;
684 *done = FALSE; /* unconditionally */
685
686 result = mqtt_connect(data);
687 if(result) {
688 failf(data, "Error %d sending MQTT CONN request", result);
689 return result;
690 }
691 mqstate(data, MQTT_FIRST, MQTT_CONNACK);
692 return CURLE_OK;
693 }
694
mqtt_doing(struct Curl_easy * data,bool * done)695 static CURLcode mqtt_doing(struct Curl_easy *data, bool *done)
696 {
697 CURLcode result = CURLE_OK;
698 struct connectdata *conn = data->conn;
699 struct mqtt_conn *mqtt = &conn->proto.mqtt;
700 struct MQTT *mq = data->req.p.mqtt;
701 ssize_t nread;
702 curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
703 unsigned char *pkt = (unsigned char *)data->state.buffer;
704 unsigned char byte;
705
706 *done = FALSE;
707
708 if(mq->nsend) {
709 /* send the remainder of an outgoing packet */
710 char *ptr = mq->sendleftovers;
711 result = mqtt_send(data, mq->sendleftovers, mq->nsend);
712 free(ptr);
713 if(result)
714 return result;
715 }
716
717 infof(data, "mqtt_doing: state [%d]", (int) mqtt->state);
718 switch(mqtt->state) {
719 case MQTT_FIRST:
720 /* Read the initial byte only */
721 result = Curl_read(data, sockfd, (char *)&mq->firstbyte, 1, &nread);
722 if(!nread)
723 break;
724 Curl_debug(data, CURLINFO_HEADER_IN, (char *)&mq->firstbyte, 1);
725 /* remember the first byte */
726 mq->npacket = 0;
727 mqstate(data, MQTT_REMAINING_LENGTH, MQTT_NOSTATE);
728 /* FALLTHROUGH */
729 case MQTT_REMAINING_LENGTH:
730 do {
731 result = Curl_read(data, sockfd, (char *)&byte, 1, &nread);
732 if(!nread)
733 break;
734 Curl_debug(data, CURLINFO_HEADER_IN, (char *)&byte, 1);
735 pkt[mq->npacket++] = byte;
736 } while((byte & 0x80) && (mq->npacket < 4));
737 if(nread && (byte & 0x80))
738 /* MQTT supports up to 127 * 128^0 + 127 * 128^1 + 127 * 128^2 +
739 127 * 128^3 bytes. server tried to send more */
740 result = CURLE_WEIRD_SERVER_REPLY;
741 if(result)
742 break;
743 mq->remaining_length = mqtt_decode_len(&pkt[0], mq->npacket, NULL);
744 mq->npacket = 0;
745 if(mq->remaining_length) {
746 mqstate(data, mqtt->nextstate, MQTT_NOSTATE);
747 break;
748 }
749 mqstate(data, MQTT_FIRST, MQTT_FIRST);
750
751 if(mq->firstbyte == MQTT_MSG_DISCONNECT) {
752 infof(data, "Got DISCONNECT");
753 *done = TRUE;
754 }
755 break;
756 case MQTT_CONNACK:
757 result = mqtt_verify_connack(data);
758 if(result)
759 break;
760
761 if(data->state.httpreq == HTTPREQ_POST) {
762 result = mqtt_publish(data);
763 if(!result) {
764 result = mqtt_disconnect(data);
765 *done = TRUE;
766 }
767 mqtt->nextstate = MQTT_FIRST;
768 }
769 else {
770 result = mqtt_subscribe(data);
771 if(!result) {
772 mqstate(data, MQTT_FIRST, MQTT_SUBACK);
773 }
774 }
775 break;
776
777 case MQTT_SUBACK:
778 case MQTT_PUBWAIT:
779 case MQTT_PUB_REMAIN:
780 result = mqtt_read_publish(data, done);
781 break;
782
783 default:
784 failf(data, "State not handled yet");
785 *done = TRUE;
786 break;
787 }
788
789 if(result == CURLE_AGAIN)
790 result = CURLE_OK;
791 return result;
792 }
793
794 #endif /* CURL_DISABLE_MQTT */
795