• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * nghttp2 - HTTP/2 C Library
3  *
4  * Copyright (c) 2012 Tatsuhiro Tsujikawa
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining
7  * a copy of this software and associated documentation files (the
8  * "Software"), to deal in the Software without restriction, including
9  * without limitation the rights to use, copy, modify, merge, publish,
10  * distribute, sublicense, and/or sell copies of the Software, and to
11  * permit persons to whom the Software is furnished to do so, subject to
12  * the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be
15  * included in all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21  * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22  * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23  * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24  */
25 #include "shrpx_worker.h"
26 
27 #ifdef HAVE_UNISTD_H
28 #  include <unistd.h>
29 #endif // HAVE_UNISTD_H
30 #include <netinet/udp.h>
31 
32 #include <cstdio>
33 #include <memory>
34 
35 #include "ssl_compat.h"
36 
37 #ifdef NGHTTP2_OPENSSL_IS_WOLFSSL
38 #  include <wolfssl/options.h>
39 #  include <wolfssl/openssl/rand.h>
40 #else // !NGHTTP2_OPENSSL_IS_WOLFSSL
41 #  include <openssl/rand.h>
42 #endif // !NGHTTP2_OPENSSL_IS_WOLFSSL
43 
44 #ifdef HAVE_LIBBPF
45 #  include <bpf/bpf.h>
46 #  include <bpf/libbpf.h>
47 #endif // HAVE_LIBBPF
48 
49 #include "shrpx_tls.h"
50 #include "shrpx_log.h"
51 #include "shrpx_client_handler.h"
52 #include "shrpx_http2_session.h"
53 #include "shrpx_log_config.h"
54 #include "shrpx_memcached_dispatcher.h"
55 #ifdef HAVE_MRUBY
56 #  include "shrpx_mruby.h"
57 #endif // HAVE_MRUBY
58 #ifdef ENABLE_HTTP3
59 #  include "shrpx_quic_listener.h"
60 #endif // ENABLE_HTTP3
61 #include "shrpx_connection_handler.h"
62 #include "util.h"
63 #include "template.h"
64 #include "xsi_strerror.h"
65 
66 namespace shrpx {
67 
68 namespace {
eventcb(struct ev_loop * loop,ev_async * w,int revents)69 void eventcb(struct ev_loop *loop, ev_async *w, int revents) {
70   auto worker = static_cast<Worker *>(w->data);
71   worker->process_events();
72 }
73 } // namespace
74 
75 namespace {
mcpool_clear_cb(struct ev_loop * loop,ev_timer * w,int revents)76 void mcpool_clear_cb(struct ev_loop *loop, ev_timer *w, int revents) {
77   auto worker = static_cast<Worker *>(w->data);
78   if (worker->get_worker_stat()->num_connections != 0) {
79     return;
80   }
81   auto mcpool = worker->get_mcpool();
82   if (mcpool->freelistsize == mcpool->poolsize) {
83     worker->get_mcpool()->clear();
84   }
85 }
86 } // namespace
87 
88 namespace {
proc_wev_cb(struct ev_loop * loop,ev_timer * w,int revents)89 void proc_wev_cb(struct ev_loop *loop, ev_timer *w, int revents) {
90   auto worker = static_cast<Worker *>(w->data);
91   worker->process_events();
92 }
93 } // namespace
94 
DownstreamAddrGroup()95 DownstreamAddrGroup::DownstreamAddrGroup() : retired{false} {}
96 
~DownstreamAddrGroup()97 DownstreamAddrGroup::~DownstreamAddrGroup() {}
98 
99 // DownstreamKey is used to index SharedDownstreamAddr in order to
100 // find the same configuration.
101 using DownstreamKey = std::tuple<
102   std::vector<std::tuple<StringRef, StringRef, StringRef, size_t, size_t, Proto,
103                          uint32_t, uint32_t, uint32_t, bool, bool, bool, bool>>,
104   bool, SessionAffinity, StringRef, StringRef, SessionAffinityCookieSecure,
105   SessionAffinityCookieStickiness, int64_t, int64_t, StringRef, bool>;
106 
107 namespace {
108 DownstreamKey
create_downstream_key(const std::shared_ptr<SharedDownstreamAddr> & shared_addr,const StringRef & mruby_file)109 create_downstream_key(const std::shared_ptr<SharedDownstreamAddr> &shared_addr,
110                       const StringRef &mruby_file) {
111   DownstreamKey dkey;
112 
113   auto &addrs = std::get<0>(dkey);
114   addrs.resize(shared_addr->addrs.size());
115   auto p = std::begin(addrs);
116   for (auto &a : shared_addr->addrs) {
117     std::get<0>(*p) = a.host;
118     std::get<1>(*p) = a.sni;
119     std::get<2>(*p) = a.group;
120     std::get<3>(*p) = a.fall;
121     std::get<4>(*p) = a.rise;
122     std::get<5>(*p) = a.proto;
123     std::get<6>(*p) = a.port;
124     std::get<7>(*p) = a.weight;
125     std::get<8>(*p) = a.group_weight;
126     std::get<9>(*p) = a.host_unix;
127     std::get<10>(*p) = a.tls;
128     std::get<11>(*p) = a.dns;
129     std::get<12>(*p) = a.upgrade_scheme;
130     ++p;
131   }
132   std::sort(std::begin(addrs), std::end(addrs));
133 
134   std::get<1>(dkey) = shared_addr->redirect_if_not_tls;
135 
136   auto &affinity = shared_addr->affinity;
137   std::get<2>(dkey) = affinity.type;
138   std::get<3>(dkey) = affinity.cookie.name;
139   std::get<4>(dkey) = affinity.cookie.path;
140   std::get<5>(dkey) = affinity.cookie.secure;
141   std::get<6>(dkey) = affinity.cookie.stickiness;
142   auto &timeout = shared_addr->timeout;
143   std::get<7>(dkey) = timeout.read;
144   std::get<8>(dkey) = timeout.write;
145   std::get<9>(dkey) = mruby_file;
146   std::get<10>(dkey) = shared_addr->dnf;
147 
148   return dkey;
149 }
150 } // namespace
151 
Worker(struct ev_loop * loop,SSL_CTX * sv_ssl_ctx,SSL_CTX * cl_ssl_ctx,SSL_CTX * tls_session_cache_memcached_ssl_ctx,tls::CertLookupTree * cert_tree,SSL_CTX * quic_sv_ssl_ctx,tls::CertLookupTree * quic_cert_tree,WorkerID wid,size_t index,const std::shared_ptr<TicketKeys> & ticket_keys,ConnectionHandler * conn_handler,std::shared_ptr<DownstreamConfig> downstreamconf)152 Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
153                SSL_CTX *tls_session_cache_memcached_ssl_ctx,
154                tls::CertLookupTree *cert_tree,
155 #ifdef ENABLE_HTTP3
156                SSL_CTX *quic_sv_ssl_ctx, tls::CertLookupTree *quic_cert_tree,
157                WorkerID wid,
158 #  ifdef HAVE_LIBBPF
159                size_t index,
160 #  endif // HAVE_LIBBPF
161 #endif   // ENABLE_HTTP3
162                const std::shared_ptr<TicketKeys> &ticket_keys,
163                ConnectionHandler *conn_handler,
164                std::shared_ptr<DownstreamConfig> downstreamconf)
165   :
166 #if defined(ENABLE_HTTP3) && defined(HAVE_LIBBPF)
167     index_{index},
168 #endif // ENABLE_HTTP3 && HAVE_LIBBPF
169     randgen_(util::make_mt19937()),
170     worker_stat_{},
171     dns_tracker_(loop, get_config()->conn.downstream->family),
172 #ifdef ENABLE_HTTP3
173     worker_id_{std::move(wid)},
174     quic_upstream_addrs_{get_config()->conn.quic_listener.addrs},
175 #endif // ENABLE_HTTP3
176     loop_(loop),
177     sv_ssl_ctx_(sv_ssl_ctx),
178     cl_ssl_ctx_(cl_ssl_ctx),
179     cert_tree_(cert_tree),
180     conn_handler_(conn_handler),
181 #ifdef ENABLE_HTTP3
182     quic_sv_ssl_ctx_{quic_sv_ssl_ctx},
183     quic_cert_tree_{quic_cert_tree},
184     quic_conn_handler_{this},
185 #endif // ENABLE_HTTP3
186     ticket_keys_(ticket_keys),
187     connect_blocker_(
188       std::make_unique<ConnectBlocker>(randgen_, loop_, nullptr, nullptr)),
189     graceful_shutdown_(false) {
190   ev_async_init(&w_, eventcb);
191   w_.data = this;
192   ev_async_start(loop_, &w_);
193 
194   ev_timer_init(&mcpool_clear_timer_, mcpool_clear_cb, 0., 0.);
195   mcpool_clear_timer_.data = this;
196 
197   ev_timer_init(&proc_wev_timer_, proc_wev_cb, 0., 0.);
198   proc_wev_timer_.data = this;
199 
200   auto &session_cacheconf = get_config()->tls.session_cache;
201 
202   if (!session_cacheconf.memcached.host.empty()) {
203     session_cache_memcached_dispatcher_ = std::make_unique<MemcachedDispatcher>(
204       &session_cacheconf.memcached.addr, loop,
205       tls_session_cache_memcached_ssl_ctx,
206       StringRef{session_cacheconf.memcached.host}, &mcpool_, randgen_);
207   }
208 
209   replace_downstream_config(std::move(downstreamconf));
210 }
211 
212 namespace {
ensure_enqueue_addr(std::priority_queue<WeightGroupEntry,std::vector<WeightGroupEntry>,WeightGroupEntryGreater> & wgpq,WeightGroup * wg,DownstreamAddr * addr)213 void ensure_enqueue_addr(
214   std::priority_queue<WeightGroupEntry, std::vector<WeightGroupEntry>,
215                       WeightGroupEntryGreater> &wgpq,
216   WeightGroup *wg, DownstreamAddr *addr) {
217   uint32_t cycle;
218   if (!wg->pq.empty()) {
219     auto &top = wg->pq.top();
220     cycle = top.cycle;
221   } else {
222     cycle = 0;
223   }
224 
225   addr->cycle = cycle;
226   addr->pending_penalty = 0;
227   wg->pq.push(DownstreamAddrEntry{addr, addr->seq, addr->cycle});
228   addr->queued = true;
229 
230   if (!wg->queued) {
231     if (!wgpq.empty()) {
232       auto &top = wgpq.top();
233       cycle = top.cycle;
234     } else {
235       cycle = 0;
236     }
237 
238     wg->cycle = cycle;
239     wg->pending_penalty = 0;
240     wgpq.push(WeightGroupEntry{wg, wg->seq, wg->cycle});
241     wg->queued = true;
242   }
243 }
244 } // namespace
245 
replace_downstream_config(std::shared_ptr<DownstreamConfig> downstreamconf)246 void Worker::replace_downstream_config(
247   std::shared_ptr<DownstreamConfig> downstreamconf) {
248   for (auto &g : downstream_addr_groups_) {
249     g->retired = true;
250 
251     auto &shared_addr = g->shared_addr;
252     for (auto &addr : shared_addr->addrs) {
253       addr.dconn_pool->remove_all();
254     }
255   }
256 
257   downstreamconf_ = downstreamconf;
258 
259   // Making a copy is much faster with multiple thread on
260   // backendconfig API call.
261   auto groups = downstreamconf->addr_groups;
262 
263   downstream_addr_groups_ =
264     std::vector<std::shared_ptr<DownstreamAddrGroup>>(groups.size());
265 
266   std::map<DownstreamKey, size_t> addr_groups_indexer;
267 #ifdef HAVE_MRUBY
268   // TODO It is a bit less efficient because
269   // mruby::create_mruby_context returns std::unique_ptr and we cannot
270   // use std::make_shared.
271   std::map<StringRef, std::shared_ptr<mruby::MRubyContext>> shared_mruby_ctxs;
272 #endif // HAVE_MRUBY
273 
274   for (size_t i = 0; i < groups.size(); ++i) {
275     auto &src = groups[i];
276     auto &dst = downstream_addr_groups_[i];
277 
278     dst = std::make_shared<DownstreamAddrGroup>();
279     dst->pattern =
280       ImmutableString{std::begin(src.pattern), std::end(src.pattern)};
281 
282     auto shared_addr = std::make_shared<SharedDownstreamAddr>();
283 
284     shared_addr->addrs.resize(src.addrs.size());
285     shared_addr->affinity.type = src.affinity.type;
286     if (src.affinity.type == SessionAffinity::COOKIE) {
287       shared_addr->affinity.cookie.name =
288         make_string_ref(shared_addr->balloc, src.affinity.cookie.name);
289       if (!src.affinity.cookie.path.empty()) {
290         shared_addr->affinity.cookie.path =
291           make_string_ref(shared_addr->balloc, src.affinity.cookie.path);
292       }
293       shared_addr->affinity.cookie.secure = src.affinity.cookie.secure;
294       shared_addr->affinity.cookie.stickiness = src.affinity.cookie.stickiness;
295     }
296     shared_addr->affinity_hash = src.affinity_hash;
297     shared_addr->affinity_hash_map = src.affinity_hash_map;
298     shared_addr->redirect_if_not_tls = src.redirect_if_not_tls;
299     shared_addr->dnf = src.dnf;
300     shared_addr->timeout.read = src.timeout.read;
301     shared_addr->timeout.write = src.timeout.write;
302 
303     for (size_t j = 0; j < src.addrs.size(); ++j) {
304       auto &src_addr = src.addrs[j];
305       auto &dst_addr = shared_addr->addrs[j];
306 
307       dst_addr.addr = src_addr.addr;
308       dst_addr.host = make_string_ref(shared_addr->balloc, src_addr.host);
309       dst_addr.hostport =
310         make_string_ref(shared_addr->balloc, src_addr.hostport);
311       dst_addr.port = src_addr.port;
312       dst_addr.host_unix = src_addr.host_unix;
313       dst_addr.weight = src_addr.weight;
314       dst_addr.group = make_string_ref(shared_addr->balloc, src_addr.group);
315       dst_addr.group_weight = src_addr.group_weight;
316       dst_addr.affinity_hash = src_addr.affinity_hash;
317       dst_addr.proto = src_addr.proto;
318       dst_addr.tls = src_addr.tls;
319       dst_addr.sni = make_string_ref(shared_addr->balloc, src_addr.sni);
320       dst_addr.fall = src_addr.fall;
321       dst_addr.rise = src_addr.rise;
322       dst_addr.dns = src_addr.dns;
323       dst_addr.upgrade_scheme = src_addr.upgrade_scheme;
324     }
325 
326 #ifdef HAVE_MRUBY
327     auto mruby_ctx_it = shared_mruby_ctxs.find(src.mruby_file);
328     if (mruby_ctx_it == std::end(shared_mruby_ctxs)) {
329       shared_addr->mruby_ctx = mruby::create_mruby_context(src.mruby_file);
330       assert(shared_addr->mruby_ctx);
331       shared_mruby_ctxs.emplace(src.mruby_file, shared_addr->mruby_ctx);
332     } else {
333       shared_addr->mruby_ctx = (*mruby_ctx_it).second;
334     }
335 #endif // HAVE_MRUBY
336 
337     // share the connection if patterns have the same set of backend
338     // addresses.
339 
340     auto dkey = create_downstream_key(shared_addr, src.mruby_file);
341     auto it = addr_groups_indexer.find(dkey);
342 
343     if (it == std::end(addr_groups_indexer)) {
344       auto shared_addr_ptr = shared_addr.get();
345 
346       for (auto &addr : shared_addr->addrs) {
347         addr.connect_blocker = std::make_unique<ConnectBlocker>(
348           randgen_, loop_, nullptr, [shared_addr_ptr, &addr]() {
349             if (!addr.queued) {
350               if (!addr.wg) {
351                 return;
352               }
353               ensure_enqueue_addr(shared_addr_ptr->pq, addr.wg, &addr);
354             }
355           });
356 
357         addr.live_check = std::make_unique<LiveCheck>(loop_, cl_ssl_ctx_, this,
358                                                       &addr, randgen_);
359       }
360 
361       size_t seq = 0;
362       for (auto &addr : shared_addr->addrs) {
363         addr.dconn_pool = std::make_unique<DownstreamConnectionPool>();
364         addr.seq = seq++;
365       }
366 
367       util::shuffle(std::begin(shared_addr->addrs),
368                     std::end(shared_addr->addrs), randgen_,
369                     [](auto i, auto j) { std::swap((*i).seq, (*j).seq); });
370 
371       if (shared_addr->affinity.type == SessionAffinity::NONE) {
372         std::map<StringRef, WeightGroup *> wgs;
373         size_t num_wgs = 0;
374         for (auto &addr : shared_addr->addrs) {
375           if (wgs.find(addr.group) == std::end(wgs)) {
376             ++num_wgs;
377             wgs.emplace(addr.group, nullptr);
378           }
379         }
380 
381         shared_addr->wgs = std::vector<WeightGroup>(num_wgs);
382 
383         for (auto &addr : shared_addr->addrs) {
384           auto &wg = wgs[addr.group];
385           if (wg == nullptr) {
386             wg = &shared_addr->wgs[--num_wgs];
387             wg->seq = num_wgs;
388           }
389 
390           wg->weight = addr.group_weight;
391           wg->pq.push(DownstreamAddrEntry{&addr, addr.seq, addr.cycle});
392           addr.queued = true;
393           addr.wg = wg;
394         }
395 
396         assert(num_wgs == 0);
397 
398         for (auto &kv : wgs) {
399           shared_addr->pq.push(
400             WeightGroupEntry{kv.second, kv.second->seq, kv.second->cycle});
401           kv.second->queued = true;
402         }
403       }
404 
405       dst->shared_addr = std::move(shared_addr);
406 
407       addr_groups_indexer.emplace(std::move(dkey), i);
408     } else {
409       auto &g = *(std::begin(downstream_addr_groups_) + (*it).second);
410       if (LOG_ENABLED(INFO)) {
411         LOG(INFO) << dst->pattern << " shares the same backend group with "
412                   << g->pattern;
413       }
414       dst->shared_addr = g->shared_addr;
415     }
416   }
417 }
418 
~Worker()419 Worker::~Worker() {
420   ev_async_stop(loop_, &w_);
421   ev_timer_stop(loop_, &mcpool_clear_timer_);
422   ev_timer_stop(loop_, &proc_wev_timer_);
423 }
424 
schedule_clear_mcpool()425 void Worker::schedule_clear_mcpool() {
426   // libev manual says: "If the watcher is already active nothing will
427   // happen."  Since we don't change any timeout here, we don't have
428   // to worry about querying ev_is_active.
429   ev_timer_start(loop_, &mcpool_clear_timer_);
430 }
431 
wait()432 void Worker::wait() {
433 #ifndef NOTHREADS
434   fut_.get();
435 #endif // !NOTHREADS
436 }
437 
run_async()438 void Worker::run_async() {
439 #ifndef NOTHREADS
440   fut_ = std::async(std::launch::async, [this] {
441     (void)reopen_log_files(get_config()->logging);
442     ev_run(loop_);
443     delete_log_config();
444 
445 #  ifdef NGHTTP2_OPENSSL_IS_WOLFSSL
446     wc_ecc_fp_free();
447 #  endif // NGHTTP2_OPENSSL_IS_WOLFSSL
448   });
449 #endif // !NOTHREADS
450 }
451 
send(WorkerEvent event)452 void Worker::send(WorkerEvent event) {
453   {
454     std::lock_guard<std::mutex> g(m_);
455 
456     q_.emplace_back(std::move(event));
457   }
458 
459   ev_async_send(loop_, &w_);
460 }
461 
process_events()462 void Worker::process_events() {
463   WorkerEvent wev;
464   {
465     std::lock_guard<std::mutex> g(m_);
466 
467     // Process event one at a time.  This is important for
468     // WorkerEventType::NEW_CONNECTION event since accepting large
469     // number of new connections at once may delay time to 1st byte
470     // for existing connections.
471 
472     if (q_.empty()) {
473       ev_timer_stop(loop_, &proc_wev_timer_);
474       return;
475     }
476 
477     wev = std::move(q_.front());
478     q_.pop_front();
479   }
480 
481   ev_timer_start(loop_, &proc_wev_timer_);
482 
483   auto config = get_config();
484 
485   auto worker_connections = config->conn.upstream.worker_connections;
486 
487   switch (wev.type) {
488   case WorkerEventType::NEW_CONNECTION: {
489     if (LOG_ENABLED(INFO)) {
490       WLOG(INFO, this) << "WorkerEvent: client_fd=" << wev.client_fd
491                        << ", addrlen=" << wev.client_addrlen;
492     }
493 
494     if (worker_stat_.num_connections >= worker_connections) {
495       if (LOG_ENABLED(INFO)) {
496         WLOG(INFO, this) << "Too many connections >= " << worker_connections;
497       }
498 
499       close(wev.client_fd);
500 
501       break;
502     }
503 
504     auto client_handler = tls::accept_connection(
505       this, wev.client_fd, &wev.client_addr.sa, wev.client_addrlen, wev.faddr);
506     if (!client_handler) {
507       if (LOG_ENABLED(INFO)) {
508         WLOG(ERROR, this) << "ClientHandler creation failed";
509       }
510       close(wev.client_fd);
511       break;
512     }
513 
514     if (LOG_ENABLED(INFO)) {
515       WLOG(INFO, this) << "CLIENT_HANDLER:" << client_handler << " created";
516     }
517 
518     break;
519   }
520   case WorkerEventType::REOPEN_LOG:
521     WLOG(NOTICE, this) << "Reopening log files: worker process (thread " << this
522                        << ")";
523 
524     reopen_log_files(config->logging);
525 
526     break;
527   case WorkerEventType::GRACEFUL_SHUTDOWN:
528     WLOG(NOTICE, this) << "Graceful shutdown commencing";
529 
530     graceful_shutdown_ = true;
531 
532     if (worker_stat_.num_connections == 0 &&
533         worker_stat_.num_close_waits == 0) {
534       ev_break(loop_);
535 
536       return;
537     }
538 
539     break;
540   case WorkerEventType::REPLACE_DOWNSTREAM:
541     WLOG(NOTICE, this) << "Replace downstream";
542 
543     replace_downstream_config(wev.downstreamconf);
544 
545     break;
546 #ifdef ENABLE_HTTP3
547   case WorkerEventType::QUIC_PKT_FORWARD: {
548     const UpstreamAddr *faddr;
549 
550     if (wev.quic_pkt->upstream_addr_index == static_cast<size_t>(-1)) {
551       faddr = find_quic_upstream_addr(wev.quic_pkt->local_addr);
552       if (faddr == nullptr) {
553         LOG(ERROR) << "No suitable upstream address found";
554 
555         break;
556       }
557     } else if (quic_upstream_addrs_.size() <=
558                wev.quic_pkt->upstream_addr_index) {
559       LOG(ERROR) << "upstream_addr_index is too large";
560 
561       break;
562     } else {
563       faddr = &quic_upstream_addrs_[wev.quic_pkt->upstream_addr_index];
564     }
565 
566     quic_conn_handler_.handle_packet(faddr, wev.quic_pkt->remote_addr,
567                                      wev.quic_pkt->local_addr, wev.quic_pkt->pi,
568                                      wev.quic_pkt->data);
569 
570     break;
571   }
572 #endif // ENABLE_HTTP3
573   default:
574     if (LOG_ENABLED(INFO)) {
575       WLOG(INFO, this) << "unknown event type " << static_cast<int>(wev.type);
576     }
577   }
578 }
579 
get_cert_lookup_tree() const580 tls::CertLookupTree *Worker::get_cert_lookup_tree() const { return cert_tree_; }
581 
582 #ifdef ENABLE_HTTP3
get_quic_cert_lookup_tree() const583 tls::CertLookupTree *Worker::get_quic_cert_lookup_tree() const {
584   return quic_cert_tree_;
585 }
586 #endif // ENABLE_HTTP3
587 
get_ticket_keys()588 std::shared_ptr<TicketKeys> Worker::get_ticket_keys() {
589 #ifdef HAVE_ATOMIC_STD_SHARED_PTR
590   return ticket_keys_.load(std::memory_order_acquire);
591 #else  // !HAVE_ATOMIC_STD_SHARED_PTR
592   std::lock_guard<std::mutex> g(ticket_keys_m_);
593   return ticket_keys_;
594 #endif // !HAVE_ATOMIC_STD_SHARED_PTR
595 }
596 
set_ticket_keys(std::shared_ptr<TicketKeys> ticket_keys)597 void Worker::set_ticket_keys(std::shared_ptr<TicketKeys> ticket_keys) {
598 #ifdef HAVE_ATOMIC_STD_SHARED_PTR
599   // This is single writer
600   ticket_keys_.store(std::move(ticket_keys), std::memory_order_release);
601 #else  // !HAVE_ATOMIC_STD_SHARED_PTR
602   std::lock_guard<std::mutex> g(ticket_keys_m_);
603   ticket_keys_ = std::move(ticket_keys);
604 #endif // !HAVE_ATOMIC_STD_SHARED_PTR
605 }
606 
get_worker_stat()607 WorkerStat *Worker::get_worker_stat() { return &worker_stat_; }
608 
get_loop() const609 struct ev_loop *Worker::get_loop() const { return loop_; }
610 
get_sv_ssl_ctx() const611 SSL_CTX *Worker::get_sv_ssl_ctx() const { return sv_ssl_ctx_; }
612 
get_cl_ssl_ctx() const613 SSL_CTX *Worker::get_cl_ssl_ctx() const { return cl_ssl_ctx_; }
614 
615 #ifdef ENABLE_HTTP3
get_quic_sv_ssl_ctx() const616 SSL_CTX *Worker::get_quic_sv_ssl_ctx() const { return quic_sv_ssl_ctx_; }
617 #endif // ENABLE_HTTP3
618 
set_graceful_shutdown(bool f)619 void Worker::set_graceful_shutdown(bool f) { graceful_shutdown_ = f; }
620 
get_graceful_shutdown() const621 bool Worker::get_graceful_shutdown() const { return graceful_shutdown_; }
622 
get_mcpool()623 MemchunkPool *Worker::get_mcpool() { return &mcpool_; }
624 
get_session_cache_memcached_dispatcher()625 MemcachedDispatcher *Worker::get_session_cache_memcached_dispatcher() {
626   return session_cache_memcached_dispatcher_.get();
627 }
628 
get_randgen()629 std::mt19937 &Worker::get_randgen() { return randgen_; }
630 
631 #ifdef HAVE_MRUBY
create_mruby_context()632 int Worker::create_mruby_context() {
633   mruby_ctx_ = mruby::create_mruby_context(StringRef{get_config()->mruby_file});
634   if (!mruby_ctx_) {
635     return -1;
636   }
637 
638   return 0;
639 }
640 
get_mruby_context() const641 mruby::MRubyContext *Worker::get_mruby_context() const {
642   return mruby_ctx_.get();
643 }
644 #endif // HAVE_MRUBY
645 
646 std::vector<std::shared_ptr<DownstreamAddrGroup>> &
get_downstream_addr_groups()647 Worker::get_downstream_addr_groups() {
648   return downstream_addr_groups_;
649 }
650 
get_connect_blocker() const651 ConnectBlocker *Worker::get_connect_blocker() const {
652   return connect_blocker_.get();
653 }
654 
get_downstream_config() const655 const DownstreamConfig *Worker::get_downstream_config() const {
656   return downstreamconf_.get();
657 }
658 
get_connection_handler() const659 ConnectionHandler *Worker::get_connection_handler() const {
660   return conn_handler_;
661 }
662 
663 #ifdef ENABLE_HTTP3
get_quic_connection_handler()664 QUICConnectionHandler *Worker::get_quic_connection_handler() {
665   return &quic_conn_handler_;
666 }
667 #endif // ENABLE_HTTP3
668 
get_dns_tracker()669 DNSTracker *Worker::get_dns_tracker() { return &dns_tracker_; }
670 
671 #ifdef ENABLE_HTTP3
672 #  ifdef HAVE_LIBBPF
should_attach_bpf() const673 bool Worker::should_attach_bpf() const {
674   auto config = get_config();
675   auto &quicconf = config->quic;
676   auto &apiconf = config->api;
677 
678   if (quicconf.bpf.disabled) {
679     return false;
680   }
681 
682   if (!config->single_thread && apiconf.enabled) {
683     return index_ == 1;
684   }
685 
686   return index_ == 0;
687 }
688 
should_update_bpf_map() const689 bool Worker::should_update_bpf_map() const {
690   auto config = get_config();
691   auto &quicconf = config->quic;
692 
693   return !quicconf.bpf.disabled;
694 }
695 
compute_sk_index() const696 uint32_t Worker::compute_sk_index() const {
697   auto config = get_config();
698   auto &apiconf = config->api;
699 
700   if (!config->single_thread && apiconf.enabled) {
701     return index_ - 1;
702   }
703 
704   return index_;
705 }
706 #  endif // HAVE_LIBBPF
707 
setup_quic_server_socket()708 int Worker::setup_quic_server_socket() {
709   size_t n = 0;
710 
711   for (auto &addr : quic_upstream_addrs_) {
712     assert(!addr.host_unix);
713     if (create_quic_server_socket(addr) != 0) {
714       return -1;
715     }
716 
717     // Make sure that each endpoint has a unique address.
718     for (size_t i = 0; i < n; ++i) {
719       const auto &a = quic_upstream_addrs_[i];
720 
721       if (addr.hostport == a.hostport) {
722         LOG(FATAL)
723           << "QUIC frontend endpoint must be unique: a duplicate found for "
724           << addr.hostport;
725 
726         return -1;
727       }
728     }
729 
730     ++n;
731 
732     quic_listeners_.emplace_back(std::make_unique<QUICListener>(&addr, this));
733   }
734 
735   return 0;
736 }
737 
738 #  ifdef HAVE_LIBBPF
739 namespace {
740 // https://github.com/kokke/tiny-AES-c
741 //
742 // License is Public Domain.
743 // Commit hash: 12e7744b4919e9d55de75b7ab566326a1c8e7a67
744 
745 // The number of columns comprising a state in AES. This is a constant
746 // in AES. Value=4
747 #    define Nb 4
748 
749 #    define Nk 4  // The number of 32 bit words in a key.
750 #    define Nr 10 // The number of rounds in AES Cipher.
751 
752 // The lookup-tables are marked const so they can be placed in
753 // read-only storage instead of RAM The numbers below can be computed
754 // dynamically trading ROM for RAM - This can be useful in (embedded)
755 // bootloader applications, where ROM is often limited.
756 const uint8_t sbox[256] = {
757   // 0 1 2 3 4 5 6 7 8 9 A B C D E F
758   0x63, 0x7c, 0x77, 0x7b, 0xf2, 0x6b, 0x6f, 0xc5, 0x30, 0x01, 0x67, 0x2b, 0xfe,
759   0xd7, 0xab, 0x76, 0xca, 0x82, 0xc9, 0x7d, 0xfa, 0x59, 0x47, 0xf0, 0xad, 0xd4,
760   0xa2, 0xaf, 0x9c, 0xa4, 0x72, 0xc0, 0xb7, 0xfd, 0x93, 0x26, 0x36, 0x3f, 0xf7,
761   0xcc, 0x34, 0xa5, 0xe5, 0xf1, 0x71, 0xd8, 0x31, 0x15, 0x04, 0xc7, 0x23, 0xc3,
762   0x18, 0x96, 0x05, 0x9a, 0x07, 0x12, 0x80, 0xe2, 0xeb, 0x27, 0xb2, 0x75, 0x09,
763   0x83, 0x2c, 0x1a, 0x1b, 0x6e, 0x5a, 0xa0, 0x52, 0x3b, 0xd6, 0xb3, 0x29, 0xe3,
764   0x2f, 0x84, 0x53, 0xd1, 0x00, 0xed, 0x20, 0xfc, 0xb1, 0x5b, 0x6a, 0xcb, 0xbe,
765   0x39, 0x4a, 0x4c, 0x58, 0xcf, 0xd0, 0xef, 0xaa, 0xfb, 0x43, 0x4d, 0x33, 0x85,
766   0x45, 0xf9, 0x02, 0x7f, 0x50, 0x3c, 0x9f, 0xa8, 0x51, 0xa3, 0x40, 0x8f, 0x92,
767   0x9d, 0x38, 0xf5, 0xbc, 0xb6, 0xda, 0x21, 0x10, 0xff, 0xf3, 0xd2, 0xcd, 0x0c,
768   0x13, 0xec, 0x5f, 0x97, 0x44, 0x17, 0xc4, 0xa7, 0x7e, 0x3d, 0x64, 0x5d, 0x19,
769   0x73, 0x60, 0x81, 0x4f, 0xdc, 0x22, 0x2a, 0x90, 0x88, 0x46, 0xee, 0xb8, 0x14,
770   0xde, 0x5e, 0x0b, 0xdb, 0xe0, 0x32, 0x3a, 0x0a, 0x49, 0x06, 0x24, 0x5c, 0xc2,
771   0xd3, 0xac, 0x62, 0x91, 0x95, 0xe4, 0x79, 0xe7, 0xc8, 0x37, 0x6d, 0x8d, 0xd5,
772   0x4e, 0xa9, 0x6c, 0x56, 0xf4, 0xea, 0x65, 0x7a, 0xae, 0x08, 0xba, 0x78, 0x25,
773   0x2e, 0x1c, 0xa6, 0xb4, 0xc6, 0xe8, 0xdd, 0x74, 0x1f, 0x4b, 0xbd, 0x8b, 0x8a,
774   0x70, 0x3e, 0xb5, 0x66, 0x48, 0x03, 0xf6, 0x0e, 0x61, 0x35, 0x57, 0xb9, 0x86,
775   0xc1, 0x1d, 0x9e, 0xe1, 0xf8, 0x98, 0x11, 0x69, 0xd9, 0x8e, 0x94, 0x9b, 0x1e,
776   0x87, 0xe9, 0xce, 0x55, 0x28, 0xdf, 0x8c, 0xa1, 0x89, 0x0d, 0xbf, 0xe6, 0x42,
777   0x68, 0x41, 0x99, 0x2d, 0x0f, 0xb0, 0x54, 0xbb, 0x16};
778 
779 #    define getSBoxValue(num) (sbox[(num)])
780 
781 // The round constant word array, Rcon[i], contains the values given
782 // by x to the power (i-1) being powers of x (x is denoted as {02}) in
783 // the field GF(2^8)
784 const uint8_t Rcon[11] = {0x8d, 0x01, 0x02, 0x04, 0x08, 0x10,
785                           0x20, 0x40, 0x80, 0x1b, 0x36};
786 
787 // This function produces Nb(Nr+1) round keys. The round keys are used
788 // in each round to decrypt the states.
KeyExpansion(uint8_t * RoundKey,const uint8_t * Key)789 void KeyExpansion(uint8_t *RoundKey, const uint8_t *Key) {
790   unsigned i, j, k;
791   uint8_t tempa[4]; // Used for the column/row operations
792 
793   // The first round key is the key itself.
794   for (i = 0; i < Nk; ++i) {
795     RoundKey[(i * 4) + 0] = Key[(i * 4) + 0];
796     RoundKey[(i * 4) + 1] = Key[(i * 4) + 1];
797     RoundKey[(i * 4) + 2] = Key[(i * 4) + 2];
798     RoundKey[(i * 4) + 3] = Key[(i * 4) + 3];
799   }
800 
801   // All other round keys are found from the previous round keys.
802   for (i = Nk; i < Nb * (Nr + 1); ++i) {
803     {
804       k = (i - 1) * 4;
805       tempa[0] = RoundKey[k + 0];
806       tempa[1] = RoundKey[k + 1];
807       tempa[2] = RoundKey[k + 2];
808       tempa[3] = RoundKey[k + 3];
809     }
810 
811     if (i % Nk == 0) {
812       // This function shifts the 4 bytes in a word to the left once.
813       // [a0,a1,a2,a3] becomes [a1,a2,a3,a0]
814 
815       // Function RotWord()
816       {
817         const uint8_t u8tmp = tempa[0];
818         tempa[0] = tempa[1];
819         tempa[1] = tempa[2];
820         tempa[2] = tempa[3];
821         tempa[3] = u8tmp;
822       }
823 
824       // SubWord() is a function that takes a four-byte input word and
825       // applies the S-box to each of the four bytes to produce an
826       // output word.
827 
828       // Function Subword()
829       {
830         tempa[0] = getSBoxValue(tempa[0]);
831         tempa[1] = getSBoxValue(tempa[1]);
832         tempa[2] = getSBoxValue(tempa[2]);
833         tempa[3] = getSBoxValue(tempa[3]);
834       }
835 
836       tempa[0] = tempa[0] ^ Rcon[i / Nk];
837     }
838     j = i * 4;
839     k = (i - Nk) * 4;
840     RoundKey[j + 0] = RoundKey[k + 0] ^ tempa[0];
841     RoundKey[j + 1] = RoundKey[k + 1] ^ tempa[1];
842     RoundKey[j + 2] = RoundKey[k + 2] ^ tempa[2];
843     RoundKey[j + 3] = RoundKey[k + 3] ^ tempa[3];
844   }
845 }
846 } // namespace
847 #  endif // HAVE_LIBBPF
848 
create_quic_server_socket(UpstreamAddr & faddr)849 int Worker::create_quic_server_socket(UpstreamAddr &faddr) {
850   std::array<char, STRERROR_BUFSIZE> errbuf;
851   int fd = -1;
852   int rv;
853 
854   auto service = util::utos(faddr.port);
855   addrinfo hints{};
856   hints.ai_family = faddr.family;
857   hints.ai_socktype = SOCK_DGRAM;
858   hints.ai_flags = AI_PASSIVE;
859 #  ifdef AI_ADDRCONFIG
860   hints.ai_flags |= AI_ADDRCONFIG;
861 #  endif // AI_ADDRCONFIG
862 
863   auto node = faddr.host == "*"_sr ? nullptr : faddr.host.data();
864 
865   addrinfo *res, *rp;
866   rv = getaddrinfo(node, service.c_str(), &hints, &res);
867 #  ifdef AI_ADDRCONFIG
868   if (rv != 0) {
869     // Retry without AI_ADDRCONFIG
870     hints.ai_flags &= ~AI_ADDRCONFIG;
871     rv = getaddrinfo(node, service.c_str(), &hints, &res);
872   }
873 #  endif // AI_ADDRCONFIG
874   if (rv != 0) {
875     LOG(FATAL) << "Unable to get IPv" << (faddr.family == AF_INET ? "4" : "6")
876                << " address for " << faddr.host << ", port " << faddr.port
877                << ": " << gai_strerror(rv);
878     return -1;
879   }
880 
881   auto res_d = defer(freeaddrinfo, res);
882 
883   std::array<char, NI_MAXHOST> host;
884 
885   for (rp = res; rp; rp = rp->ai_next) {
886     rv = getnameinfo(rp->ai_addr, rp->ai_addrlen, host.data(), host.size(),
887                      nullptr, 0, NI_NUMERICHOST);
888     if (rv != 0) {
889       LOG(WARN) << "getnameinfo() failed: " << gai_strerror(rv);
890       continue;
891     }
892 
893 #  ifdef SOCK_NONBLOCK
894     fd = socket(rp->ai_family, rp->ai_socktype | SOCK_NONBLOCK | SOCK_CLOEXEC,
895                 rp->ai_protocol);
896     if (fd == -1) {
897       auto error = errno;
898       LOG(WARN) << "socket() syscall failed: "
899                 << xsi_strerror(error, errbuf.data(), errbuf.size());
900       continue;
901     }
902 #  else  // !SOCK_NONBLOCK
903     fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
904     if (fd == -1) {
905       auto error = errno;
906       LOG(WARN) << "socket() syscall failed: "
907                 << xsi_strerror(error, errbuf.data(), errbuf.size());
908       continue;
909     }
910     util::make_socket_nonblocking(fd);
911     util::make_socket_closeonexec(fd);
912 #  endif // !SOCK_NONBLOCK
913 
914     int val = 1;
915     if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val,
916                    static_cast<socklen_t>(sizeof(val))) == -1) {
917       auto error = errno;
918       LOG(WARN) << "Failed to set SO_REUSEADDR option to listener socket: "
919                 << xsi_strerror(error, errbuf.data(), errbuf.size());
920       close(fd);
921       continue;
922     }
923 
924     if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &val,
925                    static_cast<socklen_t>(sizeof(val))) == -1) {
926       auto error = errno;
927       LOG(WARN) << "Failed to set SO_REUSEPORT option to listener socket: "
928                 << xsi_strerror(error, errbuf.data(), errbuf.size());
929       close(fd);
930       continue;
931     }
932 
933     if (faddr.family == AF_INET6) {
934 #  ifdef IPV6_V6ONLY
935       if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val,
936                      static_cast<socklen_t>(sizeof(val))) == -1) {
937         auto error = errno;
938         LOG(WARN) << "Failed to set IPV6_V6ONLY option to listener socket: "
939                   << xsi_strerror(error, errbuf.data(), errbuf.size());
940         close(fd);
941         continue;
942       }
943 #  endif // IPV6_V6ONLY
944 
945       if (setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &val,
946                      static_cast<socklen_t>(sizeof(val))) == -1) {
947         auto error = errno;
948         LOG(WARN)
949           << "Failed to set IPV6_RECVPKTINFO option to listener socket: "
950           << xsi_strerror(error, errbuf.data(), errbuf.size());
951         close(fd);
952         continue;
953       }
954 
955       if (setsockopt(fd, IPPROTO_IPV6, IPV6_RECVTCLASS, &val,
956                      static_cast<socklen_t>(sizeof(val))) == -1) {
957         auto error = errno;
958         LOG(WARN) << "Failed to set IPV6_RECVTCLASS option to listener socket: "
959                   << xsi_strerror(error, errbuf.data(), errbuf.size());
960         close(fd);
961         continue;
962       }
963 
964 #  if defined(IPV6_MTU_DISCOVER) && defined(IPV6_PMTUDISC_DO)
965       int mtu_disc = IPV6_PMTUDISC_DO;
966       if (setsockopt(fd, IPPROTO_IPV6, IPV6_MTU_DISCOVER, &mtu_disc,
967                      static_cast<socklen_t>(sizeof(mtu_disc))) == -1) {
968         auto error = errno;
969         LOG(WARN)
970           << "Failed to set IPV6_MTU_DISCOVER option to listener socket: "
971           << xsi_strerror(error, errbuf.data(), errbuf.size());
972         close(fd);
973         continue;
974       }
975 #  endif // defined(IPV6_MTU_DISCOVER) && defined(IP_PMTUDISC_DO)
976     } else {
977       if (setsockopt(fd, IPPROTO_IP, IP_PKTINFO, &val,
978                      static_cast<socklen_t>(sizeof(val))) == -1) {
979         auto error = errno;
980         LOG(WARN) << "Failed to set IP_PKTINFO option to listener socket: "
981                   << xsi_strerror(error, errbuf.data(), errbuf.size());
982         close(fd);
983         continue;
984       }
985 
986       if (setsockopt(fd, IPPROTO_IP, IP_RECVTOS, &val,
987                      static_cast<socklen_t>(sizeof(val))) == -1) {
988         auto error = errno;
989         LOG(WARN) << "Failed to set IP_RECVTOS option to listener socket: "
990                   << xsi_strerror(error, errbuf.data(), errbuf.size());
991         close(fd);
992         continue;
993       }
994 
995 #  if defined(IP_MTU_DISCOVER) && defined(IP_PMTUDISC_DO)
996       int mtu_disc = IP_PMTUDISC_DO;
997       if (setsockopt(fd, IPPROTO_IP, IP_MTU_DISCOVER, &mtu_disc,
998                      static_cast<socklen_t>(sizeof(mtu_disc))) == -1) {
999         auto error = errno;
1000         LOG(WARN) << "Failed to set IP_MTU_DISCOVER option to listener socket: "
1001                   << xsi_strerror(error, errbuf.data(), errbuf.size());
1002         close(fd);
1003         continue;
1004       }
1005 #  endif // defined(IP_MTU_DISCOVER) && defined(IP_PMTUDISC_DO)
1006     }
1007 
1008 #  ifdef UDP_GRO
1009     if (setsockopt(fd, IPPROTO_UDP, UDP_GRO, &val, sizeof(val)) == -1) {
1010       auto error = errno;
1011       LOG(WARN) << "Failed to set UDP_GRO option to listener socket: "
1012                 << xsi_strerror(error, errbuf.data(), errbuf.size());
1013       close(fd);
1014       continue;
1015     }
1016 #  endif // UDP_GRO
1017 
1018     if (bind(fd, rp->ai_addr, rp->ai_addrlen) == -1) {
1019       auto error = errno;
1020       LOG(WARN) << "bind() syscall failed: "
1021                 << xsi_strerror(error, errbuf.data(), errbuf.size());
1022       close(fd);
1023       continue;
1024     }
1025 
1026 #  ifdef HAVE_LIBBPF
1027     auto config = get_config();
1028 
1029     auto &quic_bpf_refs = conn_handler_->get_quic_bpf_refs();
1030 
1031     if (should_attach_bpf()) {
1032       auto &bpfconf = config->quic.bpf;
1033 
1034       auto obj = bpf_object__open_file(bpfconf.prog_file.data(), nullptr);
1035       if (!obj) {
1036         auto error = errno;
1037         LOG(FATAL) << "Failed to open bpf object file: "
1038                    << xsi_strerror(error, errbuf.data(), errbuf.size());
1039         close(fd);
1040         return -1;
1041       }
1042 
1043       rv = bpf_object__load(obj);
1044       if (rv != 0) {
1045         auto error = errno;
1046         LOG(FATAL) << "Failed to load bpf object file: "
1047                    << xsi_strerror(error, errbuf.data(), errbuf.size());
1048         close(fd);
1049         return -1;
1050       }
1051 
1052       auto prog = bpf_object__find_program_by_name(obj, "select_reuseport");
1053       if (!prog) {
1054         auto error = errno;
1055         LOG(FATAL) << "Failed to find sk_reuseport program: "
1056                    << xsi_strerror(error, errbuf.data(), errbuf.size());
1057         close(fd);
1058         return -1;
1059       }
1060 
1061       auto &ref = quic_bpf_refs[faddr.index];
1062 
1063       ref.obj = obj;
1064 
1065       ref.reuseport_array =
1066         bpf_object__find_map_by_name(obj, "reuseport_array");
1067       if (!ref.reuseport_array) {
1068         auto error = errno;
1069         LOG(FATAL) << "Failed to get reuseport_array: "
1070                    << xsi_strerror(error, errbuf.data(), errbuf.size());
1071         close(fd);
1072         return -1;
1073       }
1074 
1075       ref.worker_id_map = bpf_object__find_map_by_name(obj, "worker_id_map");
1076       if (!ref.worker_id_map) {
1077         auto error = errno;
1078         LOG(FATAL) << "Failed to get worker_id_map: "
1079                    << xsi_strerror(error, errbuf.data(), errbuf.size());
1080         close(fd);
1081         return -1;
1082       }
1083 
1084       auto sk_info = bpf_object__find_map_by_name(obj, "sk_info");
1085       if (!sk_info) {
1086         auto error = errno;
1087         LOG(FATAL) << "Failed to get sk_info: "
1088                    << xsi_strerror(error, errbuf.data(), errbuf.size());
1089         close(fd);
1090         return -1;
1091       }
1092 
1093       constexpr uint32_t zero = 0;
1094       uint64_t num_socks = config->num_worker;
1095 
1096       rv = bpf_map__update_elem(sk_info, &zero, sizeof(zero), &num_socks,
1097                                 sizeof(num_socks), BPF_ANY);
1098       if (rv != 0) {
1099         auto error = errno;
1100         LOG(FATAL) << "Failed to update sk_info: "
1101                    << xsi_strerror(error, errbuf.data(), errbuf.size());
1102         close(fd);
1103         return -1;
1104       }
1105 
1106       auto &qkms = conn_handler_->get_quic_keying_materials();
1107       auto &qkm = qkms->keying_materials.front();
1108 
1109       auto aes_key = bpf_object__find_map_by_name(obj, "aes_key");
1110       if (!aes_key) {
1111         auto error = errno;
1112         LOG(FATAL) << "Failed to get aes_key: "
1113                    << xsi_strerror(error, errbuf.data(), errbuf.size());
1114         close(fd);
1115         return -1;
1116       }
1117 
1118       constexpr size_t expanded_aes_keylen = 176;
1119       std::array<uint8_t, expanded_aes_keylen> aes_exp_key;
1120 
1121       KeyExpansion(aes_exp_key.data(), qkm.cid_encryption_key.data());
1122 
1123       rv =
1124         bpf_map__update_elem(aes_key, &zero, sizeof(zero), aes_exp_key.data(),
1125                              aes_exp_key.size(), BPF_ANY);
1126       if (rv != 0) {
1127         auto error = errno;
1128         LOG(FATAL) << "Failed to update aes_key: "
1129                    << xsi_strerror(error, errbuf.data(), errbuf.size());
1130         close(fd);
1131         return -1;
1132       }
1133 
1134       auto prog_fd = bpf_program__fd(prog);
1135 
1136       if (setsockopt(fd, SOL_SOCKET, SO_ATTACH_REUSEPORT_EBPF, &prog_fd,
1137                      static_cast<socklen_t>(sizeof(prog_fd))) == -1) {
1138         LOG(FATAL) << "Failed to attach bpf program: "
1139                    << xsi_strerror(errno, errbuf.data(), errbuf.size());
1140         close(fd);
1141         return -1;
1142       }
1143     }
1144 
1145     if (should_update_bpf_map()) {
1146       const auto &ref = quic_bpf_refs[faddr.index];
1147       auto sk_index = compute_sk_index();
1148 
1149       rv = bpf_map__update_elem(ref.reuseport_array, &sk_index,
1150                                 sizeof(sk_index), &fd, sizeof(fd), BPF_NOEXIST);
1151       if (rv != 0) {
1152         auto error = errno;
1153         LOG(FATAL) << "Failed to update reuseport_array: "
1154                    << xsi_strerror(error, errbuf.data(), errbuf.size());
1155         close(fd);
1156         return -1;
1157       }
1158 
1159       rv =
1160         bpf_map__update_elem(ref.worker_id_map, &worker_id_, sizeof(worker_id_),
1161                              &sk_index, sizeof(sk_index), BPF_NOEXIST);
1162       if (rv != 0) {
1163         auto error = errno;
1164         LOG(FATAL) << "Failed to update worker_id_map: "
1165                    << xsi_strerror(error, errbuf.data(), errbuf.size());
1166         close(fd);
1167         return -1;
1168       }
1169     }
1170 #  endif // HAVE_LIBBPF
1171 
1172     break;
1173   }
1174 
1175   if (!rp) {
1176     LOG(FATAL) << "Listening " << (faddr.family == AF_INET ? "IPv4" : "IPv6")
1177                << " socket failed";
1178 
1179     return -1;
1180   }
1181 
1182   faddr.fd = fd;
1183   faddr.hostport = util::make_http_hostport(mod_config()->balloc,
1184                                             StringRef{host.data()}, faddr.port);
1185 
1186   LOG(NOTICE) << "Listening on " << faddr.hostport << ", quic";
1187 
1188   return 0;
1189 }
1190 
get_worker_id() const1191 const WorkerID &Worker::get_worker_id() const { return worker_id_; }
1192 
find_quic_upstream_addr(const Address & local_addr)1193 const UpstreamAddr *Worker::find_quic_upstream_addr(const Address &local_addr) {
1194   std::array<char, NI_MAXHOST> host;
1195 
1196   auto rv = getnameinfo(&local_addr.su.sa, local_addr.len, host.data(),
1197                         host.size(), nullptr, 0, NI_NUMERICHOST);
1198   if (rv != 0) {
1199     LOG(ERROR) << "getnameinfo: " << gai_strerror(rv);
1200 
1201     return nullptr;
1202   }
1203 
1204   uint16_t port;
1205 
1206   switch (local_addr.su.sa.sa_family) {
1207   case AF_INET:
1208     port = htons(local_addr.su.in.sin_port);
1209 
1210     break;
1211   case AF_INET6:
1212     port = htons(local_addr.su.in6.sin6_port);
1213 
1214     break;
1215   default:
1216     assert(0);
1217     abort();
1218   }
1219 
1220   std::array<char, util::max_hostport> hostport_buf;
1221 
1222   auto hostport = util::make_http_hostport(std::begin(hostport_buf),
1223                                            StringRef{host.data()}, port);
1224   const UpstreamAddr *fallback_faddr = nullptr;
1225 
1226   for (auto &faddr : quic_upstream_addrs_) {
1227     if (faddr.hostport == hostport) {
1228       return &faddr;
1229     }
1230 
1231     if (faddr.port != port || faddr.family != local_addr.su.sa.sa_family) {
1232       continue;
1233     }
1234 
1235     if (faddr.port == 443 || faddr.port == 80) {
1236       switch (faddr.family) {
1237       case AF_INET:
1238         if (faddr.hostport == "0.0.0.0"_sr) {
1239           fallback_faddr = &faddr;
1240         }
1241 
1242         break;
1243       case AF_INET6:
1244         if (faddr.hostport == "[::]"_sr) {
1245           fallback_faddr = &faddr;
1246         }
1247 
1248         break;
1249       default:
1250         assert(0);
1251       }
1252     } else {
1253       switch (faddr.family) {
1254       case AF_INET:
1255         if (util::starts_with(faddr.hostport, "0.0.0.0:"_sr)) {
1256           fallback_faddr = &faddr;
1257         }
1258 
1259         break;
1260       case AF_INET6:
1261         if (util::starts_with(faddr.hostport, "[::]:"_sr)) {
1262           fallback_faddr = &faddr;
1263         }
1264 
1265         break;
1266       default:
1267         assert(0);
1268       }
1269     }
1270   }
1271 
1272   return fallback_faddr;
1273 }
1274 #endif // ENABLE_HTTP3
1275 
1276 namespace {
match_downstream_addr_group_host(const RouterConfig & routerconf,const StringRef & host,const StringRef & path,const std::vector<std::shared_ptr<DownstreamAddrGroup>> & groups,size_t catch_all,BlockAllocator & balloc)1277 size_t match_downstream_addr_group_host(
1278   const RouterConfig &routerconf, const StringRef &host, const StringRef &path,
1279   const std::vector<std::shared_ptr<DownstreamAddrGroup>> &groups,
1280   size_t catch_all, BlockAllocator &balloc) {
1281   const auto &router = routerconf.router;
1282   const auto &rev_wildcard_router = routerconf.rev_wildcard_router;
1283   const auto &wildcard_patterns = routerconf.wildcard_patterns;
1284 
1285   if (LOG_ENABLED(INFO)) {
1286     LOG(INFO) << "Perform mapping selection, using host=" << host
1287               << ", path=" << path;
1288   }
1289 
1290   auto group = router.match(host, path);
1291   if (group != -1) {
1292     if (LOG_ENABLED(INFO)) {
1293       LOG(INFO) << "Found pattern with query " << host << path
1294                 << ", matched pattern=" << groups[group]->pattern;
1295     }
1296     return group;
1297   }
1298 
1299   if (!wildcard_patterns.empty() && !host.empty()) {
1300     auto rev_host_src = make_byte_ref(balloc, host.size() - 1);
1301     auto ep =
1302       std::copy(std::begin(host) + 1, std::end(host), std::begin(rev_host_src));
1303     std::reverse(std::begin(rev_host_src), ep);
1304     auto rev_host = StringRef{std::span{std::begin(rev_host_src), ep}};
1305 
1306     ssize_t best_group = -1;
1307     const RNode *last_node = nullptr;
1308 
1309     for (;;) {
1310       size_t nread = 0;
1311       auto wcidx =
1312         rev_wildcard_router.match_prefix(&nread, &last_node, rev_host);
1313       if (wcidx == -1) {
1314         break;
1315       }
1316 
1317       rev_host = StringRef{std::begin(rev_host) + nread, std::end(rev_host)};
1318 
1319       auto &wc = wildcard_patterns[wcidx];
1320       auto group = wc.router.match(StringRef{}, path);
1321       if (group != -1) {
1322         // We sorted wildcard_patterns in a way that first match is the
1323         // longest host pattern.
1324         if (LOG_ENABLED(INFO)) {
1325           LOG(INFO) << "Found wildcard pattern with query " << host << path
1326                     << ", matched pattern=" << groups[group]->pattern;
1327         }
1328 
1329         best_group = group;
1330       }
1331     }
1332 
1333     if (best_group != -1) {
1334       return best_group;
1335     }
1336   }
1337 
1338   group = router.match(""_sr, path);
1339   if (group != -1) {
1340     if (LOG_ENABLED(INFO)) {
1341       LOG(INFO) << "Found pattern with query " << path
1342                 << ", matched pattern=" << groups[group]->pattern;
1343     }
1344     return group;
1345   }
1346 
1347   if (LOG_ENABLED(INFO)) {
1348     LOG(INFO) << "None match.  Use catch-all pattern";
1349   }
1350   return catch_all;
1351 }
1352 } // namespace
1353 
match_downstream_addr_group(const RouterConfig & routerconf,const StringRef & hostport,const StringRef & raw_path,const std::vector<std::shared_ptr<DownstreamAddrGroup>> & groups,size_t catch_all,BlockAllocator & balloc)1354 size_t match_downstream_addr_group(
1355   const RouterConfig &routerconf, const StringRef &hostport,
1356   const StringRef &raw_path,
1357   const std::vector<std::shared_ptr<DownstreamAddrGroup>> &groups,
1358   size_t catch_all, BlockAllocator &balloc) {
1359   if (std::find(std::begin(hostport), std::end(hostport), '/') !=
1360       std::end(hostport)) {
1361     // We use '/' specially, and if '/' is included in host, it breaks
1362     // our code.  Select catch-all case.
1363     return catch_all;
1364   }
1365 
1366   auto fragment = std::find(std::begin(raw_path), std::end(raw_path), '#');
1367   auto query = std::find(std::begin(raw_path), fragment, '?');
1368   auto path = StringRef{std::begin(raw_path), query};
1369 
1370   if (path.empty() || path[0] != '/') {
1371     path = "/"_sr;
1372   }
1373 
1374   if (hostport.empty()) {
1375     return match_downstream_addr_group_host(routerconf, hostport, path, groups,
1376                                             catch_all, balloc);
1377   }
1378 
1379   StringRef host;
1380   if (hostport[0] == '[') {
1381     // assume this is IPv6 numeric address
1382     auto p = std::find(std::begin(hostport), std::end(hostport), ']');
1383     if (p == std::end(hostport)) {
1384       return catch_all;
1385     }
1386     if (p + 1 < std::end(hostport) && *(p + 1) != ':') {
1387       return catch_all;
1388     }
1389     host = StringRef{std::begin(hostport), p + 1};
1390   } else {
1391     auto p = std::find(std::begin(hostport), std::end(hostport), ':');
1392     if (p == std::begin(hostport)) {
1393       return catch_all;
1394     }
1395     host = StringRef{std::begin(hostport), p};
1396   }
1397 
1398   if (std::find_if(std::begin(host), std::end(host), [](char c) {
1399         return 'A' <= c || c <= 'Z';
1400       }) != std::end(host)) {
1401     auto low_host = make_byte_ref(balloc, host.size() + 1);
1402     auto ep = std::copy(std::begin(host), std::end(host), std::begin(low_host));
1403     *ep = '\0';
1404     util::inp_strlower(std::begin(low_host), ep);
1405     host = StringRef{std::span{std::begin(low_host), ep}};
1406   }
1407   return match_downstream_addr_group_host(routerconf, host, path, groups,
1408                                           catch_all, balloc);
1409 }
1410 
downstream_failure(DownstreamAddr * addr,const Address * raddr)1411 void downstream_failure(DownstreamAddr *addr, const Address *raddr) {
1412   const auto &connect_blocker = addr->connect_blocker;
1413 
1414   if (connect_blocker->in_offline()) {
1415     return;
1416   }
1417 
1418   connect_blocker->on_failure();
1419 
1420   if (addr->fall == 0) {
1421     return;
1422   }
1423 
1424   auto fail_count = connect_blocker->get_fail_count();
1425 
1426   if (fail_count >= addr->fall) {
1427     if (raddr) {
1428       LOG(WARN) << "Could not connect to " << util::to_numeric_addr(raddr)
1429                 << " " << fail_count
1430                 << " times in a row; considered as offline";
1431     } else {
1432       LOG(WARN) << "Could not connect to " << addr->host << ":" << addr->port
1433                 << " " << fail_count
1434                 << " times in a row; considered as offline";
1435     }
1436 
1437     connect_blocker->offline();
1438 
1439     if (addr->rise) {
1440       addr->live_check->schedule();
1441     }
1442   }
1443 }
1444 
1445 } // namespace shrpx
1446