1 /*
2 * nghttp2 - HTTP/2 C Library
3 *
4 * Copyright (c) 2016 Tatsuhiro Tsujikawa
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining
7 * a copy of this software and associated documentation files (the
8 * "Software"), to deal in the Software without restriction, including
9 * without limitation the rights to use, copy, modify, merge, publish,
10 * distribute, sublicense, and/or sell copies of the Software, and to
11 * permit persons to whom the Software is furnished to do so, subject to
12 * the following conditions:
13 *
14 * The above copyright notice and this permission notice shall be
15 * included in all copies or substantial portions of the Software.
16 *
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24 */
25 #include "shrpx_live_check.h"
26 #include "shrpx_worker.h"
27 #include "shrpx_connect_blocker.h"
28 #include "shrpx_tls.h"
29 #include "shrpx_log.h"
30
31 namespace shrpx {
32
33 namespace {
34 constexpr size_t MAX_BUFFER_SIZE = 4_k;
35 } // namespace
36
37 namespace {
readcb(struct ev_loop * loop,ev_io * w,int revents)38 void readcb(struct ev_loop *loop, ev_io *w, int revents) {
39 int rv;
40 auto conn = static_cast<Connection *>(w->data);
41 auto live_check = static_cast<LiveCheck *>(conn->data);
42
43 rv = live_check->do_read();
44 if (rv != 0) {
45 live_check->on_failure();
46 return;
47 }
48 }
49 } // namespace
50
51 namespace {
writecb(struct ev_loop * loop,ev_io * w,int revents)52 void writecb(struct ev_loop *loop, ev_io *w, int revents) {
53 int rv;
54 auto conn = static_cast<Connection *>(w->data);
55 auto live_check = static_cast<LiveCheck *>(conn->data);
56
57 rv = live_check->do_write();
58 if (rv != 0) {
59 live_check->on_failure();
60 return;
61 }
62 }
63 } // namespace
64
65 namespace {
timeoutcb(struct ev_loop * loop,ev_timer * w,int revents)66 void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
67 auto conn = static_cast<Connection *>(w->data);
68 auto live_check = static_cast<LiveCheck *>(conn->data);
69
70 if (w == &conn->rt && !conn->expired_rt()) {
71 return;
72 }
73
74 live_check->on_failure();
75 }
76 } // namespace
77
78 namespace {
backoff_timeoutcb(struct ev_loop * loop,ev_timer * w,int revents)79 void backoff_timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
80 int rv;
81 auto live_check = static_cast<LiveCheck *>(w->data);
82
83 rv = live_check->initiate_connection();
84 if (rv != 0) {
85 live_check->on_failure();
86 return;
87 }
88 }
89 } // namespace
90
91 namespace {
settings_timeout_cb(struct ev_loop * loop,ev_timer * w,int revents)92 void settings_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
93 auto live_check = static_cast<LiveCheck *>(w->data);
94
95 if (LOG_ENABLED(INFO)) {
96 LOG(INFO) << "SETTINGS timeout";
97 }
98
99 live_check->on_failure();
100 }
101 } // namespace
102
LiveCheck(struct ev_loop * loop,SSL_CTX * ssl_ctx,Worker * worker,DownstreamAddr * addr,std::mt19937 & gen)103 LiveCheck::LiveCheck(struct ev_loop *loop, SSL_CTX *ssl_ctx, Worker *worker,
104 DownstreamAddr *addr, std::mt19937 &gen)
105 : conn_(loop, -1, nullptr, worker->get_mcpool(),
106 worker->get_downstream_config()->timeout.write,
107 worker->get_downstream_config()->timeout.read, {}, {}, writecb,
108 readcb, timeoutcb, this, get_config()->tls.dyn_rec.warmup_threshold,
109 get_config()->tls.dyn_rec.idle_timeout, Proto::NONE),
110 wb_(worker->get_mcpool()),
111 gen_(gen),
112 read_(&LiveCheck::noop),
113 write_(&LiveCheck::noop),
114 worker_(worker),
115 ssl_ctx_(ssl_ctx),
116 addr_(addr),
117 session_(nullptr),
118 raddr_(nullptr),
119 success_count_(0),
120 fail_count_(0),
121 settings_ack_received_(false),
122 session_closing_(false) {
123 ev_timer_init(&backoff_timer_, backoff_timeoutcb, 0., 0.);
124 backoff_timer_.data = this;
125
126 // SETTINGS ACK must be received in a short timeout. Otherwise, we
127 // assume that connection is broken.
128 ev_timer_init(&settings_timer_, settings_timeout_cb, 0., 0.);
129 settings_timer_.data = this;
130 }
131
~LiveCheck()132 LiveCheck::~LiveCheck() {
133 disconnect();
134
135 ev_timer_stop(conn_.loop, &backoff_timer_);
136 }
137
disconnect()138 void LiveCheck::disconnect() {
139 if (dns_query_) {
140 auto dns_tracker = worker_->get_dns_tracker();
141
142 dns_tracker->cancel(dns_query_.get());
143 }
144
145 dns_query_.reset();
146 // We can reuse resolved_addr_
147 raddr_ = nullptr;
148
149 conn_.rlimit.stopw();
150 conn_.wlimit.stopw();
151
152 ev_timer_stop(conn_.loop, &settings_timer_);
153
154 read_ = write_ = &LiveCheck::noop;
155
156 conn_.disconnect();
157
158 nghttp2_session_del(session_);
159 session_ = nullptr;
160
161 settings_ack_received_ = false;
162 session_closing_ = false;
163
164 wb_.reset();
165 }
166
167 // Use the similar backoff algorithm described in
168 // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md
169 namespace {
170 constexpr size_t MAX_BACKOFF_EXP = 10;
171 constexpr auto MULTIPLIER = 1.6;
172 constexpr auto JITTER = 0.2;
173 } // namespace
174
schedule()175 void LiveCheck::schedule() {
176 auto base_backoff =
177 util::int_pow(MULTIPLIER, std::min(fail_count_, MAX_BACKOFF_EXP));
178 auto dist = std::uniform_real_distribution<>(-JITTER * base_backoff,
179 JITTER * base_backoff);
180
181 auto &downstreamconf = *get_config()->conn.downstream;
182
183 auto backoff =
184 std::min(downstreamconf.timeout.max_backoff, base_backoff + dist(gen_));
185
186 ev_timer_set(&backoff_timer_, backoff, 0.);
187 ev_timer_start(conn_.loop, &backoff_timer_);
188 }
189
do_read()190 int LiveCheck::do_read() { return read_(*this); }
191
do_write()192 int LiveCheck::do_write() { return write_(*this); }
193
initiate_connection()194 int LiveCheck::initiate_connection() {
195 int rv;
196
197 auto worker_blocker = worker_->get_connect_blocker();
198 if (worker_blocker->blocked()) {
199 if (LOG_ENABLED(INFO)) {
200 LOG(INFO) << "Worker wide backend connection was blocked temporarily";
201 }
202 return -1;
203 }
204
205 if (!dns_query_ && addr_->tls) {
206 assert(ssl_ctx_);
207
208 auto ssl = tls::create_ssl(ssl_ctx_);
209 if (!ssl) {
210 return -1;
211 }
212
213 switch (addr_->proto) {
214 case Proto::HTTP1:
215 tls::setup_downstream_http1_alpn(ssl);
216 break;
217 case Proto::HTTP2:
218 tls::setup_downstream_http2_alpn(ssl);
219 break;
220 default:
221 assert(0);
222 }
223
224 conn_.set_ssl(ssl);
225 conn_.tls.client_session_cache = &addr_->tls_session_cache;
226 }
227
228 if (addr_->dns) {
229 if (!dns_query_) {
230 auto dns_query = std::make_unique<DNSQuery>(
231 addr_->host, [this](DNSResolverStatus status, const Address *result) {
232 int rv;
233
234 if (status == DNSResolverStatus::OK) {
235 *this->resolved_addr_ = *result;
236 }
237 rv = this->initiate_connection();
238 if (rv != 0) {
239 this->on_failure();
240 }
241 });
242 auto dns_tracker = worker_->get_dns_tracker();
243
244 if (!resolved_addr_) {
245 resolved_addr_ = std::make_unique<Address>();
246 }
247
248 switch (dns_tracker->resolve(resolved_addr_.get(), dns_query.get())) {
249 case DNSResolverStatus::ERROR:
250 return -1;
251 case DNSResolverStatus::RUNNING:
252 dns_query_ = std::move(dns_query);
253 return 0;
254 case DNSResolverStatus::OK:
255 break;
256 default:
257 assert(0);
258 }
259 } else {
260 switch (dns_query_->status) {
261 case DNSResolverStatus::ERROR:
262 dns_query_.reset();
263 return -1;
264 case DNSResolverStatus::OK:
265 dns_query_.reset();
266 break;
267 default:
268 assert(0);
269 }
270 }
271
272 util::set_port(*resolved_addr_, addr_->port);
273 raddr_ = resolved_addr_.get();
274 } else {
275 raddr_ = &addr_->addr;
276 }
277
278 conn_.fd = util::create_nonblock_socket(raddr_->su.storage.ss_family);
279
280 if (conn_.fd == -1) {
281 auto error = errno;
282 LOG(WARN) << "socket() failed; addr=" << util::to_numeric_addr(raddr_)
283 << ", errno=" << error;
284 return -1;
285 }
286
287 rv = connect(conn_.fd, &raddr_->su.sa, raddr_->len);
288 if (rv != 0 && errno != EINPROGRESS) {
289 auto error = errno;
290 LOG(WARN) << "connect() failed; addr=" << util::to_numeric_addr(raddr_)
291 << ", errno=" << error;
292
293 close(conn_.fd);
294 conn_.fd = -1;
295
296 return -1;
297 }
298
299 if (addr_->tls) {
300 auto sni_name =
301 addr_->sni.empty() ? StringRef{addr_->host} : StringRef{addr_->sni};
302 if (!util::numeric_host(sni_name.data())) {
303 SSL_set_tlsext_host_name(conn_.tls.ssl, sni_name.data());
304 }
305
306 auto session = tls::reuse_tls_session(addr_->tls_session_cache);
307 if (session) {
308 SSL_set_session(conn_.tls.ssl, session);
309 SSL_SESSION_free(session);
310 }
311
312 conn_.prepare_client_handshake();
313 }
314
315 write_ = &LiveCheck::connected;
316
317 ev_io_set(&conn_.wev, conn_.fd, EV_WRITE);
318 ev_io_set(&conn_.rev, conn_.fd, EV_READ);
319
320 conn_.wlimit.startw();
321
322 auto &downstreamconf = *get_config()->conn.downstream;
323
324 conn_.wt.repeat = downstreamconf.timeout.connect;
325 ev_timer_again(conn_.loop, &conn_.wt);
326
327 return 0;
328 }
329
connected()330 int LiveCheck::connected() {
331 auto sock_error = util::get_socket_error(conn_.fd);
332 if (sock_error != 0) {
333 if (LOG_ENABLED(INFO)) {
334 LOG(INFO) << "Backend connect failed; addr="
335 << util::to_numeric_addr(raddr_) << ": errno=" << sock_error;
336 }
337
338 return -1;
339 }
340
341 if (LOG_ENABLED(INFO)) {
342 LOG(INFO) << "Connection established";
343 }
344
345 auto &downstreamconf = *get_config()->conn.downstream;
346
347 // Reset timeout for write. Previously, we set timeout for connect.
348 conn_.wt.repeat = downstreamconf.timeout.write;
349 ev_timer_again(conn_.loop, &conn_.wt);
350
351 conn_.rlimit.startw();
352 conn_.again_rt();
353
354 if (conn_.tls.ssl) {
355 read_ = &LiveCheck::tls_handshake;
356 write_ = &LiveCheck::tls_handshake;
357
358 return do_write();
359 }
360
361 if (addr_->proto == Proto::HTTP2) {
362 // For HTTP/2, we try to read SETTINGS ACK from server to make
363 // sure it is really alive, and serving HTTP/2.
364 read_ = &LiveCheck::read_clear;
365 write_ = &LiveCheck::write_clear;
366
367 if (connection_made() != 0) {
368 return -1;
369 }
370
371 return 0;
372 }
373
374 on_success();
375
376 return 0;
377 }
378
tls_handshake()379 int LiveCheck::tls_handshake() {
380 conn_.last_read = std::chrono::steady_clock::now();
381
382 ERR_clear_error();
383
384 auto rv = conn_.tls_handshake();
385
386 if (rv == SHRPX_ERR_INPROGRESS) {
387 return 0;
388 }
389
390 if (rv < 0) {
391 return rv;
392 }
393
394 if (LOG_ENABLED(INFO)) {
395 LOG(INFO) << "SSL/TLS handshake completed";
396 }
397
398 if (!get_config()->tls.insecure &&
399 tls::check_cert(conn_.tls.ssl, addr_, raddr_) != 0) {
400 return -1;
401 }
402
403 // Check negotiated ALPN
404
405 const unsigned char *next_proto = nullptr;
406 unsigned int next_proto_len = 0;
407
408 SSL_get0_alpn_selected(conn_.tls.ssl, &next_proto, &next_proto_len);
409
410 auto proto = StringRef{next_proto, next_proto_len};
411
412 switch (addr_->proto) {
413 case Proto::HTTP1:
414 if (proto.empty() || proto == "http/1.1"_sr) {
415 break;
416 }
417 return -1;
418 case Proto::HTTP2:
419 if (util::check_h2_is_selected(proto)) {
420 // For HTTP/2, we try to read SETTINGS ACK from server to make
421 // sure it is really alive, and serving HTTP/2.
422 read_ = &LiveCheck::read_tls;
423 write_ = &LiveCheck::write_tls;
424
425 if (connection_made() != 0) {
426 return -1;
427 }
428
429 return 0;
430 }
431 return -1;
432 default:
433 break;
434 }
435
436 on_success();
437
438 return 0;
439 }
440
read_tls()441 int LiveCheck::read_tls() {
442 conn_.last_read = std::chrono::steady_clock::now();
443
444 std::array<uint8_t, 4_k> buf;
445
446 ERR_clear_error();
447
448 for (;;) {
449 auto nread = conn_.read_tls(buf.data(), buf.size());
450
451 if (nread == 0) {
452 return 0;
453 }
454
455 if (nread < 0) {
456 return nread;
457 }
458
459 if (on_read(buf.data(), nread) != 0) {
460 return -1;
461 }
462 }
463 }
464
write_tls()465 int LiveCheck::write_tls() {
466 conn_.last_read = std::chrono::steady_clock::now();
467
468 ERR_clear_error();
469
470 struct iovec iov;
471
472 for (;;) {
473 if (wb_.rleft() > 0) {
474 auto iovcnt = wb_.riovec(&iov, 1);
475 if (iovcnt != 1) {
476 assert(0);
477 return -1;
478 }
479 auto nwrite = conn_.write_tls(iov.iov_base, iov.iov_len);
480
481 if (nwrite == 0) {
482 return 0;
483 }
484
485 if (nwrite < 0) {
486 return nwrite;
487 }
488
489 wb_.drain(nwrite);
490
491 continue;
492 }
493
494 if (on_write() != 0) {
495 return -1;
496 }
497
498 if (wb_.rleft() == 0) {
499 conn_.start_tls_write_idle();
500 break;
501 }
502 }
503
504 conn_.wlimit.stopw();
505 ev_timer_stop(conn_.loop, &conn_.wt);
506
507 if (settings_ack_received_) {
508 on_success();
509 }
510
511 return 0;
512 }
513
read_clear()514 int LiveCheck::read_clear() {
515 conn_.last_read = std::chrono::steady_clock::now();
516
517 std::array<uint8_t, 4_k> buf;
518
519 for (;;) {
520 auto nread = conn_.read_clear(buf.data(), buf.size());
521
522 if (nread == 0) {
523 return 0;
524 }
525
526 if (nread < 0) {
527 return nread;
528 }
529
530 if (on_read(buf.data(), nread) != 0) {
531 return -1;
532 }
533 }
534 }
535
write_clear()536 int LiveCheck::write_clear() {
537 conn_.last_read = std::chrono::steady_clock::now();
538
539 struct iovec iov;
540
541 for (;;) {
542 if (wb_.rleft() > 0) {
543 auto iovcnt = wb_.riovec(&iov, 1);
544 if (iovcnt != 1) {
545 assert(0);
546 return -1;
547 }
548 auto nwrite = conn_.write_clear(iov.iov_base, iov.iov_len);
549
550 if (nwrite == 0) {
551 return 0;
552 }
553
554 if (nwrite < 0) {
555 return nwrite;
556 }
557
558 wb_.drain(nwrite);
559
560 continue;
561 }
562
563 if (on_write() != 0) {
564 return -1;
565 }
566
567 if (wb_.rleft() == 0) {
568 break;
569 }
570 }
571
572 conn_.wlimit.stopw();
573 ev_timer_stop(conn_.loop, &conn_.wt);
574
575 if (settings_ack_received_) {
576 on_success();
577 }
578
579 return 0;
580 }
581
on_read(const uint8_t * data,size_t len)582 int LiveCheck::on_read(const uint8_t *data, size_t len) {
583 auto rv = nghttp2_session_mem_recv2(session_, data, len);
584 if (rv < 0) {
585 LOG(ERROR) << "nghttp2_session_mem_recv2() returned error: "
586 << nghttp2_strerror(rv);
587 return -1;
588 }
589
590 if (settings_ack_received_ && !session_closing_) {
591 session_closing_ = true;
592 auto rv = nghttp2_session_terminate_session(session_, NGHTTP2_NO_ERROR);
593 if (rv != 0) {
594 return -1;
595 }
596 }
597
598 if (nghttp2_session_want_read(session_) == 0 &&
599 nghttp2_session_want_write(session_) == 0 && wb_.rleft() == 0) {
600 if (LOG_ENABLED(INFO)) {
601 LOG(INFO) << "No more read/write for this session";
602 }
603
604 // If we have SETTINGS ACK already, we treat this success.
605 if (settings_ack_received_) {
606 return 0;
607 }
608
609 return -1;
610 }
611
612 signal_write();
613
614 return 0;
615 }
616
on_write()617 int LiveCheck::on_write() {
618 for (;;) {
619 const uint8_t *data;
620 auto datalen = nghttp2_session_mem_send2(session_, &data);
621
622 if (datalen < 0) {
623 LOG(ERROR) << "nghttp2_session_mem_send2() returned error: "
624 << nghttp2_strerror(datalen);
625 return -1;
626 }
627 if (datalen == 0) {
628 break;
629 }
630 wb_.append(data, datalen);
631
632 if (wb_.rleft() >= MAX_BUFFER_SIZE) {
633 break;
634 }
635 }
636
637 if (nghttp2_session_want_read(session_) == 0 &&
638 nghttp2_session_want_write(session_) == 0 && wb_.rleft() == 0) {
639 if (LOG_ENABLED(INFO)) {
640 LOG(INFO) << "No more read/write for this session";
641 }
642
643 if (settings_ack_received_) {
644 return 0;
645 }
646
647 return -1;
648 }
649
650 return 0;
651 }
652
on_failure()653 void LiveCheck::on_failure() {
654 ++fail_count_;
655
656 if (LOG_ENABLED(INFO)) {
657 LOG(INFO) << "Liveness check for " << addr_->host << ":" << addr_->port
658 << " failed " << fail_count_ << " time(s) in a row";
659 }
660
661 disconnect();
662
663 schedule();
664 }
665
on_success()666 void LiveCheck::on_success() {
667 ++success_count_;
668 fail_count_ = 0;
669
670 if (LOG_ENABLED(INFO)) {
671 LOG(INFO) << "Liveness check for " << addr_->host << ":" << addr_->port
672 << " succeeded " << success_count_ << " time(s) in a row";
673 }
674
675 if (success_count_ < addr_->rise) {
676 disconnect();
677
678 schedule();
679
680 return;
681 }
682
683 LOG(NOTICE) << util::to_numeric_addr(&addr_->addr) << " is considered online";
684
685 addr_->connect_blocker->online();
686
687 success_count_ = 0;
688 fail_count_ = 0;
689
690 disconnect();
691 }
692
noop()693 int LiveCheck::noop() { return 0; }
694
start_settings_timer()695 void LiveCheck::start_settings_timer() {
696 auto &downstreamconf = get_config()->http2.downstream;
697
698 ev_timer_set(&settings_timer_, downstreamconf.timeout.settings, 0.);
699 ev_timer_start(conn_.loop, &settings_timer_);
700 }
701
stop_settings_timer()702 void LiveCheck::stop_settings_timer() {
703 ev_timer_stop(conn_.loop, &settings_timer_);
704 }
705
settings_ack_received()706 void LiveCheck::settings_ack_received() { settings_ack_received_ = true; }
707
708 namespace {
on_frame_send_callback(nghttp2_session * session,const nghttp2_frame * frame,void * user_data)709 int on_frame_send_callback(nghttp2_session *session, const nghttp2_frame *frame,
710 void *user_data) {
711 auto live_check = static_cast<LiveCheck *>(user_data);
712
713 if (frame->hd.type != NGHTTP2_SETTINGS ||
714 (frame->hd.flags & NGHTTP2_FLAG_ACK)) {
715 return 0;
716 }
717
718 live_check->start_settings_timer();
719
720 return 0;
721 }
722 } // namespace
723
724 namespace {
on_frame_recv_callback(nghttp2_session * session,const nghttp2_frame * frame,void * user_data)725 int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame,
726 void *user_data) {
727 auto live_check = static_cast<LiveCheck *>(user_data);
728
729 if (frame->hd.type != NGHTTP2_SETTINGS ||
730 (frame->hd.flags & NGHTTP2_FLAG_ACK) == 0) {
731 return 0;
732 }
733
734 live_check->stop_settings_timer();
735 live_check->settings_ack_received();
736
737 return 0;
738 }
739 } // namespace
740
connection_made()741 int LiveCheck::connection_made() {
742 int rv;
743
744 nghttp2_session_callbacks *callbacks;
745 rv = nghttp2_session_callbacks_new(&callbacks);
746 if (rv != 0) {
747 return -1;
748 }
749
750 nghttp2_session_callbacks_set_on_frame_send_callback(callbacks,
751 on_frame_send_callback);
752 nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks,
753 on_frame_recv_callback);
754
755 rv = nghttp2_session_client_new(&session_, callbacks, this);
756
757 nghttp2_session_callbacks_del(callbacks);
758
759 if (rv != 0) {
760 return -1;
761 }
762
763 rv = nghttp2_submit_settings(session_, NGHTTP2_FLAG_NONE, nullptr, 0);
764 if (rv != 0) {
765 return -1;
766 }
767
768 auto must_terminate =
769 addr_->tls && !nghttp2::tls::check_http2_requirement(conn_.tls.ssl);
770
771 if (must_terminate) {
772 if (LOG_ENABLED(INFO)) {
773 LOG(INFO) << "TLSv1.2 was not negotiated. HTTP/2 must not be negotiated.";
774 }
775
776 rv = nghttp2_session_terminate_session(session_,
777 NGHTTP2_INADEQUATE_SECURITY);
778 if (rv != 0) {
779 return -1;
780 }
781 }
782
783 signal_write();
784
785 return 0;
786 }
787
signal_write()788 void LiveCheck::signal_write() { conn_.wlimit.startw(); }
789
790 } // namespace shrpx
791