1 /***************************************************************************
2  *                                  _   _ ____  _
3  *  Project                     ___| | | |  _ \| |
4  *                             / __| | | | |_) | |
5  *                            | (__| |_| |  _ <| |___
6  *                             \___|\___/|_| \_\_____|
7  *
8  * Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
9  *
10  * This software is licensed as described in the file COPYING, which
11  * you should have received as part of this distribution. The terms
12  * are also available at https://curl.se/docs/copyright.html.
13  *
14  * You may opt to use, copy, modify, merge, publish, distribute and/or sell
15  * copies of the Software, and permit persons to whom the Software is
16  * furnished to do so, under the terms of the COPYING file.
17  *
18  * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
19  * KIND, either express or implied.
20  *
21  * SPDX-License-Identifier: curl
22  *
23  ***************************************************************************/
24 #include "server_setup.h"
25 #include <stdlib.h>
26 #include <string.h>
27 #include "util.h"
28 
29 /* Function
30  *
31  * Accepts a TCP connection on a custom port (IPv4 or IPv6).  Speaks MQTT.
32  *
33  * Read commands from FILE (set with --config). The commands control how to
34  * act and is reset to defaults each client TCP connect.
35  *
36  * Config file keywords:
37  *
38  * TODO
39  */
40 
41 /* based on sockfilt.c */
42 
43 #ifdef HAVE_SIGNAL_H
44 #include <signal.h>
45 #endif
46 #ifdef HAVE_NETINET_IN_H
47 #include <netinet/in.h>
48 #endif
49 #ifdef HAVE_NETINET_IN6_H
50 #include <netinet/in6.h>
51 #endif
52 #ifdef HAVE_ARPA_INET_H
53 #include <arpa/inet.h>
54 #endif
55 #ifdef HAVE_NETDB_H
56 #include <netdb.h>
57 #endif
58 
59 #define ENABLE_CURLX_PRINTF
60 /* make the curlx header define all printf() functions to use the curlx_*
61    versions instead */
62 #include "curlx.h" /* from the private lib dir */
63 #include "getpart.h"
64 #include "inet_pton.h"
65 #include "server_sockaddr.h"
66 #include "warnless.h"
67 
68 /* include memdebug.h last */
69 #include "memdebug.h"
70 
71 #ifdef USE_WINSOCK
72 #undef  EINTR
73 #define EINTR    4 /* errno.h value */
74 #undef  EAGAIN
75 #define EAGAIN  11 /* errno.h value */
76 #undef  ENOMEM
77 #define ENOMEM  12 /* errno.h value */
78 #undef  EINVAL
79 #define EINVAL  22 /* errno.h value */
80 #endif
81 
82 #define DEFAULT_PORT 1883 /* MQTT default port */
83 
84 #ifndef DEFAULT_LOGFILE
85 #define DEFAULT_LOGFILE "log/mqttd.log"
86 #endif
87 
88 #ifndef DEFAULT_CONFIG
89 #define DEFAULT_CONFIG "mqttd.config"
90 #endif
91 
92 #define MQTT_MSG_CONNECT    0x10
93 #define MQTT_MSG_CONNACK    0x20
94 #define MQTT_MSG_PUBLISH    0x30
95 #define MQTT_MSG_PUBACK     0x40
96 #define MQTT_MSG_SUBSCRIBE  0x82
97 #define MQTT_MSG_SUBACK     0x90
98 #define MQTT_MSG_DISCONNECT 0xe0
99 
100 #define MQTT_CONNACK_LEN 4
101 #define MQTT_SUBACK_LEN 5
102 #define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
103 #define MQTT_HEADER_LEN 5    /* max 5 bytes */
104 
105 struct configurable {
106   unsigned char version; /* initial version byte in the request must match
107                             this */
108   bool publish_before_suback;
109   bool short_publish;
110   bool excessive_remaining;
111   unsigned char error_connack;
112   int testnum;
113 };
114 
115 #define REQUEST_DUMP  "log/server.input"
116 #define CONFIG_VERSION 5
117 
118 static struct configurable config;
119 
120 const char *serverlogfile = DEFAULT_LOGFILE;
121 static const char *configfile = DEFAULT_CONFIG;
122 
123 #ifdef ENABLE_IPV6
124 static bool use_ipv6 = FALSE;
125 #endif
126 static const char *ipv_inuse = "IPv4";
127 static unsigned short port = DEFAULT_PORT;
128 
resetdefaults(void)129 static void resetdefaults(void)
130 {
131   logmsg("Reset to defaults");
132   config.version = CONFIG_VERSION;
133   config.publish_before_suback = FALSE;
134   config.short_publish = FALSE;
135   config.excessive_remaining = FALSE;
136   config.error_connack = 0;
137   config.testnum = 0;
138 }
139 
byteval(char * value)140 static unsigned char byteval(char *value)
141 {
142   unsigned long num = strtoul(value, NULL, 10);
143   return num & 0xff;
144 }
145 
getconfig(void)146 static void getconfig(void)
147 {
148   FILE *fp = fopen(configfile, FOPEN_READTEXT);
149   resetdefaults();
150   if(fp) {
151     char buffer[512];
152     logmsg("parse config file");
153     while(fgets(buffer, sizeof(buffer), fp)) {
154       char key[32];
155       char value[32];
156       if(2 == sscanf(buffer, "%31s %31s", key, value)) {
157         if(!strcmp(key, "version")) {
158           config.version = byteval(value);
159           logmsg("version [%d] set", config.version);
160         }
161         else if(!strcmp(key, "PUBLISH-before-SUBACK")) {
162           logmsg("PUBLISH-before-SUBACK set");
163           config.publish_before_suback = TRUE;
164         }
165         else if(!strcmp(key, "short-PUBLISH")) {
166           logmsg("short-PUBLISH set");
167           config.short_publish = TRUE;
168         }
169         else if(!strcmp(key, "error-CONNACK")) {
170           config.error_connack = byteval(value);
171           logmsg("error-CONNACK = %d", config.error_connack);
172         }
173         else if(!strcmp(key, "Testnum")) {
174           config.testnum = atoi(value);
175           logmsg("testnum = %d", config.testnum);
176         }
177         else if(!strcmp(key, "excessive-remaining")) {
178           logmsg("excessive-remaining set");
179           config.excessive_remaining = TRUE;
180         }
181       }
182     }
183     fclose(fp);
184   }
185   else {
186     logmsg("No config file '%s' to read", configfile);
187   }
188 }
189 
loghex(unsigned char * buffer,ssize_t len)190 static void loghex(unsigned char *buffer, ssize_t len)
191 {
192   char data[12000];
193   ssize_t i;
194   unsigned char *ptr = buffer;
195   char *optr = data;
196   ssize_t width = 0;
197   int left = sizeof(data);
198 
199   for(i = 0; i<len && (left >= 0); i++) {
200     msnprintf(optr, left, "%02x", ptr[i]);
201     width += 2;
202     optr += 2;
203     left -= 2;
204   }
205   if(width)
206     logmsg("'%s'", data);
207 }
208 
209 typedef enum {
210   FROM_CLIENT,
211   FROM_SERVER
212 } mqttdir;
213 
logprotocol(mqttdir dir,const char * prefix,size_t remlen,FILE * output,unsigned char * buffer,ssize_t len)214 static void logprotocol(mqttdir dir,
215                         const char *prefix, size_t remlen,
216                         FILE *output,
217                         unsigned char *buffer, ssize_t len)
218 {
219   char data[12000] = "";
220   ssize_t i;
221   unsigned char *ptr = buffer;
222   char *optr = data;
223   int left = sizeof(data);
224 
225   for(i = 0; i<len && (left >= 0); i++) {
226     msnprintf(optr, left, "%02x", ptr[i]);
227     optr += 2;
228     left -= 2;
229   }
230   fprintf(output, "%s %s %zx %s\n",
231           dir == FROM_CLIENT? "client": "server",
232           prefix, remlen,
233           data);
234 }
235 
236 
237 /* return 0 on success */
connack(FILE * dump,curl_socket_t fd)238 static int connack(FILE *dump, curl_socket_t fd)
239 {
240   unsigned char packet[]={
241     MQTT_MSG_CONNACK, 0x02,
242     0x00, 0x00
243   };
244   ssize_t rc;
245 
246   packet[3] = config.error_connack;
247 
248   rc = swrite(fd, (char *)packet, sizeof(packet));
249   if(rc > 0) {
250     logmsg("WROTE %d bytes [CONNACK]", rc);
251     loghex(packet, rc);
252     logprotocol(FROM_SERVER, "CONNACK", 2, dump, packet, sizeof(packet));
253   }
254   if(rc == sizeof(packet)) {
255     return 0;
256   }
257   return 1;
258 }
259 
260 /* return 0 on success */
suback(FILE * dump,curl_socket_t fd,unsigned short packetid)261 static int suback(FILE *dump, curl_socket_t fd, unsigned short packetid)
262 {
263   unsigned char packet[]={
264     MQTT_MSG_SUBACK, 0x03,
265     0, 0, /* filled in below */
266     0x00
267   };
268   ssize_t rc;
269   packet[2] = (unsigned char)(packetid >> 8);
270   packet[3] = (unsigned char)(packetid & 0xff);
271 
272   rc = swrite(fd, (char *)packet, sizeof(packet));
273   if(rc == sizeof(packet)) {
274     logmsg("WROTE %d bytes [SUBACK]", rc);
275     loghex(packet, rc);
276     logprotocol(FROM_SERVER, "SUBACK", 3, dump, packet, rc);
277     return 0;
278   }
279   return 1;
280 }
281 
282 #ifdef QOS
283 /* return 0 on success */
puback(FILE * dump,curl_socket_t fd,unsigned short packetid)284 static int puback(FILE *dump, curl_socket_t fd, unsigned short packetid)
285 {
286   unsigned char packet[]={
287     MQTT_MSG_PUBACK, 0x00,
288     0, 0 /* filled in below */
289   };
290   ssize_t rc;
291   packet[2] = (unsigned char)(packetid >> 8);
292   packet[3] = (unsigned char)(packetid & 0xff);
293 
294   rc = swrite(fd, (char *)packet, sizeof(packet));
295   if(rc == sizeof(packet)) {
296     logmsg("WROTE %d bytes [PUBACK]", rc);
297     loghex(packet, rc);
298     logprotocol(FROM_SERVER, dump, packet, rc);
299     return 0;
300   }
301   logmsg("Failed sending [PUBACK]");
302   return 1;
303 }
304 #endif
305 
306 /* return 0 on success */
disconnect(FILE * dump,curl_socket_t fd)307 static int disconnect(FILE *dump, curl_socket_t fd)
308 {
309   unsigned char packet[]={
310     MQTT_MSG_DISCONNECT, 0x00,
311   };
312   ssize_t rc = swrite(fd, (char *)packet, sizeof(packet));
313   if(rc == sizeof(packet)) {
314     logmsg("WROTE %d bytes [DISCONNECT]", rc);
315     loghex(packet, rc);
316     logprotocol(FROM_SERVER, "DISCONNECT", 0, dump, packet, rc);
317     return 0;
318   }
319   logmsg("Failed sending [DISCONNECT]");
320   return 1;
321 }
322 
323 
324 
325 /*
326   do
327 
328      encodedByte = X MOD 128
329 
330      X = X DIV 128
331 
332      // if there are more data to encode, set the top bit of this byte
333 
334      if ( X > 0 )
335 
336         encodedByte = encodedByte OR 128
337 
338       endif
339 
340     'output' encodedByte
341 
342   while ( X > 0 )
343 
344 */
345 
346 /* return number of bytes used */
encode_length(size_t packetlen,unsigned char * remlength)347 static int encode_length(size_t packetlen,
348                          unsigned char *remlength) /* 4 bytes */
349 {
350   int bytes = 0;
351   unsigned char encode;
352 
353   do {
354     encode = packetlen % 0x80;
355     packetlen /= 0x80;
356     if(packetlen)
357       encode |= 0x80;
358 
359     remlength[bytes++] = encode;
360 
361     if(bytes > 3) {
362       logmsg("too large packet!");
363       return 0;
364     }
365   } while(packetlen);
366 
367   return bytes;
368 }
369 
370 
decode_length(unsigned char * buf,size_t buflen,size_t * lenbytes)371 static size_t decode_length(unsigned char *buf,
372                             size_t buflen, size_t *lenbytes)
373 {
374   size_t len = 0;
375   size_t mult = 1;
376   size_t i;
377   unsigned char encoded = 0x80;
378 
379   for(i = 0; (i < buflen) && (encoded & 0x80); i++) {
380     encoded = buf[i];
381     len += (encoded & 0x7f) * mult;
382     mult *= 0x80;
383   }
384 
385   if(lenbytes)
386     *lenbytes = i;
387 
388   return len;
389 }
390 
391 
392 /* return 0 on success */
publish(FILE * dump,curl_socket_t fd,unsigned short packetid,char * topic,char * payload,size_t payloadlen)393 static int publish(FILE *dump,
394                    curl_socket_t fd, unsigned short packetid,
395                    char *topic, char *payload, size_t payloadlen)
396 {
397   size_t topiclen = strlen(topic);
398   unsigned char *packet;
399   size_t payloadindex;
400   ssize_t remaininglength = topiclen + 2 + payloadlen;
401   ssize_t packetlen;
402   ssize_t sendamount;
403   ssize_t rc;
404   unsigned char rembuffer[4];
405   int encodedlen;
406 
407   if(config.excessive_remaining) {
408     /* manually set illegal remaining length */
409     rembuffer[0] = 0xff;
410     rembuffer[1] = 0xff;
411     rembuffer[2] = 0xff;
412     rembuffer[3] = 0x80; /* maximum allowed here by spec is 0x7f */
413     encodedlen = 4;
414   }
415   else
416     encodedlen = encode_length(remaininglength, rembuffer);
417 
418   /* one packet type byte (possibly two more for packetid) */
419   packetlen = remaininglength + encodedlen + 1;
420   packet = malloc(packetlen);
421   if(!packet)
422     return 1;
423 
424   packet[0] = MQTT_MSG_PUBLISH; /* TODO: set QoS? */
425   memcpy(&packet[1], rembuffer, encodedlen);
426 
427   (void)packetid;
428   /* packet_id if QoS is set */
429 
430   packet[1 + encodedlen] = (unsigned char)(topiclen >> 8);
431   packet[2 + encodedlen] = (unsigned char)(topiclen & 0xff);
432   memcpy(&packet[3 + encodedlen], topic, topiclen);
433 
434   payloadindex = 3 + topiclen + encodedlen;
435   memcpy(&packet[payloadindex], payload, payloadlen);
436 
437   sendamount = packetlen;
438   if(config.short_publish)
439     sendamount -= 2;
440 
441   rc = swrite(fd, (char *)packet, sendamount);
442   if(rc > 0) {
443     logmsg("WROTE %d bytes [PUBLISH]", rc);
444     loghex(packet, rc);
445     logprotocol(FROM_SERVER, "PUBLISH", remaininglength, dump, packet, rc);
446   }
447   if(rc == packetlen)
448     return 0;
449   return 1;
450 }
451 
452 #define MAX_TOPIC_LENGTH 65535
453 #define MAX_CLIENT_ID_LENGTH 32
454 
455 static char topic[MAX_TOPIC_LENGTH + 1];
456 
fixedheader(curl_socket_t fd,unsigned char * bytep,size_t * remaining_lengthp,size_t * remaining_length_bytesp)457 static int fixedheader(curl_socket_t fd,
458                        unsigned char *bytep,
459                        size_t *remaining_lengthp,
460                        size_t *remaining_length_bytesp)
461 {
462   /* get the fixed header */
463   unsigned char buffer[10];
464 
465   /* get the first two bytes */
466   ssize_t rc = sread(fd, (char *)buffer, 2);
467   int i;
468   if(rc < 2) {
469     logmsg("READ %d bytes [SHORT!]", rc);
470     return 1; /* fail */
471   }
472   logmsg("READ %d bytes", rc);
473   loghex(buffer, rc);
474   *bytep = buffer[0];
475 
476   /* if the length byte has the top bit set, get the next one too */
477   i = 1;
478   while(buffer[i] & 0x80) {
479     i++;
480     rc = sread(fd, (char *)&buffer[i], 1);
481     if(rc != 1) {
482       logmsg("Remaining Length broken");
483       return 1;
484     }
485   }
486   *remaining_lengthp = decode_length(&buffer[1], i, remaining_length_bytesp);
487   logmsg("Remaining Length: %ld [%d bytes]", (long) *remaining_lengthp,
488          *remaining_length_bytesp);
489   return 0;
490 }
491 
mqttit(curl_socket_t fd)492 static curl_socket_t mqttit(curl_socket_t fd)
493 {
494   size_t buff_size = 10*1024;
495   unsigned char *buffer = NULL;
496   ssize_t rc;
497   unsigned char byte;
498   unsigned short packet_id;
499   size_t payload_len;
500   size_t client_id_length;
501   unsigned int topic_len;
502   size_t remaining_length = 0;
503   size_t bytes = 0; /* remaining length field size in bytes */
504   char client_id[MAX_CLIENT_ID_LENGTH];
505   long testno;
506   FILE *stream = NULL;
507 
508 
509   static const char protocol[7] = {
510     0x00, 0x04,       /* protocol length */
511     'M','Q','T','T',  /* protocol name */
512     0x04              /* protocol level */
513   };
514   FILE *dump = fopen(REQUEST_DUMP, "ab");
515   if(!dump)
516     goto end;
517 
518   getconfig();
519 
520   testno = config.testnum;
521 
522   if(testno)
523     logmsg("Found test number %ld", testno);
524 
525   buffer = malloc(buff_size);
526   if(!buffer) {
527     logmsg("Out of memory, unable to allocate buffer");
528     goto end;
529   }
530 
531   do {
532     unsigned char usr_flag = 0x80;
533     unsigned char passwd_flag = 0x40;
534     unsigned char conn_flags;
535     const size_t client_id_offset = 12;
536     size_t start_usr;
537     size_t start_passwd;
538 
539     /* get the fixed header */
540     rc = fixedheader(fd, &byte, &remaining_length, &bytes);
541     if(rc)
542       break;
543 
544     if(remaining_length >= buff_size) {
545       buff_size = remaining_length;
546       buffer = realloc(buffer, buff_size);
547       if(!buffer) {
548         logmsg("Failed realloc of size %lu", buff_size);
549         goto end;
550       }
551     }
552 
553     if(remaining_length) {
554       /* reading variable header and payload into buffer */
555       rc = sread(fd, (char *)buffer, remaining_length);
556       if(rc > 0) {
557         logmsg("READ %d bytes", rc);
558         loghex(buffer, rc);
559       }
560     }
561 
562     if(byte == MQTT_MSG_CONNECT) {
563       logprotocol(FROM_CLIENT, "CONNECT", remaining_length,
564                   dump, buffer, rc);
565 
566       if(memcmp(protocol, buffer, sizeof(protocol))) {
567         logmsg("Protocol preamble mismatch");
568         goto end;
569       }
570       /* ignore the connect flag byte and two keepalive bytes */
571       payload_len = (buffer[10] << 8) | buffer[11];
572       /* first part of the payload is the client ID */
573       client_id_length = payload_len;
574 
575       /* checking if user and password flags were set */
576       conn_flags = buffer[7];
577 
578       start_usr = client_id_offset + payload_len;
579       if(usr_flag == (unsigned char)(conn_flags & usr_flag)) {
580         logmsg("User flag is present in CONN flag");
581         payload_len += (buffer[start_usr] << 8) | buffer[start_usr + 1];
582         payload_len += 2; /* MSB and LSB for user length */
583       }
584 
585       start_passwd = client_id_offset + payload_len;
586       if(passwd_flag == (char)(conn_flags & passwd_flag)) {
587         logmsg("Password flag is present in CONN flags");
588         payload_len += (buffer[start_passwd] << 8) | buffer[start_passwd + 1];
589         payload_len += 2; /* MSB and LSB for password length */
590       }
591 
592       /* check the length of the payload */
593       if((ssize_t)payload_len != (rc - 12)) {
594         logmsg("Payload length mismatch, expected %x got %x",
595                rc - 12, payload_len);
596         goto end;
597       }
598       /* check the length of the client ID */
599       else if((client_id_length + 1) > MAX_CLIENT_ID_LENGTH) {
600         logmsg("Too large client id");
601         goto end;
602       }
603       memcpy(client_id, &buffer[12], client_id_length);
604       client_id[client_id_length] = 0;
605 
606       logmsg("MQTT client connect accepted: %s", client_id);
607 
608       /* The first packet sent from the Server to the Client MUST be a
609          CONNACK Packet */
610 
611       if(connack(dump, fd)) {
612         logmsg("failed sending CONNACK");
613         goto end;
614       }
615     }
616     else if(byte == MQTT_MSG_SUBSCRIBE) {
617       int error;
618       char *data;
619       size_t datalen;
620       logprotocol(FROM_CLIENT, "SUBSCRIBE", remaining_length,
621                   dump, buffer, rc);
622       logmsg("Incoming SUBSCRIBE");
623 
624       if(rc < 6) {
625         logmsg("Too small SUBSCRIBE");
626         goto end;
627       }
628 
629       /* two bytes packet id */
630       packet_id = (unsigned short)((buffer[0] << 8) | buffer[1]);
631 
632       /* two bytes topic length */
633       topic_len = (buffer[2] << 8) | buffer[3];
634       if(topic_len != (remaining_length - 5)) {
635         logmsg("Wrong topic length, got %d expected %d",
636                topic_len, remaining_length - 5);
637         goto end;
638       }
639       memcpy(topic, &buffer[4], topic_len);
640       topic[topic_len] = 0;
641 
642       /* there's a QoS byte (two bits) after the topic */
643 
644       logmsg("SUBSCRIBE to '%s' [%d]", topic, packet_id);
645       stream = test2fopen(testno);
646       error = getpart(&data, &datalen, "reply", "data", stream);
647       if(!error) {
648         if(!config.publish_before_suback) {
649           if(suback(dump, fd, packet_id)) {
650             logmsg("failed sending SUBACK");
651             goto end;
652           }
653         }
654         if(publish(dump, fd, packet_id, topic, data, datalen)) {
655           logmsg("PUBLISH failed");
656           goto end;
657         }
658         if(config.publish_before_suback) {
659           if(suback(dump, fd, packet_id)) {
660             logmsg("failed sending SUBACK");
661             goto end;
662           }
663         }
664       }
665       else {
666         char *def = (char *)"this is random payload yes yes it is";
667         publish(dump, fd, packet_id, topic, def, strlen(def));
668       }
669       disconnect(dump, fd);
670     }
671     else if((byte & 0xf0) == (MQTT_MSG_PUBLISH & 0xf0)) {
672       size_t topiclen;
673 
674       logmsg("Incoming PUBLISH");
675       logprotocol(FROM_CLIENT, "PUBLISH", remaining_length,
676                   dump, buffer, rc);
677 
678       topiclen = (buffer[1 + bytes] << 8) | buffer[2 + bytes];
679       logmsg("Got %d bytes topic", topiclen);
680       /* TODO: verify topiclen */
681 
682 #ifdef QOS
683       /* TODO: handle packetid if there is one. Send puback if QoS > 0 */
684       puback(dump, fd, 0);
685 #endif
686       /* expect a disconnect here */
687       /* get the request */
688       rc = sread(fd, (char *)&buffer[0], 2);
689 
690       logmsg("READ %d bytes [DISCONNECT]", rc);
691       loghex(buffer, rc);
692       logprotocol(FROM_CLIENT, "DISCONNECT", 0, dump, buffer, rc);
693       goto end;
694     }
695     else {
696       /* not supported (yet) */
697       goto end;
698     }
699   } while(1);
700 
701   end:
702   if(buffer)
703     free(buffer);
704   if(dump)
705     fclose(dump);
706   if(stream)
707     fclose(stream);
708   return CURL_SOCKET_BAD;
709 }
710 
711 /*
712   sockfdp is a pointer to an established stream or CURL_SOCKET_BAD
713 
714   if sockfd is CURL_SOCKET_BAD, listendfd is a listening socket we must
715   accept()
716 */
incoming(curl_socket_t listenfd)717 static bool incoming(curl_socket_t listenfd)
718 {
719   fd_set fds_read;
720   fd_set fds_write;
721   fd_set fds_err;
722   int clients = 0; /* connected clients */
723 
724   if(got_exit_signal) {
725     logmsg("signalled to die, exiting...");
726     return FALSE;
727   }
728 
729 #ifdef HAVE_GETPPID
730   /* As a last resort, quit if socks5 process becomes orphan. */
731   if(getppid() <= 1) {
732     logmsg("process becomes orphan, exiting");
733     return FALSE;
734   }
735 #endif
736 
737   do {
738     ssize_t rc;
739     int error = 0;
740     curl_socket_t sockfd = listenfd;
741     int maxfd = (int)sockfd;
742 
743     FD_ZERO(&fds_read);
744     FD_ZERO(&fds_write);
745     FD_ZERO(&fds_err);
746 
747     /* there's always a socket to wait for */
748     FD_SET(sockfd, &fds_read);
749 
750     do {
751       /* select() blocking behavior call on blocking descriptors please */
752       rc = select(maxfd + 1, &fds_read, &fds_write, &fds_err, NULL);
753       if(got_exit_signal) {
754         logmsg("signalled to die, exiting...");
755         return FALSE;
756       }
757     } while((rc == -1) && ((error = SOCKERRNO) == EINTR));
758 
759     if(rc < 0) {
760       logmsg("select() failed with error: (%d) %s",
761              error, strerror(error));
762       return FALSE;
763     }
764 
765     if(FD_ISSET(sockfd, &fds_read)) {
766       curl_socket_t newfd = accept(sockfd, NULL, NULL);
767       if(CURL_SOCKET_BAD == newfd) {
768         error = SOCKERRNO;
769         logmsg("accept(%d, NULL, NULL) failed with error: (%d) %s",
770                sockfd, error, strerror(error));
771       }
772       else {
773         logmsg("====> Client connect, fd %d. Read config from %s",
774                newfd, configfile);
775         set_advisor_read_lock(SERVERLOGS_LOCK);
776         (void)mqttit(newfd); /* until done */
777         clear_advisor_read_lock(SERVERLOGS_LOCK);
778 
779         logmsg("====> Client disconnect");
780         sclose(newfd);
781       }
782     }
783   } while(clients);
784 
785   return TRUE;
786 }
787 
sockdaemon(curl_socket_t sock,unsigned short * listenport)788 static curl_socket_t sockdaemon(curl_socket_t sock,
789                                 unsigned short *listenport)
790 {
791   /* passive daemon style */
792   srvr_sockaddr_union_t listener;
793   int flag;
794   int rc;
795   int totdelay = 0;
796   int maxretr = 10;
797   int delay = 20;
798   int attempt = 0;
799   int error = 0;
800 
801   do {
802     attempt++;
803     flag = 1;
804     rc = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
805          (void *)&flag, sizeof(flag));
806     if(rc) {
807       error = SOCKERRNO;
808       logmsg("setsockopt(SO_REUSEADDR) failed with error: (%d) %s",
809              error, strerror(error));
810       if(maxretr) {
811         rc = wait_ms(delay);
812         if(rc) {
813           /* should not happen */
814           logmsg("wait_ms() failed with error: %d", rc);
815           sclose(sock);
816           return CURL_SOCKET_BAD;
817         }
818         if(got_exit_signal) {
819           logmsg("signalled to die, exiting...");
820           sclose(sock);
821           return CURL_SOCKET_BAD;
822         }
823         totdelay += delay;
824         delay *= 2; /* double the sleep for next attempt */
825       }
826     }
827   } while(rc && maxretr--);
828 
829   if(rc) {
830     logmsg("setsockopt(SO_REUSEADDR) failed %d times in %d ms. Error: (%d) %s",
831            attempt, totdelay, error, strerror(error));
832     logmsg("Continuing anyway...");
833   }
834 
835   /* When the specified listener port is zero, it is actually a
836      request to let the system choose a non-zero available port. */
837 
838 #ifdef ENABLE_IPV6
839   if(!use_ipv6) {
840 #endif
841     memset(&listener.sa4, 0, sizeof(listener.sa4));
842     listener.sa4.sin_family = AF_INET;
843     listener.sa4.sin_addr.s_addr = INADDR_ANY;
844     listener.sa4.sin_port = htons(*listenport);
845     rc = bind(sock, &listener.sa, sizeof(listener.sa4));
846 #ifdef ENABLE_IPV6
847   }
848   else {
849     memset(&listener.sa6, 0, sizeof(listener.sa6));
850     listener.sa6.sin6_family = AF_INET6;
851     listener.sa6.sin6_addr = in6addr_any;
852     listener.sa6.sin6_port = htons(*listenport);
853     rc = bind(sock, &listener.sa, sizeof(listener.sa6));
854   }
855 #endif /* ENABLE_IPV6 */
856   if(rc) {
857     error = SOCKERRNO;
858     logmsg("Error binding socket on port %hu: (%d) %s",
859            *listenport, error, strerror(error));
860     sclose(sock);
861     return CURL_SOCKET_BAD;
862   }
863 
864   if(!*listenport) {
865     /* The system was supposed to choose a port number, figure out which
866        port we actually got and update the listener port value with it. */
867     curl_socklen_t la_size;
868     srvr_sockaddr_union_t localaddr;
869 #ifdef ENABLE_IPV6
870     if(!use_ipv6)
871 #endif
872       la_size = sizeof(localaddr.sa4);
873 #ifdef ENABLE_IPV6
874     else
875       la_size = sizeof(localaddr.sa6);
876 #endif
877     memset(&localaddr.sa, 0, (size_t)la_size);
878     if(getsockname(sock, &localaddr.sa, &la_size) < 0) {
879       error = SOCKERRNO;
880       logmsg("getsockname() failed with error: (%d) %s",
881              error, strerror(error));
882       sclose(sock);
883       return CURL_SOCKET_BAD;
884     }
885     switch(localaddr.sa.sa_family) {
886     case AF_INET:
887       *listenport = ntohs(localaddr.sa4.sin_port);
888       break;
889 #ifdef ENABLE_IPV6
890     case AF_INET6:
891       *listenport = ntohs(localaddr.sa6.sin6_port);
892       break;
893 #endif
894     default:
895       break;
896     }
897     if(!*listenport) {
898       /* Real failure, listener port shall not be zero beyond this point. */
899       logmsg("Apparently getsockname() succeeded, with listener port zero.");
900       logmsg("A valid reason for this failure is a binary built without");
901       logmsg("proper network library linkage. This might not be the only");
902       logmsg("reason, but double check it before anything else.");
903       sclose(sock);
904       return CURL_SOCKET_BAD;
905     }
906   }
907 
908   /* start accepting connections */
909   rc = listen(sock, 5);
910   if(0 != rc) {
911     error = SOCKERRNO;
912     logmsg("listen(%d, 5) failed with error: (%d) %s",
913            sock, error, strerror(error));
914     sclose(sock);
915     return CURL_SOCKET_BAD;
916   }
917 
918   return sock;
919 }
920 
921 
main(int argc,char * argv[])922 int main(int argc, char *argv[])
923 {
924   curl_socket_t sock = CURL_SOCKET_BAD;
925   curl_socket_t msgsock = CURL_SOCKET_BAD;
926   int wrotepidfile = 0;
927   int wroteportfile = 0;
928   const char *pidname = ".mqttd.pid";
929   const char *portname = ".mqttd.port";
930   bool juggle_again;
931   int error;
932   int arg = 1;
933 
934   while(argc>arg) {
935     if(!strcmp("--version", argv[arg])) {
936       printf("mqttd IPv4%s\n",
937 #ifdef ENABLE_IPV6
938              "/IPv6"
939 #else
940              ""
941 #endif
942              );
943       return 0;
944     }
945     else if(!strcmp("--pidfile", argv[arg])) {
946       arg++;
947       if(argc>arg)
948         pidname = argv[arg++];
949     }
950     else if(!strcmp("--portfile", argv[arg])) {
951       arg++;
952       if(argc>arg)
953         portname = argv[arg++];
954     }
955     else if(!strcmp("--config", argv[arg])) {
956       arg++;
957       if(argc>arg)
958         configfile = argv[arg++];
959     }
960     else if(!strcmp("--logfile", argv[arg])) {
961       arg++;
962       if(argc>arg)
963         serverlogfile = argv[arg++];
964     }
965     else if(!strcmp("--ipv6", argv[arg])) {
966 #ifdef ENABLE_IPV6
967       ipv_inuse = "IPv6";
968       use_ipv6 = TRUE;
969 #endif
970       arg++;
971     }
972     else if(!strcmp("--ipv4", argv[arg])) {
973       /* for completeness, we support this option as well */
974 #ifdef ENABLE_IPV6
975       ipv_inuse = "IPv4";
976       use_ipv6 = FALSE;
977 #endif
978       arg++;
979     }
980     else if(!strcmp("--port", argv[arg])) {
981       arg++;
982       if(argc>arg) {
983         char *endptr;
984         unsigned long ulnum = strtoul(argv[arg], &endptr, 10);
985         if((endptr != argv[arg] + strlen(argv[arg])) ||
986            ((ulnum != 0UL) && ((ulnum < 1025UL) || (ulnum > 65535UL)))) {
987           fprintf(stderr, "mqttd: invalid --port argument (%s)\n",
988                   argv[arg]);
989           return 0;
990         }
991         port = curlx_ultous(ulnum);
992         arg++;
993       }
994     }
995     else {
996       puts("Usage: mqttd [option]\n"
997            " --config [file]\n"
998            " --version\n"
999            " --logfile [file]\n"
1000            " --pidfile [file]\n"
1001            " --portfile [file]\n"
1002            " --ipv4\n"
1003            " --ipv6\n"
1004            " --port [port]\n");
1005       return 0;
1006     }
1007   }
1008 
1009 #ifdef WIN32
1010   win32_init();
1011   atexit(win32_cleanup);
1012 
1013   setmode(fileno(stdin), O_BINARY);
1014   setmode(fileno(stdout), O_BINARY);
1015   setmode(fileno(stderr), O_BINARY);
1016 #endif
1017 
1018   install_signal_handlers(FALSE);
1019 
1020 #ifdef ENABLE_IPV6
1021   if(!use_ipv6)
1022 #endif
1023     sock = socket(AF_INET, SOCK_STREAM, 0);
1024 #ifdef ENABLE_IPV6
1025   else
1026     sock = socket(AF_INET6, SOCK_STREAM, 0);
1027 #endif
1028 
1029   if(CURL_SOCKET_BAD == sock) {
1030     error = SOCKERRNO;
1031     logmsg("Error creating socket: (%d) %s",
1032            error, strerror(error));
1033     goto mqttd_cleanup;
1034   }
1035 
1036   {
1037     /* passive daemon style */
1038     sock = sockdaemon(sock, &port);
1039     if(CURL_SOCKET_BAD == sock) {
1040       goto mqttd_cleanup;
1041     }
1042     msgsock = CURL_SOCKET_BAD; /* no stream socket yet */
1043   }
1044 
1045   logmsg("Running %s version", ipv_inuse);
1046   logmsg("Listening on port %hu", port);
1047 
1048   wrotepidfile = write_pidfile(pidname);
1049   if(!wrotepidfile) {
1050     goto mqttd_cleanup;
1051   }
1052 
1053   wroteportfile = write_portfile(portname, port);
1054   if(!wroteportfile) {
1055     goto mqttd_cleanup;
1056   }
1057 
1058   do {
1059     juggle_again = incoming(sock);
1060   } while(juggle_again);
1061 
1062 mqttd_cleanup:
1063 
1064   if((msgsock != sock) && (msgsock != CURL_SOCKET_BAD))
1065     sclose(msgsock);
1066 
1067   if(sock != CURL_SOCKET_BAD)
1068     sclose(sock);
1069 
1070   if(wrotepidfile)
1071     unlink(pidname);
1072   if(wroteportfile)
1073     unlink(portname);
1074 
1075   restore_signal_handlers(FALSE);
1076 
1077   if(got_exit_signal) {
1078     logmsg("============> mqttd exits with signal (%d)", exit_signal);
1079     /*
1080      * To properly set the return status of the process we
1081      * must raise the same signal SIGINT or SIGTERM that we
1082      * caught and let the old handler take care of it.
1083      */
1084     raise(exit_signal);
1085   }
1086 
1087   logmsg("============> mqttd quits");
1088   return 0;
1089 }
1090