• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * nghttp2 - HTTP/2 C Library
3  *
4  * Copyright (c) 2012 Tatsuhiro Tsujikawa
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining
7  * a copy of this software and associated documentation files (the
8  * "Software"), to deal in the Software without restriction, including
9  * without limitation the rights to use, copy, modify, merge, publish,
10  * distribute, sublicense, and/or sell copies of the Software, and to
11  * permit persons to whom the Software is furnished to do so, subject to
12  * the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be
15  * included in all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21  * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22  * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23  * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24  */
25 #include "shrpx_connection_handler.h"
26 
27 #ifdef HAVE_UNISTD_H
28 #  include <unistd.h>
29 #endif // HAVE_UNISTD_H
30 #include <sys/types.h>
31 #include <sys/wait.h>
32 
33 #include <cerrno>
34 #include <thread>
35 #include <random>
36 
37 #include "shrpx_client_handler.h"
38 #include "shrpx_tls.h"
39 #include "shrpx_worker.h"
40 #include "shrpx_config.h"
41 #include "shrpx_http2_session.h"
42 #include "shrpx_connect_blocker.h"
43 #include "shrpx_downstream_connection.h"
44 #include "shrpx_accept_handler.h"
45 #include "shrpx_memcached_dispatcher.h"
46 #include "shrpx_signal.h"
47 #include "shrpx_log.h"
48 #include "util.h"
49 #include "template.h"
50 
51 using namespace nghttp2;
52 
53 namespace shrpx {
54 
55 namespace {
acceptor_disable_cb(struct ev_loop * loop,ev_timer * w,int revent)56 void acceptor_disable_cb(struct ev_loop *loop, ev_timer *w, int revent) {
57   auto h = static_cast<ConnectionHandler *>(w->data);
58 
59   // If we are in graceful shutdown period, we must not enable
60   // acceptors again.
61   if (h->get_graceful_shutdown()) {
62     return;
63   }
64 
65   h->enable_acceptor();
66 }
67 } // namespace
68 
69 namespace {
ocsp_cb(struct ev_loop * loop,ev_timer * w,int revent)70 void ocsp_cb(struct ev_loop *loop, ev_timer *w, int revent) {
71   auto h = static_cast<ConnectionHandler *>(w->data);
72 
73   // If we are in graceful shutdown period, we won't do ocsp query.
74   if (h->get_graceful_shutdown()) {
75     return;
76   }
77 
78   LOG(NOTICE) << "Start ocsp update";
79 
80   h->proceed_next_cert_ocsp();
81 }
82 } // namespace
83 
84 namespace {
ocsp_read_cb(struct ev_loop * loop,ev_io * w,int revent)85 void ocsp_read_cb(struct ev_loop *loop, ev_io *w, int revent) {
86   auto h = static_cast<ConnectionHandler *>(w->data);
87 
88   h->read_ocsp_chunk();
89 }
90 } // namespace
91 
92 namespace {
ocsp_chld_cb(struct ev_loop * loop,ev_child * w,int revent)93 void ocsp_chld_cb(struct ev_loop *loop, ev_child *w, int revent) {
94   auto h = static_cast<ConnectionHandler *>(w->data);
95 
96   h->handle_ocsp_complete();
97 }
98 } // namespace
99 
100 namespace {
thread_join_async_cb(struct ev_loop * loop,ev_async * w,int revent)101 void thread_join_async_cb(struct ev_loop *loop, ev_async *w, int revent) {
102   ev_break(loop);
103 }
104 } // namespace
105 
106 namespace {
serial_event_async_cb(struct ev_loop * loop,ev_async * w,int revent)107 void serial_event_async_cb(struct ev_loop *loop, ev_async *w, int revent) {
108   auto h = static_cast<ConnectionHandler *>(w->data);
109 
110   h->handle_serial_event();
111 }
112 } // namespace
113 
ConnectionHandler(struct ev_loop * loop,std::mt19937 & gen)114 ConnectionHandler::ConnectionHandler(struct ev_loop *loop, std::mt19937 &gen)
115     : gen_(gen),
116       single_worker_(nullptr),
117       loop_(loop),
118 #ifdef HAVE_NEVERBLEED
119       nb_(nullptr),
120 #endif // HAVE_NEVERBLEED
121       tls_ticket_key_memcached_get_retry_count_(0),
122       tls_ticket_key_memcached_fail_count_(0),
123       worker_round_robin_cnt_(get_config()->api.enabled ? 1 : 0),
124       graceful_shutdown_(false),
125       enable_acceptor_on_ocsp_completion_(false) {
126   ev_timer_init(&disable_acceptor_timer_, acceptor_disable_cb, 0., 0.);
127   disable_acceptor_timer_.data = this;
128 
129   ev_timer_init(&ocsp_timer_, ocsp_cb, 0., 0.);
130   ocsp_timer_.data = this;
131 
132   ev_io_init(&ocsp_.rev, ocsp_read_cb, -1, EV_READ);
133   ocsp_.rev.data = this;
134 
135   ev_async_init(&thread_join_asyncev_, thread_join_async_cb);
136 
137   ev_async_init(&serial_event_asyncev_, serial_event_async_cb);
138   serial_event_asyncev_.data = this;
139 
140   ev_async_start(loop_, &serial_event_asyncev_);
141 
142   ev_child_init(&ocsp_.chldev, ocsp_chld_cb, 0, 0);
143   ocsp_.chldev.data = this;
144 
145   ocsp_.next = 0;
146   ocsp_.proc.rfd = -1;
147 
148   reset_ocsp();
149 }
150 
~ConnectionHandler()151 ConnectionHandler::~ConnectionHandler() {
152   ev_child_stop(loop_, &ocsp_.chldev);
153   ev_async_stop(loop_, &serial_event_asyncev_);
154   ev_async_stop(loop_, &thread_join_asyncev_);
155   ev_io_stop(loop_, &ocsp_.rev);
156   ev_timer_stop(loop_, &ocsp_timer_);
157   ev_timer_stop(loop_, &disable_acceptor_timer_);
158 
159   for (auto ssl_ctx : all_ssl_ctx_) {
160     auto tls_ctx_data =
161         static_cast<tls::TLSContextData *>(SSL_CTX_get_app_data(ssl_ctx));
162     delete tls_ctx_data;
163     SSL_CTX_free(ssl_ctx);
164   }
165 
166   // Free workers before destroying ev_loop
167   workers_.clear();
168 
169   for (auto loop : worker_loops_) {
170     ev_loop_destroy(loop);
171   }
172 }
173 
set_ticket_keys_to_worker(const std::shared_ptr<TicketKeys> & ticket_keys)174 void ConnectionHandler::set_ticket_keys_to_worker(
175     const std::shared_ptr<TicketKeys> &ticket_keys) {
176   for (auto &worker : workers_) {
177     worker->set_ticket_keys(ticket_keys);
178   }
179 }
180 
worker_reopen_log_files()181 void ConnectionHandler::worker_reopen_log_files() {
182   WorkerEvent wev{};
183 
184   wev.type = WorkerEventType::REOPEN_LOG;
185 
186   for (auto &worker : workers_) {
187     worker->send(wev);
188   }
189 }
190 
worker_replace_downstream(std::shared_ptr<DownstreamConfig> downstreamconf)191 void ConnectionHandler::worker_replace_downstream(
192     std::shared_ptr<DownstreamConfig> downstreamconf) {
193   WorkerEvent wev{};
194 
195   wev.type = WorkerEventType::REPLACE_DOWNSTREAM;
196   wev.downstreamconf = std::move(downstreamconf);
197 
198   for (auto &worker : workers_) {
199     worker->send(wev);
200   }
201 }
202 
create_single_worker()203 int ConnectionHandler::create_single_worker() {
204   cert_tree_ = tls::create_cert_lookup_tree();
205   auto sv_ssl_ctx = tls::setup_server_ssl_context(
206       all_ssl_ctx_, indexed_ssl_ctx_, cert_tree_.get()
207 #ifdef HAVE_NEVERBLEED
208                                           ,
209       nb_
210 #endif // HAVE_NEVERBLEED
211   );
212   auto cl_ssl_ctx = tls::setup_downstream_client_ssl_context(
213 #ifdef HAVE_NEVERBLEED
214       nb_
215 #endif // HAVE_NEVERBLEED
216   );
217 
218   if (cl_ssl_ctx) {
219     all_ssl_ctx_.push_back(cl_ssl_ctx);
220   }
221 
222   auto config = get_config();
223   auto &tlsconf = config->tls;
224 
225   SSL_CTX *session_cache_ssl_ctx = nullptr;
226   {
227     auto &memcachedconf = config->tls.session_cache.memcached;
228     if (memcachedconf.tls) {
229       session_cache_ssl_ctx = tls::create_ssl_client_context(
230 #ifdef HAVE_NEVERBLEED
231           nb_,
232 #endif // HAVE_NEVERBLEED
233           tlsconf.cacert, memcachedconf.cert_file,
234           memcachedconf.private_key_file, nullptr);
235       all_ssl_ctx_.push_back(session_cache_ssl_ctx);
236     }
237   }
238 
239   single_worker_ = std::make_unique<Worker>(
240       loop_, sv_ssl_ctx, cl_ssl_ctx, session_cache_ssl_ctx, cert_tree_.get(),
241       ticket_keys_, this, config->conn.downstream);
242 #ifdef HAVE_MRUBY
243   if (single_worker_->create_mruby_context() != 0) {
244     return -1;
245   }
246 #endif // HAVE_MRUBY
247 
248   return 0;
249 }
250 
create_worker_thread(size_t num)251 int ConnectionHandler::create_worker_thread(size_t num) {
252 #ifndef NOTHREADS
253   assert(workers_.size() == 0);
254 
255   cert_tree_ = tls::create_cert_lookup_tree();
256   auto sv_ssl_ctx = tls::setup_server_ssl_context(
257       all_ssl_ctx_, indexed_ssl_ctx_, cert_tree_.get()
258 #  ifdef HAVE_NEVERBLEED
259                                           ,
260       nb_
261 #  endif // HAVE_NEVERBLEED
262   );
263   auto cl_ssl_ctx = tls::setup_downstream_client_ssl_context(
264 #  ifdef HAVE_NEVERBLEED
265       nb_
266 #  endif // HAVE_NEVERBLEED
267   );
268 
269   if (cl_ssl_ctx) {
270     all_ssl_ctx_.push_back(cl_ssl_ctx);
271   }
272 
273   auto config = get_config();
274   auto &tlsconf = config->tls;
275   auto &apiconf = config->api;
276 
277   // We have dedicated worker for API request processing.
278   if (apiconf.enabled) {
279     ++num;
280   }
281 
282   SSL_CTX *session_cache_ssl_ctx = nullptr;
283   {
284     auto &memcachedconf = config->tls.session_cache.memcached;
285 
286     if (memcachedconf.tls) {
287       session_cache_ssl_ctx = tls::create_ssl_client_context(
288 #  ifdef HAVE_NEVERBLEED
289           nb_,
290 #  endif // HAVE_NEVERBLEED
291           tlsconf.cacert, memcachedconf.cert_file,
292           memcachedconf.private_key_file, nullptr);
293       all_ssl_ctx_.push_back(session_cache_ssl_ctx);
294     }
295   }
296 
297   for (size_t i = 0; i < num; ++i) {
298     auto loop = ev_loop_new(config->ev_loop_flags);
299 
300     auto worker = std::make_unique<Worker>(
301         loop, sv_ssl_ctx, cl_ssl_ctx, session_cache_ssl_ctx, cert_tree_.get(),
302         ticket_keys_, this, config->conn.downstream);
303 #  ifdef HAVE_MRUBY
304     if (worker->create_mruby_context() != 0) {
305       return -1;
306     }
307 #  endif // HAVE_MRUBY
308 
309     workers_.push_back(std::move(worker));
310     worker_loops_.push_back(loop);
311 
312     LLOG(NOTICE, this) << "Created worker thread #" << workers_.size() - 1;
313   }
314 
315   for (auto &worker : workers_) {
316     worker->run_async();
317   }
318 
319 #endif // NOTHREADS
320 
321   return 0;
322 }
323 
join_worker()324 void ConnectionHandler::join_worker() {
325 #ifndef NOTHREADS
326   int n = 0;
327 
328   if (LOG_ENABLED(INFO)) {
329     LLOG(INFO, this) << "Waiting for worker thread to join: n="
330                      << workers_.size();
331   }
332 
333   for (auto &worker : workers_) {
334     worker->wait();
335     if (LOG_ENABLED(INFO)) {
336       LLOG(INFO, this) << "Thread #" << n << " joined";
337     }
338     ++n;
339   }
340 #endif // NOTHREADS
341 }
342 
graceful_shutdown_worker()343 void ConnectionHandler::graceful_shutdown_worker() {
344   if (single_worker_) {
345     return;
346   }
347 
348   WorkerEvent wev{};
349   wev.type = WorkerEventType::GRACEFUL_SHUTDOWN;
350 
351   if (LOG_ENABLED(INFO)) {
352     LLOG(INFO, this) << "Sending graceful shutdown signal to worker";
353   }
354 
355   for (auto &worker : workers_) {
356     worker->send(wev);
357   }
358 
359 #ifndef NOTHREADS
360   ev_async_start(loop_, &thread_join_asyncev_);
361 
362   thread_join_fut_ = std::async(std::launch::async, [this]() {
363     (void)reopen_log_files(get_config()->logging);
364     join_worker();
365     ev_async_send(get_loop(), &thread_join_asyncev_);
366     delete_log_config();
367   });
368 #endif // NOTHREADS
369 }
370 
handle_connection(int fd,sockaddr * addr,int addrlen,const UpstreamAddr * faddr)371 int ConnectionHandler::handle_connection(int fd, sockaddr *addr, int addrlen,
372                                          const UpstreamAddr *faddr) {
373   if (LOG_ENABLED(INFO)) {
374     LLOG(INFO, this) << "Accepted connection from "
375                      << util::numeric_name(addr, addrlen) << ", fd=" << fd;
376   }
377 
378   auto config = get_config();
379 
380   if (single_worker_) {
381     auto &upstreamconf = config->conn.upstream;
382     if (single_worker_->get_worker_stat()->num_connections >=
383         upstreamconf.worker_connections) {
384 
385       if (LOG_ENABLED(INFO)) {
386         LLOG(INFO, this) << "Too many connections >="
387                          << upstreamconf.worker_connections;
388       }
389 
390       close(fd);
391       return -1;
392     }
393 
394     auto client =
395         tls::accept_connection(single_worker_.get(), fd, addr, addrlen, faddr);
396     if (!client) {
397       LLOG(ERROR, this) << "ClientHandler creation failed";
398 
399       close(fd);
400       return -1;
401     }
402 
403     return 0;
404   }
405 
406   Worker *worker;
407 
408   if (faddr->alt_mode == UpstreamAltMode::API) {
409     worker = workers_[0].get();
410 
411     if (LOG_ENABLED(INFO)) {
412       LOG(INFO) << "Dispatch connection to API worker #0";
413     }
414   } else {
415     worker = workers_[worker_round_robin_cnt_].get();
416 
417     if (LOG_ENABLED(INFO)) {
418       LOG(INFO) << "Dispatch connection to worker #" << worker_round_robin_cnt_;
419     }
420 
421     if (++worker_round_robin_cnt_ == workers_.size()) {
422       auto &apiconf = config->api;
423 
424       if (apiconf.enabled) {
425         worker_round_robin_cnt_ = 1;
426       } else {
427         worker_round_robin_cnt_ = 0;
428       }
429     }
430   }
431 
432   WorkerEvent wev{};
433   wev.type = WorkerEventType::NEW_CONNECTION;
434   wev.client_fd = fd;
435   memcpy(&wev.client_addr, addr, addrlen);
436   wev.client_addrlen = addrlen;
437   wev.faddr = faddr;
438 
439   worker->send(wev);
440 
441   return 0;
442 }
443 
get_loop() const444 struct ev_loop *ConnectionHandler::get_loop() const {
445   return loop_;
446 }
447 
get_single_worker() const448 Worker *ConnectionHandler::get_single_worker() const {
449   return single_worker_.get();
450 }
451 
add_acceptor(std::unique_ptr<AcceptHandler> h)452 void ConnectionHandler::add_acceptor(std::unique_ptr<AcceptHandler> h) {
453   acceptors_.push_back(std::move(h));
454 }
455 
delete_acceptor()456 void ConnectionHandler::delete_acceptor() { acceptors_.clear(); }
457 
enable_acceptor()458 void ConnectionHandler::enable_acceptor() {
459   for (auto &a : acceptors_) {
460     a->enable();
461   }
462 }
463 
disable_acceptor()464 void ConnectionHandler::disable_acceptor() {
465   for (auto &a : acceptors_) {
466     a->disable();
467   }
468 }
469 
sleep_acceptor(ev_tstamp t)470 void ConnectionHandler::sleep_acceptor(ev_tstamp t) {
471   if (t == 0. || ev_is_active(&disable_acceptor_timer_)) {
472     return;
473   }
474 
475   disable_acceptor();
476 
477   ev_timer_set(&disable_acceptor_timer_, t, 0.);
478   ev_timer_start(loop_, &disable_acceptor_timer_);
479 }
480 
accept_pending_connection()481 void ConnectionHandler::accept_pending_connection() {
482   for (auto &a : acceptors_) {
483     a->accept_connection();
484   }
485 }
486 
set_ticket_keys(std::shared_ptr<TicketKeys> ticket_keys)487 void ConnectionHandler::set_ticket_keys(
488     std::shared_ptr<TicketKeys> ticket_keys) {
489   ticket_keys_ = std::move(ticket_keys);
490   if (single_worker_) {
491     single_worker_->set_ticket_keys(ticket_keys_);
492   }
493 }
494 
get_ticket_keys() const495 const std::shared_ptr<TicketKeys> &ConnectionHandler::get_ticket_keys() const {
496   return ticket_keys_;
497 }
498 
set_graceful_shutdown(bool f)499 void ConnectionHandler::set_graceful_shutdown(bool f) {
500   graceful_shutdown_ = f;
501   if (single_worker_) {
502     single_worker_->set_graceful_shutdown(f);
503   }
504 }
505 
get_graceful_shutdown() const506 bool ConnectionHandler::get_graceful_shutdown() const {
507   return graceful_shutdown_;
508 }
509 
cancel_ocsp_update()510 void ConnectionHandler::cancel_ocsp_update() {
511   enable_acceptor_on_ocsp_completion_ = false;
512   ev_timer_stop(loop_, &ocsp_timer_);
513 
514   if (ocsp_.proc.pid == 0) {
515     return;
516   }
517 
518   int rv;
519 
520   rv = kill(ocsp_.proc.pid, SIGTERM);
521   if (rv != 0) {
522     auto error = errno;
523     LOG(ERROR) << "Could not send signal to OCSP query process: errno="
524                << error;
525   }
526 
527   while ((rv = waitpid(ocsp_.proc.pid, nullptr, 0)) == -1 && errno == EINTR)
528     ;
529   if (rv == -1) {
530     auto error = errno;
531     LOG(ERROR) << "Error occurred while we were waiting for the completion of "
532                   "OCSP query process: errno="
533                << error;
534   }
535 }
536 
537 // inspired by h2o_read_command function from h2o project:
538 // https://github.com/h2o/h2o
start_ocsp_update(const char * cert_file)539 int ConnectionHandler::start_ocsp_update(const char *cert_file) {
540   int rv;
541 
542   if (LOG_ENABLED(INFO)) {
543     LOG(INFO) << "Start ocsp update for " << cert_file;
544   }
545 
546   assert(!ev_is_active(&ocsp_.rev));
547   assert(!ev_is_active(&ocsp_.chldev));
548 
549   char *const argv[] = {
550       const_cast<char *>(
551           get_config()->tls.ocsp.fetch_ocsp_response_file.c_str()),
552       const_cast<char *>(cert_file), nullptr};
553 
554   Process proc;
555   rv = exec_read_command(proc, argv);
556   if (rv != 0) {
557     return -1;
558   }
559 
560   ocsp_.proc = proc;
561 
562   ev_io_set(&ocsp_.rev, ocsp_.proc.rfd, EV_READ);
563   ev_io_start(loop_, &ocsp_.rev);
564 
565   ev_child_set(&ocsp_.chldev, ocsp_.proc.pid, 0);
566   ev_child_start(loop_, &ocsp_.chldev);
567 
568   return 0;
569 }
570 
read_ocsp_chunk()571 void ConnectionHandler::read_ocsp_chunk() {
572   std::array<uint8_t, 4_k> buf;
573   for (;;) {
574     ssize_t n;
575     while ((n = read(ocsp_.proc.rfd, buf.data(), buf.size())) == -1 &&
576            errno == EINTR)
577       ;
578 
579     if (n == -1) {
580       if (errno == EAGAIN || errno == EWOULDBLOCK) {
581         return;
582       }
583       auto error = errno;
584       LOG(WARN) << "Reading from ocsp query command failed: errno=" << error;
585       ocsp_.error = error;
586 
587       break;
588     }
589 
590     if (n == 0) {
591       break;
592     }
593 
594     std::copy_n(std::begin(buf), n, std::back_inserter(ocsp_.resp));
595   }
596 
597   ev_io_stop(loop_, &ocsp_.rev);
598 }
599 
handle_ocsp_complete()600 void ConnectionHandler::handle_ocsp_complete() {
601   ev_io_stop(loop_, &ocsp_.rev);
602   ev_child_stop(loop_, &ocsp_.chldev);
603 
604   assert(ocsp_.next < all_ssl_ctx_.size());
605 
606   auto ssl_ctx = all_ssl_ctx_[ocsp_.next];
607   auto tls_ctx_data =
608       static_cast<tls::TLSContextData *>(SSL_CTX_get_app_data(ssl_ctx));
609 
610   auto rstatus = ocsp_.chldev.rstatus;
611   auto status = WEXITSTATUS(rstatus);
612   if (ocsp_.error || !WIFEXITED(rstatus) || status != 0) {
613     LOG(WARN) << "ocsp query command for " << tls_ctx_data->cert_file
614               << " failed: error=" << ocsp_.error << ", rstatus=" << log::hex
615               << rstatus << log::dec << ", status=" << status;
616     ++ocsp_.next;
617     proceed_next_cert_ocsp();
618     return;
619   }
620 
621   if (LOG_ENABLED(INFO)) {
622     LOG(INFO) << "ocsp update for " << tls_ctx_data->cert_file
623               << " finished successfully";
624   }
625 
626   auto config = get_config();
627   auto &tlsconf = config->tls;
628 
629   if (tlsconf.ocsp.no_verify ||
630       tls::verify_ocsp_response(ssl_ctx, ocsp_.resp.data(),
631                                 ocsp_.resp.size()) == 0) {
632 #ifndef OPENSSL_IS_BORINGSSL
633 #  ifdef HAVE_ATOMIC_STD_SHARED_PTR
634     std::atomic_store_explicit(
635         &tls_ctx_data->ocsp_data,
636         std::make_shared<std::vector<uint8_t>>(std::move(ocsp_.resp)),
637         std::memory_order_release);
638 #  else  // !HAVE_ATOMIC_STD_SHARED_PTR
639     std::lock_guard<std::mutex> g(tls_ctx_data->mu);
640     tls_ctx_data->ocsp_data =
641         std::make_shared<std::vector<uint8_t>>(std::move(ocsp_.resp));
642 #  endif // !HAVE_ATOMIC_STD_SHARED_PTR
643 #else    // OPENSSL_IS_BORINGSSL
644     SSL_CTX_set_ocsp_response(ssl_ctx, ocsp_.resp.data(), ocsp_.resp.size());
645 #endif   // OPENSSL_IS_BORINGSSL
646   }
647 
648   ++ocsp_.next;
649   proceed_next_cert_ocsp();
650 }
651 
reset_ocsp()652 void ConnectionHandler::reset_ocsp() {
653   if (ocsp_.proc.rfd != -1) {
654     close(ocsp_.proc.rfd);
655   }
656 
657   ocsp_.proc.rfd = -1;
658   ocsp_.proc.pid = 0;
659   ocsp_.error = 0;
660   ocsp_.resp = std::vector<uint8_t>();
661 }
662 
proceed_next_cert_ocsp()663 void ConnectionHandler::proceed_next_cert_ocsp() {
664   for (;;) {
665     reset_ocsp();
666     if (ocsp_.next == all_ssl_ctx_.size()) {
667       ocsp_.next = 0;
668       // We have updated all ocsp response, and schedule next update.
669       ev_timer_set(&ocsp_timer_, get_config()->tls.ocsp.update_interval, 0.);
670       ev_timer_start(loop_, &ocsp_timer_);
671 
672       if (enable_acceptor_on_ocsp_completion_) {
673         enable_acceptor_on_ocsp_completion_ = false;
674         enable_acceptor();
675       }
676 
677       return;
678     }
679 
680     auto ssl_ctx = all_ssl_ctx_[ocsp_.next];
681     auto tls_ctx_data =
682         static_cast<tls::TLSContextData *>(SSL_CTX_get_app_data(ssl_ctx));
683 
684     // client SSL_CTX is also included in all_ssl_ctx_, but has no
685     // tls_ctx_data.
686     if (!tls_ctx_data) {
687       ++ocsp_.next;
688       continue;
689     }
690 
691     auto cert_file = tls_ctx_data->cert_file;
692 
693     if (start_ocsp_update(cert_file) != 0) {
694       ++ocsp_.next;
695       continue;
696     }
697 
698     break;
699   }
700 }
701 
set_tls_ticket_key_memcached_dispatcher(std::unique_ptr<MemcachedDispatcher> dispatcher)702 void ConnectionHandler::set_tls_ticket_key_memcached_dispatcher(
703     std::unique_ptr<MemcachedDispatcher> dispatcher) {
704   tls_ticket_key_memcached_dispatcher_ = std::move(dispatcher);
705 }
706 
707 MemcachedDispatcher *
get_tls_ticket_key_memcached_dispatcher() const708 ConnectionHandler::get_tls_ticket_key_memcached_dispatcher() const {
709   return tls_ticket_key_memcached_dispatcher_.get();
710 }
711 
712 // Use the similar backoff algorithm described in
713 // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md
714 namespace {
715 constexpr size_t MAX_BACKOFF_EXP = 10;
716 constexpr auto MULTIPLIER = 3.2;
717 constexpr auto JITTER = 0.2;
718 } // namespace
719 
on_tls_ticket_key_network_error(ev_timer * w)720 void ConnectionHandler::on_tls_ticket_key_network_error(ev_timer *w) {
721   if (++tls_ticket_key_memcached_get_retry_count_ >=
722       get_config()->tls.ticket.memcached.max_retry) {
723     LOG(WARN) << "Memcached: tls ticket get retry all failed "
724               << tls_ticket_key_memcached_get_retry_count_ << " times.";
725 
726     on_tls_ticket_key_not_found(w);
727     return;
728   }
729 
730   auto base_backoff = util::int_pow(
731       MULTIPLIER,
732       std::min(MAX_BACKOFF_EXP, tls_ticket_key_memcached_get_retry_count_));
733   auto dist = std::uniform_real_distribution<>(-JITTER * base_backoff,
734                                                JITTER * base_backoff);
735 
736   auto backoff = base_backoff + dist(gen_);
737 
738   LOG(WARN)
739       << "Memcached: tls ticket get failed due to network error, retrying in "
740       << backoff << " seconds";
741 
742   ev_timer_set(w, backoff, 0.);
743   ev_timer_start(loop_, w);
744 }
745 
on_tls_ticket_key_not_found(ev_timer * w)746 void ConnectionHandler::on_tls_ticket_key_not_found(ev_timer *w) {
747   tls_ticket_key_memcached_get_retry_count_ = 0;
748 
749   if (++tls_ticket_key_memcached_fail_count_ >=
750       get_config()->tls.ticket.memcached.max_fail) {
751     LOG(WARN) << "Memcached: could not get tls ticket; disable tls ticket";
752 
753     tls_ticket_key_memcached_fail_count_ = 0;
754 
755     set_ticket_keys(nullptr);
756     set_ticket_keys_to_worker(nullptr);
757   }
758 
759   LOG(WARN) << "Memcached: tls ticket get failed, schedule next";
760   schedule_next_tls_ticket_key_memcached_get(w);
761 }
762 
on_tls_ticket_key_get_success(const std::shared_ptr<TicketKeys> & ticket_keys,ev_timer * w)763 void ConnectionHandler::on_tls_ticket_key_get_success(
764     const std::shared_ptr<TicketKeys> &ticket_keys, ev_timer *w) {
765   LOG(NOTICE) << "Memcached: tls ticket get success";
766 
767   tls_ticket_key_memcached_get_retry_count_ = 0;
768   tls_ticket_key_memcached_fail_count_ = 0;
769 
770   schedule_next_tls_ticket_key_memcached_get(w);
771 
772   if (!ticket_keys || ticket_keys->keys.empty()) {
773     LOG(WARN) << "Memcached: tls ticket keys are empty; tls ticket disabled";
774     set_ticket_keys(nullptr);
775     set_ticket_keys_to_worker(nullptr);
776     return;
777   }
778 
779   if (LOG_ENABLED(INFO)) {
780     LOG(INFO) << "ticket keys get done";
781     LOG(INFO) << 0 << " enc+dec: "
782               << util::format_hex(ticket_keys->keys[0].data.name);
783     for (size_t i = 1; i < ticket_keys->keys.size(); ++i) {
784       auto &key = ticket_keys->keys[i];
785       LOG(INFO) << i << " dec: " << util::format_hex(key.data.name);
786     }
787   }
788 
789   set_ticket_keys(ticket_keys);
790   set_ticket_keys_to_worker(ticket_keys);
791 }
792 
schedule_next_tls_ticket_key_memcached_get(ev_timer * w)793 void ConnectionHandler::schedule_next_tls_ticket_key_memcached_get(
794     ev_timer *w) {
795   ev_timer_set(w, get_config()->tls.ticket.memcached.interval, 0.);
796   ev_timer_start(loop_, w);
797 }
798 
create_tls_ticket_key_memcached_ssl_ctx()799 SSL_CTX *ConnectionHandler::create_tls_ticket_key_memcached_ssl_ctx() {
800   auto config = get_config();
801   auto &tlsconf = config->tls;
802   auto &memcachedconf = config->tls.ticket.memcached;
803 
804   auto ssl_ctx = tls::create_ssl_client_context(
805 #ifdef HAVE_NEVERBLEED
806       nb_,
807 #endif // HAVE_NEVERBLEED
808       tlsconf.cacert, memcachedconf.cert_file, memcachedconf.private_key_file,
809       nullptr);
810 
811   all_ssl_ctx_.push_back(ssl_ctx);
812 
813   return ssl_ctx;
814 }
815 
816 #ifdef HAVE_NEVERBLEED
set_neverbleed(neverbleed_t * nb)817 void ConnectionHandler::set_neverbleed(neverbleed_t *nb) { nb_ = nb; }
818 #endif // HAVE_NEVERBLEED
819 
handle_serial_event()820 void ConnectionHandler::handle_serial_event() {
821   std::vector<SerialEvent> q;
822   {
823     std::lock_guard<std::mutex> g(serial_event_mu_);
824     q.swap(serial_events_);
825   }
826 
827   for (auto &sev : q) {
828     switch (sev.type) {
829     case SerialEventType::REPLACE_DOWNSTREAM:
830       // Mmake sure that none of worker uses
831       // get_config()->conn.downstream
832       mod_config()->conn.downstream = sev.downstreamconf;
833 
834       if (single_worker_) {
835         single_worker_->replace_downstream_config(sev.downstreamconf);
836 
837         break;
838       }
839 
840       worker_replace_downstream(sev.downstreamconf);
841 
842       break;
843     default:
844       break;
845     }
846   }
847 }
848 
send_replace_downstream(const std::shared_ptr<DownstreamConfig> & downstreamconf)849 void ConnectionHandler::send_replace_downstream(
850     const std::shared_ptr<DownstreamConfig> &downstreamconf) {
851   send_serial_event(
852       SerialEvent(SerialEventType::REPLACE_DOWNSTREAM, downstreamconf));
853 }
854 
send_serial_event(SerialEvent ev)855 void ConnectionHandler::send_serial_event(SerialEvent ev) {
856   {
857     std::lock_guard<std::mutex> g(serial_event_mu_);
858 
859     serial_events_.push_back(std::move(ev));
860   }
861 
862   ev_async_send(loop_, &serial_event_asyncev_);
863 }
864 
get_ssl_ctx(size_t idx) const865 SSL_CTX *ConnectionHandler::get_ssl_ctx(size_t idx) const {
866   return all_ssl_ctx_[idx];
867 }
868 
869 const std::vector<SSL_CTX *> &
get_indexed_ssl_ctx(size_t idx) const870 ConnectionHandler::get_indexed_ssl_ctx(size_t idx) const {
871   return indexed_ssl_ctx_[idx];
872 }
873 
set_enable_acceptor_on_ocsp_completion(bool f)874 void ConnectionHandler::set_enable_acceptor_on_ocsp_completion(bool f) {
875   enable_acceptor_on_ocsp_completion_ = f;
876 }
877 
878 } // namespace shrpx
879