• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /***************************************************************************
2  *                                  _   _ ____  _
3  *  Project                     ___| | | |  _ \| |
4  *                             / __| | | | |_) | |
5  *                            | (__| |_| |  _ <| |___
6  *                             \___|\___/|_| \_\_____|
7  *
8  * Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
9  *
10  * This software is licensed as described in the file COPYING, which
11  * you should have received as part of this distribution. The terms
12  * are also available at https://curl.se/docs/copyright.html.
13  *
14  * You may opt to use, copy, modify, merge, publish, distribute and/or sell
15  * copies of the Software, and permit persons to whom the Software is
16  * furnished to do so, under the terms of the COPYING file.
17  *
18  * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
19  * KIND, either express or implied.
20  *
21  * SPDX-License-Identifier: curl
22  *
23  ***************************************************************************/
24 #include "server_setup.h"
25 #include <stdlib.h>
26 #include <string.h>
27 #include "util.h"
28 
29 /* Function
30  *
31  * Accepts a TCP connection on a custom port (IPv4 or IPv6).  Speaks MQTT.
32  *
33  * Read commands from FILE (set with --config). The commands control how to
34  * act and is reset to defaults each client TCP connect.
35  *
36  * Config file keywords:
37  *
38  * TODO
39  */
40 
41 /* based on sockfilt.c */
42 
43 #include <signal.h>
44 #ifdef HAVE_NETINET_IN_H
45 #include <netinet/in.h>
46 #endif
47 #ifdef HAVE_NETINET_IN6_H
48 #include <netinet/in6.h>
49 #endif
50 #ifdef HAVE_ARPA_INET_H
51 #include <arpa/inet.h>
52 #endif
53 #ifdef HAVE_NETDB_H
54 #include <netdb.h>
55 #endif
56 
57 #define ENABLE_CURLX_PRINTF
58 /* make the curlx header define all printf() functions to use the curlx_*
59    versions instead */
60 #include "curlx.h" /* from the private lib dir */
61 #include "getpart.h"
62 #include "inet_pton.h"
63 #include "server_sockaddr.h"
64 #include "warnless.h"
65 
66 /* include memdebug.h last */
67 #include "memdebug.h"
68 
69 #ifdef USE_WINSOCK
70 #undef  EINTR
71 #define EINTR    4 /* errno.h value */
72 #undef  EAGAIN
73 #define EAGAIN  11 /* errno.h value */
74 #undef  ENOMEM
75 #define ENOMEM  12 /* errno.h value */
76 #undef  EINVAL
77 #define EINVAL  22 /* errno.h value */
78 #endif
79 
80 #define DEFAULT_PORT 1883 /* MQTT default port */
81 
82 #ifndef DEFAULT_LOGFILE
83 #define DEFAULT_LOGFILE "log/mqttd.log"
84 #endif
85 
86 #ifndef DEFAULT_CONFIG
87 #define DEFAULT_CONFIG "mqttd.config"
88 #endif
89 
90 #define MQTT_MSG_CONNECT    0x10
91 #define MQTT_MSG_CONNACK    0x20
92 #define MQTT_MSG_PUBLISH    0x30
93 #define MQTT_MSG_PUBACK     0x40
94 #define MQTT_MSG_SUBSCRIBE  0x82
95 #define MQTT_MSG_SUBACK     0x90
96 #define MQTT_MSG_DISCONNECT 0xe0
97 
98 #define MQTT_CONNACK_LEN 4
99 #define MQTT_SUBACK_LEN 5
100 #define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
101 #define MQTT_HEADER_LEN 5    /* max 5 bytes */
102 
103 struct configurable {
104   unsigned char version; /* initial version byte in the request must match
105                             this */
106   bool publish_before_suback;
107   bool short_publish;
108   bool excessive_remaining;
109   unsigned char error_connack;
110   int testnum;
111 };
112 
113 #define REQUEST_DUMP  "server.input"
114 #define CONFIG_VERSION 5
115 
116 static struct configurable config;
117 
118 const char *serverlogfile = DEFAULT_LOGFILE;
119 static const char *configfile = DEFAULT_CONFIG;
120 static const char *logdir = "log";
121 static char loglockfile[256];
122 
123 #ifdef ENABLE_IPV6
124 static bool use_ipv6 = FALSE;
125 #endif
126 static const char *ipv_inuse = "IPv4";
127 static unsigned short port = DEFAULT_PORT;
128 
resetdefaults(void)129 static void resetdefaults(void)
130 {
131   logmsg("Reset to defaults");
132   config.version = CONFIG_VERSION;
133   config.publish_before_suback = FALSE;
134   config.short_publish = FALSE;
135   config.excessive_remaining = FALSE;
136   config.error_connack = 0;
137   config.testnum = 0;
138 }
139 
byteval(char * value)140 static unsigned char byteval(char *value)
141 {
142   unsigned long num = strtoul(value, NULL, 10);
143   return num & 0xff;
144 }
145 
getconfig(void)146 static void getconfig(void)
147 {
148   FILE *fp = fopen(configfile, FOPEN_READTEXT);
149   resetdefaults();
150   if(fp) {
151     char buffer[512];
152     logmsg("parse config file");
153     while(fgets(buffer, sizeof(buffer), fp)) {
154       char key[32];
155       char value[32];
156       if(2 == sscanf(buffer, "%31s %31s", key, value)) {
157         if(!strcmp(key, "version")) {
158           config.version = byteval(value);
159           logmsg("version [%d] set", config.version);
160         }
161         else if(!strcmp(key, "PUBLISH-before-SUBACK")) {
162           logmsg("PUBLISH-before-SUBACK set");
163           config.publish_before_suback = TRUE;
164         }
165         else if(!strcmp(key, "short-PUBLISH")) {
166           logmsg("short-PUBLISH set");
167           config.short_publish = TRUE;
168         }
169         else if(!strcmp(key, "error-CONNACK")) {
170           config.error_connack = byteval(value);
171           logmsg("error-CONNACK = %d", config.error_connack);
172         }
173         else if(!strcmp(key, "Testnum")) {
174           config.testnum = atoi(value);
175           logmsg("testnum = %d", config.testnum);
176         }
177         else if(!strcmp(key, "excessive-remaining")) {
178           logmsg("excessive-remaining set");
179           config.excessive_remaining = TRUE;
180         }
181       }
182     }
183     fclose(fp);
184   }
185   else {
186     logmsg("No config file '%s' to read", configfile);
187   }
188 }
189 
loghex(unsigned char * buffer,ssize_t len)190 static void loghex(unsigned char *buffer, ssize_t len)
191 {
192   char data[12000];
193   ssize_t i;
194   unsigned char *ptr = buffer;
195   char *optr = data;
196   ssize_t width = 0;
197   int left = sizeof(data);
198 
199   for(i = 0; i<len && (left >= 0); i++) {
200     msnprintf(optr, left, "%02x", ptr[i]);
201     width += 2;
202     optr += 2;
203     left -= 2;
204   }
205   if(width)
206     logmsg("'%s'", data);
207 }
208 
209 typedef enum {
210   FROM_CLIENT,
211   FROM_SERVER
212 } mqttdir;
213 
logprotocol(mqttdir dir,const char * prefix,size_t remlen,FILE * output,unsigned char * buffer,ssize_t len)214 static void logprotocol(mqttdir dir,
215                         const char *prefix, size_t remlen,
216                         FILE *output,
217                         unsigned char *buffer, ssize_t len)
218 {
219   char data[12000] = "";
220   ssize_t i;
221   unsigned char *ptr = buffer;
222   char *optr = data;
223   int left = sizeof(data);
224 
225   for(i = 0; i<len && (left >= 0); i++) {
226     msnprintf(optr, left, "%02x", ptr[i]);
227     optr += 2;
228     left -= 2;
229   }
230   fprintf(output, "%s %s %zx %s\n",
231           dir == FROM_CLIENT? "client": "server",
232           prefix, remlen,
233           data);
234 }
235 
236 
237 /* return 0 on success */
connack(FILE * dump,curl_socket_t fd)238 static int connack(FILE *dump, curl_socket_t fd)
239 {
240   unsigned char packet[]={
241     MQTT_MSG_CONNACK, 0x02,
242     0x00, 0x00
243   };
244   ssize_t rc;
245 
246   packet[3] = config.error_connack;
247 
248   rc = swrite(fd, (char *)packet, sizeof(packet));
249   if(rc > 0) {
250     logmsg("WROTE %d bytes [CONNACK]", rc);
251     loghex(packet, rc);
252     logprotocol(FROM_SERVER, "CONNACK", 2, dump, packet, sizeof(packet));
253   }
254   if(rc == sizeof(packet)) {
255     return 0;
256   }
257   return 1;
258 }
259 
260 /* return 0 on success */
suback(FILE * dump,curl_socket_t fd,unsigned short packetid)261 static int suback(FILE *dump, curl_socket_t fd, unsigned short packetid)
262 {
263   unsigned char packet[]={
264     MQTT_MSG_SUBACK, 0x03,
265     0, 0, /* filled in below */
266     0x00
267   };
268   ssize_t rc;
269   packet[2] = (unsigned char)(packetid >> 8);
270   packet[3] = (unsigned char)(packetid & 0xff);
271 
272   rc = swrite(fd, (char *)packet, sizeof(packet));
273   if(rc == sizeof(packet)) {
274     logmsg("WROTE %d bytes [SUBACK]", rc);
275     loghex(packet, rc);
276     logprotocol(FROM_SERVER, "SUBACK", 3, dump, packet, rc);
277     return 0;
278   }
279   return 1;
280 }
281 
282 #ifdef QOS
283 /* return 0 on success */
puback(FILE * dump,curl_socket_t fd,unsigned short packetid)284 static int puback(FILE *dump, curl_socket_t fd, unsigned short packetid)
285 {
286   unsigned char packet[]={
287     MQTT_MSG_PUBACK, 0x00,
288     0, 0 /* filled in below */
289   };
290   ssize_t rc;
291   packet[2] = (unsigned char)(packetid >> 8);
292   packet[3] = (unsigned char)(packetid & 0xff);
293 
294   rc = swrite(fd, (char *)packet, sizeof(packet));
295   if(rc == sizeof(packet)) {
296     logmsg("WROTE %d bytes [PUBACK]", rc);
297     loghex(packet, rc);
298     logprotocol(FROM_SERVER, dump, packet, rc);
299     return 0;
300   }
301   logmsg("Failed sending [PUBACK]");
302   return 1;
303 }
304 #endif
305 
306 /* return 0 on success */
disconnect(FILE * dump,curl_socket_t fd)307 static int disconnect(FILE *dump, curl_socket_t fd)
308 {
309   unsigned char packet[]={
310     MQTT_MSG_DISCONNECT, 0x00,
311   };
312   ssize_t rc = swrite(fd, (char *)packet, sizeof(packet));
313   if(rc == sizeof(packet)) {
314     logmsg("WROTE %d bytes [DISCONNECT]", rc);
315     loghex(packet, rc);
316     logprotocol(FROM_SERVER, "DISCONNECT", 0, dump, packet, rc);
317     return 0;
318   }
319   logmsg("Failed sending [DISCONNECT]");
320   return 1;
321 }
322 
323 
324 
325 /*
326   do
327 
328      encodedByte = X MOD 128
329 
330      X = X DIV 128
331 
332      // if there are more data to encode, set the top bit of this byte
333 
334      if ( X > 0 )
335 
336         encodedByte = encodedByte OR 128
337 
338       endif
339 
340     'output' encodedByte
341 
342   while ( X > 0 )
343 
344 */
345 
346 /* return number of bytes used */
encode_length(size_t packetlen,unsigned char * remlength)347 static int encode_length(size_t packetlen,
348                          unsigned char *remlength) /* 4 bytes */
349 {
350   int bytes = 0;
351   unsigned char encode;
352 
353   do {
354     encode = packetlen % 0x80;
355     packetlen /= 0x80;
356     if(packetlen)
357       encode |= 0x80;
358 
359     remlength[bytes++] = encode;
360 
361     if(bytes > 3) {
362       logmsg("too large packet!");
363       return 0;
364     }
365   } while(packetlen);
366 
367   return bytes;
368 }
369 
370 
decode_length(unsigned char * buf,size_t buflen,size_t * lenbytes)371 static size_t decode_length(unsigned char *buf,
372                             size_t buflen, size_t *lenbytes)
373 {
374   size_t len = 0;
375   size_t mult = 1;
376   size_t i;
377   unsigned char encoded = 0x80;
378 
379   for(i = 0; (i < buflen) && (encoded & 0x80); i++) {
380     encoded = buf[i];
381     len += (encoded & 0x7f) * mult;
382     mult *= 0x80;
383   }
384 
385   if(lenbytes)
386     *lenbytes = i;
387 
388   return len;
389 }
390 
391 
392 /* return 0 on success */
publish(FILE * dump,curl_socket_t fd,unsigned short packetid,char * topic,char * payload,size_t payloadlen)393 static int publish(FILE *dump,
394                    curl_socket_t fd, unsigned short packetid,
395                    char *topic, char *payload, size_t payloadlen)
396 {
397   size_t topiclen = strlen(topic);
398   unsigned char *packet;
399   size_t payloadindex;
400   ssize_t remaininglength = topiclen + 2 + payloadlen;
401   ssize_t packetlen;
402   ssize_t sendamount;
403   ssize_t rc;
404   unsigned char rembuffer[4];
405   int encodedlen;
406 
407   if(config.excessive_remaining) {
408     /* manually set illegal remaining length */
409     rembuffer[0] = 0xff;
410     rembuffer[1] = 0xff;
411     rembuffer[2] = 0xff;
412     rembuffer[3] = 0x80; /* maximum allowed here by spec is 0x7f */
413     encodedlen = 4;
414   }
415   else
416     encodedlen = encode_length(remaininglength, rembuffer);
417 
418   /* one packet type byte (possibly two more for packetid) */
419   packetlen = remaininglength + encodedlen + 1;
420   packet = malloc(packetlen);
421   if(!packet)
422     return 1;
423 
424   packet[0] = MQTT_MSG_PUBLISH; /* TODO: set QoS? */
425   memcpy(&packet[1], rembuffer, encodedlen);
426 
427   (void)packetid;
428   /* packet_id if QoS is set */
429 
430   packet[1 + encodedlen] = (unsigned char)(topiclen >> 8);
431   packet[2 + encodedlen] = (unsigned char)(topiclen & 0xff);
432   memcpy(&packet[3 + encodedlen], topic, topiclen);
433 
434   payloadindex = 3 + topiclen + encodedlen;
435   memcpy(&packet[payloadindex], payload, payloadlen);
436 
437   sendamount = packetlen;
438   if(config.short_publish)
439     sendamount -= 2;
440 
441   rc = swrite(fd, (char *)packet, sendamount);
442   if(rc > 0) {
443     logmsg("WROTE %d bytes [PUBLISH]", rc);
444     loghex(packet, rc);
445     logprotocol(FROM_SERVER, "PUBLISH", remaininglength, dump, packet, rc);
446   }
447   if(rc == packetlen)
448     return 0;
449   return 1;
450 }
451 
452 #define MAX_TOPIC_LENGTH 65535
453 #define MAX_CLIENT_ID_LENGTH 32
454 
455 static char topic[MAX_TOPIC_LENGTH + 1];
456 
fixedheader(curl_socket_t fd,unsigned char * bytep,size_t * remaining_lengthp,size_t * remaining_length_bytesp)457 static int fixedheader(curl_socket_t fd,
458                        unsigned char *bytep,
459                        size_t *remaining_lengthp,
460                        size_t *remaining_length_bytesp)
461 {
462   /* get the fixed header */
463   unsigned char buffer[10];
464 
465   /* get the first two bytes */
466   ssize_t rc = sread(fd, (char *)buffer, 2);
467   int i;
468   if(rc < 2) {
469     logmsg("READ %d bytes [SHORT!]", rc);
470     return 1; /* fail */
471   }
472   logmsg("READ %d bytes", rc);
473   loghex(buffer, rc);
474   *bytep = buffer[0];
475 
476   /* if the length byte has the top bit set, get the next one too */
477   i = 1;
478   while(buffer[i] & 0x80) {
479     i++;
480     rc = sread(fd, (char *)&buffer[i], 1);
481     if(rc != 1) {
482       logmsg("Remaining Length broken");
483       return 1;
484     }
485   }
486   *remaining_lengthp = decode_length(&buffer[1], i, remaining_length_bytesp);
487   logmsg("Remaining Length: %ld [%d bytes]", (long) *remaining_lengthp,
488          *remaining_length_bytesp);
489   return 0;
490 }
491 
mqttit(curl_socket_t fd)492 static curl_socket_t mqttit(curl_socket_t fd)
493 {
494   size_t buff_size = 10*1024;
495   unsigned char *buffer = NULL;
496   ssize_t rc;
497   unsigned char byte;
498   unsigned short packet_id;
499   size_t payload_len;
500   size_t client_id_length;
501   unsigned int topic_len;
502   size_t remaining_length = 0;
503   size_t bytes = 0; /* remaining length field size in bytes */
504   char client_id[MAX_CLIENT_ID_LENGTH];
505   long testno;
506   FILE *stream = NULL;
507   FILE *dump;
508   char dumpfile[256];
509 
510   static const char protocol[7] = {
511     0x00, 0x04,       /* protocol length */
512     'M','Q','T','T',  /* protocol name */
513     0x04              /* protocol level */
514   };
515   msnprintf(dumpfile, sizeof(dumpfile), "%s/%s", logdir, REQUEST_DUMP);
516   dump = fopen(dumpfile, "ab");
517   if(!dump)
518     goto end;
519 
520   getconfig();
521 
522   testno = config.testnum;
523 
524   if(testno)
525     logmsg("Found test number %ld", testno);
526 
527   buffer = malloc(buff_size);
528   if(!buffer) {
529     logmsg("Out of memory, unable to allocate buffer");
530     goto end;
531   }
532 
533   do {
534     unsigned char usr_flag = 0x80;
535     unsigned char passwd_flag = 0x40;
536     unsigned char conn_flags;
537     const size_t client_id_offset = 12;
538     size_t start_usr;
539     size_t start_passwd;
540 
541     /* get the fixed header */
542     rc = fixedheader(fd, &byte, &remaining_length, &bytes);
543     if(rc)
544       break;
545 
546     if(remaining_length >= buff_size) {
547       buff_size = remaining_length;
548       buffer = realloc(buffer, buff_size);
549       if(!buffer) {
550         logmsg("Failed realloc of size %lu", buff_size);
551         goto end;
552       }
553     }
554 
555     if(remaining_length) {
556       /* reading variable header and payload into buffer */
557       rc = sread(fd, (char *)buffer, remaining_length);
558       if(rc > 0) {
559         logmsg("READ %d bytes", rc);
560         loghex(buffer, rc);
561       }
562     }
563 
564     if(byte == MQTT_MSG_CONNECT) {
565       logprotocol(FROM_CLIENT, "CONNECT", remaining_length,
566                   dump, buffer, rc);
567 
568       if(memcmp(protocol, buffer, sizeof(protocol))) {
569         logmsg("Protocol preamble mismatch");
570         goto end;
571       }
572       /* ignore the connect flag byte and two keepalive bytes */
573       payload_len = (buffer[10] << 8) | buffer[11];
574       /* first part of the payload is the client ID */
575       client_id_length = payload_len;
576 
577       /* checking if user and password flags were set */
578       conn_flags = buffer[7];
579 
580       start_usr = client_id_offset + payload_len;
581       if(usr_flag == (unsigned char)(conn_flags & usr_flag)) {
582         logmsg("User flag is present in CONN flag");
583         payload_len += (buffer[start_usr] << 8) | buffer[start_usr + 1];
584         payload_len += 2; /* MSB and LSB for user length */
585       }
586 
587       start_passwd = client_id_offset + payload_len;
588       if(passwd_flag == (char)(conn_flags & passwd_flag)) {
589         logmsg("Password flag is present in CONN flags");
590         payload_len += (buffer[start_passwd] << 8) | buffer[start_passwd + 1];
591         payload_len += 2; /* MSB and LSB for password length */
592       }
593 
594       /* check the length of the payload */
595       if((ssize_t)payload_len != (rc - 12)) {
596         logmsg("Payload length mismatch, expected %x got %x",
597                rc - 12, payload_len);
598         goto end;
599       }
600       /* check the length of the client ID */
601       else if((client_id_length + 1) > MAX_CLIENT_ID_LENGTH) {
602         logmsg("Too large client id");
603         goto end;
604       }
605       memcpy(client_id, &buffer[12], client_id_length);
606       client_id[client_id_length] = 0;
607 
608       logmsg("MQTT client connect accepted: %s", client_id);
609 
610       /* The first packet sent from the Server to the Client MUST be a
611          CONNACK Packet */
612 
613       if(connack(dump, fd)) {
614         logmsg("failed sending CONNACK");
615         goto end;
616       }
617     }
618     else if(byte == MQTT_MSG_SUBSCRIBE) {
619       int error;
620       char *data;
621       size_t datalen;
622       logprotocol(FROM_CLIENT, "SUBSCRIBE", remaining_length,
623                   dump, buffer, rc);
624       logmsg("Incoming SUBSCRIBE");
625 
626       if(rc < 6) {
627         logmsg("Too small SUBSCRIBE");
628         goto end;
629       }
630 
631       /* two bytes packet id */
632       packet_id = (unsigned short)((buffer[0] << 8) | buffer[1]);
633 
634       /* two bytes topic length */
635       topic_len = (buffer[2] << 8) | buffer[3];
636       if(topic_len != (remaining_length - 5)) {
637         logmsg("Wrong topic length, got %d expected %d",
638                topic_len, remaining_length - 5);
639         goto end;
640       }
641       memcpy(topic, &buffer[4], topic_len);
642       topic[topic_len] = 0;
643 
644       /* there's a QoS byte (two bits) after the topic */
645 
646       logmsg("SUBSCRIBE to '%s' [%d]", topic, packet_id);
647       stream = test2fopen(testno, logdir);
648       error = getpart(&data, &datalen, "reply", "data", stream);
649       if(!error) {
650         if(!config.publish_before_suback) {
651           if(suback(dump, fd, packet_id)) {
652             logmsg("failed sending SUBACK");
653             goto end;
654           }
655         }
656         if(publish(dump, fd, packet_id, topic, data, datalen)) {
657           logmsg("PUBLISH failed");
658           goto end;
659         }
660         if(config.publish_before_suback) {
661           if(suback(dump, fd, packet_id)) {
662             logmsg("failed sending SUBACK");
663             goto end;
664           }
665         }
666       }
667       else {
668         char *def = (char *)"this is random payload yes yes it is";
669         publish(dump, fd, packet_id, topic, def, strlen(def));
670       }
671       disconnect(dump, fd);
672     }
673     else if((byte & 0xf0) == (MQTT_MSG_PUBLISH & 0xf0)) {
674       size_t topiclen;
675 
676       logmsg("Incoming PUBLISH");
677       logprotocol(FROM_CLIENT, "PUBLISH", remaining_length,
678                   dump, buffer, rc);
679 
680       topiclen = (buffer[1 + bytes] << 8) | buffer[2 + bytes];
681       logmsg("Got %d bytes topic", topiclen);
682       /* TODO: verify topiclen */
683 
684 #ifdef QOS
685       /* TODO: handle packetid if there is one. Send puback if QoS > 0 */
686       puback(dump, fd, 0);
687 #endif
688       /* expect a disconnect here */
689       /* get the request */
690       rc = sread(fd, (char *)&buffer[0], 2);
691 
692       logmsg("READ %d bytes [DISCONNECT]", rc);
693       loghex(buffer, rc);
694       logprotocol(FROM_CLIENT, "DISCONNECT", 0, dump, buffer, rc);
695       goto end;
696     }
697     else {
698       /* not supported (yet) */
699       goto end;
700     }
701   } while(1);
702 
703 end:
704   if(buffer)
705     free(buffer);
706   if(dump)
707     fclose(dump);
708   if(stream)
709     fclose(stream);
710   return CURL_SOCKET_BAD;
711 }
712 
713 /*
714   sockfdp is a pointer to an established stream or CURL_SOCKET_BAD
715 
716   if sockfd is CURL_SOCKET_BAD, listendfd is a listening socket we must
717   accept()
718 */
incoming(curl_socket_t listenfd)719 static bool incoming(curl_socket_t listenfd)
720 {
721   fd_set fds_read;
722   fd_set fds_write;
723   fd_set fds_err;
724   int clients = 0; /* connected clients */
725 
726   if(got_exit_signal) {
727     logmsg("signalled to die, exiting...");
728     return FALSE;
729   }
730 
731 #ifdef HAVE_GETPPID
732   /* As a last resort, quit if socks5 process becomes orphan. */
733   if(getppid() <= 1) {
734     logmsg("process becomes orphan, exiting");
735     return FALSE;
736   }
737 #endif
738 
739   do {
740     ssize_t rc;
741     int error = 0;
742     curl_socket_t sockfd = listenfd;
743     int maxfd = (int)sockfd;
744 
745     FD_ZERO(&fds_read);
746     FD_ZERO(&fds_write);
747     FD_ZERO(&fds_err);
748 
749     /* there's always a socket to wait for */
750     FD_SET(sockfd, &fds_read);
751 
752     do {
753       /* select() blocking behavior call on blocking descriptors please */
754       rc = select(maxfd + 1, &fds_read, &fds_write, &fds_err, NULL);
755       if(got_exit_signal) {
756         logmsg("signalled to die, exiting...");
757         return FALSE;
758       }
759     } while((rc == -1) && ((error = SOCKERRNO) == EINTR));
760 
761     if(rc < 0) {
762       logmsg("select() failed with error: (%d) %s",
763              error, strerror(error));
764       return FALSE;
765     }
766 
767     if(FD_ISSET(sockfd, &fds_read)) {
768       curl_socket_t newfd = accept(sockfd, NULL, NULL);
769       if(CURL_SOCKET_BAD == newfd) {
770         error = SOCKERRNO;
771         logmsg("accept(%d, NULL, NULL) failed with error: (%d) %s",
772                sockfd, error, sstrerror(error));
773       }
774       else {
775         logmsg("====> Client connect, fd %d. Read config from %s",
776                newfd, configfile);
777         set_advisor_read_lock(loglockfile);
778         (void)mqttit(newfd); /* until done */
779         clear_advisor_read_lock(loglockfile);
780 
781         logmsg("====> Client disconnect");
782         sclose(newfd);
783       }
784     }
785   } while(clients);
786 
787   return TRUE;
788 }
789 
sockdaemon(curl_socket_t sock,unsigned short * listenport)790 static curl_socket_t sockdaemon(curl_socket_t sock,
791                                 unsigned short *listenport)
792 {
793   /* passive daemon style */
794   srvr_sockaddr_union_t listener;
795   int flag;
796   int rc;
797   int totdelay = 0;
798   int maxretr = 10;
799   int delay = 20;
800   int attempt = 0;
801   int error = 0;
802 
803   do {
804     attempt++;
805     flag = 1;
806     rc = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
807          (void *)&flag, sizeof(flag));
808     if(rc) {
809       error = SOCKERRNO;
810       logmsg("setsockopt(SO_REUSEADDR) failed with error: (%d) %s",
811              error, sstrerror(error));
812       if(maxretr) {
813         rc = wait_ms(delay);
814         if(rc) {
815           /* should not happen */
816           logmsg("wait_ms() failed with error: %d", rc);
817           sclose(sock);
818           return CURL_SOCKET_BAD;
819         }
820         if(got_exit_signal) {
821           logmsg("signalled to die, exiting...");
822           sclose(sock);
823           return CURL_SOCKET_BAD;
824         }
825         totdelay += delay;
826         delay *= 2; /* double the sleep for next attempt */
827       }
828     }
829   } while(rc && maxretr--);
830 
831   if(rc) {
832     logmsg("setsockopt(SO_REUSEADDR) failed %d times in %d ms. Error: (%d) %s",
833            attempt, totdelay, error, strerror(error));
834     logmsg("Continuing anyway...");
835   }
836 
837   /* When the specified listener port is zero, it is actually a
838      request to let the system choose a non-zero available port. */
839 
840 #ifdef ENABLE_IPV6
841   if(!use_ipv6) {
842 #endif
843     memset(&listener.sa4, 0, sizeof(listener.sa4));
844     listener.sa4.sin_family = AF_INET;
845     listener.sa4.sin_addr.s_addr = INADDR_ANY;
846     listener.sa4.sin_port = htons(*listenport);
847     rc = bind(sock, &listener.sa, sizeof(listener.sa4));
848 #ifdef ENABLE_IPV6
849   }
850   else {
851     memset(&listener.sa6, 0, sizeof(listener.sa6));
852     listener.sa6.sin6_family = AF_INET6;
853     listener.sa6.sin6_addr = in6addr_any;
854     listener.sa6.sin6_port = htons(*listenport);
855     rc = bind(sock, &listener.sa, sizeof(listener.sa6));
856   }
857 #endif /* ENABLE_IPV6 */
858   if(rc) {
859     error = SOCKERRNO;
860     logmsg("Error binding socket on port %hu: (%d) %s",
861            *listenport, error, sstrerror(error));
862     sclose(sock);
863     return CURL_SOCKET_BAD;
864   }
865 
866   if(!*listenport) {
867     /* The system was supposed to choose a port number, figure out which
868        port we actually got and update the listener port value with it. */
869     curl_socklen_t la_size;
870     srvr_sockaddr_union_t localaddr;
871 #ifdef ENABLE_IPV6
872     if(!use_ipv6)
873 #endif
874       la_size = sizeof(localaddr.sa4);
875 #ifdef ENABLE_IPV6
876     else
877       la_size = sizeof(localaddr.sa6);
878 #endif
879     memset(&localaddr.sa, 0, (size_t)la_size);
880     if(getsockname(sock, &localaddr.sa, &la_size) < 0) {
881       error = SOCKERRNO;
882       logmsg("getsockname() failed with error: (%d) %s",
883              error, sstrerror(error));
884       sclose(sock);
885       return CURL_SOCKET_BAD;
886     }
887     switch(localaddr.sa.sa_family) {
888     case AF_INET:
889       *listenport = ntohs(localaddr.sa4.sin_port);
890       break;
891 #ifdef ENABLE_IPV6
892     case AF_INET6:
893       *listenport = ntohs(localaddr.sa6.sin6_port);
894       break;
895 #endif
896     default:
897       break;
898     }
899     if(!*listenport) {
900       /* Real failure, listener port shall not be zero beyond this point. */
901       logmsg("Apparently getsockname() succeeded, with listener port zero.");
902       logmsg("A valid reason for this failure is a binary built without");
903       logmsg("proper network library linkage. This might not be the only");
904       logmsg("reason, but double check it before anything else.");
905       sclose(sock);
906       return CURL_SOCKET_BAD;
907     }
908   }
909 
910   /* start accepting connections */
911   rc = listen(sock, 5);
912   if(0 != rc) {
913     error = SOCKERRNO;
914     logmsg("listen(%d, 5) failed with error: (%d) %s",
915            sock, error, sstrerror(error));
916     sclose(sock);
917     return CURL_SOCKET_BAD;
918   }
919 
920   return sock;
921 }
922 
923 
main(int argc,char * argv[])924 int main(int argc, char *argv[])
925 {
926   curl_socket_t sock = CURL_SOCKET_BAD;
927   curl_socket_t msgsock = CURL_SOCKET_BAD;
928   int wrotepidfile = 0;
929   int wroteportfile = 0;
930   const char *pidname = ".mqttd.pid";
931   const char *portname = ".mqttd.port";
932   bool juggle_again;
933   int error;
934   int arg = 1;
935 
936   while(argc>arg) {
937     if(!strcmp("--version", argv[arg])) {
938       printf("mqttd IPv4%s\n",
939 #ifdef ENABLE_IPV6
940              "/IPv6"
941 #else
942              ""
943 #endif
944              );
945       return 0;
946     }
947     else if(!strcmp("--pidfile", argv[arg])) {
948       arg++;
949       if(argc>arg)
950         pidname = argv[arg++];
951     }
952     else if(!strcmp("--portfile", argv[arg])) {
953       arg++;
954       if(argc>arg)
955         portname = argv[arg++];
956     }
957     else if(!strcmp("--config", argv[arg])) {
958       arg++;
959       if(argc>arg)
960         configfile = argv[arg++];
961     }
962     else if(!strcmp("--logfile", argv[arg])) {
963       arg++;
964       if(argc>arg)
965         serverlogfile = argv[arg++];
966     }
967     else if(!strcmp("--logdir", argv[arg])) {
968       arg++;
969       if(argc>arg)
970         logdir = argv[arg++];
971     }
972     else if(!strcmp("--ipv6", argv[arg])) {
973 #ifdef ENABLE_IPV6
974       ipv_inuse = "IPv6";
975       use_ipv6 = TRUE;
976 #endif
977       arg++;
978     }
979     else if(!strcmp("--ipv4", argv[arg])) {
980       /* for completeness, we support this option as well */
981 #ifdef ENABLE_IPV6
982       ipv_inuse = "IPv4";
983       use_ipv6 = FALSE;
984 #endif
985       arg++;
986     }
987     else if(!strcmp("--port", argv[arg])) {
988       arg++;
989       if(argc>arg) {
990         char *endptr;
991         unsigned long ulnum = strtoul(argv[arg], &endptr, 10);
992         if((endptr != argv[arg] + strlen(argv[arg])) ||
993            ((ulnum != 0UL) && ((ulnum < 1025UL) || (ulnum > 65535UL)))) {
994           fprintf(stderr, "mqttd: invalid --port argument (%s)\n",
995                   argv[arg]);
996           return 0;
997         }
998         port = curlx_ultous(ulnum);
999         arg++;
1000       }
1001     }
1002     else {
1003       puts("Usage: mqttd [option]\n"
1004            " --config [file]\n"
1005            " --version\n"
1006            " --logfile [file]\n"
1007            " --logdir [directory]\n"
1008            " --pidfile [file]\n"
1009            " --portfile [file]\n"
1010            " --ipv4\n"
1011            " --ipv6\n"
1012            " --port [port]\n");
1013       return 0;
1014     }
1015   }
1016 
1017   msnprintf(loglockfile, sizeof(loglockfile), "%s/%s/mqtt-%s.lock",
1018             logdir, SERVERLOGS_LOCKDIR, ipv_inuse);
1019 
1020 #ifdef WIN32
1021   win32_init();
1022   atexit(win32_cleanup);
1023 
1024   setmode(fileno(stdin), O_BINARY);
1025   setmode(fileno(stdout), O_BINARY);
1026   setmode(fileno(stderr), O_BINARY);
1027 #endif
1028 
1029   install_signal_handlers(FALSE);
1030 
1031 #ifdef ENABLE_IPV6
1032   if(!use_ipv6)
1033 #endif
1034     sock = socket(AF_INET, SOCK_STREAM, 0);
1035 #ifdef ENABLE_IPV6
1036   else
1037     sock = socket(AF_INET6, SOCK_STREAM, 0);
1038 #endif
1039 
1040   if(CURL_SOCKET_BAD == sock) {
1041     error = SOCKERRNO;
1042     logmsg("Error creating socket: (%d) %s", error, sstrerror(error));
1043     goto mqttd_cleanup;
1044   }
1045 
1046   {
1047     /* passive daemon style */
1048     sock = sockdaemon(sock, &port);
1049     if(CURL_SOCKET_BAD == sock) {
1050       goto mqttd_cleanup;
1051     }
1052     msgsock = CURL_SOCKET_BAD; /* no stream socket yet */
1053   }
1054 
1055   logmsg("Running %s version", ipv_inuse);
1056   logmsg("Listening on port %hu", port);
1057 
1058   wrotepidfile = write_pidfile(pidname);
1059   if(!wrotepidfile) {
1060     goto mqttd_cleanup;
1061   }
1062 
1063   wroteportfile = write_portfile(portname, port);
1064   if(!wroteportfile) {
1065     goto mqttd_cleanup;
1066   }
1067 
1068   do {
1069     juggle_again = incoming(sock);
1070   } while(juggle_again);
1071 
1072 mqttd_cleanup:
1073 
1074   if((msgsock != sock) && (msgsock != CURL_SOCKET_BAD))
1075     sclose(msgsock);
1076 
1077   if(sock != CURL_SOCKET_BAD)
1078     sclose(sock);
1079 
1080   if(wrotepidfile)
1081     unlink(pidname);
1082   if(wroteportfile)
1083     unlink(portname);
1084 
1085   restore_signal_handlers(FALSE);
1086 
1087   if(got_exit_signal) {
1088     logmsg("============> mqttd exits with signal (%d)", exit_signal);
1089     /*
1090      * To properly set the return status of the process we
1091      * must raise the same signal SIGINT or SIGTERM that we
1092      * caught and let the old handler take care of it.
1093      */
1094     raise(exit_signal);
1095   }
1096 
1097   logmsg("============> mqttd quits");
1098   return 0;
1099 }
1100