• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /***************************************************************************
2  *                                  _   _ ____  _
3  *  Project                     ___| | | |  _ \| |
4  *                             / __| | | | |_) | |
5  *                            | (__| |_| |  _ <| |___
6  *                             \___|\___/|_| \_\_____|
7  *
8  * Copyright (C) 2020 - 2021, Daniel Stenberg, <daniel@haxx.se>, et al.
9  * Copyright (C) 2019, Björn Stenberg, <bjorn@haxx.se>
10  *
11  * This software is licensed as described in the file COPYING, which
12  * you should have received as part of this distribution. The terms
13  * are also available at https://curl.se/docs/copyright.html.
14  *
15  * You may opt to use, copy, modify, merge, publish, distribute and/or sell
16  * copies of the Software, and permit persons to whom the Software is
17  * furnished to do so, under the terms of the COPYING file.
18  *
19  * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
20  * KIND, either express or implied.
21  *
22  ***************************************************************************/
23 
24 #include "curl_setup.h"
25 
26 #ifndef CURL_DISABLE_MQTT
27 
28 #include "urldata.h"
29 #include <curl/curl.h>
30 #include "transfer.h"
31 #include "sendf.h"
32 #include "progress.h"
33 #include "mqtt.h"
34 #include "select.h"
35 #include "strdup.h"
36 #include "url.h"
37 #include "escape.h"
38 #include "warnless.h"
39 #include "curl_printf.h"
40 #include "curl_memory.h"
41 #include "multiif.h"
42 #include "rand.h"
43 
44 /* The last #include file should be: */
45 #include "memdebug.h"
46 
47 #define MQTT_MSG_CONNECT   0x10
48 #define MQTT_MSG_CONNACK   0x20
49 #define MQTT_MSG_PUBLISH   0x30
50 #define MQTT_MSG_SUBSCRIBE 0x82
51 #define MQTT_MSG_SUBACK    0x90
52 #define MQTT_MSG_DISCONNECT 0xe0
53 
54 #define MQTT_CONNACK_LEN 2
55 #define MQTT_SUBACK_LEN 3
56 #define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
57 
58 /*
59  * Forward declarations.
60  */
61 
62 static CURLcode mqtt_do(struct Curl_easy *data, bool *done);
63 static CURLcode mqtt_doing(struct Curl_easy *data, bool *done);
64 static int mqtt_getsock(struct Curl_easy *data, struct connectdata *conn,
65                         curl_socket_t *sock);
66 static CURLcode mqtt_setup_conn(struct Curl_easy *data,
67                                 struct connectdata *conn);
68 
69 /*
70  * MQTT protocol handler.
71  */
72 
73 const struct Curl_handler Curl_handler_mqtt = {
74   "MQTT",                             /* scheme */
75   mqtt_setup_conn,                    /* setup_connection */
76   mqtt_do,                            /* do_it */
77   ZERO_NULL,                          /* done */
78   ZERO_NULL,                          /* do_more */
79   ZERO_NULL,                          /* connect_it */
80   ZERO_NULL,                          /* connecting */
81   mqtt_doing,                         /* doing */
82   ZERO_NULL,                          /* proto_getsock */
83   mqtt_getsock,                       /* doing_getsock */
84   ZERO_NULL,                          /* domore_getsock */
85   ZERO_NULL,                          /* perform_getsock */
86   ZERO_NULL,                          /* disconnect */
87   ZERO_NULL,                          /* readwrite */
88   ZERO_NULL,                          /* connection_check */
89   ZERO_NULL,                          /* attach connection */
90   PORT_MQTT,                          /* defport */
91   CURLPROTO_MQTT,                     /* protocol */
92   CURLPROTO_MQTT,                     /* family */
93   PROTOPT_NONE                        /* flags */
94 };
95 
mqtt_setup_conn(struct Curl_easy * data,struct connectdata * conn)96 static CURLcode mqtt_setup_conn(struct Curl_easy *data,
97                                 struct connectdata *conn)
98 {
99   /* allocate the HTTP-specific struct for the Curl_easy, only to survive
100      during this request */
101   struct MQTT *mq;
102   (void)conn;
103   DEBUGASSERT(data->req.p.mqtt == NULL);
104 
105   mq = calloc(1, sizeof(struct MQTT));
106   if(!mq)
107     return CURLE_OUT_OF_MEMORY;
108   data->req.p.mqtt = mq;
109   return CURLE_OK;
110 }
111 
mqtt_send(struct Curl_easy * data,char * buf,size_t len)112 static CURLcode mqtt_send(struct Curl_easy *data,
113                           char *buf, size_t len)
114 {
115   CURLcode result = CURLE_OK;
116   struct connectdata *conn = data->conn;
117   curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
118   struct MQTT *mq = data->req.p.mqtt;
119   ssize_t n;
120   result = Curl_write(data, sockfd, buf, len, &n);
121   if(!result)
122     Curl_debug(data, CURLINFO_HEADER_OUT, buf, (size_t)n);
123   if(len != (size_t)n) {
124     size_t nsend = len - n;
125     char *sendleftovers = Curl_memdup(&buf[n], nsend);
126     if(!sendleftovers)
127       return CURLE_OUT_OF_MEMORY;
128     mq->sendleftovers = sendleftovers;
129     mq->nsend = nsend;
130   }
131   else {
132     mq->sendleftovers = NULL;
133     mq->nsend = 0;
134   }
135   return result;
136 }
137 
138 /* Generic function called by the multi interface to figure out what socket(s)
139    to wait for and for what actions during the DOING and PROTOCONNECT
140    states */
mqtt_getsock(struct Curl_easy * data,struct connectdata * conn,curl_socket_t * sock)141 static int mqtt_getsock(struct Curl_easy *data,
142                         struct connectdata *conn,
143                         curl_socket_t *sock)
144 {
145   (void)data;
146   sock[0] = conn->sock[FIRSTSOCKET];
147   return GETSOCK_READSOCK(FIRSTSOCKET);
148 }
149 
mqtt_encode_len(char * buf,size_t len)150 static int mqtt_encode_len(char *buf, size_t len)
151 {
152   unsigned char encoded;
153   int i;
154 
155   for(i = 0; (len > 0) && (i<4); i++) {
156     encoded = len % 0x80;
157     len /= 0x80;
158     if(len)
159       encoded |= 0x80;
160     buf[i] = encoded;
161   }
162 
163   return i;
164 }
165 
166 /* add the passwd to the CONNECT packet */
add_passwd(const char * passwd,const size_t plen,char * pkt,const size_t start,int remain_pos)167 static int add_passwd(const char *passwd, const size_t plen,
168                        char *pkt, const size_t start, int remain_pos)
169 {
170   /* magic number that need to be set properly */
171   const size_t conn_flags_pos = remain_pos + 8;
172   if(plen > 0xffff)
173     return 1;
174 
175   /* set password flag */
176   pkt[conn_flags_pos] |= 0x40;
177 
178   /* length of password provided */
179   pkt[start] = (char)((plen >> 8) & 0xFF);
180   pkt[start + 1] = (char)(plen & 0xFF);
181   memcpy(&pkt[start + 2], passwd, plen);
182   return 0;
183 }
184 
185 /* add user to the CONN packet */
add_user(const char * username,const size_t ulen,unsigned char * pkt,const size_t start,int remain_pos)186 static int add_user(const char *username, const size_t ulen,
187                     unsigned char *pkt, const size_t start, int remain_pos)
188 {
189   /* magic number that need to be set properly */
190   const size_t conn_flags_pos = remain_pos + 8;
191   if(ulen > 0xffff)
192     return 1;
193 
194   /* set username flag */
195   pkt[conn_flags_pos] |= 0x80;
196   /* length of username provided */
197   pkt[start] = (unsigned char)((ulen >> 8) & 0xFF);
198   pkt[start + 1] = (unsigned char)(ulen & 0xFF);
199   memcpy(&pkt[start + 2], username, ulen);
200   return 0;
201 }
202 
203 /* add client ID to the CONN packet */
add_client_id(const char * client_id,const size_t client_id_len,char * pkt,const size_t start)204 static int add_client_id(const char *client_id, const size_t client_id_len,
205                          char *pkt, const size_t start)
206 {
207   if(client_id_len != MQTT_CLIENTID_LEN)
208     return 1;
209   pkt[start] = 0x00;
210   pkt[start + 1] = MQTT_CLIENTID_LEN;
211   memcpy(&pkt[start + 2], client_id, MQTT_CLIENTID_LEN);
212   return 0;
213 }
214 
215 /* Set initial values of CONN packet */
init_connpack(char * packet,char * remain,int remain_pos)216 static int init_connpack(char *packet, char *remain, int remain_pos)
217 {
218   /* Fixed header starts */
219   /* packet type */
220   packet[0] = MQTT_MSG_CONNECT;
221   /* remaining length field */
222   memcpy(&packet[1], remain, remain_pos);
223   /* Fixed header ends */
224 
225   /* Variable header starts */
226   /* protocol length */
227   packet[remain_pos + 1] = 0x00;
228   packet[remain_pos + 2] = 0x04;
229   /* protocol name */
230   packet[remain_pos + 3] = 'M';
231   packet[remain_pos + 4] = 'Q';
232   packet[remain_pos + 5] = 'T';
233   packet[remain_pos + 6] = 'T';
234   /* protocol level */
235   packet[remain_pos + 7] = 0x04;
236   /* CONNECT flag: CleanSession */
237   packet[remain_pos + 8] = 0x02;
238   /* keep-alive 0 = disabled */
239   packet[remain_pos + 9] = 0x00;
240   packet[remain_pos + 10] = 0x3c;
241   /*end of variable header*/
242   return remain_pos + 10;
243 }
244 
mqtt_connect(struct Curl_easy * data)245 static CURLcode mqtt_connect(struct Curl_easy *data)
246 {
247   CURLcode result = CURLE_OK;
248   int pos = 0;
249   int rc = 0;
250   /*remain length*/
251   int remain_pos = 0;
252   char remain[4] = {0};
253   size_t packetlen = 0;
254   size_t payloadlen = 0;
255   size_t start_user = 0;
256   size_t start_pwd = 0;
257   char client_id[MQTT_CLIENTID_LEN + 1] = "curl";
258   const size_t clen = strlen("curl");
259   char *packet = NULL;
260 
261   /* extracting username from request */
262   const char *username = data->state.aptr.user ?
263     data->state.aptr.user : "";
264   const size_t ulen = strlen(username);
265   /* extracting password from request */
266   const char *passwd = data->state.aptr.passwd ?
267     data->state.aptr.passwd : "";
268   const size_t plen = strlen(passwd);
269 
270   payloadlen = ulen + plen + MQTT_CLIENTID_LEN + 2;
271   /* The plus 2 are for the MSB and LSB describing the length of the string to
272    * be added on the payload. Refer to spec 1.5.2 and 1.5.4 */
273   if(ulen)
274     payloadlen += 2;
275   if(plen)
276     payloadlen += 2;
277 
278   /* getting how much occupy the remain length */
279   remain_pos = mqtt_encode_len(remain, payloadlen + 10);
280 
281   /* 10 length of variable header and 1 the first byte of the fixed header */
282   packetlen = payloadlen + 10 + remain_pos + 1;
283 
284   /* allocating packet */
285   if(packetlen > 268435455)
286     return CURLE_WEIRD_SERVER_REPLY;
287   packet = malloc(packetlen);
288   if(!packet)
289     return CURLE_OUT_OF_MEMORY;
290   memset(packet, 0, packetlen);
291 
292   /* set initial values for CONN pack */
293   pos = init_connpack(packet, remain, remain_pos);
294 
295   result = Curl_rand_hex(data, (unsigned char *)&client_id[clen],
296                          MQTT_CLIENTID_LEN - clen + 1);
297   /* add client id */
298   rc = add_client_id(client_id, strlen(client_id), packet, pos + 1);
299   if(rc) {
300     failf(data, "Client ID length mismatched: [%lu]", strlen(client_id));
301     result = CURLE_WEIRD_SERVER_REPLY;
302     goto end;
303   }
304   infof(data, "Using client id '%s'", client_id);
305 
306   /* position where starts the user payload */
307   start_user = pos + 3 + MQTT_CLIENTID_LEN;
308   /* position where starts the password payload */
309   start_pwd = start_user + ulen;
310   /* if user name was provided, add it to the packet */
311   if(ulen) {
312     start_pwd += 2;
313 
314     rc = add_user(username, ulen,
315                   (unsigned char *)packet, start_user, remain_pos);
316     if(rc) {
317       failf(data, "Username is too large: [%lu]", ulen);
318       result = CURLE_WEIRD_SERVER_REPLY;
319       goto end;
320     }
321   }
322 
323   /* if passwd was provided, add it to the packet */
324   if(plen) {
325     rc = add_passwd(passwd, plen, packet, start_pwd, remain_pos);
326     if(rc) {
327       failf(data, "Password is too large: [%lu]", plen);
328       result = CURLE_WEIRD_SERVER_REPLY;
329       goto end;
330     }
331   }
332 
333   if(!result)
334     result = mqtt_send(data, packet, packetlen);
335 
336 end:
337   if(packet)
338     free(packet);
339   Curl_safefree(data->state.aptr.user);
340   Curl_safefree(data->state.aptr.passwd);
341   return result;
342 }
343 
mqtt_disconnect(struct Curl_easy * data)344 static CURLcode mqtt_disconnect(struct Curl_easy *data)
345 {
346   CURLcode result = CURLE_OK;
347   result = mqtt_send(data, (char *)"\xe0\x00", 2);
348   return result;
349 }
350 
mqtt_verify_connack(struct Curl_easy * data)351 static CURLcode mqtt_verify_connack(struct Curl_easy *data)
352 {
353   CURLcode result;
354   struct connectdata *conn = data->conn;
355   curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
356   unsigned char readbuf[MQTT_CONNACK_LEN];
357   ssize_t nread;
358 
359   result = Curl_read(data, sockfd, (char *)readbuf, MQTT_CONNACK_LEN, &nread);
360   if(result)
361     goto fail;
362 
363   Curl_debug(data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
364 
365   /* fixme */
366   if(nread < MQTT_CONNACK_LEN) {
367     result = CURLE_WEIRD_SERVER_REPLY;
368     goto fail;
369   }
370 
371   /* verify CONNACK */
372   if(readbuf[0] != 0x00 || readbuf[1] != 0x00) {
373     failf(data, "Expected %02x%02x but got %02x%02x",
374           0x00, 0x00, readbuf[0], readbuf[1]);
375     result = CURLE_WEIRD_SERVER_REPLY;
376   }
377 
378 fail:
379   return result;
380 }
381 
mqtt_get_topic(struct Curl_easy * data,char ** topic,size_t * topiclen)382 static CURLcode mqtt_get_topic(struct Curl_easy *data,
383                                char **topic, size_t *topiclen)
384 {
385   char *path = data->state.up.path;
386   if(strlen(path) > 1)
387     return Curl_urldecode(data, path + 1, 0, topic, topiclen,
388                           REJECT_NADA);
389   failf(data, "No MQTT topic found. Forgot to URL encode it?");
390   return CURLE_URL_MALFORMAT;
391 }
392 
mqtt_subscribe(struct Curl_easy * data)393 static CURLcode mqtt_subscribe(struct Curl_easy *data)
394 {
395   CURLcode result = CURLE_OK;
396   char *topic = NULL;
397   size_t topiclen;
398   unsigned char *packet = NULL;
399   size_t packetlen;
400   char encodedsize[4];
401   size_t n;
402   struct connectdata *conn = data->conn;
403 
404   result = mqtt_get_topic(data, &topic, &topiclen);
405   if(result)
406     goto fail;
407 
408   conn->proto.mqtt.packetid++;
409 
410   packetlen = topiclen + 5; /* packetid + topic (has a two byte length field)
411                                + 2 bytes topic length + QoS byte */
412   n = mqtt_encode_len((char *)encodedsize, packetlen);
413   packetlen += n + 1; /* add one for the control packet type byte */
414 
415   packet = malloc(packetlen);
416   if(!packet) {
417     result = CURLE_OUT_OF_MEMORY;
418     goto fail;
419   }
420 
421   packet[0] = MQTT_MSG_SUBSCRIBE;
422   memcpy(&packet[1], encodedsize, n);
423   packet[1 + n] = (conn->proto.mqtt.packetid >> 8) & 0xff;
424   packet[2 + n] = conn->proto.mqtt.packetid & 0xff;
425   packet[3 + n] = (topiclen >> 8) & 0xff;
426   packet[4 + n ] = topiclen & 0xff;
427   memcpy(&packet[5 + n], topic, topiclen);
428   packet[5 + n + topiclen] = 0; /* QoS zero */
429 
430   result = mqtt_send(data, (char *)packet, packetlen);
431 
432 fail:
433   free(topic);
434   free(packet);
435   return result;
436 }
437 
438 /*
439  * Called when the first byte was already read.
440  */
mqtt_verify_suback(struct Curl_easy * data)441 static CURLcode mqtt_verify_suback(struct Curl_easy *data)
442 {
443   CURLcode result;
444   struct connectdata *conn = data->conn;
445   curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
446   unsigned char readbuf[MQTT_SUBACK_LEN];
447   ssize_t nread;
448   struct mqtt_conn *mqtt = &conn->proto.mqtt;
449 
450   result = Curl_read(data, sockfd, (char *)readbuf, MQTT_SUBACK_LEN, &nread);
451   if(result)
452     goto fail;
453 
454   Curl_debug(data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
455 
456   /* fixme */
457   if(nread < MQTT_SUBACK_LEN) {
458     result = CURLE_WEIRD_SERVER_REPLY;
459     goto fail;
460   }
461 
462   /* verify SUBACK */
463   if(readbuf[0] != ((mqtt->packetid >> 8) & 0xff) ||
464      readbuf[1] != (mqtt->packetid & 0xff) ||
465      readbuf[2] != 0x00)
466     result = CURLE_WEIRD_SERVER_REPLY;
467 
468 fail:
469   return result;
470 }
471 
mqtt_publish(struct Curl_easy * data)472 static CURLcode mqtt_publish(struct Curl_easy *data)
473 {
474   CURLcode result;
475   char *payload = data->set.postfields;
476   size_t payloadlen;
477   char *topic = NULL;
478   size_t topiclen;
479   unsigned char *pkt = NULL;
480   size_t i = 0;
481   size_t remaininglength;
482   size_t encodelen;
483   char encodedbytes[4];
484   curl_off_t postfieldsize = data->set.postfieldsize;
485 
486   if(!payload)
487     return CURLE_BAD_FUNCTION_ARGUMENT;
488   if(postfieldsize < 0)
489     payloadlen = strlen(payload);
490   else
491     payloadlen = (size_t)postfieldsize;
492 
493   result = mqtt_get_topic(data, &topic, &topiclen);
494   if(result)
495     goto fail;
496 
497   remaininglength = payloadlen + 2 + topiclen;
498   encodelen = mqtt_encode_len(encodedbytes, remaininglength);
499 
500   /* add the control byte and the encoded remaining length */
501   pkt = malloc(remaininglength + 1 + encodelen);
502   if(!pkt) {
503     result = CURLE_OUT_OF_MEMORY;
504     goto fail;
505   }
506 
507   /* assemble packet */
508   pkt[i++] = MQTT_MSG_PUBLISH;
509   memcpy(&pkt[i], encodedbytes, encodelen);
510   i += encodelen;
511   pkt[i++] = (topiclen >> 8) & 0xff;
512   pkt[i++] = (topiclen & 0xff);
513   memcpy(&pkt[i], topic, topiclen);
514   i += topiclen;
515   memcpy(&pkt[i], payload, payloadlen);
516   i += payloadlen;
517   result = mqtt_send(data, (char *)pkt, i);
518 
519 fail:
520   free(pkt);
521   free(topic);
522   return result;
523 }
524 
mqtt_decode_len(unsigned char * buf,size_t buflen,size_t * lenbytes)525 static size_t mqtt_decode_len(unsigned char *buf,
526                               size_t buflen, size_t *lenbytes)
527 {
528   size_t len = 0;
529   size_t mult = 1;
530   size_t i;
531   unsigned char encoded = 128;
532 
533   for(i = 0; (i < buflen) && (encoded & 128); i++) {
534     encoded = buf[i];
535     len += (encoded & 127) * mult;
536     mult *= 128;
537   }
538 
539   if(lenbytes)
540     *lenbytes = i;
541 
542   return len;
543 }
544 
545 #ifdef CURLDEBUG
546 static const char *statenames[]={
547   "MQTT_FIRST",
548   "MQTT_REMAINING_LENGTH",
549   "MQTT_CONNACK",
550   "MQTT_SUBACK",
551   "MQTT_SUBACK_COMING",
552   "MQTT_PUBWAIT",
553   "MQTT_PUB_REMAIN",
554 
555   "NOT A STATE"
556 };
557 #endif
558 
559 /* The only way to change state */
mqstate(struct Curl_easy * data,enum mqttstate state,enum mqttstate nextstate)560 static void mqstate(struct Curl_easy *data,
561                     enum mqttstate state,
562                     enum mqttstate nextstate) /* used if state == FIRST */
563 {
564   struct connectdata *conn = data->conn;
565   struct mqtt_conn *mqtt = &conn->proto.mqtt;
566 #ifdef CURLDEBUG
567   infof(data, "%s (from %s) (next is %s)",
568         statenames[state],
569         statenames[mqtt->state],
570         (state == MQTT_FIRST)? statenames[nextstate] : "");
571 #endif
572   mqtt->state = state;
573   if(state == MQTT_FIRST)
574     mqtt->nextstate = nextstate;
575 }
576 
577 
578 /* for the publish packet */
579 #define MQTT_HEADER_LEN 5    /* max 5 bytes */
580 
mqtt_read_publish(struct Curl_easy * data,bool * done)581 static CURLcode mqtt_read_publish(struct Curl_easy *data, bool *done)
582 {
583   CURLcode result = CURLE_OK;
584   struct connectdata *conn = data->conn;
585   curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
586   ssize_t nread;
587   unsigned char *pkt = (unsigned char *)data->state.buffer;
588   size_t remlen;
589   struct mqtt_conn *mqtt = &conn->proto.mqtt;
590   struct MQTT *mq = data->req.p.mqtt;
591   unsigned char packet;
592 
593   switch(mqtt->state) {
594   MQTT_SUBACK_COMING:
595   case MQTT_SUBACK_COMING:
596     result = mqtt_verify_suback(data);
597     if(result)
598       break;
599 
600     mqstate(data, MQTT_FIRST, MQTT_PUBWAIT);
601     break;
602 
603   case MQTT_SUBACK:
604   case MQTT_PUBWAIT:
605     /* we are expecting PUBLISH or SUBACK */
606     packet = mq->firstbyte & 0xf0;
607     if(packet == MQTT_MSG_PUBLISH)
608       mqstate(data, MQTT_PUB_REMAIN, MQTT_NOSTATE);
609     else if(packet == MQTT_MSG_SUBACK) {
610       mqstate(data, MQTT_SUBACK_COMING, MQTT_NOSTATE);
611       goto MQTT_SUBACK_COMING;
612     }
613     else if(packet == MQTT_MSG_DISCONNECT) {
614       infof(data, "Got DISCONNECT");
615       *done = TRUE;
616       goto end;
617     }
618     else {
619       result = CURLE_WEIRD_SERVER_REPLY;
620       goto end;
621     }
622 
623     /* -- switched state -- */
624     remlen = mq->remaining_length;
625     infof(data, "Remaining length: %zd bytes", remlen);
626     if(data->set.max_filesize &&
627        (curl_off_t)remlen > data->set.max_filesize) {
628       failf(data, "Maximum file size exceeded");
629       result = CURLE_FILESIZE_EXCEEDED;
630       goto end;
631     }
632     Curl_pgrsSetDownloadSize(data, remlen);
633     data->req.bytecount = 0;
634     data->req.size = remlen;
635     mq->npacket = remlen; /* get this many bytes */
636     /* FALLTHROUGH */
637   case MQTT_PUB_REMAIN: {
638     /* read rest of packet, but no more. Cap to buffer size */
639     struct SingleRequest *k = &data->req;
640     size_t rest = mq->npacket;
641     if(rest > (size_t)data->set.buffer_size)
642       rest = (size_t)data->set.buffer_size;
643     result = Curl_read(data, sockfd, (char *)pkt, rest, &nread);
644     if(result) {
645       if(CURLE_AGAIN == result) {
646         infof(data, "EEEE AAAAGAIN");
647       }
648       goto end;
649     }
650     if(!nread) {
651       infof(data, "server disconnected");
652       result = CURLE_PARTIAL_FILE;
653       goto end;
654     }
655     Curl_debug(data, CURLINFO_DATA_IN, (char *)pkt, (size_t)nread);
656 
657     mq->npacket -= nread;
658     k->bytecount += nread;
659     Curl_pgrsSetDownloadCounter(data, k->bytecount);
660 
661     /* if QoS is set, message contains packet id */
662 
663     result = Curl_client_write(data, CLIENTWRITE_BODY, (char *)pkt, nread);
664     if(result)
665       goto end;
666 
667     if(!mq->npacket)
668       /* no more PUBLISH payload, back to subscribe wait state */
669       mqstate(data, MQTT_FIRST, MQTT_PUBWAIT);
670     break;
671   }
672   default:
673     DEBUGASSERT(NULL); /* illegal state */
674     result = CURLE_WEIRD_SERVER_REPLY;
675     goto end;
676   }
677   end:
678   return result;
679 }
680 
mqtt_do(struct Curl_easy * data,bool * done)681 static CURLcode mqtt_do(struct Curl_easy *data, bool *done)
682 {
683   CURLcode result = CURLE_OK;
684   *done = FALSE; /* unconditionally */
685 
686   result = mqtt_connect(data);
687   if(result) {
688     failf(data, "Error %d sending MQTT CONN request", result);
689     return result;
690   }
691   mqstate(data, MQTT_FIRST, MQTT_CONNACK);
692   return CURLE_OK;
693 }
694 
mqtt_doing(struct Curl_easy * data,bool * done)695 static CURLcode mqtt_doing(struct Curl_easy *data, bool *done)
696 {
697   CURLcode result = CURLE_OK;
698   struct connectdata *conn = data->conn;
699   struct mqtt_conn *mqtt = &conn->proto.mqtt;
700   struct MQTT *mq = data->req.p.mqtt;
701   ssize_t nread;
702   curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
703   unsigned char *pkt = (unsigned char *)data->state.buffer;
704   unsigned char byte;
705 
706   *done = FALSE;
707 
708   if(mq->nsend) {
709     /* send the remainder of an outgoing packet */
710     char *ptr = mq->sendleftovers;
711     result = mqtt_send(data, mq->sendleftovers, mq->nsend);
712     free(ptr);
713     if(result)
714       return result;
715   }
716 
717   infof(data, "mqtt_doing: state [%d]", (int) mqtt->state);
718   switch(mqtt->state) {
719   case MQTT_FIRST:
720     /* Read the initial byte only */
721     result = Curl_read(data, sockfd, (char *)&mq->firstbyte, 1, &nread);
722     if(!nread)
723       break;
724     Curl_debug(data, CURLINFO_HEADER_IN, (char *)&mq->firstbyte, 1);
725     /* remember the first byte */
726     mq->npacket = 0;
727     mqstate(data, MQTT_REMAINING_LENGTH, MQTT_NOSTATE);
728     /* FALLTHROUGH */
729   case MQTT_REMAINING_LENGTH:
730     do {
731       result = Curl_read(data, sockfd, (char *)&byte, 1, &nread);
732       if(!nread)
733         break;
734       Curl_debug(data, CURLINFO_HEADER_IN, (char *)&byte, 1);
735       pkt[mq->npacket++] = byte;
736     } while((byte & 0x80) && (mq->npacket < 4));
737     if(nread && (byte & 0x80))
738       /* MQTT supports up to 127 * 128^0 + 127 * 128^1 + 127 * 128^2 +
739          127 * 128^3 bytes. server tried to send more */
740       result = CURLE_WEIRD_SERVER_REPLY;
741     if(result)
742       break;
743     mq->remaining_length = mqtt_decode_len(&pkt[0], mq->npacket, NULL);
744     mq->npacket = 0;
745     if(mq->remaining_length) {
746       mqstate(data, mqtt->nextstate, MQTT_NOSTATE);
747       break;
748     }
749     mqstate(data, MQTT_FIRST, MQTT_FIRST);
750 
751     if(mq->firstbyte == MQTT_MSG_DISCONNECT) {
752       infof(data, "Got DISCONNECT");
753       *done = TRUE;
754     }
755     break;
756   case MQTT_CONNACK:
757     result = mqtt_verify_connack(data);
758     if(result)
759       break;
760 
761     if(data->state.httpreq == HTTPREQ_POST) {
762       result = mqtt_publish(data);
763       if(!result) {
764         result = mqtt_disconnect(data);
765         *done = TRUE;
766       }
767       mqtt->nextstate = MQTT_FIRST;
768     }
769     else {
770       result = mqtt_subscribe(data);
771       if(!result) {
772         mqstate(data, MQTT_FIRST, MQTT_SUBACK);
773       }
774     }
775     break;
776 
777   case MQTT_SUBACK:
778   case MQTT_PUBWAIT:
779   case MQTT_PUB_REMAIN:
780     result = mqtt_read_publish(data, done);
781     break;
782 
783   default:
784     failf(data, "State not handled yet");
785     *done = TRUE;
786     break;
787   }
788 
789   if(result == CURLE_AGAIN)
790     result = CURLE_OK;
791   return result;
792 }
793 
794 #endif /* CURL_DISABLE_MQTT */
795