1 /***************************************************************************
2 * _ _ ____ _
3 * Project ___| | | | _ \| |
4 * / __| | | | |_) | |
5 * | (__| |_| | _ <| |___
6 * \___|\___/|_| \_\_____|
7 *
8 * Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
9 *
10 * This software is licensed as described in the file COPYING, which
11 * you should have received as part of this distribution. The terms
12 * are also available at https://curl.se/docs/copyright.html.
13 *
14 * You may opt to use, copy, modify, merge, publish, distribute and/or sell
15 * copies of the Software, and permit persons to whom the Software is
16 * furnished to do so, under the terms of the COPYING file.
17 *
18 * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
19 * KIND, either express or implied.
20 *
21 * SPDX-License-Identifier: curl
22 *
23 ***************************************************************************/
24 #include "server_setup.h"
25 #include <stdlib.h>
26 #include <string.h>
27 #include "util.h"
28
29 /* Function
30 *
31 * Accepts a TCP connection on a custom port (IPv4 or IPv6). Speaks MQTT.
32 *
33 * Read commands from FILE (set with --config). The commands control how to
34 * act and is reset to defaults each client TCP connect.
35 *
36 * Config file keywords:
37 *
38 * TODO
39 */
40
41 /* based on sockfilt.c */
42
43 #include <signal.h>
44 #ifdef HAVE_NETINET_IN_H
45 #include <netinet/in.h>
46 #endif
47 #ifdef HAVE_NETINET_IN6_H
48 #include <netinet/in6.h>
49 #endif
50 #ifdef HAVE_ARPA_INET_H
51 #include <arpa/inet.h>
52 #endif
53 #ifdef HAVE_NETDB_H
54 #include <netdb.h>
55 #endif
56
57 #define ENABLE_CURLX_PRINTF
58 /* make the curlx header define all printf() functions to use the curlx_*
59 versions instead */
60 #include "curlx.h" /* from the private lib dir */
61 #include "getpart.h"
62 #include "inet_pton.h"
63 #include "server_sockaddr.h"
64 #include "warnless.h"
65
66 /* include memdebug.h last */
67 #include "memdebug.h"
68
69 #ifdef USE_WINSOCK
70 #undef EINTR
71 #define EINTR 4 /* errno.h value */
72 #undef EAGAIN
73 #define EAGAIN 11 /* errno.h value */
74 #undef ENOMEM
75 #define ENOMEM 12 /* errno.h value */
76 #undef EINVAL
77 #define EINVAL 22 /* errno.h value */
78 #endif
79
80 #define DEFAULT_PORT 1883 /* MQTT default port */
81
82 #ifndef DEFAULT_LOGFILE
83 #define DEFAULT_LOGFILE "log/mqttd.log"
84 #endif
85
86 #ifndef DEFAULT_CONFIG
87 #define DEFAULT_CONFIG "mqttd.config"
88 #endif
89
90 #define MQTT_MSG_CONNECT 0x10
91 #define MQTT_MSG_CONNACK 0x20
92 #define MQTT_MSG_PUBLISH 0x30
93 #define MQTT_MSG_PUBACK 0x40
94 #define MQTT_MSG_SUBSCRIBE 0x82
95 #define MQTT_MSG_SUBACK 0x90
96 #define MQTT_MSG_DISCONNECT 0xe0
97
98 #define MQTT_CONNACK_LEN 4
99 #define MQTT_SUBACK_LEN 5
100 #define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
101 #define MQTT_HEADER_LEN 5 /* max 5 bytes */
102
103 struct configurable {
104 unsigned char version; /* initial version byte in the request must match
105 this */
106 bool publish_before_suback;
107 bool short_publish;
108 bool excessive_remaining;
109 unsigned char error_connack;
110 int testnum;
111 };
112
113 #define REQUEST_DUMP "server.input"
114 #define CONFIG_VERSION 5
115
116 static struct configurable config;
117
118 const char *serverlogfile = DEFAULT_LOGFILE;
119 static const char *configfile = DEFAULT_CONFIG;
120 static const char *logdir = "log";
121 static char loglockfile[256];
122
123 #ifdef ENABLE_IPV6
124 static bool use_ipv6 = FALSE;
125 #endif
126 static const char *ipv_inuse = "IPv4";
127 static unsigned short port = DEFAULT_PORT;
128
resetdefaults(void)129 static void resetdefaults(void)
130 {
131 logmsg("Reset to defaults");
132 config.version = CONFIG_VERSION;
133 config.publish_before_suback = FALSE;
134 config.short_publish = FALSE;
135 config.excessive_remaining = FALSE;
136 config.error_connack = 0;
137 config.testnum = 0;
138 }
139
byteval(char * value)140 static unsigned char byteval(char *value)
141 {
142 unsigned long num = strtoul(value, NULL, 10);
143 return num & 0xff;
144 }
145
getconfig(void)146 static void getconfig(void)
147 {
148 FILE *fp = fopen(configfile, FOPEN_READTEXT);
149 resetdefaults();
150 if(fp) {
151 char buffer[512];
152 logmsg("parse config file");
153 while(fgets(buffer, sizeof(buffer), fp)) {
154 char key[32];
155 char value[32];
156 if(2 == sscanf(buffer, "%31s %31s", key, value)) {
157 if(!strcmp(key, "version")) {
158 config.version = byteval(value);
159 logmsg("version [%d] set", config.version);
160 }
161 else if(!strcmp(key, "PUBLISH-before-SUBACK")) {
162 logmsg("PUBLISH-before-SUBACK set");
163 config.publish_before_suback = TRUE;
164 }
165 else if(!strcmp(key, "short-PUBLISH")) {
166 logmsg("short-PUBLISH set");
167 config.short_publish = TRUE;
168 }
169 else if(!strcmp(key, "error-CONNACK")) {
170 config.error_connack = byteval(value);
171 logmsg("error-CONNACK = %d", config.error_connack);
172 }
173 else if(!strcmp(key, "Testnum")) {
174 config.testnum = atoi(value);
175 logmsg("testnum = %d", config.testnum);
176 }
177 else if(!strcmp(key, "excessive-remaining")) {
178 logmsg("excessive-remaining set");
179 config.excessive_remaining = TRUE;
180 }
181 }
182 }
183 fclose(fp);
184 }
185 else {
186 logmsg("No config file '%s' to read", configfile);
187 }
188 }
189
loghex(unsigned char * buffer,ssize_t len)190 static void loghex(unsigned char *buffer, ssize_t len)
191 {
192 char data[12000];
193 ssize_t i;
194 unsigned char *ptr = buffer;
195 char *optr = data;
196 ssize_t width = 0;
197 int left = sizeof(data);
198
199 for(i = 0; i<len && (left >= 0); i++) {
200 msnprintf(optr, left, "%02x", ptr[i]);
201 width += 2;
202 optr += 2;
203 left -= 2;
204 }
205 if(width)
206 logmsg("'%s'", data);
207 }
208
209 typedef enum {
210 FROM_CLIENT,
211 FROM_SERVER
212 } mqttdir;
213
logprotocol(mqttdir dir,const char * prefix,size_t remlen,FILE * output,unsigned char * buffer,ssize_t len)214 static void logprotocol(mqttdir dir,
215 const char *prefix, size_t remlen,
216 FILE *output,
217 unsigned char *buffer, ssize_t len)
218 {
219 char data[12000] = "";
220 ssize_t i;
221 unsigned char *ptr = buffer;
222 char *optr = data;
223 int left = sizeof(data);
224
225 for(i = 0; i<len && (left >= 0); i++) {
226 msnprintf(optr, left, "%02x", ptr[i]);
227 optr += 2;
228 left -= 2;
229 }
230 fprintf(output, "%s %s %zx %s\n",
231 dir == FROM_CLIENT? "client": "server",
232 prefix, remlen,
233 data);
234 }
235
236
237 /* return 0 on success */
connack(FILE * dump,curl_socket_t fd)238 static int connack(FILE *dump, curl_socket_t fd)
239 {
240 unsigned char packet[]={
241 MQTT_MSG_CONNACK, 0x02,
242 0x00, 0x00
243 };
244 ssize_t rc;
245
246 packet[3] = config.error_connack;
247
248 rc = swrite(fd, (char *)packet, sizeof(packet));
249 if(rc > 0) {
250 logmsg("WROTE %d bytes [CONNACK]", rc);
251 loghex(packet, rc);
252 logprotocol(FROM_SERVER, "CONNACK", 2, dump, packet, sizeof(packet));
253 }
254 if(rc == sizeof(packet)) {
255 return 0;
256 }
257 return 1;
258 }
259
260 /* return 0 on success */
suback(FILE * dump,curl_socket_t fd,unsigned short packetid)261 static int suback(FILE *dump, curl_socket_t fd, unsigned short packetid)
262 {
263 unsigned char packet[]={
264 MQTT_MSG_SUBACK, 0x03,
265 0, 0, /* filled in below */
266 0x00
267 };
268 ssize_t rc;
269 packet[2] = (unsigned char)(packetid >> 8);
270 packet[3] = (unsigned char)(packetid & 0xff);
271
272 rc = swrite(fd, (char *)packet, sizeof(packet));
273 if(rc == sizeof(packet)) {
274 logmsg("WROTE %d bytes [SUBACK]", rc);
275 loghex(packet, rc);
276 logprotocol(FROM_SERVER, "SUBACK", 3, dump, packet, rc);
277 return 0;
278 }
279 return 1;
280 }
281
282 #ifdef QOS
283 /* return 0 on success */
puback(FILE * dump,curl_socket_t fd,unsigned short packetid)284 static int puback(FILE *dump, curl_socket_t fd, unsigned short packetid)
285 {
286 unsigned char packet[]={
287 MQTT_MSG_PUBACK, 0x00,
288 0, 0 /* filled in below */
289 };
290 ssize_t rc;
291 packet[2] = (unsigned char)(packetid >> 8);
292 packet[3] = (unsigned char)(packetid & 0xff);
293
294 rc = swrite(fd, (char *)packet, sizeof(packet));
295 if(rc == sizeof(packet)) {
296 logmsg("WROTE %d bytes [PUBACK]", rc);
297 loghex(packet, rc);
298 logprotocol(FROM_SERVER, dump, packet, rc);
299 return 0;
300 }
301 logmsg("Failed sending [PUBACK]");
302 return 1;
303 }
304 #endif
305
306 /* return 0 on success */
disconnect(FILE * dump,curl_socket_t fd)307 static int disconnect(FILE *dump, curl_socket_t fd)
308 {
309 unsigned char packet[]={
310 MQTT_MSG_DISCONNECT, 0x00,
311 };
312 ssize_t rc = swrite(fd, (char *)packet, sizeof(packet));
313 if(rc == sizeof(packet)) {
314 logmsg("WROTE %d bytes [DISCONNECT]", rc);
315 loghex(packet, rc);
316 logprotocol(FROM_SERVER, "DISCONNECT", 0, dump, packet, rc);
317 return 0;
318 }
319 logmsg("Failed sending [DISCONNECT]");
320 return 1;
321 }
322
323
324
325 /*
326 do
327
328 encodedByte = X MOD 128
329
330 X = X DIV 128
331
332 // if there are more data to encode, set the top bit of this byte
333
334 if ( X > 0 )
335
336 encodedByte = encodedByte OR 128
337
338 endif
339
340 'output' encodedByte
341
342 while ( X > 0 )
343
344 */
345
346 /* return number of bytes used */
encode_length(size_t packetlen,unsigned char * remlength)347 static int encode_length(size_t packetlen,
348 unsigned char *remlength) /* 4 bytes */
349 {
350 int bytes = 0;
351 unsigned char encode;
352
353 do {
354 encode = packetlen % 0x80;
355 packetlen /= 0x80;
356 if(packetlen)
357 encode |= 0x80;
358
359 remlength[bytes++] = encode;
360
361 if(bytes > 3) {
362 logmsg("too large packet!");
363 return 0;
364 }
365 } while(packetlen);
366
367 return bytes;
368 }
369
370
decode_length(unsigned char * buf,size_t buflen,size_t * lenbytes)371 static size_t decode_length(unsigned char *buf,
372 size_t buflen, size_t *lenbytes)
373 {
374 size_t len = 0;
375 size_t mult = 1;
376 size_t i;
377 unsigned char encoded = 0x80;
378
379 for(i = 0; (i < buflen) && (encoded & 0x80); i++) {
380 encoded = buf[i];
381 len += (encoded & 0x7f) * mult;
382 mult *= 0x80;
383 }
384
385 if(lenbytes)
386 *lenbytes = i;
387
388 return len;
389 }
390
391
392 /* return 0 on success */
publish(FILE * dump,curl_socket_t fd,unsigned short packetid,char * topic,char * payload,size_t payloadlen)393 static int publish(FILE *dump,
394 curl_socket_t fd, unsigned short packetid,
395 char *topic, char *payload, size_t payloadlen)
396 {
397 size_t topiclen = strlen(topic);
398 unsigned char *packet;
399 size_t payloadindex;
400 ssize_t remaininglength = topiclen + 2 + payloadlen;
401 ssize_t packetlen;
402 ssize_t sendamount;
403 ssize_t rc;
404 unsigned char rembuffer[4];
405 int encodedlen;
406
407 if(config.excessive_remaining) {
408 /* manually set illegal remaining length */
409 rembuffer[0] = 0xff;
410 rembuffer[1] = 0xff;
411 rembuffer[2] = 0xff;
412 rembuffer[3] = 0x80; /* maximum allowed here by spec is 0x7f */
413 encodedlen = 4;
414 }
415 else
416 encodedlen = encode_length(remaininglength, rembuffer);
417
418 /* one packet type byte (possibly two more for packetid) */
419 packetlen = remaininglength + encodedlen + 1;
420 packet = malloc(packetlen);
421 if(!packet)
422 return 1;
423
424 packet[0] = MQTT_MSG_PUBLISH; /* TODO: set QoS? */
425 memcpy(&packet[1], rembuffer, encodedlen);
426
427 (void)packetid;
428 /* packet_id if QoS is set */
429
430 packet[1 + encodedlen] = (unsigned char)(topiclen >> 8);
431 packet[2 + encodedlen] = (unsigned char)(topiclen & 0xff);
432 memcpy(&packet[3 + encodedlen], topic, topiclen);
433
434 payloadindex = 3 + topiclen + encodedlen;
435 memcpy(&packet[payloadindex], payload, payloadlen);
436
437 sendamount = packetlen;
438 if(config.short_publish)
439 sendamount -= 2;
440
441 rc = swrite(fd, (char *)packet, sendamount);
442 if(rc > 0) {
443 logmsg("WROTE %d bytes [PUBLISH]", rc);
444 loghex(packet, rc);
445 logprotocol(FROM_SERVER, "PUBLISH", remaininglength, dump, packet, rc);
446 }
447 if(rc == packetlen)
448 return 0;
449 return 1;
450 }
451
452 #define MAX_TOPIC_LENGTH 65535
453 #define MAX_CLIENT_ID_LENGTH 32
454
455 static char topic[MAX_TOPIC_LENGTH + 1];
456
fixedheader(curl_socket_t fd,unsigned char * bytep,size_t * remaining_lengthp,size_t * remaining_length_bytesp)457 static int fixedheader(curl_socket_t fd,
458 unsigned char *bytep,
459 size_t *remaining_lengthp,
460 size_t *remaining_length_bytesp)
461 {
462 /* get the fixed header */
463 unsigned char buffer[10];
464
465 /* get the first two bytes */
466 ssize_t rc = sread(fd, (char *)buffer, 2);
467 int i;
468 if(rc < 2) {
469 logmsg("READ %d bytes [SHORT!]", rc);
470 return 1; /* fail */
471 }
472 logmsg("READ %d bytes", rc);
473 loghex(buffer, rc);
474 *bytep = buffer[0];
475
476 /* if the length byte has the top bit set, get the next one too */
477 i = 1;
478 while(buffer[i] & 0x80) {
479 i++;
480 rc = sread(fd, (char *)&buffer[i], 1);
481 if(rc != 1) {
482 logmsg("Remaining Length broken");
483 return 1;
484 }
485 }
486 *remaining_lengthp = decode_length(&buffer[1], i, remaining_length_bytesp);
487 logmsg("Remaining Length: %ld [%d bytes]", (long) *remaining_lengthp,
488 *remaining_length_bytesp);
489 return 0;
490 }
491
mqttit(curl_socket_t fd)492 static curl_socket_t mqttit(curl_socket_t fd)
493 {
494 size_t buff_size = 10*1024;
495 unsigned char *buffer = NULL;
496 ssize_t rc;
497 unsigned char byte;
498 unsigned short packet_id;
499 size_t payload_len;
500 size_t client_id_length;
501 unsigned int topic_len;
502 size_t remaining_length = 0;
503 size_t bytes = 0; /* remaining length field size in bytes */
504 char client_id[MAX_CLIENT_ID_LENGTH];
505 long testno;
506 FILE *stream = NULL;
507 FILE *dump;
508 char dumpfile[256];
509
510 static const char protocol[7] = {
511 0x00, 0x04, /* protocol length */
512 'M','Q','T','T', /* protocol name */
513 0x04 /* protocol level */
514 };
515 msnprintf(dumpfile, sizeof(dumpfile), "%s/%s", logdir, REQUEST_DUMP);
516 dump = fopen(dumpfile, "ab");
517 if(!dump)
518 goto end;
519
520 getconfig();
521
522 testno = config.testnum;
523
524 if(testno)
525 logmsg("Found test number %ld", testno);
526
527 buffer = malloc(buff_size);
528 if(!buffer) {
529 logmsg("Out of memory, unable to allocate buffer");
530 goto end;
531 }
532
533 do {
534 unsigned char usr_flag = 0x80;
535 unsigned char passwd_flag = 0x40;
536 unsigned char conn_flags;
537 const size_t client_id_offset = 12;
538 size_t start_usr;
539 size_t start_passwd;
540
541 /* get the fixed header */
542 rc = fixedheader(fd, &byte, &remaining_length, &bytes);
543 if(rc)
544 break;
545
546 if(remaining_length >= buff_size) {
547 buff_size = remaining_length;
548 buffer = realloc(buffer, buff_size);
549 if(!buffer) {
550 logmsg("Failed realloc of size %lu", buff_size);
551 goto end;
552 }
553 }
554
555 if(remaining_length) {
556 /* reading variable header and payload into buffer */
557 rc = sread(fd, (char *)buffer, remaining_length);
558 if(rc > 0) {
559 logmsg("READ %d bytes", rc);
560 loghex(buffer, rc);
561 }
562 }
563
564 if(byte == MQTT_MSG_CONNECT) {
565 logprotocol(FROM_CLIENT, "CONNECT", remaining_length,
566 dump, buffer, rc);
567
568 if(memcmp(protocol, buffer, sizeof(protocol))) {
569 logmsg("Protocol preamble mismatch");
570 goto end;
571 }
572 /* ignore the connect flag byte and two keepalive bytes */
573 payload_len = (buffer[10] << 8) | buffer[11];
574 /* first part of the payload is the client ID */
575 client_id_length = payload_len;
576
577 /* checking if user and password flags were set */
578 conn_flags = buffer[7];
579
580 start_usr = client_id_offset + payload_len;
581 if(usr_flag == (unsigned char)(conn_flags & usr_flag)) {
582 logmsg("User flag is present in CONN flag");
583 payload_len += (buffer[start_usr] << 8) | buffer[start_usr + 1];
584 payload_len += 2; /* MSB and LSB for user length */
585 }
586
587 start_passwd = client_id_offset + payload_len;
588 if(passwd_flag == (char)(conn_flags & passwd_flag)) {
589 logmsg("Password flag is present in CONN flags");
590 payload_len += (buffer[start_passwd] << 8) | buffer[start_passwd + 1];
591 payload_len += 2; /* MSB and LSB for password length */
592 }
593
594 /* check the length of the payload */
595 if((ssize_t)payload_len != (rc - 12)) {
596 logmsg("Payload length mismatch, expected %x got %x",
597 rc - 12, payload_len);
598 goto end;
599 }
600 /* check the length of the client ID */
601 else if((client_id_length + 1) > MAX_CLIENT_ID_LENGTH) {
602 logmsg("Too large client id");
603 goto end;
604 }
605 memcpy(client_id, &buffer[12], client_id_length);
606 client_id[client_id_length] = 0;
607
608 logmsg("MQTT client connect accepted: %s", client_id);
609
610 /* The first packet sent from the Server to the Client MUST be a
611 CONNACK Packet */
612
613 if(connack(dump, fd)) {
614 logmsg("failed sending CONNACK");
615 goto end;
616 }
617 }
618 else if(byte == MQTT_MSG_SUBSCRIBE) {
619 int error;
620 char *data;
621 size_t datalen;
622 logprotocol(FROM_CLIENT, "SUBSCRIBE", remaining_length,
623 dump, buffer, rc);
624 logmsg("Incoming SUBSCRIBE");
625
626 if(rc < 6) {
627 logmsg("Too small SUBSCRIBE");
628 goto end;
629 }
630
631 /* two bytes packet id */
632 packet_id = (unsigned short)((buffer[0] << 8) | buffer[1]);
633
634 /* two bytes topic length */
635 topic_len = (buffer[2] << 8) | buffer[3];
636 if(topic_len != (remaining_length - 5)) {
637 logmsg("Wrong topic length, got %d expected %d",
638 topic_len, remaining_length - 5);
639 goto end;
640 }
641 memcpy(topic, &buffer[4], topic_len);
642 topic[topic_len] = 0;
643
644 /* there's a QoS byte (two bits) after the topic */
645
646 logmsg("SUBSCRIBE to '%s' [%d]", topic, packet_id);
647 stream = test2fopen(testno, logdir);
648 error = getpart(&data, &datalen, "reply", "data", stream);
649 if(!error) {
650 if(!config.publish_before_suback) {
651 if(suback(dump, fd, packet_id)) {
652 logmsg("failed sending SUBACK");
653 goto end;
654 }
655 }
656 if(publish(dump, fd, packet_id, topic, data, datalen)) {
657 logmsg("PUBLISH failed");
658 goto end;
659 }
660 if(config.publish_before_suback) {
661 if(suback(dump, fd, packet_id)) {
662 logmsg("failed sending SUBACK");
663 goto end;
664 }
665 }
666 }
667 else {
668 char *def = (char *)"this is random payload yes yes it is";
669 publish(dump, fd, packet_id, topic, def, strlen(def));
670 }
671 disconnect(dump, fd);
672 }
673 else if((byte & 0xf0) == (MQTT_MSG_PUBLISH & 0xf0)) {
674 size_t topiclen;
675
676 logmsg("Incoming PUBLISH");
677 logprotocol(FROM_CLIENT, "PUBLISH", remaining_length,
678 dump, buffer, rc);
679
680 topiclen = (buffer[1 + bytes] << 8) | buffer[2 + bytes];
681 logmsg("Got %d bytes topic", topiclen);
682 /* TODO: verify topiclen */
683
684 #ifdef QOS
685 /* TODO: handle packetid if there is one. Send puback if QoS > 0 */
686 puback(dump, fd, 0);
687 #endif
688 /* expect a disconnect here */
689 /* get the request */
690 rc = sread(fd, (char *)&buffer[0], 2);
691
692 logmsg("READ %d bytes [DISCONNECT]", rc);
693 loghex(buffer, rc);
694 logprotocol(FROM_CLIENT, "DISCONNECT", 0, dump, buffer, rc);
695 goto end;
696 }
697 else {
698 /* not supported (yet) */
699 goto end;
700 }
701 } while(1);
702
703 end:
704 if(buffer)
705 free(buffer);
706 if(dump)
707 fclose(dump);
708 if(stream)
709 fclose(stream);
710 return CURL_SOCKET_BAD;
711 }
712
713 /*
714 sockfdp is a pointer to an established stream or CURL_SOCKET_BAD
715
716 if sockfd is CURL_SOCKET_BAD, listendfd is a listening socket we must
717 accept()
718 */
incoming(curl_socket_t listenfd)719 static bool incoming(curl_socket_t listenfd)
720 {
721 fd_set fds_read;
722 fd_set fds_write;
723 fd_set fds_err;
724 int clients = 0; /* connected clients */
725
726 if(got_exit_signal) {
727 logmsg("signalled to die, exiting...");
728 return FALSE;
729 }
730
731 #ifdef HAVE_GETPPID
732 /* As a last resort, quit if socks5 process becomes orphan. */
733 if(getppid() <= 1) {
734 logmsg("process becomes orphan, exiting");
735 return FALSE;
736 }
737 #endif
738
739 do {
740 ssize_t rc;
741 int error = 0;
742 curl_socket_t sockfd = listenfd;
743 int maxfd = (int)sockfd;
744
745 FD_ZERO(&fds_read);
746 FD_ZERO(&fds_write);
747 FD_ZERO(&fds_err);
748
749 /* there's always a socket to wait for */
750 FD_SET(sockfd, &fds_read);
751
752 do {
753 /* select() blocking behavior call on blocking descriptors please */
754 rc = select(maxfd + 1, &fds_read, &fds_write, &fds_err, NULL);
755 if(got_exit_signal) {
756 logmsg("signalled to die, exiting...");
757 return FALSE;
758 }
759 } while((rc == -1) && ((error = SOCKERRNO) == EINTR));
760
761 if(rc < 0) {
762 logmsg("select() failed with error: (%d) %s",
763 error, strerror(error));
764 return FALSE;
765 }
766
767 if(FD_ISSET(sockfd, &fds_read)) {
768 curl_socket_t newfd = accept(sockfd, NULL, NULL);
769 if(CURL_SOCKET_BAD == newfd) {
770 error = SOCKERRNO;
771 logmsg("accept(%d, NULL, NULL) failed with error: (%d) %s",
772 sockfd, error, sstrerror(error));
773 }
774 else {
775 logmsg("====> Client connect, fd %d. Read config from %s",
776 newfd, configfile);
777 set_advisor_read_lock(loglockfile);
778 (void)mqttit(newfd); /* until done */
779 clear_advisor_read_lock(loglockfile);
780
781 logmsg("====> Client disconnect");
782 sclose(newfd);
783 }
784 }
785 } while(clients);
786
787 return TRUE;
788 }
789
sockdaemon(curl_socket_t sock,unsigned short * listenport)790 static curl_socket_t sockdaemon(curl_socket_t sock,
791 unsigned short *listenport)
792 {
793 /* passive daemon style */
794 srvr_sockaddr_union_t listener;
795 int flag;
796 int rc;
797 int totdelay = 0;
798 int maxretr = 10;
799 int delay = 20;
800 int attempt = 0;
801 int error = 0;
802
803 do {
804 attempt++;
805 flag = 1;
806 rc = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
807 (void *)&flag, sizeof(flag));
808 if(rc) {
809 error = SOCKERRNO;
810 logmsg("setsockopt(SO_REUSEADDR) failed with error: (%d) %s",
811 error, sstrerror(error));
812 if(maxretr) {
813 rc = wait_ms(delay);
814 if(rc) {
815 /* should not happen */
816 logmsg("wait_ms() failed with error: %d", rc);
817 sclose(sock);
818 return CURL_SOCKET_BAD;
819 }
820 if(got_exit_signal) {
821 logmsg("signalled to die, exiting...");
822 sclose(sock);
823 return CURL_SOCKET_BAD;
824 }
825 totdelay += delay;
826 delay *= 2; /* double the sleep for next attempt */
827 }
828 }
829 } while(rc && maxretr--);
830
831 if(rc) {
832 logmsg("setsockopt(SO_REUSEADDR) failed %d times in %d ms. Error: (%d) %s",
833 attempt, totdelay, error, strerror(error));
834 logmsg("Continuing anyway...");
835 }
836
837 /* When the specified listener port is zero, it is actually a
838 request to let the system choose a non-zero available port. */
839
840 #ifdef ENABLE_IPV6
841 if(!use_ipv6) {
842 #endif
843 memset(&listener.sa4, 0, sizeof(listener.sa4));
844 listener.sa4.sin_family = AF_INET;
845 listener.sa4.sin_addr.s_addr = INADDR_ANY;
846 listener.sa4.sin_port = htons(*listenport);
847 rc = bind(sock, &listener.sa, sizeof(listener.sa4));
848 #ifdef ENABLE_IPV6
849 }
850 else {
851 memset(&listener.sa6, 0, sizeof(listener.sa6));
852 listener.sa6.sin6_family = AF_INET6;
853 listener.sa6.sin6_addr = in6addr_any;
854 listener.sa6.sin6_port = htons(*listenport);
855 rc = bind(sock, &listener.sa, sizeof(listener.sa6));
856 }
857 #endif /* ENABLE_IPV6 */
858 if(rc) {
859 error = SOCKERRNO;
860 logmsg("Error binding socket on port %hu: (%d) %s",
861 *listenport, error, sstrerror(error));
862 sclose(sock);
863 return CURL_SOCKET_BAD;
864 }
865
866 if(!*listenport) {
867 /* The system was supposed to choose a port number, figure out which
868 port we actually got and update the listener port value with it. */
869 curl_socklen_t la_size;
870 srvr_sockaddr_union_t localaddr;
871 #ifdef ENABLE_IPV6
872 if(!use_ipv6)
873 #endif
874 la_size = sizeof(localaddr.sa4);
875 #ifdef ENABLE_IPV6
876 else
877 la_size = sizeof(localaddr.sa6);
878 #endif
879 memset(&localaddr.sa, 0, (size_t)la_size);
880 if(getsockname(sock, &localaddr.sa, &la_size) < 0) {
881 error = SOCKERRNO;
882 logmsg("getsockname() failed with error: (%d) %s",
883 error, sstrerror(error));
884 sclose(sock);
885 return CURL_SOCKET_BAD;
886 }
887 switch(localaddr.sa.sa_family) {
888 case AF_INET:
889 *listenport = ntohs(localaddr.sa4.sin_port);
890 break;
891 #ifdef ENABLE_IPV6
892 case AF_INET6:
893 *listenport = ntohs(localaddr.sa6.sin6_port);
894 break;
895 #endif
896 default:
897 break;
898 }
899 if(!*listenport) {
900 /* Real failure, listener port shall not be zero beyond this point. */
901 logmsg("Apparently getsockname() succeeded, with listener port zero.");
902 logmsg("A valid reason for this failure is a binary built without");
903 logmsg("proper network library linkage. This might not be the only");
904 logmsg("reason, but double check it before anything else.");
905 sclose(sock);
906 return CURL_SOCKET_BAD;
907 }
908 }
909
910 /* start accepting connections */
911 rc = listen(sock, 5);
912 if(0 != rc) {
913 error = SOCKERRNO;
914 logmsg("listen(%d, 5) failed with error: (%d) %s",
915 sock, error, sstrerror(error));
916 sclose(sock);
917 return CURL_SOCKET_BAD;
918 }
919
920 return sock;
921 }
922
923
main(int argc,char * argv[])924 int main(int argc, char *argv[])
925 {
926 curl_socket_t sock = CURL_SOCKET_BAD;
927 curl_socket_t msgsock = CURL_SOCKET_BAD;
928 int wrotepidfile = 0;
929 int wroteportfile = 0;
930 const char *pidname = ".mqttd.pid";
931 const char *portname = ".mqttd.port";
932 bool juggle_again;
933 int error;
934 int arg = 1;
935
936 while(argc>arg) {
937 if(!strcmp("--version", argv[arg])) {
938 printf("mqttd IPv4%s\n",
939 #ifdef ENABLE_IPV6
940 "/IPv6"
941 #else
942 ""
943 #endif
944 );
945 return 0;
946 }
947 else if(!strcmp("--pidfile", argv[arg])) {
948 arg++;
949 if(argc>arg)
950 pidname = argv[arg++];
951 }
952 else if(!strcmp("--portfile", argv[arg])) {
953 arg++;
954 if(argc>arg)
955 portname = argv[arg++];
956 }
957 else if(!strcmp("--config", argv[arg])) {
958 arg++;
959 if(argc>arg)
960 configfile = argv[arg++];
961 }
962 else if(!strcmp("--logfile", argv[arg])) {
963 arg++;
964 if(argc>arg)
965 serverlogfile = argv[arg++];
966 }
967 else if(!strcmp("--logdir", argv[arg])) {
968 arg++;
969 if(argc>arg)
970 logdir = argv[arg++];
971 }
972 else if(!strcmp("--ipv6", argv[arg])) {
973 #ifdef ENABLE_IPV6
974 ipv_inuse = "IPv6";
975 use_ipv6 = TRUE;
976 #endif
977 arg++;
978 }
979 else if(!strcmp("--ipv4", argv[arg])) {
980 /* for completeness, we support this option as well */
981 #ifdef ENABLE_IPV6
982 ipv_inuse = "IPv4";
983 use_ipv6 = FALSE;
984 #endif
985 arg++;
986 }
987 else if(!strcmp("--port", argv[arg])) {
988 arg++;
989 if(argc>arg) {
990 char *endptr;
991 unsigned long ulnum = strtoul(argv[arg], &endptr, 10);
992 if((endptr != argv[arg] + strlen(argv[arg])) ||
993 ((ulnum != 0UL) && ((ulnum < 1025UL) || (ulnum > 65535UL)))) {
994 fprintf(stderr, "mqttd: invalid --port argument (%s)\n",
995 argv[arg]);
996 return 0;
997 }
998 port = curlx_ultous(ulnum);
999 arg++;
1000 }
1001 }
1002 else {
1003 puts("Usage: mqttd [option]\n"
1004 " --config [file]\n"
1005 " --version\n"
1006 " --logfile [file]\n"
1007 " --logdir [directory]\n"
1008 " --pidfile [file]\n"
1009 " --portfile [file]\n"
1010 " --ipv4\n"
1011 " --ipv6\n"
1012 " --port [port]\n");
1013 return 0;
1014 }
1015 }
1016
1017 msnprintf(loglockfile, sizeof(loglockfile), "%s/%s/mqtt-%s.lock",
1018 logdir, SERVERLOGS_LOCKDIR, ipv_inuse);
1019
1020 #ifdef WIN32
1021 win32_init();
1022 atexit(win32_cleanup);
1023
1024 setmode(fileno(stdin), O_BINARY);
1025 setmode(fileno(stdout), O_BINARY);
1026 setmode(fileno(stderr), O_BINARY);
1027 #endif
1028
1029 install_signal_handlers(FALSE);
1030
1031 #ifdef ENABLE_IPV6
1032 if(!use_ipv6)
1033 #endif
1034 sock = socket(AF_INET, SOCK_STREAM, 0);
1035 #ifdef ENABLE_IPV6
1036 else
1037 sock = socket(AF_INET6, SOCK_STREAM, 0);
1038 #endif
1039
1040 if(CURL_SOCKET_BAD == sock) {
1041 error = SOCKERRNO;
1042 logmsg("Error creating socket: (%d) %s", error, sstrerror(error));
1043 goto mqttd_cleanup;
1044 }
1045
1046 {
1047 /* passive daemon style */
1048 sock = sockdaemon(sock, &port);
1049 if(CURL_SOCKET_BAD == sock) {
1050 goto mqttd_cleanup;
1051 }
1052 msgsock = CURL_SOCKET_BAD; /* no stream socket yet */
1053 }
1054
1055 logmsg("Running %s version", ipv_inuse);
1056 logmsg("Listening on port %hu", port);
1057
1058 wrotepidfile = write_pidfile(pidname);
1059 if(!wrotepidfile) {
1060 goto mqttd_cleanup;
1061 }
1062
1063 wroteportfile = write_portfile(portname, port);
1064 if(!wroteportfile) {
1065 goto mqttd_cleanup;
1066 }
1067
1068 do {
1069 juggle_again = incoming(sock);
1070 } while(juggle_again);
1071
1072 mqttd_cleanup:
1073
1074 if((msgsock != sock) && (msgsock != CURL_SOCKET_BAD))
1075 sclose(msgsock);
1076
1077 if(sock != CURL_SOCKET_BAD)
1078 sclose(sock);
1079
1080 if(wrotepidfile)
1081 unlink(pidname);
1082 if(wroteportfile)
1083 unlink(portname);
1084
1085 restore_signal_handlers(FALSE);
1086
1087 if(got_exit_signal) {
1088 logmsg("============> mqttd exits with signal (%d)", exit_signal);
1089 /*
1090 * To properly set the return status of the process we
1091 * must raise the same signal SIGINT or SIGTERM that we
1092 * caught and let the old handler take care of it.
1093 */
1094 raise(exit_signal);
1095 }
1096
1097 logmsg("============> mqttd quits");
1098 return 0;
1099 }
1100