• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * nghttp2 - HTTP/2 C Library
3  *
4  * Copyright (c) 2012 Tatsuhiro Tsujikawa
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining
7  * a copy of this software and associated documentation files (the
8  * "Software"), to deal in the Software without restriction, including
9  * without limitation the rights to use, copy, modify, merge, publish,
10  * distribute, sublicense, and/or sell copies of the Software, and to
11  * permit persons to whom the Software is furnished to do so, subject to
12  * the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be
15  * included in all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21  * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22  * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23  * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24  */
25 #include "shrpx_worker.h"
26 
27 #ifdef HAVE_UNISTD_H
28 #  include <unistd.h>
29 #endif // HAVE_UNISTD_H
30 
31 #include <cstdio>
32 #include <memory>
33 
34 #include <openssl/rand.h>
35 
36 #ifdef HAVE_LIBBPF
37 #  include <bpf/bpf.h>
38 #  include <bpf/libbpf.h>
39 #endif // HAVE_LIBBPF
40 
41 #include "shrpx_tls.h"
42 #include "shrpx_log.h"
43 #include "shrpx_client_handler.h"
44 #include "shrpx_http2_session.h"
45 #include "shrpx_log_config.h"
46 #include "shrpx_memcached_dispatcher.h"
47 #ifdef HAVE_MRUBY
48 #  include "shrpx_mruby.h"
49 #endif // HAVE_MRUBY
50 #ifdef ENABLE_HTTP3
51 #  include "shrpx_quic_listener.h"
52 #endif // ENABLE_HTTP3
53 #include "shrpx_connection_handler.h"
54 #include "util.h"
55 #include "template.h"
56 #include "xsi_strerror.h"
57 
58 namespace shrpx {
59 
60 namespace {
eventcb(struct ev_loop * loop,ev_async * w,int revents)61 void eventcb(struct ev_loop *loop, ev_async *w, int revents) {
62   auto worker = static_cast<Worker *>(w->data);
63   worker->process_events();
64 }
65 } // namespace
66 
67 namespace {
mcpool_clear_cb(struct ev_loop * loop,ev_timer * w,int revents)68 void mcpool_clear_cb(struct ev_loop *loop, ev_timer *w, int revents) {
69   auto worker = static_cast<Worker *>(w->data);
70   if (worker->get_worker_stat()->num_connections != 0) {
71     return;
72   }
73   auto mcpool = worker->get_mcpool();
74   if (mcpool->freelistsize == mcpool->poolsize) {
75     worker->get_mcpool()->clear();
76   }
77 }
78 } // namespace
79 
80 namespace {
proc_wev_cb(struct ev_loop * loop,ev_timer * w,int revents)81 void proc_wev_cb(struct ev_loop *loop, ev_timer *w, int revents) {
82   auto worker = static_cast<Worker *>(w->data);
83   worker->process_events();
84 }
85 } // namespace
86 
DownstreamAddrGroup()87 DownstreamAddrGroup::DownstreamAddrGroup() : retired{false} {}
88 
~DownstreamAddrGroup()89 DownstreamAddrGroup::~DownstreamAddrGroup() {}
90 
91 // DownstreamKey is used to index SharedDownstreamAddr in order to
92 // find the same configuration.
93 using DownstreamKey = std::tuple<
94     std::vector<
95         std::tuple<StringRef, StringRef, StringRef, size_t, size_t, Proto,
96                    uint32_t, uint32_t, uint32_t, bool, bool, bool, bool>>,
97     bool, SessionAffinity, StringRef, StringRef, SessionAffinityCookieSecure,
98     SessionAffinityCookieStickiness, int64_t, int64_t, StringRef, bool>;
99 
100 namespace {
101 DownstreamKey
create_downstream_key(const std::shared_ptr<SharedDownstreamAddr> & shared_addr,const StringRef & mruby_file)102 create_downstream_key(const std::shared_ptr<SharedDownstreamAddr> &shared_addr,
103                       const StringRef &mruby_file) {
104   DownstreamKey dkey;
105 
106   auto &addrs = std::get<0>(dkey);
107   addrs.resize(shared_addr->addrs.size());
108   auto p = std::begin(addrs);
109   for (auto &a : shared_addr->addrs) {
110     std::get<0>(*p) = a.host;
111     std::get<1>(*p) = a.sni;
112     std::get<2>(*p) = a.group;
113     std::get<3>(*p) = a.fall;
114     std::get<4>(*p) = a.rise;
115     std::get<5>(*p) = a.proto;
116     std::get<6>(*p) = a.port;
117     std::get<7>(*p) = a.weight;
118     std::get<8>(*p) = a.group_weight;
119     std::get<9>(*p) = a.host_unix;
120     std::get<10>(*p) = a.tls;
121     std::get<11>(*p) = a.dns;
122     std::get<12>(*p) = a.upgrade_scheme;
123     ++p;
124   }
125   std::sort(std::begin(addrs), std::end(addrs));
126 
127   std::get<1>(dkey) = shared_addr->redirect_if_not_tls;
128 
129   auto &affinity = shared_addr->affinity;
130   std::get<2>(dkey) = affinity.type;
131   std::get<3>(dkey) = affinity.cookie.name;
132   std::get<4>(dkey) = affinity.cookie.path;
133   std::get<5>(dkey) = affinity.cookie.secure;
134   std::get<6>(dkey) = affinity.cookie.stickiness;
135   auto &timeout = shared_addr->timeout;
136   std::get<7>(dkey) = timeout.read;
137   std::get<8>(dkey) = timeout.write;
138   std::get<9>(dkey) = mruby_file;
139   std::get<10>(dkey) = shared_addr->dnf;
140 
141   return dkey;
142 }
143 } // namespace
144 
Worker(struct ev_loop * loop,SSL_CTX * sv_ssl_ctx,SSL_CTX * cl_ssl_ctx,SSL_CTX * tls_session_cache_memcached_ssl_ctx,tls::CertLookupTree * cert_tree,SSL_CTX * quic_sv_ssl_ctx,tls::CertLookupTree * quic_cert_tree,const uint8_t * cid_prefix,size_t cid_prefixlen,size_t index,const std::shared_ptr<TicketKeys> & ticket_keys,ConnectionHandler * conn_handler,std::shared_ptr<DownstreamConfig> downstreamconf)145 Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
146                SSL_CTX *tls_session_cache_memcached_ssl_ctx,
147                tls::CertLookupTree *cert_tree,
148 #ifdef ENABLE_HTTP3
149                SSL_CTX *quic_sv_ssl_ctx, tls::CertLookupTree *quic_cert_tree,
150                const uint8_t *cid_prefix, size_t cid_prefixlen,
151 #  ifdef HAVE_LIBBPF
152                size_t index,
153 #  endif // HAVE_LIBBPF
154 #endif   // ENABLE_HTTP3
155                const std::shared_ptr<TicketKeys> &ticket_keys,
156                ConnectionHandler *conn_handler,
157                std::shared_ptr<DownstreamConfig> downstreamconf)
158     :
159 #if defined(ENABLE_HTTP3) && defined(HAVE_LIBBPF)
160       index_{index},
161 #endif // ENABLE_HTTP3 && HAVE_LIBBPF
162       randgen_(util::make_mt19937()),
163       worker_stat_{},
164       dns_tracker_(loop, get_config()->conn.downstream->family),
165 #ifdef ENABLE_HTTP3
166       quic_upstream_addrs_{get_config()->conn.quic_listener.addrs},
167 #endif // ENABLE_HTTP3
168       loop_(loop),
169       sv_ssl_ctx_(sv_ssl_ctx),
170       cl_ssl_ctx_(cl_ssl_ctx),
171       cert_tree_(cert_tree),
172       conn_handler_(conn_handler),
173 #ifdef ENABLE_HTTP3
174       quic_sv_ssl_ctx_{quic_sv_ssl_ctx},
175       quic_cert_tree_{quic_cert_tree},
176       quic_conn_handler_{this},
177 #endif // ENABLE_HTTP3
178       ticket_keys_(ticket_keys),
179       connect_blocker_(
180           std::make_unique<ConnectBlocker>(randgen_, loop_, nullptr, nullptr)),
181       graceful_shutdown_(false) {
182 #ifdef ENABLE_HTTP3
183   std::copy_n(cid_prefix, cid_prefixlen, std::begin(cid_prefix_));
184 #endif // ENABLE_HTTP3
185 
186   ev_async_init(&w_, eventcb);
187   w_.data = this;
188   ev_async_start(loop_, &w_);
189 
190   ev_timer_init(&mcpool_clear_timer_, mcpool_clear_cb, 0., 0.);
191   mcpool_clear_timer_.data = this;
192 
193   ev_timer_init(&proc_wev_timer_, proc_wev_cb, 0., 0.);
194   proc_wev_timer_.data = this;
195 
196   auto &session_cacheconf = get_config()->tls.session_cache;
197 
198   if (!session_cacheconf.memcached.host.empty()) {
199     session_cache_memcached_dispatcher_ = std::make_unique<MemcachedDispatcher>(
200         &session_cacheconf.memcached.addr, loop,
201         tls_session_cache_memcached_ssl_ctx,
202         StringRef{session_cacheconf.memcached.host}, &mcpool_, randgen_);
203   }
204 
205   replace_downstream_config(std::move(downstreamconf));
206 }
207 
208 namespace {
ensure_enqueue_addr(std::priority_queue<WeightGroupEntry,std::vector<WeightGroupEntry>,WeightGroupEntryGreater> & wgpq,WeightGroup * wg,DownstreamAddr * addr)209 void ensure_enqueue_addr(
210     std::priority_queue<WeightGroupEntry, std::vector<WeightGroupEntry>,
211                         WeightGroupEntryGreater> &wgpq,
212     WeightGroup *wg, DownstreamAddr *addr) {
213   uint32_t cycle;
214   if (!wg->pq.empty()) {
215     auto &top = wg->pq.top();
216     cycle = top.cycle;
217   } else {
218     cycle = 0;
219   }
220 
221   addr->cycle = cycle;
222   addr->pending_penalty = 0;
223   wg->pq.push(DownstreamAddrEntry{addr, addr->seq, addr->cycle});
224   addr->queued = true;
225 
226   if (!wg->queued) {
227     if (!wgpq.empty()) {
228       auto &top = wgpq.top();
229       cycle = top.cycle;
230     } else {
231       cycle = 0;
232     }
233 
234     wg->cycle = cycle;
235     wg->pending_penalty = 0;
236     wgpq.push(WeightGroupEntry{wg, wg->seq, wg->cycle});
237     wg->queued = true;
238   }
239 }
240 } // namespace
241 
replace_downstream_config(std::shared_ptr<DownstreamConfig> downstreamconf)242 void Worker::replace_downstream_config(
243     std::shared_ptr<DownstreamConfig> downstreamconf) {
244   for (auto &g : downstream_addr_groups_) {
245     g->retired = true;
246 
247     auto &shared_addr = g->shared_addr;
248     for (auto &addr : shared_addr->addrs) {
249       addr.dconn_pool->remove_all();
250     }
251   }
252 
253   downstreamconf_ = downstreamconf;
254 
255   // Making a copy is much faster with multiple thread on
256   // backendconfig API call.
257   auto groups = downstreamconf->addr_groups;
258 
259   downstream_addr_groups_ =
260       std::vector<std::shared_ptr<DownstreamAddrGroup>>(groups.size());
261 
262   std::map<DownstreamKey, size_t> addr_groups_indexer;
263 #ifdef HAVE_MRUBY
264   // TODO It is a bit less efficient because
265   // mruby::create_mruby_context returns std::unique_ptr and we cannot
266   // use std::make_shared.
267   std::map<StringRef, std::shared_ptr<mruby::MRubyContext>> shared_mruby_ctxs;
268 #endif // HAVE_MRUBY
269 
270   for (size_t i = 0; i < groups.size(); ++i) {
271     auto &src = groups[i];
272     auto &dst = downstream_addr_groups_[i];
273 
274     dst = std::make_shared<DownstreamAddrGroup>();
275     dst->pattern =
276         ImmutableString{std::begin(src.pattern), std::end(src.pattern)};
277 
278     auto shared_addr = std::make_shared<SharedDownstreamAddr>();
279 
280     shared_addr->addrs.resize(src.addrs.size());
281     shared_addr->affinity.type = src.affinity.type;
282     if (src.affinity.type == SessionAffinity::COOKIE) {
283       shared_addr->affinity.cookie.name =
284           make_string_ref(shared_addr->balloc, src.affinity.cookie.name);
285       if (!src.affinity.cookie.path.empty()) {
286         shared_addr->affinity.cookie.path =
287             make_string_ref(shared_addr->balloc, src.affinity.cookie.path);
288       }
289       shared_addr->affinity.cookie.secure = src.affinity.cookie.secure;
290       shared_addr->affinity.cookie.stickiness = src.affinity.cookie.stickiness;
291     }
292     shared_addr->affinity_hash = src.affinity_hash;
293     shared_addr->affinity_hash_map = src.affinity_hash_map;
294     shared_addr->redirect_if_not_tls = src.redirect_if_not_tls;
295     shared_addr->dnf = src.dnf;
296     shared_addr->timeout.read = src.timeout.read;
297     shared_addr->timeout.write = src.timeout.write;
298 
299     for (size_t j = 0; j < src.addrs.size(); ++j) {
300       auto &src_addr = src.addrs[j];
301       auto &dst_addr = shared_addr->addrs[j];
302 
303       dst_addr.addr = src_addr.addr;
304       dst_addr.host = make_string_ref(shared_addr->balloc, src_addr.host);
305       dst_addr.hostport =
306           make_string_ref(shared_addr->balloc, src_addr.hostport);
307       dst_addr.port = src_addr.port;
308       dst_addr.host_unix = src_addr.host_unix;
309       dst_addr.weight = src_addr.weight;
310       dst_addr.group = make_string_ref(shared_addr->balloc, src_addr.group);
311       dst_addr.group_weight = src_addr.group_weight;
312       dst_addr.affinity_hash = src_addr.affinity_hash;
313       dst_addr.proto = src_addr.proto;
314       dst_addr.tls = src_addr.tls;
315       dst_addr.sni = make_string_ref(shared_addr->balloc, src_addr.sni);
316       dst_addr.fall = src_addr.fall;
317       dst_addr.rise = src_addr.rise;
318       dst_addr.dns = src_addr.dns;
319       dst_addr.upgrade_scheme = src_addr.upgrade_scheme;
320     }
321 
322 #ifdef HAVE_MRUBY
323     auto mruby_ctx_it = shared_mruby_ctxs.find(src.mruby_file);
324     if (mruby_ctx_it == std::end(shared_mruby_ctxs)) {
325       shared_addr->mruby_ctx = mruby::create_mruby_context(src.mruby_file);
326       assert(shared_addr->mruby_ctx);
327       shared_mruby_ctxs.emplace(src.mruby_file, shared_addr->mruby_ctx);
328     } else {
329       shared_addr->mruby_ctx = (*mruby_ctx_it).second;
330     }
331 #endif // HAVE_MRUBY
332 
333     // share the connection if patterns have the same set of backend
334     // addresses.
335 
336     auto dkey = create_downstream_key(shared_addr, src.mruby_file);
337     auto it = addr_groups_indexer.find(dkey);
338 
339     if (it == std::end(addr_groups_indexer)) {
340       auto shared_addr_ptr = shared_addr.get();
341 
342       for (auto &addr : shared_addr->addrs) {
343         addr.connect_blocker = std::make_unique<ConnectBlocker>(
344             randgen_, loop_, nullptr, [shared_addr_ptr, &addr]() {
345               if (!addr.queued) {
346                 if (!addr.wg) {
347                   return;
348                 }
349                 ensure_enqueue_addr(shared_addr_ptr->pq, addr.wg, &addr);
350               }
351             });
352 
353         addr.live_check = std::make_unique<LiveCheck>(loop_, cl_ssl_ctx_, this,
354                                                       &addr, randgen_);
355       }
356 
357       size_t seq = 0;
358       for (auto &addr : shared_addr->addrs) {
359         addr.dconn_pool = std::make_unique<DownstreamConnectionPool>();
360         addr.seq = seq++;
361       }
362 
363       util::shuffle(std::begin(shared_addr->addrs),
364                     std::end(shared_addr->addrs), randgen_,
365                     [](auto i, auto j) { std::swap((*i).seq, (*j).seq); });
366 
367       if (shared_addr->affinity.type == SessionAffinity::NONE) {
368         std::map<StringRef, WeightGroup *> wgs;
369         size_t num_wgs = 0;
370         for (auto &addr : shared_addr->addrs) {
371           if (wgs.find(addr.group) == std::end(wgs)) {
372             ++num_wgs;
373             wgs.emplace(addr.group, nullptr);
374           }
375         }
376 
377         shared_addr->wgs = std::vector<WeightGroup>(num_wgs);
378 
379         for (auto &addr : shared_addr->addrs) {
380           auto &wg = wgs[addr.group];
381           if (wg == nullptr) {
382             wg = &shared_addr->wgs[--num_wgs];
383             wg->seq = num_wgs;
384           }
385 
386           wg->weight = addr.group_weight;
387           wg->pq.push(DownstreamAddrEntry{&addr, addr.seq, addr.cycle});
388           addr.queued = true;
389           addr.wg = wg;
390         }
391 
392         assert(num_wgs == 0);
393 
394         for (auto &kv : wgs) {
395           shared_addr->pq.push(
396               WeightGroupEntry{kv.second, kv.second->seq, kv.second->cycle});
397           kv.second->queued = true;
398         }
399       }
400 
401       dst->shared_addr = shared_addr;
402 
403       addr_groups_indexer.emplace(std::move(dkey), i);
404     } else {
405       auto &g = *(std::begin(downstream_addr_groups_) + (*it).second);
406       if (LOG_ENABLED(INFO)) {
407         LOG(INFO) << dst->pattern << " shares the same backend group with "
408                   << g->pattern;
409       }
410       dst->shared_addr = g->shared_addr;
411     }
412   }
413 }
414 
~Worker()415 Worker::~Worker() {
416   ev_async_stop(loop_, &w_);
417   ev_timer_stop(loop_, &mcpool_clear_timer_);
418   ev_timer_stop(loop_, &proc_wev_timer_);
419 }
420 
schedule_clear_mcpool()421 void Worker::schedule_clear_mcpool() {
422   // libev manual says: "If the watcher is already active nothing will
423   // happen."  Since we don't change any timeout here, we don't have
424   // to worry about querying ev_is_active.
425   ev_timer_start(loop_, &mcpool_clear_timer_);
426 }
427 
wait()428 void Worker::wait() {
429 #ifndef NOTHREADS
430   fut_.get();
431 #endif // !NOTHREADS
432 }
433 
run_async()434 void Worker::run_async() {
435 #ifndef NOTHREADS
436   fut_ = std::async(std::launch::async, [this] {
437     (void)reopen_log_files(get_config()->logging);
438     ev_run(loop_);
439     delete_log_config();
440   });
441 #endif // !NOTHREADS
442 }
443 
send(WorkerEvent event)444 void Worker::send(WorkerEvent event) {
445   {
446     std::lock_guard<std::mutex> g(m_);
447 
448     q_.emplace_back(std::move(event));
449   }
450 
451   ev_async_send(loop_, &w_);
452 }
453 
process_events()454 void Worker::process_events() {
455   WorkerEvent wev;
456   {
457     std::lock_guard<std::mutex> g(m_);
458 
459     // Process event one at a time.  This is important for
460     // WorkerEventType::NEW_CONNECTION event since accepting large
461     // number of new connections at once may delay time to 1st byte
462     // for existing connections.
463 
464     if (q_.empty()) {
465       ev_timer_stop(loop_, &proc_wev_timer_);
466       return;
467     }
468 
469     wev = std::move(q_.front());
470     q_.pop_front();
471   }
472 
473   ev_timer_start(loop_, &proc_wev_timer_);
474 
475   auto config = get_config();
476 
477   auto worker_connections = config->conn.upstream.worker_connections;
478 
479   switch (wev.type) {
480   case WorkerEventType::NEW_CONNECTION: {
481     if (LOG_ENABLED(INFO)) {
482       WLOG(INFO, this) << "WorkerEvent: client_fd=" << wev.client_fd
483                        << ", addrlen=" << wev.client_addrlen;
484     }
485 
486     if (worker_stat_.num_connections >= worker_connections) {
487 
488       if (LOG_ENABLED(INFO)) {
489         WLOG(INFO, this) << "Too many connections >= " << worker_connections;
490       }
491 
492       close(wev.client_fd);
493 
494       break;
495     }
496 
497     auto client_handler =
498         tls::accept_connection(this, wev.client_fd, &wev.client_addr.sa,
499                                wev.client_addrlen, wev.faddr);
500     if (!client_handler) {
501       if (LOG_ENABLED(INFO)) {
502         WLOG(ERROR, this) << "ClientHandler creation failed";
503       }
504       close(wev.client_fd);
505       break;
506     }
507 
508     if (LOG_ENABLED(INFO)) {
509       WLOG(INFO, this) << "CLIENT_HANDLER:" << client_handler << " created ";
510     }
511 
512     break;
513   }
514   case WorkerEventType::REOPEN_LOG:
515     WLOG(NOTICE, this) << "Reopening log files: worker process (thread " << this
516                        << ")";
517 
518     reopen_log_files(config->logging);
519 
520     break;
521   case WorkerEventType::GRACEFUL_SHUTDOWN:
522     WLOG(NOTICE, this) << "Graceful shutdown commencing";
523 
524     graceful_shutdown_ = true;
525 
526     if (worker_stat_.num_connections == 0 &&
527         worker_stat_.num_close_waits == 0) {
528       ev_break(loop_);
529 
530       return;
531     }
532 
533     break;
534   case WorkerEventType::REPLACE_DOWNSTREAM:
535     WLOG(NOTICE, this) << "Replace downstream";
536 
537     replace_downstream_config(wev.downstreamconf);
538 
539     break;
540 #ifdef ENABLE_HTTP3
541   case WorkerEventType::QUIC_PKT_FORWARD: {
542     const UpstreamAddr *faddr;
543 
544     if (wev.quic_pkt->upstream_addr_index == static_cast<size_t>(-1)) {
545       faddr = find_quic_upstream_addr(wev.quic_pkt->local_addr);
546       if (faddr == nullptr) {
547         LOG(ERROR) << "No suitable upstream address found";
548 
549         break;
550       }
551     } else if (quic_upstream_addrs_.size() <=
552                wev.quic_pkt->upstream_addr_index) {
553       LOG(ERROR) << "upstream_addr_index is too large";
554 
555       break;
556     } else {
557       faddr = &quic_upstream_addrs_[wev.quic_pkt->upstream_addr_index];
558     }
559 
560     quic_conn_handler_.handle_packet(
561         faddr, wev.quic_pkt->remote_addr, wev.quic_pkt->local_addr,
562         wev.quic_pkt->pi, wev.quic_pkt->data.data(), wev.quic_pkt->data.size());
563 
564     break;
565   }
566 #endif // ENABLE_HTTP3
567   default:
568     if (LOG_ENABLED(INFO)) {
569       WLOG(INFO, this) << "unknown event type " << static_cast<int>(wev.type);
570     }
571   }
572 }
573 
get_cert_lookup_tree() const574 tls::CertLookupTree *Worker::get_cert_lookup_tree() const { return cert_tree_; }
575 
576 #ifdef ENABLE_HTTP3
get_quic_cert_lookup_tree() const577 tls::CertLookupTree *Worker::get_quic_cert_lookup_tree() const {
578   return quic_cert_tree_;
579 }
580 #endif // ENABLE_HTTP3
581 
get_ticket_keys()582 std::shared_ptr<TicketKeys> Worker::get_ticket_keys() {
583 #ifdef HAVE_ATOMIC_STD_SHARED_PTR
584   return std::atomic_load_explicit(&ticket_keys_, std::memory_order_acquire);
585 #else  // !HAVE_ATOMIC_STD_SHARED_PTR
586   std::lock_guard<std::mutex> g(ticket_keys_m_);
587   return ticket_keys_;
588 #endif // !HAVE_ATOMIC_STD_SHARED_PTR
589 }
590 
set_ticket_keys(std::shared_ptr<TicketKeys> ticket_keys)591 void Worker::set_ticket_keys(std::shared_ptr<TicketKeys> ticket_keys) {
592 #ifdef HAVE_ATOMIC_STD_SHARED_PTR
593   // This is single writer
594   std::atomic_store_explicit(&ticket_keys_, std::move(ticket_keys),
595                              std::memory_order_release);
596 #else  // !HAVE_ATOMIC_STD_SHARED_PTR
597   std::lock_guard<std::mutex> g(ticket_keys_m_);
598   ticket_keys_ = std::move(ticket_keys);
599 #endif // !HAVE_ATOMIC_STD_SHARED_PTR
600 }
601 
get_worker_stat()602 WorkerStat *Worker::get_worker_stat() { return &worker_stat_; }
603 
get_loop() const604 struct ev_loop *Worker::get_loop() const {
605   return loop_;
606 }
607 
get_sv_ssl_ctx() const608 SSL_CTX *Worker::get_sv_ssl_ctx() const { return sv_ssl_ctx_; }
609 
get_cl_ssl_ctx() const610 SSL_CTX *Worker::get_cl_ssl_ctx() const { return cl_ssl_ctx_; }
611 
612 #ifdef ENABLE_HTTP3
get_quic_sv_ssl_ctx() const613 SSL_CTX *Worker::get_quic_sv_ssl_ctx() const { return quic_sv_ssl_ctx_; }
614 #endif // ENABLE_HTTP3
615 
set_graceful_shutdown(bool f)616 void Worker::set_graceful_shutdown(bool f) { graceful_shutdown_ = f; }
617 
get_graceful_shutdown() const618 bool Worker::get_graceful_shutdown() const { return graceful_shutdown_; }
619 
get_mcpool()620 MemchunkPool *Worker::get_mcpool() { return &mcpool_; }
621 
get_session_cache_memcached_dispatcher()622 MemcachedDispatcher *Worker::get_session_cache_memcached_dispatcher() {
623   return session_cache_memcached_dispatcher_.get();
624 }
625 
get_randgen()626 std::mt19937 &Worker::get_randgen() { return randgen_; }
627 
628 #ifdef HAVE_MRUBY
create_mruby_context()629 int Worker::create_mruby_context() {
630   mruby_ctx_ = mruby::create_mruby_context(StringRef{get_config()->mruby_file});
631   if (!mruby_ctx_) {
632     return -1;
633   }
634 
635   return 0;
636 }
637 
get_mruby_context() const638 mruby::MRubyContext *Worker::get_mruby_context() const {
639   return mruby_ctx_.get();
640 }
641 #endif // HAVE_MRUBY
642 
643 std::vector<std::shared_ptr<DownstreamAddrGroup>> &
get_downstream_addr_groups()644 Worker::get_downstream_addr_groups() {
645   return downstream_addr_groups_;
646 }
647 
get_connect_blocker() const648 ConnectBlocker *Worker::get_connect_blocker() const {
649   return connect_blocker_.get();
650 }
651 
get_downstream_config() const652 const DownstreamConfig *Worker::get_downstream_config() const {
653   return downstreamconf_.get();
654 }
655 
get_connection_handler() const656 ConnectionHandler *Worker::get_connection_handler() const {
657   return conn_handler_;
658 }
659 
660 #ifdef ENABLE_HTTP3
get_quic_connection_handler()661 QUICConnectionHandler *Worker::get_quic_connection_handler() {
662   return &quic_conn_handler_;
663 }
664 #endif // ENABLE_HTTP3
665 
get_dns_tracker()666 DNSTracker *Worker::get_dns_tracker() { return &dns_tracker_; }
667 
668 #ifdef ENABLE_HTTP3
669 #  ifdef HAVE_LIBBPF
should_attach_bpf() const670 bool Worker::should_attach_bpf() const {
671   auto config = get_config();
672   auto &quicconf = config->quic;
673   auto &apiconf = config->api;
674 
675   if (quicconf.bpf.disabled) {
676     return false;
677   }
678 
679   if (!config->single_thread && apiconf.enabled) {
680     return index_ == 1;
681   }
682 
683   return index_ == 0;
684 }
685 
should_update_bpf_map() const686 bool Worker::should_update_bpf_map() const {
687   auto config = get_config();
688   auto &quicconf = config->quic;
689 
690   return !quicconf.bpf.disabled;
691 }
692 
compute_sk_index() const693 uint32_t Worker::compute_sk_index() const {
694   auto config = get_config();
695   auto &apiconf = config->api;
696 
697   if (!config->single_thread && apiconf.enabled) {
698     return index_ - 1;
699   }
700 
701   return index_;
702 }
703 #  endif // HAVE_LIBBPF
704 
setup_quic_server_socket()705 int Worker::setup_quic_server_socket() {
706   size_t n = 0;
707 
708   for (auto &addr : quic_upstream_addrs_) {
709     assert(!addr.host_unix);
710     if (create_quic_server_socket(addr) != 0) {
711       return -1;
712     }
713 
714     // Make sure that each endpoint has a unique address.
715     for (size_t i = 0; i < n; ++i) {
716       const auto &a = quic_upstream_addrs_[i];
717 
718       if (addr.hostport == a.hostport) {
719         LOG(FATAL)
720             << "QUIC frontend endpoint must be unique: a duplicate found for "
721             << addr.hostport;
722 
723         return -1;
724       }
725     }
726 
727     ++n;
728 
729     quic_listeners_.emplace_back(std::make_unique<QUICListener>(&addr, this));
730   }
731 
732   return 0;
733 }
734 
create_quic_server_socket(UpstreamAddr & faddr)735 int Worker::create_quic_server_socket(UpstreamAddr &faddr) {
736   std::array<char, STRERROR_BUFSIZE> errbuf;
737   int fd = -1;
738   int rv;
739 
740   auto service = util::utos(faddr.port);
741   addrinfo hints{};
742   hints.ai_family = faddr.family;
743   hints.ai_socktype = SOCK_DGRAM;
744   hints.ai_flags = AI_PASSIVE;
745 #  ifdef AI_ADDRCONFIG
746   hints.ai_flags |= AI_ADDRCONFIG;
747 #  endif // AI_ADDRCONFIG
748 
749   auto node =
750       faddr.host == StringRef::from_lit("*") ? nullptr : faddr.host.c_str();
751 
752   addrinfo *res, *rp;
753   rv = getaddrinfo(node, service.c_str(), &hints, &res);
754 #  ifdef AI_ADDRCONFIG
755   if (rv != 0) {
756     // Retry without AI_ADDRCONFIG
757     hints.ai_flags &= ~AI_ADDRCONFIG;
758     rv = getaddrinfo(node, service.c_str(), &hints, &res);
759   }
760 #  endif // AI_ADDRCONFIG
761   if (rv != 0) {
762     LOG(FATAL) << "Unable to get IPv" << (faddr.family == AF_INET ? "4" : "6")
763                << " address for " << faddr.host << ", port " << faddr.port
764                << ": " << gai_strerror(rv);
765     return -1;
766   }
767 
768   auto res_d = defer(freeaddrinfo, res);
769 
770   std::array<char, NI_MAXHOST> host;
771 
772   for (rp = res; rp; rp = rp->ai_next) {
773     rv = getnameinfo(rp->ai_addr, rp->ai_addrlen, host.data(), host.size(),
774                      nullptr, 0, NI_NUMERICHOST);
775     if (rv != 0) {
776       LOG(WARN) << "getnameinfo() failed: " << gai_strerror(rv);
777       continue;
778     }
779 
780 #  ifdef SOCK_NONBLOCK
781     fd = socket(rp->ai_family, rp->ai_socktype | SOCK_NONBLOCK | SOCK_CLOEXEC,
782                 rp->ai_protocol);
783     if (fd == -1) {
784       auto error = errno;
785       LOG(WARN) << "socket() syscall failed: "
786                 << xsi_strerror(error, errbuf.data(), errbuf.size());
787       continue;
788     }
789 #  else  // !SOCK_NONBLOCK
790     fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
791     if (fd == -1) {
792       auto error = errno;
793       LOG(WARN) << "socket() syscall failed: "
794                 << xsi_strerror(error, errbuf.data(), errbuf.size());
795       continue;
796     }
797     util::make_socket_nonblocking(fd);
798     util::make_socket_closeonexec(fd);
799 #  endif // !SOCK_NONBLOCK
800 
801     int val = 1;
802     if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val,
803                    static_cast<socklen_t>(sizeof(val))) == -1) {
804       auto error = errno;
805       LOG(WARN) << "Failed to set SO_REUSEADDR option to listener socket: "
806                 << xsi_strerror(error, errbuf.data(), errbuf.size());
807       close(fd);
808       continue;
809     }
810 
811     if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &val,
812                    static_cast<socklen_t>(sizeof(val))) == -1) {
813       auto error = errno;
814       LOG(WARN) << "Failed to set SO_REUSEPORT option to listener socket: "
815                 << xsi_strerror(error, errbuf.data(), errbuf.size());
816       close(fd);
817       continue;
818     }
819 
820     if (faddr.family == AF_INET6) {
821 #  ifdef IPV6_V6ONLY
822       if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val,
823                      static_cast<socklen_t>(sizeof(val))) == -1) {
824         auto error = errno;
825         LOG(WARN) << "Failed to set IPV6_V6ONLY option to listener socket: "
826                   << xsi_strerror(error, errbuf.data(), errbuf.size());
827         close(fd);
828         continue;
829       }
830 #  endif // IPV6_V6ONLY
831 
832       if (setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &val,
833                      static_cast<socklen_t>(sizeof(val))) == -1) {
834         auto error = errno;
835         LOG(WARN)
836             << "Failed to set IPV6_RECVPKTINFO option to listener socket: "
837             << xsi_strerror(error, errbuf.data(), errbuf.size());
838         close(fd);
839         continue;
840       }
841 
842       if (setsockopt(fd, IPPROTO_IPV6, IPV6_RECVTCLASS, &val,
843                      static_cast<socklen_t>(sizeof(val))) == -1) {
844         auto error = errno;
845         LOG(WARN) << "Failed to set IPV6_RECVTCLASS option to listener socket: "
846                   << xsi_strerror(error, errbuf.data(), errbuf.size());
847         close(fd);
848         continue;
849       }
850 
851 #  if defined(IPV6_MTU_DISCOVER) && defined(IPV6_PMTUDISC_DO)
852       int mtu_disc = IPV6_PMTUDISC_DO;
853       if (setsockopt(fd, IPPROTO_IPV6, IPV6_MTU_DISCOVER, &mtu_disc,
854                      static_cast<socklen_t>(sizeof(mtu_disc))) == -1) {
855         auto error = errno;
856         LOG(WARN)
857             << "Failed to set IPV6_MTU_DISCOVER option to listener socket: "
858             << xsi_strerror(error, errbuf.data(), errbuf.size());
859         close(fd);
860         continue;
861       }
862 #  endif // defined(IPV6_MTU_DISCOVER) && defined(IP_PMTUDISC_DO)
863     } else {
864       if (setsockopt(fd, IPPROTO_IP, IP_PKTINFO, &val,
865                      static_cast<socklen_t>(sizeof(val))) == -1) {
866         auto error = errno;
867         LOG(WARN) << "Failed to set IP_PKTINFO option to listener socket: "
868                   << xsi_strerror(error, errbuf.data(), errbuf.size());
869         close(fd);
870         continue;
871       }
872 
873       if (setsockopt(fd, IPPROTO_IP, IP_RECVTOS, &val,
874                      static_cast<socklen_t>(sizeof(val))) == -1) {
875         auto error = errno;
876         LOG(WARN) << "Failed to set IP_RECVTOS option to listener socket: "
877                   << xsi_strerror(error, errbuf.data(), errbuf.size());
878         close(fd);
879         continue;
880       }
881 
882 #  if defined(IP_MTU_DISCOVER) && defined(IP_PMTUDISC_DO)
883       int mtu_disc = IP_PMTUDISC_DO;
884       if (setsockopt(fd, IPPROTO_IP, IP_MTU_DISCOVER, &mtu_disc,
885                      static_cast<socklen_t>(sizeof(mtu_disc))) == -1) {
886         auto error = errno;
887         LOG(WARN) << "Failed to set IP_MTU_DISCOVER option to listener socket: "
888                   << xsi_strerror(error, errbuf.data(), errbuf.size());
889         close(fd);
890         continue;
891       }
892 #  endif // defined(IP_MTU_DISCOVER) && defined(IP_PMTUDISC_DO)
893     }
894 
895     if (bind(fd, rp->ai_addr, rp->ai_addrlen) == -1) {
896       auto error = errno;
897       LOG(WARN) << "bind() syscall failed: "
898                 << xsi_strerror(error, errbuf.data(), errbuf.size());
899       close(fd);
900       continue;
901     }
902 
903 #  ifdef HAVE_LIBBPF
904     auto config = get_config();
905 
906     auto &quic_bpf_refs = conn_handler_->get_quic_bpf_refs();
907 
908     if (should_attach_bpf()) {
909       auto &bpfconf = config->quic.bpf;
910 
911       auto obj = bpf_object__open_file(bpfconf.prog_file.c_str(), nullptr);
912       if (!obj) {
913         auto error = errno;
914         LOG(FATAL) << "Failed to open bpf object file: "
915                    << xsi_strerror(error, errbuf.data(), errbuf.size());
916         close(fd);
917         return -1;
918       }
919 
920       rv = bpf_object__load(obj);
921       if (rv != 0) {
922         LOG(FATAL) << "Failed to load bpf object file: "
923                    << xsi_strerror(-rv, errbuf.data(), errbuf.size());
924         close(fd);
925         return -1;
926       }
927 
928       auto prog = bpf_object__find_program_by_name(obj, "select_reuseport");
929       if (!prog) {
930         auto error = errno;
931         LOG(FATAL) << "Failed to find sk_reuseport program: "
932                    << xsi_strerror(error, errbuf.data(), errbuf.size());
933         close(fd);
934         return -1;
935       }
936 
937       auto &ref = quic_bpf_refs[faddr.index];
938 
939       ref.obj = obj;
940 
941       auto reuseport_array =
942           bpf_object__find_map_by_name(obj, "reuseport_array");
943       if (!reuseport_array) {
944         auto error = errno;
945         LOG(FATAL) << "Failed to get reuseport_array: "
946                    << xsi_strerror(error, errbuf.data(), errbuf.size());
947         close(fd);
948         return -1;
949       }
950 
951       ref.reuseport_array = bpf_map__fd(reuseport_array);
952 
953       auto cid_prefix_map = bpf_object__find_map_by_name(obj, "cid_prefix_map");
954       if (!cid_prefix_map) {
955         auto error = errno;
956         LOG(FATAL) << "Failed to get cid_prefix_map: "
957                    << xsi_strerror(error, errbuf.data(), errbuf.size());
958         close(fd);
959         return -1;
960       }
961 
962       ref.cid_prefix_map = bpf_map__fd(cid_prefix_map);
963 
964       auto sk_info = bpf_object__find_map_by_name(obj, "sk_info");
965       if (!sk_info) {
966         auto error = errno;
967         LOG(FATAL) << "Failed to get sk_info: "
968                    << xsi_strerror(error, errbuf.data(), errbuf.size());
969         close(fd);
970         return -1;
971       }
972 
973       constexpr uint32_t zero = 0;
974       uint64_t num_socks = config->num_worker;
975 
976       rv =
977           bpf_map_update_elem(bpf_map__fd(sk_info), &zero, &num_socks, BPF_ANY);
978       if (rv != 0) {
979         LOG(FATAL) << "Failed to update sk_info: "
980                    << xsi_strerror(-rv, errbuf.data(), errbuf.size());
981         close(fd);
982         return -1;
983       }
984 
985       constexpr uint32_t key_high_idx = 1;
986       constexpr uint32_t key_low_idx = 2;
987 
988       auto &qkms = conn_handler_->get_quic_keying_materials();
989       auto &qkm = qkms->keying_materials.front();
990 
991       rv = bpf_map_update_elem(bpf_map__fd(sk_info), &key_high_idx,
992                                qkm.cid_encryption_key.data(), BPF_ANY);
993       if (rv != 0) {
994         LOG(FATAL) << "Failed to update key_high_idx sk_info: "
995                    << xsi_strerror(-rv, errbuf.data(), errbuf.size());
996         close(fd);
997         return -1;
998       }
999 
1000       rv = bpf_map_update_elem(bpf_map__fd(sk_info), &key_low_idx,
1001                                qkm.cid_encryption_key.data() + 8, BPF_ANY);
1002       if (rv != 0) {
1003         LOG(FATAL) << "Failed to update key_low_idx sk_info: "
1004                    << xsi_strerror(-rv, errbuf.data(), errbuf.size());
1005         close(fd);
1006         return -1;
1007       }
1008 
1009       auto prog_fd = bpf_program__fd(prog);
1010 
1011       if (setsockopt(fd, SOL_SOCKET, SO_ATTACH_REUSEPORT_EBPF, &prog_fd,
1012                      static_cast<socklen_t>(sizeof(prog_fd))) == -1) {
1013         LOG(FATAL) << "Failed to attach bpf program: "
1014                    << xsi_strerror(errno, errbuf.data(), errbuf.size());
1015         close(fd);
1016         return -1;
1017       }
1018     }
1019 
1020     if (should_update_bpf_map()) {
1021       const auto &ref = quic_bpf_refs[faddr.index];
1022       auto sk_index = compute_sk_index();
1023 
1024       rv =
1025           bpf_map_update_elem(ref.reuseport_array, &sk_index, &fd, BPF_NOEXIST);
1026       if (rv != 0) {
1027         LOG(FATAL) << "Failed to update reuseport_array: "
1028                    << xsi_strerror(-rv, errbuf.data(), errbuf.size());
1029         close(fd);
1030         return -1;
1031       }
1032 
1033       rv = bpf_map_update_elem(ref.cid_prefix_map, cid_prefix_.data(),
1034                                &sk_index, BPF_NOEXIST);
1035       if (rv != 0) {
1036         LOG(FATAL) << "Failed to update cid_prefix_map: "
1037                    << xsi_strerror(-rv, errbuf.data(), errbuf.size());
1038         close(fd);
1039         return -1;
1040       }
1041     }
1042 #  endif // HAVE_LIBBPF
1043 
1044     break;
1045   }
1046 
1047   if (!rp) {
1048     LOG(FATAL) << "Listening " << (faddr.family == AF_INET ? "IPv4" : "IPv6")
1049                << " socket failed";
1050 
1051     return -1;
1052   }
1053 
1054   faddr.fd = fd;
1055   faddr.hostport = util::make_http_hostport(mod_config()->balloc,
1056                                             StringRef{host.data()}, faddr.port);
1057 
1058   LOG(NOTICE) << "Listening on " << faddr.hostport << ", quic";
1059 
1060   return 0;
1061 }
1062 
get_cid_prefix() const1063 const uint8_t *Worker::get_cid_prefix() const { return cid_prefix_.data(); }
1064 
find_quic_upstream_addr(const Address & local_addr)1065 const UpstreamAddr *Worker::find_quic_upstream_addr(const Address &local_addr) {
1066   std::array<char, NI_MAXHOST> host;
1067 
1068   auto rv = getnameinfo(&local_addr.su.sa, local_addr.len, host.data(),
1069                         host.size(), nullptr, 0, NI_NUMERICHOST);
1070   if (rv != 0) {
1071     LOG(ERROR) << "getnameinfo: " << gai_strerror(rv);
1072 
1073     return nullptr;
1074   }
1075 
1076   uint16_t port;
1077 
1078   switch (local_addr.su.sa.sa_family) {
1079   case AF_INET:
1080     port = htons(local_addr.su.in.sin_port);
1081 
1082     break;
1083   case AF_INET6:
1084     port = htons(local_addr.su.in6.sin6_port);
1085 
1086     break;
1087   default:
1088     assert(0);
1089     abort();
1090   }
1091 
1092   std::array<char, util::max_hostport> hostport_buf;
1093 
1094   auto hostport = util::make_http_hostport(std::begin(hostport_buf),
1095                                            StringRef{host.data()}, port);
1096   const UpstreamAddr *fallback_faddr = nullptr;
1097 
1098   for (auto &faddr : quic_upstream_addrs_) {
1099     if (faddr.hostport == hostport) {
1100       return &faddr;
1101     }
1102 
1103     if (faddr.port != port || faddr.family != local_addr.su.sa.sa_family) {
1104       continue;
1105     }
1106 
1107     if (faddr.port == 443 || faddr.port == 80) {
1108       switch (faddr.family) {
1109       case AF_INET:
1110         if (util::streq(faddr.hostport, StringRef::from_lit("0.0.0.0"))) {
1111           fallback_faddr = &faddr;
1112         }
1113 
1114         break;
1115       case AF_INET6:
1116         if (util::streq(faddr.hostport, StringRef::from_lit("[::]"))) {
1117           fallback_faddr = &faddr;
1118         }
1119 
1120         break;
1121       default:
1122         assert(0);
1123       }
1124     } else {
1125       switch (faddr.family) {
1126       case AF_INET:
1127         if (util::starts_with(faddr.hostport,
1128                               StringRef::from_lit("0.0.0.0:"))) {
1129           fallback_faddr = &faddr;
1130         }
1131 
1132         break;
1133       case AF_INET6:
1134         if (util::starts_with(faddr.hostport, StringRef::from_lit("[::]:"))) {
1135           fallback_faddr = &faddr;
1136         }
1137 
1138         break;
1139       default:
1140         assert(0);
1141       }
1142     }
1143   }
1144 
1145   return fallback_faddr;
1146 }
1147 #endif // ENABLE_HTTP3
1148 
1149 namespace {
match_downstream_addr_group_host(const RouterConfig & routerconf,const StringRef & host,const StringRef & path,const std::vector<std::shared_ptr<DownstreamAddrGroup>> & groups,size_t catch_all,BlockAllocator & balloc)1150 size_t match_downstream_addr_group_host(
1151     const RouterConfig &routerconf, const StringRef &host,
1152     const StringRef &path,
1153     const std::vector<std::shared_ptr<DownstreamAddrGroup>> &groups,
1154     size_t catch_all, BlockAllocator &balloc) {
1155 
1156   const auto &router = routerconf.router;
1157   const auto &rev_wildcard_router = routerconf.rev_wildcard_router;
1158   const auto &wildcard_patterns = routerconf.wildcard_patterns;
1159 
1160   if (LOG_ENABLED(INFO)) {
1161     LOG(INFO) << "Perform mapping selection, using host=" << host
1162               << ", path=" << path;
1163   }
1164 
1165   auto group = router.match(host, path);
1166   if (group != -1) {
1167     if (LOG_ENABLED(INFO)) {
1168       LOG(INFO) << "Found pattern with query " << host << path
1169                 << ", matched pattern=" << groups[group]->pattern;
1170     }
1171     return group;
1172   }
1173 
1174   if (!wildcard_patterns.empty() && !host.empty()) {
1175     auto rev_host_src = make_byte_ref(balloc, host.size() - 1);
1176     auto ep =
1177         std::copy(std::begin(host) + 1, std::end(host), rev_host_src.base);
1178     std::reverse(rev_host_src.base, ep);
1179     auto rev_host = StringRef{rev_host_src.base, ep};
1180 
1181     ssize_t best_group = -1;
1182     const RNode *last_node = nullptr;
1183 
1184     for (;;) {
1185       size_t nread = 0;
1186       auto wcidx =
1187           rev_wildcard_router.match_prefix(&nread, &last_node, rev_host);
1188       if (wcidx == -1) {
1189         break;
1190       }
1191 
1192       rev_host = StringRef{std::begin(rev_host) + nread, std::end(rev_host)};
1193 
1194       auto &wc = wildcard_patterns[wcidx];
1195       auto group = wc.router.match(StringRef{}, path);
1196       if (group != -1) {
1197         // We sorted wildcard_patterns in a way that first match is the
1198         // longest host pattern.
1199         if (LOG_ENABLED(INFO)) {
1200           LOG(INFO) << "Found wildcard pattern with query " << host << path
1201                     << ", matched pattern=" << groups[group]->pattern;
1202         }
1203 
1204         best_group = group;
1205       }
1206     }
1207 
1208     if (best_group != -1) {
1209       return best_group;
1210     }
1211   }
1212 
1213   group = router.match(StringRef::from_lit(""), path);
1214   if (group != -1) {
1215     if (LOG_ENABLED(INFO)) {
1216       LOG(INFO) << "Found pattern with query " << path
1217                 << ", matched pattern=" << groups[group]->pattern;
1218     }
1219     return group;
1220   }
1221 
1222   if (LOG_ENABLED(INFO)) {
1223     LOG(INFO) << "None match.  Use catch-all pattern";
1224   }
1225   return catch_all;
1226 }
1227 } // namespace
1228 
match_downstream_addr_group(const RouterConfig & routerconf,const StringRef & hostport,const StringRef & raw_path,const std::vector<std::shared_ptr<DownstreamAddrGroup>> & groups,size_t catch_all,BlockAllocator & balloc)1229 size_t match_downstream_addr_group(
1230     const RouterConfig &routerconf, const StringRef &hostport,
1231     const StringRef &raw_path,
1232     const std::vector<std::shared_ptr<DownstreamAddrGroup>> &groups,
1233     size_t catch_all, BlockAllocator &balloc) {
1234   if (std::find(std::begin(hostport), std::end(hostport), '/') !=
1235       std::end(hostport)) {
1236     // We use '/' specially, and if '/' is included in host, it breaks
1237     // our code.  Select catch-all case.
1238     return catch_all;
1239   }
1240 
1241   auto fragment = std::find(std::begin(raw_path), std::end(raw_path), '#');
1242   auto query = std::find(std::begin(raw_path), fragment, '?');
1243   auto path = StringRef{std::begin(raw_path), query};
1244 
1245   if (path.empty() || path[0] != '/') {
1246     path = StringRef::from_lit("/");
1247   }
1248 
1249   if (hostport.empty()) {
1250     return match_downstream_addr_group_host(routerconf, hostport, path, groups,
1251                                             catch_all, balloc);
1252   }
1253 
1254   StringRef host;
1255   if (hostport[0] == '[') {
1256     // assume this is IPv6 numeric address
1257     auto p = std::find(std::begin(hostport), std::end(hostport), ']');
1258     if (p == std::end(hostport)) {
1259       return catch_all;
1260     }
1261     if (p + 1 < std::end(hostport) && *(p + 1) != ':') {
1262       return catch_all;
1263     }
1264     host = StringRef{std::begin(hostport), p + 1};
1265   } else {
1266     auto p = std::find(std::begin(hostport), std::end(hostport), ':');
1267     if (p == std::begin(hostport)) {
1268       return catch_all;
1269     }
1270     host = StringRef{std::begin(hostport), p};
1271   }
1272 
1273   if (std::find_if(std::begin(host), std::end(host), [](char c) {
1274         return 'A' <= c || c <= 'Z';
1275       }) != std::end(host)) {
1276     auto low_host = make_byte_ref(balloc, host.size() + 1);
1277     auto ep = std::copy(std::begin(host), std::end(host), low_host.base);
1278     *ep = '\0';
1279     util::inp_strlower(low_host.base, ep);
1280     host = StringRef{low_host.base, ep};
1281   }
1282   return match_downstream_addr_group_host(routerconf, host, path, groups,
1283                                           catch_all, balloc);
1284 }
1285 
downstream_failure(DownstreamAddr * addr,const Address * raddr)1286 void downstream_failure(DownstreamAddr *addr, const Address *raddr) {
1287   const auto &connect_blocker = addr->connect_blocker;
1288 
1289   if (connect_blocker->in_offline()) {
1290     return;
1291   }
1292 
1293   connect_blocker->on_failure();
1294 
1295   if (addr->fall == 0) {
1296     return;
1297   }
1298 
1299   auto fail_count = connect_blocker->get_fail_count();
1300 
1301   if (fail_count >= addr->fall) {
1302     if (raddr) {
1303       LOG(WARN) << "Could not connect to " << util::to_numeric_addr(raddr)
1304                 << " " << fail_count
1305                 << " times in a row; considered as offline";
1306     } else {
1307       LOG(WARN) << "Could not connect to " << addr->host << ":" << addr->port
1308                 << " " << fail_count
1309                 << " times in a row; considered as offline";
1310     }
1311 
1312     connect_blocker->offline();
1313 
1314     if (addr->rise) {
1315       addr->live_check->schedule();
1316     }
1317   }
1318 }
1319 
1320 #ifdef ENABLE_HTTP3
create_cid_prefix(uint8_t * cid_prefix,const uint8_t * server_id)1321 int create_cid_prefix(uint8_t *cid_prefix, const uint8_t *server_id) {
1322   auto p = std::copy_n(server_id, SHRPX_QUIC_SERVER_IDLEN, cid_prefix);
1323 
1324   if (RAND_bytes(p, SHRPX_QUIC_CID_PREFIXLEN - SHRPX_QUIC_SERVER_IDLEN) != 1) {
1325     return -1;
1326   }
1327 
1328   return 0;
1329 }
1330 #endif // ENABLE_HTTP3
1331 
1332 } // namespace shrpx
1333