1 /***************************************************************************
2 * _ _ ____ _
3 * Project ___| | | | _ \| |
4 * / __| | | | |_) | |
5 * | (__| |_| | _ <| |___
6 * \___|\___/|_| \_\_____|
7 *
8 * Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
9 *
10 * This software is licensed as described in the file COPYING, which
11 * you should have received as part of this distribution. The terms
12 * are also available at https://curl.se/docs/copyright.html.
13 *
14 * You may opt to use, copy, modify, merge, publish, distribute and/or sell
15 * copies of the Software, and permit persons to whom the Software is
16 * furnished to do so, under the terms of the COPYING file.
17 *
18 * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
19 * KIND, either express or implied.
20 *
21 * SPDX-License-Identifier: curl
22 *
23 ***************************************************************************/
24 #include "server_setup.h"
25 #include <stdlib.h>
26 #include <string.h>
27 #include "util.h"
28
29 /* Function
30 *
31 * Accepts a TCP connection on a custom port (IPv4 or IPv6). Speaks MQTT.
32 *
33 * Read commands from FILE (set with --config). The commands control how to
34 * act and is reset to defaults each client TCP connect.
35 *
36 * Config file keywords:
37 *
38 * TODO
39 */
40
41 /* based on sockfilt.c */
42
43 #ifdef HAVE_SIGNAL_H
44 #include <signal.h>
45 #endif
46 #ifdef HAVE_NETINET_IN_H
47 #include <netinet/in.h>
48 #endif
49 #ifdef HAVE_NETINET_IN6_H
50 #include <netinet/in6.h>
51 #endif
52 #ifdef HAVE_ARPA_INET_H
53 #include <arpa/inet.h>
54 #endif
55 #ifdef HAVE_NETDB_H
56 #include <netdb.h>
57 #endif
58
59 #define ENABLE_CURLX_PRINTF
60 /* make the curlx header define all printf() functions to use the curlx_*
61 versions instead */
62 #include "curlx.h" /* from the private lib dir */
63 #include "getpart.h"
64 #include "inet_pton.h"
65 #include "server_sockaddr.h"
66 #include "warnless.h"
67
68 /* include memdebug.h last */
69 #include "memdebug.h"
70
71 #ifdef USE_WINSOCK
72 #undef EINTR
73 #define EINTR 4 /* errno.h value */
74 #undef EAGAIN
75 #define EAGAIN 11 /* errno.h value */
76 #undef ENOMEM
77 #define ENOMEM 12 /* errno.h value */
78 #undef EINVAL
79 #define EINVAL 22 /* errno.h value */
80 #endif
81
82 #define DEFAULT_PORT 1883 /* MQTT default port */
83
84 #ifndef DEFAULT_LOGFILE
85 #define DEFAULT_LOGFILE "log/mqttd.log"
86 #endif
87
88 #ifndef DEFAULT_CONFIG
89 #define DEFAULT_CONFIG "mqttd.config"
90 #endif
91
92 #define MQTT_MSG_CONNECT 0x10
93 #define MQTT_MSG_CONNACK 0x20
94 #define MQTT_MSG_PUBLISH 0x30
95 #define MQTT_MSG_PUBACK 0x40
96 #define MQTT_MSG_SUBSCRIBE 0x82
97 #define MQTT_MSG_SUBACK 0x90
98 #define MQTT_MSG_DISCONNECT 0xe0
99
100 #define MQTT_CONNACK_LEN 4
101 #define MQTT_SUBACK_LEN 5
102 #define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
103 #define MQTT_HEADER_LEN 5 /* max 5 bytes */
104
105 struct configurable {
106 unsigned char version; /* initial version byte in the request must match
107 this */
108 bool publish_before_suback;
109 bool short_publish;
110 bool excessive_remaining;
111 unsigned char error_connack;
112 int testnum;
113 };
114
115 #define REQUEST_DUMP "log/server.input"
116 #define CONFIG_VERSION 5
117
118 static struct configurable config;
119
120 const char *serverlogfile = DEFAULT_LOGFILE;
121 static const char *configfile = DEFAULT_CONFIG;
122
123 #ifdef ENABLE_IPV6
124 static bool use_ipv6 = FALSE;
125 #endif
126 static const char *ipv_inuse = "IPv4";
127 static unsigned short port = DEFAULT_PORT;
128
resetdefaults(void)129 static void resetdefaults(void)
130 {
131 logmsg("Reset to defaults");
132 config.version = CONFIG_VERSION;
133 config.publish_before_suback = FALSE;
134 config.short_publish = FALSE;
135 config.excessive_remaining = FALSE;
136 config.error_connack = 0;
137 config.testnum = 0;
138 }
139
byteval(char * value)140 static unsigned char byteval(char *value)
141 {
142 unsigned long num = strtoul(value, NULL, 10);
143 return num & 0xff;
144 }
145
getconfig(void)146 static void getconfig(void)
147 {
148 FILE *fp = fopen(configfile, FOPEN_READTEXT);
149 resetdefaults();
150 if(fp) {
151 char buffer[512];
152 logmsg("parse config file");
153 while(fgets(buffer, sizeof(buffer), fp)) {
154 char key[32];
155 char value[32];
156 if(2 == sscanf(buffer, "%31s %31s", key, value)) {
157 if(!strcmp(key, "version")) {
158 config.version = byteval(value);
159 logmsg("version [%d] set", config.version);
160 }
161 else if(!strcmp(key, "PUBLISH-before-SUBACK")) {
162 logmsg("PUBLISH-before-SUBACK set");
163 config.publish_before_suback = TRUE;
164 }
165 else if(!strcmp(key, "short-PUBLISH")) {
166 logmsg("short-PUBLISH set");
167 config.short_publish = TRUE;
168 }
169 else if(!strcmp(key, "error-CONNACK")) {
170 config.error_connack = byteval(value);
171 logmsg("error-CONNACK = %d", config.error_connack);
172 }
173 else if(!strcmp(key, "Testnum")) {
174 config.testnum = atoi(value);
175 logmsg("testnum = %d", config.testnum);
176 }
177 else if(!strcmp(key, "excessive-remaining")) {
178 logmsg("excessive-remaining set");
179 config.excessive_remaining = TRUE;
180 }
181 }
182 }
183 fclose(fp);
184 }
185 else {
186 logmsg("No config file '%s' to read", configfile);
187 }
188 }
189
loghex(unsigned char * buffer,ssize_t len)190 static void loghex(unsigned char *buffer, ssize_t len)
191 {
192 char data[12000];
193 ssize_t i;
194 unsigned char *ptr = buffer;
195 char *optr = data;
196 ssize_t width = 0;
197 int left = sizeof(data);
198
199 for(i = 0; i<len && (left >= 0); i++) {
200 msnprintf(optr, left, "%02x", ptr[i]);
201 width += 2;
202 optr += 2;
203 left -= 2;
204 }
205 if(width)
206 logmsg("'%s'", data);
207 }
208
209 typedef enum {
210 FROM_CLIENT,
211 FROM_SERVER
212 } mqttdir;
213
logprotocol(mqttdir dir,const char * prefix,size_t remlen,FILE * output,unsigned char * buffer,ssize_t len)214 static void logprotocol(mqttdir dir,
215 const char *prefix, size_t remlen,
216 FILE *output,
217 unsigned char *buffer, ssize_t len)
218 {
219 char data[12000] = "";
220 ssize_t i;
221 unsigned char *ptr = buffer;
222 char *optr = data;
223 int left = sizeof(data);
224
225 for(i = 0; i<len && (left >= 0); i++) {
226 msnprintf(optr, left, "%02x", ptr[i]);
227 optr += 2;
228 left -= 2;
229 }
230 fprintf(output, "%s %s %zx %s\n",
231 dir == FROM_CLIENT? "client": "server",
232 prefix, remlen,
233 data);
234 }
235
236
237 /* return 0 on success */
connack(FILE * dump,curl_socket_t fd)238 static int connack(FILE *dump, curl_socket_t fd)
239 {
240 unsigned char packet[]={
241 MQTT_MSG_CONNACK, 0x02,
242 0x00, 0x00
243 };
244 ssize_t rc;
245
246 packet[3] = config.error_connack;
247
248 rc = swrite(fd, (char *)packet, sizeof(packet));
249 if(rc > 0) {
250 logmsg("WROTE %d bytes [CONNACK]", rc);
251 loghex(packet, rc);
252 logprotocol(FROM_SERVER, "CONNACK", 2, dump, packet, sizeof(packet));
253 }
254 if(rc == sizeof(packet)) {
255 return 0;
256 }
257 return 1;
258 }
259
260 /* return 0 on success */
suback(FILE * dump,curl_socket_t fd,unsigned short packetid)261 static int suback(FILE *dump, curl_socket_t fd, unsigned short packetid)
262 {
263 unsigned char packet[]={
264 MQTT_MSG_SUBACK, 0x03,
265 0, 0, /* filled in below */
266 0x00
267 };
268 ssize_t rc;
269 packet[2] = (unsigned char)(packetid >> 8);
270 packet[3] = (unsigned char)(packetid & 0xff);
271
272 rc = swrite(fd, (char *)packet, sizeof(packet));
273 if(rc == sizeof(packet)) {
274 logmsg("WROTE %d bytes [SUBACK]", rc);
275 loghex(packet, rc);
276 logprotocol(FROM_SERVER, "SUBACK", 3, dump, packet, rc);
277 return 0;
278 }
279 return 1;
280 }
281
282 #ifdef QOS
283 /* return 0 on success */
puback(FILE * dump,curl_socket_t fd,unsigned short packetid)284 static int puback(FILE *dump, curl_socket_t fd, unsigned short packetid)
285 {
286 unsigned char packet[]={
287 MQTT_MSG_PUBACK, 0x00,
288 0, 0 /* filled in below */
289 };
290 ssize_t rc;
291 packet[2] = (unsigned char)(packetid >> 8);
292 packet[3] = (unsigned char)(packetid & 0xff);
293
294 rc = swrite(fd, (char *)packet, sizeof(packet));
295 if(rc == sizeof(packet)) {
296 logmsg("WROTE %d bytes [PUBACK]", rc);
297 loghex(packet, rc);
298 logprotocol(FROM_SERVER, dump, packet, rc);
299 return 0;
300 }
301 logmsg("Failed sending [PUBACK]");
302 return 1;
303 }
304 #endif
305
306 /* return 0 on success */
disconnect(FILE * dump,curl_socket_t fd)307 static int disconnect(FILE *dump, curl_socket_t fd)
308 {
309 unsigned char packet[]={
310 MQTT_MSG_DISCONNECT, 0x00,
311 };
312 ssize_t rc = swrite(fd, (char *)packet, sizeof(packet));
313 if(rc == sizeof(packet)) {
314 logmsg("WROTE %d bytes [DISCONNECT]", rc);
315 loghex(packet, rc);
316 logprotocol(FROM_SERVER, "DISCONNECT", 0, dump, packet, rc);
317 return 0;
318 }
319 logmsg("Failed sending [DISCONNECT]");
320 return 1;
321 }
322
323
324
325 /*
326 do
327
328 encodedByte = X MOD 128
329
330 X = X DIV 128
331
332 // if there are more data to encode, set the top bit of this byte
333
334 if ( X > 0 )
335
336 encodedByte = encodedByte OR 128
337
338 endif
339
340 'output' encodedByte
341
342 while ( X > 0 )
343
344 */
345
346 /* return number of bytes used */
encode_length(size_t packetlen,unsigned char * remlength)347 static int encode_length(size_t packetlen,
348 unsigned char *remlength) /* 4 bytes */
349 {
350 int bytes = 0;
351 unsigned char encode;
352
353 do {
354 encode = packetlen % 0x80;
355 packetlen /= 0x80;
356 if(packetlen)
357 encode |= 0x80;
358
359 remlength[bytes++] = encode;
360
361 if(bytes > 3) {
362 logmsg("too large packet!");
363 return 0;
364 }
365 } while(packetlen);
366
367 return bytes;
368 }
369
370
decode_length(unsigned char * buf,size_t buflen,size_t * lenbytes)371 static size_t decode_length(unsigned char *buf,
372 size_t buflen, size_t *lenbytes)
373 {
374 size_t len = 0;
375 size_t mult = 1;
376 size_t i;
377 unsigned char encoded = 0x80;
378
379 for(i = 0; (i < buflen) && (encoded & 0x80); i++) {
380 encoded = buf[i];
381 len += (encoded & 0x7f) * mult;
382 mult *= 0x80;
383 }
384
385 if(lenbytes)
386 *lenbytes = i;
387
388 return len;
389 }
390
391
392 /* return 0 on success */
publish(FILE * dump,curl_socket_t fd,unsigned short packetid,char * topic,char * payload,size_t payloadlen)393 static int publish(FILE *dump,
394 curl_socket_t fd, unsigned short packetid,
395 char *topic, char *payload, size_t payloadlen)
396 {
397 size_t topiclen = strlen(topic);
398 unsigned char *packet;
399 size_t payloadindex;
400 ssize_t remaininglength = topiclen + 2 + payloadlen;
401 ssize_t packetlen;
402 ssize_t sendamount;
403 ssize_t rc;
404 unsigned char rembuffer[4];
405 int encodedlen;
406
407 if(config.excessive_remaining) {
408 /* manually set illegal remaining length */
409 rembuffer[0] = 0xff;
410 rembuffer[1] = 0xff;
411 rembuffer[2] = 0xff;
412 rembuffer[3] = 0x80; /* maximum allowed here by spec is 0x7f */
413 encodedlen = 4;
414 }
415 else
416 encodedlen = encode_length(remaininglength, rembuffer);
417
418 /* one packet type byte (possibly two more for packetid) */
419 packetlen = remaininglength + encodedlen + 1;
420 packet = malloc(packetlen);
421 if(!packet)
422 return 1;
423
424 packet[0] = MQTT_MSG_PUBLISH; /* TODO: set QoS? */
425 memcpy(&packet[1], rembuffer, encodedlen);
426
427 (void)packetid;
428 /* packet_id if QoS is set */
429
430 packet[1 + encodedlen] = (unsigned char)(topiclen >> 8);
431 packet[2 + encodedlen] = (unsigned char)(topiclen & 0xff);
432 memcpy(&packet[3 + encodedlen], topic, topiclen);
433
434 payloadindex = 3 + topiclen + encodedlen;
435 memcpy(&packet[payloadindex], payload, payloadlen);
436
437 sendamount = packetlen;
438 if(config.short_publish)
439 sendamount -= 2;
440
441 rc = swrite(fd, (char *)packet, sendamount);
442 if(rc > 0) {
443 logmsg("WROTE %d bytes [PUBLISH]", rc);
444 loghex(packet, rc);
445 logprotocol(FROM_SERVER, "PUBLISH", remaininglength, dump, packet, rc);
446 }
447 if(rc == packetlen)
448 return 0;
449 return 1;
450 }
451
452 #define MAX_TOPIC_LENGTH 65535
453 #define MAX_CLIENT_ID_LENGTH 32
454
455 static char topic[MAX_TOPIC_LENGTH + 1];
456
fixedheader(curl_socket_t fd,unsigned char * bytep,size_t * remaining_lengthp,size_t * remaining_length_bytesp)457 static int fixedheader(curl_socket_t fd,
458 unsigned char *bytep,
459 size_t *remaining_lengthp,
460 size_t *remaining_length_bytesp)
461 {
462 /* get the fixed header */
463 unsigned char buffer[10];
464
465 /* get the first two bytes */
466 ssize_t rc = sread(fd, (char *)buffer, 2);
467 int i;
468 if(rc < 2) {
469 logmsg("READ %d bytes [SHORT!]", rc);
470 return 1; /* fail */
471 }
472 logmsg("READ %d bytes", rc);
473 loghex(buffer, rc);
474 *bytep = buffer[0];
475
476 /* if the length byte has the top bit set, get the next one too */
477 i = 1;
478 while(buffer[i] & 0x80) {
479 i++;
480 rc = sread(fd, (char *)&buffer[i], 1);
481 if(rc != 1) {
482 logmsg("Remaining Length broken");
483 return 1;
484 }
485 }
486 *remaining_lengthp = decode_length(&buffer[1], i, remaining_length_bytesp);
487 logmsg("Remaining Length: %ld [%d bytes]", (long) *remaining_lengthp,
488 *remaining_length_bytesp);
489 return 0;
490 }
491
mqttit(curl_socket_t fd)492 static curl_socket_t mqttit(curl_socket_t fd)
493 {
494 size_t buff_size = 10*1024;
495 unsigned char *buffer = NULL;
496 ssize_t rc;
497 unsigned char byte;
498 unsigned short packet_id;
499 size_t payload_len;
500 size_t client_id_length;
501 unsigned int topic_len;
502 size_t remaining_length = 0;
503 size_t bytes = 0; /* remaining length field size in bytes */
504 char client_id[MAX_CLIENT_ID_LENGTH];
505 long testno;
506 FILE *stream = NULL;
507
508
509 static const char protocol[7] = {
510 0x00, 0x04, /* protocol length */
511 'M','Q','T','T', /* protocol name */
512 0x04 /* protocol level */
513 };
514 FILE *dump = fopen(REQUEST_DUMP, "ab");
515 if(!dump)
516 goto end;
517
518 getconfig();
519
520 testno = config.testnum;
521
522 if(testno)
523 logmsg("Found test number %ld", testno);
524
525 buffer = malloc(buff_size);
526 if(!buffer) {
527 logmsg("Out of memory, unable to allocate buffer");
528 goto end;
529 }
530
531 do {
532 unsigned char usr_flag = 0x80;
533 unsigned char passwd_flag = 0x40;
534 unsigned char conn_flags;
535 const size_t client_id_offset = 12;
536 size_t start_usr;
537 size_t start_passwd;
538
539 /* get the fixed header */
540 rc = fixedheader(fd, &byte, &remaining_length, &bytes);
541 if(rc)
542 break;
543
544 if(remaining_length >= buff_size) {
545 buff_size = remaining_length;
546 buffer = realloc(buffer, buff_size);
547 if(!buffer) {
548 logmsg("Failed realloc of size %lu", buff_size);
549 goto end;
550 }
551 }
552
553 if(remaining_length) {
554 /* reading variable header and payload into buffer */
555 rc = sread(fd, (char *)buffer, remaining_length);
556 if(rc > 0) {
557 logmsg("READ %d bytes", rc);
558 loghex(buffer, rc);
559 }
560 }
561
562 if(byte == MQTT_MSG_CONNECT) {
563 logprotocol(FROM_CLIENT, "CONNECT", remaining_length,
564 dump, buffer, rc);
565
566 if(memcmp(protocol, buffer, sizeof(protocol))) {
567 logmsg("Protocol preamble mismatch");
568 goto end;
569 }
570 /* ignore the connect flag byte and two keepalive bytes */
571 payload_len = (buffer[10] << 8) | buffer[11];
572 /* first part of the payload is the client ID */
573 client_id_length = payload_len;
574
575 /* checking if user and password flags were set */
576 conn_flags = buffer[7];
577
578 start_usr = client_id_offset + payload_len;
579 if(usr_flag == (unsigned char)(conn_flags & usr_flag)) {
580 logmsg("User flag is present in CONN flag");
581 payload_len += (buffer[start_usr] << 8) | buffer[start_usr + 1];
582 payload_len += 2; /* MSB and LSB for user length */
583 }
584
585 start_passwd = client_id_offset + payload_len;
586 if(passwd_flag == (char)(conn_flags & passwd_flag)) {
587 logmsg("Password flag is present in CONN flags");
588 payload_len += (buffer[start_passwd] << 8) | buffer[start_passwd + 1];
589 payload_len += 2; /* MSB and LSB for password length */
590 }
591
592 /* check the length of the payload */
593 if((ssize_t)payload_len != (rc - 12)) {
594 logmsg("Payload length mismatch, expected %x got %x",
595 rc - 12, payload_len);
596 goto end;
597 }
598 /* check the length of the client ID */
599 else if((client_id_length + 1) > MAX_CLIENT_ID_LENGTH) {
600 logmsg("Too large client id");
601 goto end;
602 }
603 memcpy(client_id, &buffer[12], client_id_length);
604 client_id[client_id_length] = 0;
605
606 logmsg("MQTT client connect accepted: %s", client_id);
607
608 /* The first packet sent from the Server to the Client MUST be a
609 CONNACK Packet */
610
611 if(connack(dump, fd)) {
612 logmsg("failed sending CONNACK");
613 goto end;
614 }
615 }
616 else if(byte == MQTT_MSG_SUBSCRIBE) {
617 int error;
618 char *data;
619 size_t datalen;
620 logprotocol(FROM_CLIENT, "SUBSCRIBE", remaining_length,
621 dump, buffer, rc);
622 logmsg("Incoming SUBSCRIBE");
623
624 if(rc < 6) {
625 logmsg("Too small SUBSCRIBE");
626 goto end;
627 }
628
629 /* two bytes packet id */
630 packet_id = (unsigned short)((buffer[0] << 8) | buffer[1]);
631
632 /* two bytes topic length */
633 topic_len = (buffer[2] << 8) | buffer[3];
634 if(topic_len != (remaining_length - 5)) {
635 logmsg("Wrong topic length, got %d expected %d",
636 topic_len, remaining_length - 5);
637 goto end;
638 }
639 memcpy(topic, &buffer[4], topic_len);
640 topic[topic_len] = 0;
641
642 /* there's a QoS byte (two bits) after the topic */
643
644 logmsg("SUBSCRIBE to '%s' [%d]", topic, packet_id);
645 stream = test2fopen(testno);
646 error = getpart(&data, &datalen, "reply", "data", stream);
647 if(!error) {
648 if(!config.publish_before_suback) {
649 if(suback(dump, fd, packet_id)) {
650 logmsg("failed sending SUBACK");
651 goto end;
652 }
653 }
654 if(publish(dump, fd, packet_id, topic, data, datalen)) {
655 logmsg("PUBLISH failed");
656 goto end;
657 }
658 if(config.publish_before_suback) {
659 if(suback(dump, fd, packet_id)) {
660 logmsg("failed sending SUBACK");
661 goto end;
662 }
663 }
664 }
665 else {
666 char *def = (char *)"this is random payload yes yes it is";
667 publish(dump, fd, packet_id, topic, def, strlen(def));
668 }
669 disconnect(dump, fd);
670 }
671 else if((byte & 0xf0) == (MQTT_MSG_PUBLISH & 0xf0)) {
672 size_t topiclen;
673
674 logmsg("Incoming PUBLISH");
675 logprotocol(FROM_CLIENT, "PUBLISH", remaining_length,
676 dump, buffer, rc);
677
678 topiclen = (buffer[1 + bytes] << 8) | buffer[2 + bytes];
679 logmsg("Got %d bytes topic", topiclen);
680 /* TODO: verify topiclen */
681
682 #ifdef QOS
683 /* TODO: handle packetid if there is one. Send puback if QoS > 0 */
684 puback(dump, fd, 0);
685 #endif
686 /* expect a disconnect here */
687 /* get the request */
688 rc = sread(fd, (char *)&buffer[0], 2);
689
690 logmsg("READ %d bytes [DISCONNECT]", rc);
691 loghex(buffer, rc);
692 logprotocol(FROM_CLIENT, "DISCONNECT", 0, dump, buffer, rc);
693 goto end;
694 }
695 else {
696 /* not supported (yet) */
697 goto end;
698 }
699 } while(1);
700
701 end:
702 if(buffer)
703 free(buffer);
704 if(dump)
705 fclose(dump);
706 if(stream)
707 fclose(stream);
708 return CURL_SOCKET_BAD;
709 }
710
711 /*
712 sockfdp is a pointer to an established stream or CURL_SOCKET_BAD
713
714 if sockfd is CURL_SOCKET_BAD, listendfd is a listening socket we must
715 accept()
716 */
incoming(curl_socket_t listenfd)717 static bool incoming(curl_socket_t listenfd)
718 {
719 fd_set fds_read;
720 fd_set fds_write;
721 fd_set fds_err;
722 int clients = 0; /* connected clients */
723
724 if(got_exit_signal) {
725 logmsg("signalled to die, exiting...");
726 return FALSE;
727 }
728
729 #ifdef HAVE_GETPPID
730 /* As a last resort, quit if socks5 process becomes orphan. */
731 if(getppid() <= 1) {
732 logmsg("process becomes orphan, exiting");
733 return FALSE;
734 }
735 #endif
736
737 do {
738 ssize_t rc;
739 int error = 0;
740 curl_socket_t sockfd = listenfd;
741 int maxfd = (int)sockfd;
742
743 FD_ZERO(&fds_read);
744 FD_ZERO(&fds_write);
745 FD_ZERO(&fds_err);
746
747 /* there's always a socket to wait for */
748 FD_SET(sockfd, &fds_read);
749
750 do {
751 /* select() blocking behavior call on blocking descriptors please */
752 rc = select(maxfd + 1, &fds_read, &fds_write, &fds_err, NULL);
753 if(got_exit_signal) {
754 logmsg("signalled to die, exiting...");
755 return FALSE;
756 }
757 } while((rc == -1) && ((error = SOCKERRNO) == EINTR));
758
759 if(rc < 0) {
760 logmsg("select() failed with error: (%d) %s",
761 error, strerror(error));
762 return FALSE;
763 }
764
765 if(FD_ISSET(sockfd, &fds_read)) {
766 curl_socket_t newfd = accept(sockfd, NULL, NULL);
767 if(CURL_SOCKET_BAD == newfd) {
768 error = SOCKERRNO;
769 logmsg("accept(%d, NULL, NULL) failed with error: (%d) %s",
770 sockfd, error, strerror(error));
771 }
772 else {
773 logmsg("====> Client connect, fd %d. Read config from %s",
774 newfd, configfile);
775 set_advisor_read_lock(SERVERLOGS_LOCK);
776 (void)mqttit(newfd); /* until done */
777 clear_advisor_read_lock(SERVERLOGS_LOCK);
778
779 logmsg("====> Client disconnect");
780 sclose(newfd);
781 }
782 }
783 } while(clients);
784
785 return TRUE;
786 }
787
sockdaemon(curl_socket_t sock,unsigned short * listenport)788 static curl_socket_t sockdaemon(curl_socket_t sock,
789 unsigned short *listenport)
790 {
791 /* passive daemon style */
792 srvr_sockaddr_union_t listener;
793 int flag;
794 int rc;
795 int totdelay = 0;
796 int maxretr = 10;
797 int delay = 20;
798 int attempt = 0;
799 int error = 0;
800
801 do {
802 attempt++;
803 flag = 1;
804 rc = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
805 (void *)&flag, sizeof(flag));
806 if(rc) {
807 error = SOCKERRNO;
808 logmsg("setsockopt(SO_REUSEADDR) failed with error: (%d) %s",
809 error, strerror(error));
810 if(maxretr) {
811 rc = wait_ms(delay);
812 if(rc) {
813 /* should not happen */
814 logmsg("wait_ms() failed with error: %d", rc);
815 sclose(sock);
816 return CURL_SOCKET_BAD;
817 }
818 if(got_exit_signal) {
819 logmsg("signalled to die, exiting...");
820 sclose(sock);
821 return CURL_SOCKET_BAD;
822 }
823 totdelay += delay;
824 delay *= 2; /* double the sleep for next attempt */
825 }
826 }
827 } while(rc && maxretr--);
828
829 if(rc) {
830 logmsg("setsockopt(SO_REUSEADDR) failed %d times in %d ms. Error: (%d) %s",
831 attempt, totdelay, error, strerror(error));
832 logmsg("Continuing anyway...");
833 }
834
835 /* When the specified listener port is zero, it is actually a
836 request to let the system choose a non-zero available port. */
837
838 #ifdef ENABLE_IPV6
839 if(!use_ipv6) {
840 #endif
841 memset(&listener.sa4, 0, sizeof(listener.sa4));
842 listener.sa4.sin_family = AF_INET;
843 listener.sa4.sin_addr.s_addr = INADDR_ANY;
844 listener.sa4.sin_port = htons(*listenport);
845 rc = bind(sock, &listener.sa, sizeof(listener.sa4));
846 #ifdef ENABLE_IPV6
847 }
848 else {
849 memset(&listener.sa6, 0, sizeof(listener.sa6));
850 listener.sa6.sin6_family = AF_INET6;
851 listener.sa6.sin6_addr = in6addr_any;
852 listener.sa6.sin6_port = htons(*listenport);
853 rc = bind(sock, &listener.sa, sizeof(listener.sa6));
854 }
855 #endif /* ENABLE_IPV6 */
856 if(rc) {
857 error = SOCKERRNO;
858 logmsg("Error binding socket on port %hu: (%d) %s",
859 *listenport, error, strerror(error));
860 sclose(sock);
861 return CURL_SOCKET_BAD;
862 }
863
864 if(!*listenport) {
865 /* The system was supposed to choose a port number, figure out which
866 port we actually got and update the listener port value with it. */
867 curl_socklen_t la_size;
868 srvr_sockaddr_union_t localaddr;
869 #ifdef ENABLE_IPV6
870 if(!use_ipv6)
871 #endif
872 la_size = sizeof(localaddr.sa4);
873 #ifdef ENABLE_IPV6
874 else
875 la_size = sizeof(localaddr.sa6);
876 #endif
877 memset(&localaddr.sa, 0, (size_t)la_size);
878 if(getsockname(sock, &localaddr.sa, &la_size) < 0) {
879 error = SOCKERRNO;
880 logmsg("getsockname() failed with error: (%d) %s",
881 error, strerror(error));
882 sclose(sock);
883 return CURL_SOCKET_BAD;
884 }
885 switch(localaddr.sa.sa_family) {
886 case AF_INET:
887 *listenport = ntohs(localaddr.sa4.sin_port);
888 break;
889 #ifdef ENABLE_IPV6
890 case AF_INET6:
891 *listenport = ntohs(localaddr.sa6.sin6_port);
892 break;
893 #endif
894 default:
895 break;
896 }
897 if(!*listenport) {
898 /* Real failure, listener port shall not be zero beyond this point. */
899 logmsg("Apparently getsockname() succeeded, with listener port zero.");
900 logmsg("A valid reason for this failure is a binary built without");
901 logmsg("proper network library linkage. This might not be the only");
902 logmsg("reason, but double check it before anything else.");
903 sclose(sock);
904 return CURL_SOCKET_BAD;
905 }
906 }
907
908 /* start accepting connections */
909 rc = listen(sock, 5);
910 if(0 != rc) {
911 error = SOCKERRNO;
912 logmsg("listen(%d, 5) failed with error: (%d) %s",
913 sock, error, strerror(error));
914 sclose(sock);
915 return CURL_SOCKET_BAD;
916 }
917
918 return sock;
919 }
920
921
main(int argc,char * argv[])922 int main(int argc, char *argv[])
923 {
924 curl_socket_t sock = CURL_SOCKET_BAD;
925 curl_socket_t msgsock = CURL_SOCKET_BAD;
926 int wrotepidfile = 0;
927 int wroteportfile = 0;
928 const char *pidname = ".mqttd.pid";
929 const char *portname = ".mqttd.port";
930 bool juggle_again;
931 int error;
932 int arg = 1;
933
934 while(argc>arg) {
935 if(!strcmp("--version", argv[arg])) {
936 printf("mqttd IPv4%s\n",
937 #ifdef ENABLE_IPV6
938 "/IPv6"
939 #else
940 ""
941 #endif
942 );
943 return 0;
944 }
945 else if(!strcmp("--pidfile", argv[arg])) {
946 arg++;
947 if(argc>arg)
948 pidname = argv[arg++];
949 }
950 else if(!strcmp("--portfile", argv[arg])) {
951 arg++;
952 if(argc>arg)
953 portname = argv[arg++];
954 }
955 else if(!strcmp("--config", argv[arg])) {
956 arg++;
957 if(argc>arg)
958 configfile = argv[arg++];
959 }
960 else if(!strcmp("--logfile", argv[arg])) {
961 arg++;
962 if(argc>arg)
963 serverlogfile = argv[arg++];
964 }
965 else if(!strcmp("--ipv6", argv[arg])) {
966 #ifdef ENABLE_IPV6
967 ipv_inuse = "IPv6";
968 use_ipv6 = TRUE;
969 #endif
970 arg++;
971 }
972 else if(!strcmp("--ipv4", argv[arg])) {
973 /* for completeness, we support this option as well */
974 #ifdef ENABLE_IPV6
975 ipv_inuse = "IPv4";
976 use_ipv6 = FALSE;
977 #endif
978 arg++;
979 }
980 else if(!strcmp("--port", argv[arg])) {
981 arg++;
982 if(argc>arg) {
983 char *endptr;
984 unsigned long ulnum = strtoul(argv[arg], &endptr, 10);
985 if((endptr != argv[arg] + strlen(argv[arg])) ||
986 ((ulnum != 0UL) && ((ulnum < 1025UL) || (ulnum > 65535UL)))) {
987 fprintf(stderr, "mqttd: invalid --port argument (%s)\n",
988 argv[arg]);
989 return 0;
990 }
991 port = curlx_ultous(ulnum);
992 arg++;
993 }
994 }
995 else {
996 puts("Usage: mqttd [option]\n"
997 " --config [file]\n"
998 " --version\n"
999 " --logfile [file]\n"
1000 " --pidfile [file]\n"
1001 " --portfile [file]\n"
1002 " --ipv4\n"
1003 " --ipv6\n"
1004 " --port [port]\n");
1005 return 0;
1006 }
1007 }
1008
1009 #ifdef WIN32
1010 win32_init();
1011 atexit(win32_cleanup);
1012
1013 setmode(fileno(stdin), O_BINARY);
1014 setmode(fileno(stdout), O_BINARY);
1015 setmode(fileno(stderr), O_BINARY);
1016 #endif
1017
1018 install_signal_handlers(FALSE);
1019
1020 #ifdef ENABLE_IPV6
1021 if(!use_ipv6)
1022 #endif
1023 sock = socket(AF_INET, SOCK_STREAM, 0);
1024 #ifdef ENABLE_IPV6
1025 else
1026 sock = socket(AF_INET6, SOCK_STREAM, 0);
1027 #endif
1028
1029 if(CURL_SOCKET_BAD == sock) {
1030 error = SOCKERRNO;
1031 logmsg("Error creating socket: (%d) %s",
1032 error, strerror(error));
1033 goto mqttd_cleanup;
1034 }
1035
1036 {
1037 /* passive daemon style */
1038 sock = sockdaemon(sock, &port);
1039 if(CURL_SOCKET_BAD == sock) {
1040 goto mqttd_cleanup;
1041 }
1042 msgsock = CURL_SOCKET_BAD; /* no stream socket yet */
1043 }
1044
1045 logmsg("Running %s version", ipv_inuse);
1046 logmsg("Listening on port %hu", port);
1047
1048 wrotepidfile = write_pidfile(pidname);
1049 if(!wrotepidfile) {
1050 goto mqttd_cleanup;
1051 }
1052
1053 wroteportfile = write_portfile(portname, port);
1054 if(!wroteportfile) {
1055 goto mqttd_cleanup;
1056 }
1057
1058 do {
1059 juggle_again = incoming(sock);
1060 } while(juggle_again);
1061
1062 mqttd_cleanup:
1063
1064 if((msgsock != sock) && (msgsock != CURL_SOCKET_BAD))
1065 sclose(msgsock);
1066
1067 if(sock != CURL_SOCKET_BAD)
1068 sclose(sock);
1069
1070 if(wrotepidfile)
1071 unlink(pidname);
1072 if(wroteportfile)
1073 unlink(portname);
1074
1075 restore_signal_handlers(FALSE);
1076
1077 if(got_exit_signal) {
1078 logmsg("============> mqttd exits with signal (%d)", exit_signal);
1079 /*
1080 * To properly set the return status of the process we
1081 * must raise the same signal SIGINT or SIGTERM that we
1082 * caught and let the old handler take care of it.
1083 */
1084 raise(exit_signal);
1085 }
1086
1087 logmsg("============> mqttd quits");
1088 return 0;
1089 }
1090