• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * nghttp2 - HTTP/2 C Library
3  *
4  * Copyright (c) 2012 Tatsuhiro Tsujikawa
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining
7  * a copy of this software and associated documentation files (the
8  * "Software"), to deal in the Software without restriction, including
9  * without limitation the rights to use, copy, modify, merge, publish,
10  * distribute, sublicense, and/or sell copies of the Software, and to
11  * permit persons to whom the Software is furnished to do so, subject to
12  * the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be
15  * included in all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21  * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22  * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23  * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24  */
25 #include "shrpx_worker.h"
26 
27 #ifdef HAVE_UNISTD_H
28 #  include <unistd.h>
29 #endif // HAVE_UNISTD_H
30 #include <netinet/udp.h>
31 
32 #include <cstdio>
33 #include <memory>
34 
35 #include <openssl/rand.h>
36 
37 #ifdef HAVE_LIBBPF
38 #  include <bpf/bpf.h>
39 #  include <bpf/libbpf.h>
40 #endif // HAVE_LIBBPF
41 
42 #include "shrpx_tls.h"
43 #include "shrpx_log.h"
44 #include "shrpx_client_handler.h"
45 #include "shrpx_http2_session.h"
46 #include "shrpx_log_config.h"
47 #include "shrpx_memcached_dispatcher.h"
48 #ifdef HAVE_MRUBY
49 #  include "shrpx_mruby.h"
50 #endif // HAVE_MRUBY
51 #ifdef ENABLE_HTTP3
52 #  include "shrpx_quic_listener.h"
53 #endif // ENABLE_HTTP3
54 #include "shrpx_connection_handler.h"
55 #include "util.h"
56 #include "template.h"
57 #include "xsi_strerror.h"
58 
59 namespace shrpx {
60 
61 namespace {
eventcb(struct ev_loop * loop,ev_async * w,int revents)62 void eventcb(struct ev_loop *loop, ev_async *w, int revents) {
63   auto worker = static_cast<Worker *>(w->data);
64   worker->process_events();
65 }
66 } // namespace
67 
68 namespace {
mcpool_clear_cb(struct ev_loop * loop,ev_timer * w,int revents)69 void mcpool_clear_cb(struct ev_loop *loop, ev_timer *w, int revents) {
70   auto worker = static_cast<Worker *>(w->data);
71   if (worker->get_worker_stat()->num_connections != 0) {
72     return;
73   }
74   auto mcpool = worker->get_mcpool();
75   if (mcpool->freelistsize == mcpool->poolsize) {
76     worker->get_mcpool()->clear();
77   }
78 }
79 } // namespace
80 
81 namespace {
proc_wev_cb(struct ev_loop * loop,ev_timer * w,int revents)82 void proc_wev_cb(struct ev_loop *loop, ev_timer *w, int revents) {
83   auto worker = static_cast<Worker *>(w->data);
84   worker->process_events();
85 }
86 } // namespace
87 
DownstreamAddrGroup()88 DownstreamAddrGroup::DownstreamAddrGroup() : retired{false} {}
89 
~DownstreamAddrGroup()90 DownstreamAddrGroup::~DownstreamAddrGroup() {}
91 
92 // DownstreamKey is used to index SharedDownstreamAddr in order to
93 // find the same configuration.
94 using DownstreamKey = std::tuple<
95     std::vector<
96         std::tuple<StringRef, StringRef, StringRef, size_t, size_t, Proto,
97                    uint32_t, uint32_t, uint32_t, bool, bool, bool, bool>>,
98     bool, SessionAffinity, StringRef, StringRef, SessionAffinityCookieSecure,
99     SessionAffinityCookieStickiness, int64_t, int64_t, StringRef, bool>;
100 
101 namespace {
102 DownstreamKey
create_downstream_key(const std::shared_ptr<SharedDownstreamAddr> & shared_addr,const StringRef & mruby_file)103 create_downstream_key(const std::shared_ptr<SharedDownstreamAddr> &shared_addr,
104                       const StringRef &mruby_file) {
105   DownstreamKey dkey;
106 
107   auto &addrs = std::get<0>(dkey);
108   addrs.resize(shared_addr->addrs.size());
109   auto p = std::begin(addrs);
110   for (auto &a : shared_addr->addrs) {
111     std::get<0>(*p) = a.host;
112     std::get<1>(*p) = a.sni;
113     std::get<2>(*p) = a.group;
114     std::get<3>(*p) = a.fall;
115     std::get<4>(*p) = a.rise;
116     std::get<5>(*p) = a.proto;
117     std::get<6>(*p) = a.port;
118     std::get<7>(*p) = a.weight;
119     std::get<8>(*p) = a.group_weight;
120     std::get<9>(*p) = a.host_unix;
121     std::get<10>(*p) = a.tls;
122     std::get<11>(*p) = a.dns;
123     std::get<12>(*p) = a.upgrade_scheme;
124     ++p;
125   }
126   std::sort(std::begin(addrs), std::end(addrs));
127 
128   std::get<1>(dkey) = shared_addr->redirect_if_not_tls;
129 
130   auto &affinity = shared_addr->affinity;
131   std::get<2>(dkey) = affinity.type;
132   std::get<3>(dkey) = affinity.cookie.name;
133   std::get<4>(dkey) = affinity.cookie.path;
134   std::get<5>(dkey) = affinity.cookie.secure;
135   std::get<6>(dkey) = affinity.cookie.stickiness;
136   auto &timeout = shared_addr->timeout;
137   std::get<7>(dkey) = timeout.read;
138   std::get<8>(dkey) = timeout.write;
139   std::get<9>(dkey) = mruby_file;
140   std::get<10>(dkey) = shared_addr->dnf;
141 
142   return dkey;
143 }
144 } // namespace
145 
Worker(struct ev_loop * loop,SSL_CTX * sv_ssl_ctx,SSL_CTX * cl_ssl_ctx,SSL_CTX * tls_session_cache_memcached_ssl_ctx,tls::CertLookupTree * cert_tree,SSL_CTX * quic_sv_ssl_ctx,tls::CertLookupTree * quic_cert_tree,const uint8_t * cid_prefix,size_t cid_prefixlen,size_t index,const std::shared_ptr<TicketKeys> & ticket_keys,ConnectionHandler * conn_handler,std::shared_ptr<DownstreamConfig> downstreamconf)146 Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
147                SSL_CTX *tls_session_cache_memcached_ssl_ctx,
148                tls::CertLookupTree *cert_tree,
149 #ifdef ENABLE_HTTP3
150                SSL_CTX *quic_sv_ssl_ctx, tls::CertLookupTree *quic_cert_tree,
151                const uint8_t *cid_prefix, size_t cid_prefixlen,
152 #  ifdef HAVE_LIBBPF
153                size_t index,
154 #  endif // HAVE_LIBBPF
155 #endif   // ENABLE_HTTP3
156                const std::shared_ptr<TicketKeys> &ticket_keys,
157                ConnectionHandler *conn_handler,
158                std::shared_ptr<DownstreamConfig> downstreamconf)
159     :
160 #if defined(ENABLE_HTTP3) && defined(HAVE_LIBBPF)
161       index_{index},
162 #endif // ENABLE_HTTP3 && HAVE_LIBBPF
163       randgen_(util::make_mt19937()),
164       worker_stat_{},
165       dns_tracker_(loop, get_config()->conn.downstream->family),
166 #ifdef ENABLE_HTTP3
167       quic_upstream_addrs_{get_config()->conn.quic_listener.addrs},
168 #endif // ENABLE_HTTP3
169       loop_(loop),
170       sv_ssl_ctx_(sv_ssl_ctx),
171       cl_ssl_ctx_(cl_ssl_ctx),
172       cert_tree_(cert_tree),
173       conn_handler_(conn_handler),
174 #ifdef ENABLE_HTTP3
175       quic_sv_ssl_ctx_{quic_sv_ssl_ctx},
176       quic_cert_tree_{quic_cert_tree},
177       quic_conn_handler_{this},
178 #endif // ENABLE_HTTP3
179       ticket_keys_(ticket_keys),
180       connect_blocker_(
181           std::make_unique<ConnectBlocker>(randgen_, loop_, nullptr, nullptr)),
182       graceful_shutdown_(false) {
183 #ifdef ENABLE_HTTP3
184   std::copy_n(cid_prefix, cid_prefixlen, std::begin(cid_prefix_));
185 #endif // ENABLE_HTTP3
186 
187   ev_async_init(&w_, eventcb);
188   w_.data = this;
189   ev_async_start(loop_, &w_);
190 
191   ev_timer_init(&mcpool_clear_timer_, mcpool_clear_cb, 0., 0.);
192   mcpool_clear_timer_.data = this;
193 
194   ev_timer_init(&proc_wev_timer_, proc_wev_cb, 0., 0.);
195   proc_wev_timer_.data = this;
196 
197   auto &session_cacheconf = get_config()->tls.session_cache;
198 
199   if (!session_cacheconf.memcached.host.empty()) {
200     session_cache_memcached_dispatcher_ = std::make_unique<MemcachedDispatcher>(
201         &session_cacheconf.memcached.addr, loop,
202         tls_session_cache_memcached_ssl_ctx,
203         StringRef{session_cacheconf.memcached.host}, &mcpool_, randgen_);
204   }
205 
206   replace_downstream_config(std::move(downstreamconf));
207 }
208 
209 namespace {
ensure_enqueue_addr(std::priority_queue<WeightGroupEntry,std::vector<WeightGroupEntry>,WeightGroupEntryGreater> & wgpq,WeightGroup * wg,DownstreamAddr * addr)210 void ensure_enqueue_addr(
211     std::priority_queue<WeightGroupEntry, std::vector<WeightGroupEntry>,
212                         WeightGroupEntryGreater> &wgpq,
213     WeightGroup *wg, DownstreamAddr *addr) {
214   uint32_t cycle;
215   if (!wg->pq.empty()) {
216     auto &top = wg->pq.top();
217     cycle = top.cycle;
218   } else {
219     cycle = 0;
220   }
221 
222   addr->cycle = cycle;
223   addr->pending_penalty = 0;
224   wg->pq.push(DownstreamAddrEntry{addr, addr->seq, addr->cycle});
225   addr->queued = true;
226 
227   if (!wg->queued) {
228     if (!wgpq.empty()) {
229       auto &top = wgpq.top();
230       cycle = top.cycle;
231     } else {
232       cycle = 0;
233     }
234 
235     wg->cycle = cycle;
236     wg->pending_penalty = 0;
237     wgpq.push(WeightGroupEntry{wg, wg->seq, wg->cycle});
238     wg->queued = true;
239   }
240 }
241 } // namespace
242 
replace_downstream_config(std::shared_ptr<DownstreamConfig> downstreamconf)243 void Worker::replace_downstream_config(
244     std::shared_ptr<DownstreamConfig> downstreamconf) {
245   for (auto &g : downstream_addr_groups_) {
246     g->retired = true;
247 
248     auto &shared_addr = g->shared_addr;
249     for (auto &addr : shared_addr->addrs) {
250       addr.dconn_pool->remove_all();
251     }
252   }
253 
254   downstreamconf_ = downstreamconf;
255 
256   // Making a copy is much faster with multiple thread on
257   // backendconfig API call.
258   auto groups = downstreamconf->addr_groups;
259 
260   downstream_addr_groups_ =
261       std::vector<std::shared_ptr<DownstreamAddrGroup>>(groups.size());
262 
263   std::map<DownstreamKey, size_t> addr_groups_indexer;
264 #ifdef HAVE_MRUBY
265   // TODO It is a bit less efficient because
266   // mruby::create_mruby_context returns std::unique_ptr and we cannot
267   // use std::make_shared.
268   std::map<StringRef, std::shared_ptr<mruby::MRubyContext>> shared_mruby_ctxs;
269 #endif // HAVE_MRUBY
270 
271   for (size_t i = 0; i < groups.size(); ++i) {
272     auto &src = groups[i];
273     auto &dst = downstream_addr_groups_[i];
274 
275     dst = std::make_shared<DownstreamAddrGroup>();
276     dst->pattern =
277         ImmutableString{std::begin(src.pattern), std::end(src.pattern)};
278 
279     auto shared_addr = std::make_shared<SharedDownstreamAddr>();
280 
281     shared_addr->addrs.resize(src.addrs.size());
282     shared_addr->affinity.type = src.affinity.type;
283     if (src.affinity.type == SessionAffinity::COOKIE) {
284       shared_addr->affinity.cookie.name =
285           make_string_ref(shared_addr->balloc, src.affinity.cookie.name);
286       if (!src.affinity.cookie.path.empty()) {
287         shared_addr->affinity.cookie.path =
288             make_string_ref(shared_addr->balloc, src.affinity.cookie.path);
289       }
290       shared_addr->affinity.cookie.secure = src.affinity.cookie.secure;
291       shared_addr->affinity.cookie.stickiness = src.affinity.cookie.stickiness;
292     }
293     shared_addr->affinity_hash = src.affinity_hash;
294     shared_addr->affinity_hash_map = src.affinity_hash_map;
295     shared_addr->redirect_if_not_tls = src.redirect_if_not_tls;
296     shared_addr->dnf = src.dnf;
297     shared_addr->timeout.read = src.timeout.read;
298     shared_addr->timeout.write = src.timeout.write;
299 
300     for (size_t j = 0; j < src.addrs.size(); ++j) {
301       auto &src_addr = src.addrs[j];
302       auto &dst_addr = shared_addr->addrs[j];
303 
304       dst_addr.addr = src_addr.addr;
305       dst_addr.host = make_string_ref(shared_addr->balloc, src_addr.host);
306       dst_addr.hostport =
307           make_string_ref(shared_addr->balloc, src_addr.hostport);
308       dst_addr.port = src_addr.port;
309       dst_addr.host_unix = src_addr.host_unix;
310       dst_addr.weight = src_addr.weight;
311       dst_addr.group = make_string_ref(shared_addr->balloc, src_addr.group);
312       dst_addr.group_weight = src_addr.group_weight;
313       dst_addr.affinity_hash = src_addr.affinity_hash;
314       dst_addr.proto = src_addr.proto;
315       dst_addr.tls = src_addr.tls;
316       dst_addr.sni = make_string_ref(shared_addr->balloc, src_addr.sni);
317       dst_addr.fall = src_addr.fall;
318       dst_addr.rise = src_addr.rise;
319       dst_addr.dns = src_addr.dns;
320       dst_addr.upgrade_scheme = src_addr.upgrade_scheme;
321     }
322 
323 #ifdef HAVE_MRUBY
324     auto mruby_ctx_it = shared_mruby_ctxs.find(src.mruby_file);
325     if (mruby_ctx_it == std::end(shared_mruby_ctxs)) {
326       shared_addr->mruby_ctx = mruby::create_mruby_context(src.mruby_file);
327       assert(shared_addr->mruby_ctx);
328       shared_mruby_ctxs.emplace(src.mruby_file, shared_addr->mruby_ctx);
329     } else {
330       shared_addr->mruby_ctx = (*mruby_ctx_it).second;
331     }
332 #endif // HAVE_MRUBY
333 
334     // share the connection if patterns have the same set of backend
335     // addresses.
336 
337     auto dkey = create_downstream_key(shared_addr, src.mruby_file);
338     auto it = addr_groups_indexer.find(dkey);
339 
340     if (it == std::end(addr_groups_indexer)) {
341       auto shared_addr_ptr = shared_addr.get();
342 
343       for (auto &addr : shared_addr->addrs) {
344         addr.connect_blocker = std::make_unique<ConnectBlocker>(
345             randgen_, loop_, nullptr, [shared_addr_ptr, &addr]() {
346               if (!addr.queued) {
347                 if (!addr.wg) {
348                   return;
349                 }
350                 ensure_enqueue_addr(shared_addr_ptr->pq, addr.wg, &addr);
351               }
352             });
353 
354         addr.live_check = std::make_unique<LiveCheck>(loop_, cl_ssl_ctx_, this,
355                                                       &addr, randgen_);
356       }
357 
358       size_t seq = 0;
359       for (auto &addr : shared_addr->addrs) {
360         addr.dconn_pool = std::make_unique<DownstreamConnectionPool>();
361         addr.seq = seq++;
362       }
363 
364       util::shuffle(std::begin(shared_addr->addrs),
365                     std::end(shared_addr->addrs), randgen_,
366                     [](auto i, auto j) { std::swap((*i).seq, (*j).seq); });
367 
368       if (shared_addr->affinity.type == SessionAffinity::NONE) {
369         std::map<StringRef, WeightGroup *> wgs;
370         size_t num_wgs = 0;
371         for (auto &addr : shared_addr->addrs) {
372           if (wgs.find(addr.group) == std::end(wgs)) {
373             ++num_wgs;
374             wgs.emplace(addr.group, nullptr);
375           }
376         }
377 
378         shared_addr->wgs = std::vector<WeightGroup>(num_wgs);
379 
380         for (auto &addr : shared_addr->addrs) {
381           auto &wg = wgs[addr.group];
382           if (wg == nullptr) {
383             wg = &shared_addr->wgs[--num_wgs];
384             wg->seq = num_wgs;
385           }
386 
387           wg->weight = addr.group_weight;
388           wg->pq.push(DownstreamAddrEntry{&addr, addr.seq, addr.cycle});
389           addr.queued = true;
390           addr.wg = wg;
391         }
392 
393         assert(num_wgs == 0);
394 
395         for (auto &kv : wgs) {
396           shared_addr->pq.push(
397               WeightGroupEntry{kv.second, kv.second->seq, kv.second->cycle});
398           kv.second->queued = true;
399         }
400       }
401 
402       dst->shared_addr = shared_addr;
403 
404       addr_groups_indexer.emplace(std::move(dkey), i);
405     } else {
406       auto &g = *(std::begin(downstream_addr_groups_) + (*it).second);
407       if (LOG_ENABLED(INFO)) {
408         LOG(INFO) << dst->pattern << " shares the same backend group with "
409                   << g->pattern;
410       }
411       dst->shared_addr = g->shared_addr;
412     }
413   }
414 }
415 
~Worker()416 Worker::~Worker() {
417   ev_async_stop(loop_, &w_);
418   ev_timer_stop(loop_, &mcpool_clear_timer_);
419   ev_timer_stop(loop_, &proc_wev_timer_);
420 }
421 
schedule_clear_mcpool()422 void Worker::schedule_clear_mcpool() {
423   // libev manual says: "If the watcher is already active nothing will
424   // happen."  Since we don't change any timeout here, we don't have
425   // to worry about querying ev_is_active.
426   ev_timer_start(loop_, &mcpool_clear_timer_);
427 }
428 
wait()429 void Worker::wait() {
430 #ifndef NOTHREADS
431   fut_.get();
432 #endif // !NOTHREADS
433 }
434 
run_async()435 void Worker::run_async() {
436 #ifndef NOTHREADS
437   fut_ = std::async(std::launch::async, [this] {
438     (void)reopen_log_files(get_config()->logging);
439     ev_run(loop_);
440     delete_log_config();
441   });
442 #endif // !NOTHREADS
443 }
444 
send(WorkerEvent event)445 void Worker::send(WorkerEvent event) {
446   {
447     std::lock_guard<std::mutex> g(m_);
448 
449     q_.emplace_back(std::move(event));
450   }
451 
452   ev_async_send(loop_, &w_);
453 }
454 
process_events()455 void Worker::process_events() {
456   WorkerEvent wev;
457   {
458     std::lock_guard<std::mutex> g(m_);
459 
460     // Process event one at a time.  This is important for
461     // WorkerEventType::NEW_CONNECTION event since accepting large
462     // number of new connections at once may delay time to 1st byte
463     // for existing connections.
464 
465     if (q_.empty()) {
466       ev_timer_stop(loop_, &proc_wev_timer_);
467       return;
468     }
469 
470     wev = std::move(q_.front());
471     q_.pop_front();
472   }
473 
474   ev_timer_start(loop_, &proc_wev_timer_);
475 
476   auto config = get_config();
477 
478   auto worker_connections = config->conn.upstream.worker_connections;
479 
480   switch (wev.type) {
481   case WorkerEventType::NEW_CONNECTION: {
482     if (LOG_ENABLED(INFO)) {
483       WLOG(INFO, this) << "WorkerEvent: client_fd=" << wev.client_fd
484                        << ", addrlen=" << wev.client_addrlen;
485     }
486 
487     if (worker_stat_.num_connections >= worker_connections) {
488 
489       if (LOG_ENABLED(INFO)) {
490         WLOG(INFO, this) << "Too many connections >= " << worker_connections;
491       }
492 
493       close(wev.client_fd);
494 
495       break;
496     }
497 
498     auto client_handler =
499         tls::accept_connection(this, wev.client_fd, &wev.client_addr.sa,
500                                wev.client_addrlen, wev.faddr);
501     if (!client_handler) {
502       if (LOG_ENABLED(INFO)) {
503         WLOG(ERROR, this) << "ClientHandler creation failed";
504       }
505       close(wev.client_fd);
506       break;
507     }
508 
509     if (LOG_ENABLED(INFO)) {
510       WLOG(INFO, this) << "CLIENT_HANDLER:" << client_handler << " created ";
511     }
512 
513     break;
514   }
515   case WorkerEventType::REOPEN_LOG:
516     WLOG(NOTICE, this) << "Reopening log files: worker process (thread " << this
517                        << ")";
518 
519     reopen_log_files(config->logging);
520 
521     break;
522   case WorkerEventType::GRACEFUL_SHUTDOWN:
523     WLOG(NOTICE, this) << "Graceful shutdown commencing";
524 
525     graceful_shutdown_ = true;
526 
527     if (worker_stat_.num_connections == 0 &&
528         worker_stat_.num_close_waits == 0) {
529       ev_break(loop_);
530 
531       return;
532     }
533 
534     break;
535   case WorkerEventType::REPLACE_DOWNSTREAM:
536     WLOG(NOTICE, this) << "Replace downstream";
537 
538     replace_downstream_config(wev.downstreamconf);
539 
540     break;
541 #ifdef ENABLE_HTTP3
542   case WorkerEventType::QUIC_PKT_FORWARD: {
543     const UpstreamAddr *faddr;
544 
545     if (wev.quic_pkt->upstream_addr_index == static_cast<size_t>(-1)) {
546       faddr = find_quic_upstream_addr(wev.quic_pkt->local_addr);
547       if (faddr == nullptr) {
548         LOG(ERROR) << "No suitable upstream address found";
549 
550         break;
551       }
552     } else if (quic_upstream_addrs_.size() <=
553                wev.quic_pkt->upstream_addr_index) {
554       LOG(ERROR) << "upstream_addr_index is too large";
555 
556       break;
557     } else {
558       faddr = &quic_upstream_addrs_[wev.quic_pkt->upstream_addr_index];
559     }
560 
561     quic_conn_handler_.handle_packet(
562         faddr, wev.quic_pkt->remote_addr, wev.quic_pkt->local_addr,
563         wev.quic_pkt->pi, wev.quic_pkt->data.data(), wev.quic_pkt->data.size());
564 
565     break;
566   }
567 #endif // ENABLE_HTTP3
568   default:
569     if (LOG_ENABLED(INFO)) {
570       WLOG(INFO, this) << "unknown event type " << static_cast<int>(wev.type);
571     }
572   }
573 }
574 
get_cert_lookup_tree() const575 tls::CertLookupTree *Worker::get_cert_lookup_tree() const { return cert_tree_; }
576 
577 #ifdef ENABLE_HTTP3
get_quic_cert_lookup_tree() const578 tls::CertLookupTree *Worker::get_quic_cert_lookup_tree() const {
579   return quic_cert_tree_;
580 }
581 #endif // ENABLE_HTTP3
582 
get_ticket_keys()583 std::shared_ptr<TicketKeys> Worker::get_ticket_keys() {
584 #ifdef HAVE_ATOMIC_STD_SHARED_PTR
585   return std::atomic_load_explicit(&ticket_keys_, std::memory_order_acquire);
586 #else  // !HAVE_ATOMIC_STD_SHARED_PTR
587   std::lock_guard<std::mutex> g(ticket_keys_m_);
588   return ticket_keys_;
589 #endif // !HAVE_ATOMIC_STD_SHARED_PTR
590 }
591 
set_ticket_keys(std::shared_ptr<TicketKeys> ticket_keys)592 void Worker::set_ticket_keys(std::shared_ptr<TicketKeys> ticket_keys) {
593 #ifdef HAVE_ATOMIC_STD_SHARED_PTR
594   // This is single writer
595   std::atomic_store_explicit(&ticket_keys_, std::move(ticket_keys),
596                              std::memory_order_release);
597 #else  // !HAVE_ATOMIC_STD_SHARED_PTR
598   std::lock_guard<std::mutex> g(ticket_keys_m_);
599   ticket_keys_ = std::move(ticket_keys);
600 #endif // !HAVE_ATOMIC_STD_SHARED_PTR
601 }
602 
get_worker_stat()603 WorkerStat *Worker::get_worker_stat() { return &worker_stat_; }
604 
get_loop() const605 struct ev_loop *Worker::get_loop() const {
606   return loop_;
607 }
608 
get_sv_ssl_ctx() const609 SSL_CTX *Worker::get_sv_ssl_ctx() const { return sv_ssl_ctx_; }
610 
get_cl_ssl_ctx() const611 SSL_CTX *Worker::get_cl_ssl_ctx() const { return cl_ssl_ctx_; }
612 
613 #ifdef ENABLE_HTTP3
get_quic_sv_ssl_ctx() const614 SSL_CTX *Worker::get_quic_sv_ssl_ctx() const { return quic_sv_ssl_ctx_; }
615 #endif // ENABLE_HTTP3
616 
set_graceful_shutdown(bool f)617 void Worker::set_graceful_shutdown(bool f) { graceful_shutdown_ = f; }
618 
get_graceful_shutdown() const619 bool Worker::get_graceful_shutdown() const { return graceful_shutdown_; }
620 
get_mcpool()621 MemchunkPool *Worker::get_mcpool() { return &mcpool_; }
622 
get_session_cache_memcached_dispatcher()623 MemcachedDispatcher *Worker::get_session_cache_memcached_dispatcher() {
624   return session_cache_memcached_dispatcher_.get();
625 }
626 
get_randgen()627 std::mt19937 &Worker::get_randgen() { return randgen_; }
628 
629 #ifdef HAVE_MRUBY
create_mruby_context()630 int Worker::create_mruby_context() {
631   mruby_ctx_ = mruby::create_mruby_context(StringRef{get_config()->mruby_file});
632   if (!mruby_ctx_) {
633     return -1;
634   }
635 
636   return 0;
637 }
638 
get_mruby_context() const639 mruby::MRubyContext *Worker::get_mruby_context() const {
640   return mruby_ctx_.get();
641 }
642 #endif // HAVE_MRUBY
643 
644 std::vector<std::shared_ptr<DownstreamAddrGroup>> &
get_downstream_addr_groups()645 Worker::get_downstream_addr_groups() {
646   return downstream_addr_groups_;
647 }
648 
get_connect_blocker() const649 ConnectBlocker *Worker::get_connect_blocker() const {
650   return connect_blocker_.get();
651 }
652 
get_downstream_config() const653 const DownstreamConfig *Worker::get_downstream_config() const {
654   return downstreamconf_.get();
655 }
656 
get_connection_handler() const657 ConnectionHandler *Worker::get_connection_handler() const {
658   return conn_handler_;
659 }
660 
661 #ifdef ENABLE_HTTP3
get_quic_connection_handler()662 QUICConnectionHandler *Worker::get_quic_connection_handler() {
663   return &quic_conn_handler_;
664 }
665 #endif // ENABLE_HTTP3
666 
get_dns_tracker()667 DNSTracker *Worker::get_dns_tracker() { return &dns_tracker_; }
668 
669 #ifdef ENABLE_HTTP3
670 #  ifdef HAVE_LIBBPF
should_attach_bpf() const671 bool Worker::should_attach_bpf() const {
672   auto config = get_config();
673   auto &quicconf = config->quic;
674   auto &apiconf = config->api;
675 
676   if (quicconf.bpf.disabled) {
677     return false;
678   }
679 
680   if (!config->single_thread && apiconf.enabled) {
681     return index_ == 1;
682   }
683 
684   return index_ == 0;
685 }
686 
should_update_bpf_map() const687 bool Worker::should_update_bpf_map() const {
688   auto config = get_config();
689   auto &quicconf = config->quic;
690 
691   return !quicconf.bpf.disabled;
692 }
693 
compute_sk_index() const694 uint32_t Worker::compute_sk_index() const {
695   auto config = get_config();
696   auto &apiconf = config->api;
697 
698   if (!config->single_thread && apiconf.enabled) {
699     return index_ - 1;
700   }
701 
702   return index_;
703 }
704 #  endif // HAVE_LIBBPF
705 
setup_quic_server_socket()706 int Worker::setup_quic_server_socket() {
707   size_t n = 0;
708 
709   for (auto &addr : quic_upstream_addrs_) {
710     assert(!addr.host_unix);
711     if (create_quic_server_socket(addr) != 0) {
712       return -1;
713     }
714 
715     // Make sure that each endpoint has a unique address.
716     for (size_t i = 0; i < n; ++i) {
717       const auto &a = quic_upstream_addrs_[i];
718 
719       if (addr.hostport == a.hostport) {
720         LOG(FATAL)
721             << "QUIC frontend endpoint must be unique: a duplicate found for "
722             << addr.hostport;
723 
724         return -1;
725       }
726     }
727 
728     ++n;
729 
730     quic_listeners_.emplace_back(std::make_unique<QUICListener>(&addr, this));
731   }
732 
733   return 0;
734 }
735 
create_quic_server_socket(UpstreamAddr & faddr)736 int Worker::create_quic_server_socket(UpstreamAddr &faddr) {
737   std::array<char, STRERROR_BUFSIZE> errbuf;
738   int fd = -1;
739   int rv;
740 
741   auto service = util::utos(faddr.port);
742   addrinfo hints{};
743   hints.ai_family = faddr.family;
744   hints.ai_socktype = SOCK_DGRAM;
745   hints.ai_flags = AI_PASSIVE;
746 #  ifdef AI_ADDRCONFIG
747   hints.ai_flags |= AI_ADDRCONFIG;
748 #  endif // AI_ADDRCONFIG
749 
750   auto node =
751       faddr.host == StringRef::from_lit("*") ? nullptr : faddr.host.c_str();
752 
753   addrinfo *res, *rp;
754   rv = getaddrinfo(node, service.c_str(), &hints, &res);
755 #  ifdef AI_ADDRCONFIG
756   if (rv != 0) {
757     // Retry without AI_ADDRCONFIG
758     hints.ai_flags &= ~AI_ADDRCONFIG;
759     rv = getaddrinfo(node, service.c_str(), &hints, &res);
760   }
761 #  endif // AI_ADDRCONFIG
762   if (rv != 0) {
763     LOG(FATAL) << "Unable to get IPv" << (faddr.family == AF_INET ? "4" : "6")
764                << " address for " << faddr.host << ", port " << faddr.port
765                << ": " << gai_strerror(rv);
766     return -1;
767   }
768 
769   auto res_d = defer(freeaddrinfo, res);
770 
771   std::array<char, NI_MAXHOST> host;
772 
773   for (rp = res; rp; rp = rp->ai_next) {
774     rv = getnameinfo(rp->ai_addr, rp->ai_addrlen, host.data(), host.size(),
775                      nullptr, 0, NI_NUMERICHOST);
776     if (rv != 0) {
777       LOG(WARN) << "getnameinfo() failed: " << gai_strerror(rv);
778       continue;
779     }
780 
781 #  ifdef SOCK_NONBLOCK
782     fd = socket(rp->ai_family, rp->ai_socktype | SOCK_NONBLOCK | SOCK_CLOEXEC,
783                 rp->ai_protocol);
784     if (fd == -1) {
785       auto error = errno;
786       LOG(WARN) << "socket() syscall failed: "
787                 << xsi_strerror(error, errbuf.data(), errbuf.size());
788       continue;
789     }
790 #  else  // !SOCK_NONBLOCK
791     fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
792     if (fd == -1) {
793       auto error = errno;
794       LOG(WARN) << "socket() syscall failed: "
795                 << xsi_strerror(error, errbuf.data(), errbuf.size());
796       continue;
797     }
798     util::make_socket_nonblocking(fd);
799     util::make_socket_closeonexec(fd);
800 #  endif // !SOCK_NONBLOCK
801 
802     int val = 1;
803     if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val,
804                    static_cast<socklen_t>(sizeof(val))) == -1) {
805       auto error = errno;
806       LOG(WARN) << "Failed to set SO_REUSEADDR option to listener socket: "
807                 << xsi_strerror(error, errbuf.data(), errbuf.size());
808       close(fd);
809       continue;
810     }
811 
812     if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &val,
813                    static_cast<socklen_t>(sizeof(val))) == -1) {
814       auto error = errno;
815       LOG(WARN) << "Failed to set SO_REUSEPORT option to listener socket: "
816                 << xsi_strerror(error, errbuf.data(), errbuf.size());
817       close(fd);
818       continue;
819     }
820 
821     if (faddr.family == AF_INET6) {
822 #  ifdef IPV6_V6ONLY
823       if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val,
824                      static_cast<socklen_t>(sizeof(val))) == -1) {
825         auto error = errno;
826         LOG(WARN) << "Failed to set IPV6_V6ONLY option to listener socket: "
827                   << xsi_strerror(error, errbuf.data(), errbuf.size());
828         close(fd);
829         continue;
830       }
831 #  endif // IPV6_V6ONLY
832 
833       if (setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &val,
834                      static_cast<socklen_t>(sizeof(val))) == -1) {
835         auto error = errno;
836         LOG(WARN)
837             << "Failed to set IPV6_RECVPKTINFO option to listener socket: "
838             << xsi_strerror(error, errbuf.data(), errbuf.size());
839         close(fd);
840         continue;
841       }
842 
843       if (setsockopt(fd, IPPROTO_IPV6, IPV6_RECVTCLASS, &val,
844                      static_cast<socklen_t>(sizeof(val))) == -1) {
845         auto error = errno;
846         LOG(WARN) << "Failed to set IPV6_RECVTCLASS option to listener socket: "
847                   << xsi_strerror(error, errbuf.data(), errbuf.size());
848         close(fd);
849         continue;
850       }
851 
852 #  if defined(IPV6_MTU_DISCOVER) && defined(IPV6_PMTUDISC_DO)
853       int mtu_disc = IPV6_PMTUDISC_DO;
854       if (setsockopt(fd, IPPROTO_IPV6, IPV6_MTU_DISCOVER, &mtu_disc,
855                      static_cast<socklen_t>(sizeof(mtu_disc))) == -1) {
856         auto error = errno;
857         LOG(WARN)
858             << "Failed to set IPV6_MTU_DISCOVER option to listener socket: "
859             << xsi_strerror(error, errbuf.data(), errbuf.size());
860         close(fd);
861         continue;
862       }
863 #  endif // defined(IPV6_MTU_DISCOVER) && defined(IP_PMTUDISC_DO)
864     } else {
865       if (setsockopt(fd, IPPROTO_IP, IP_PKTINFO, &val,
866                      static_cast<socklen_t>(sizeof(val))) == -1) {
867         auto error = errno;
868         LOG(WARN) << "Failed to set IP_PKTINFO option to listener socket: "
869                   << xsi_strerror(error, errbuf.data(), errbuf.size());
870         close(fd);
871         continue;
872       }
873 
874       if (setsockopt(fd, IPPROTO_IP, IP_RECVTOS, &val,
875                      static_cast<socklen_t>(sizeof(val))) == -1) {
876         auto error = errno;
877         LOG(WARN) << "Failed to set IP_RECVTOS option to listener socket: "
878                   << xsi_strerror(error, errbuf.data(), errbuf.size());
879         close(fd);
880         continue;
881       }
882 
883 #  if defined(IP_MTU_DISCOVER) && defined(IP_PMTUDISC_DO)
884       int mtu_disc = IP_PMTUDISC_DO;
885       if (setsockopt(fd, IPPROTO_IP, IP_MTU_DISCOVER, &mtu_disc,
886                      static_cast<socklen_t>(sizeof(mtu_disc))) == -1) {
887         auto error = errno;
888         LOG(WARN) << "Failed to set IP_MTU_DISCOVER option to listener socket: "
889                   << xsi_strerror(error, errbuf.data(), errbuf.size());
890         close(fd);
891         continue;
892       }
893 #  endif // defined(IP_MTU_DISCOVER) && defined(IP_PMTUDISC_DO)
894     }
895 
896 #  ifdef UDP_GRO
897     if (setsockopt(fd, IPPROTO_UDP, UDP_GRO, &val, sizeof(val)) == -1) {
898       auto error = errno;
899       LOG(WARN) << "Failed to set UDP_GRO option to listener socket: "
900                 << xsi_strerror(error, errbuf.data(), errbuf.size());
901       close(fd);
902       continue;
903     }
904 #  endif // UDP_GRO
905 
906     if (bind(fd, rp->ai_addr, rp->ai_addrlen) == -1) {
907       auto error = errno;
908       LOG(WARN) << "bind() syscall failed: "
909                 << xsi_strerror(error, errbuf.data(), errbuf.size());
910       close(fd);
911       continue;
912     }
913 
914 #  ifdef HAVE_LIBBPF
915     auto config = get_config();
916 
917     auto &quic_bpf_refs = conn_handler_->get_quic_bpf_refs();
918 
919     if (should_attach_bpf()) {
920       auto &bpfconf = config->quic.bpf;
921 
922       auto obj = bpf_object__open_file(bpfconf.prog_file.c_str(), nullptr);
923       if (!obj) {
924         auto error = errno;
925         LOG(FATAL) << "Failed to open bpf object file: "
926                    << xsi_strerror(error, errbuf.data(), errbuf.size());
927         close(fd);
928         return -1;
929       }
930 
931       rv = bpf_object__load(obj);
932       if (rv != 0) {
933         auto error = errno;
934         LOG(FATAL) << "Failed to load bpf object file: "
935                    << xsi_strerror(error, errbuf.data(), errbuf.size());
936         close(fd);
937         return -1;
938       }
939 
940       auto prog = bpf_object__find_program_by_name(obj, "select_reuseport");
941       if (!prog) {
942         auto error = errno;
943         LOG(FATAL) << "Failed to find sk_reuseport program: "
944                    << xsi_strerror(error, errbuf.data(), errbuf.size());
945         close(fd);
946         return -1;
947       }
948 
949       auto &ref = quic_bpf_refs[faddr.index];
950 
951       ref.obj = obj;
952 
953       ref.reuseport_array =
954           bpf_object__find_map_by_name(obj, "reuseport_array");
955       if (!ref.reuseport_array) {
956         auto error = errno;
957         LOG(FATAL) << "Failed to get reuseport_array: "
958                    << xsi_strerror(error, errbuf.data(), errbuf.size());
959         close(fd);
960         return -1;
961       }
962 
963       ref.cid_prefix_map = bpf_object__find_map_by_name(obj, "cid_prefix_map");
964       if (!ref.cid_prefix_map) {
965         auto error = errno;
966         LOG(FATAL) << "Failed to get cid_prefix_map: "
967                    << xsi_strerror(error, errbuf.data(), errbuf.size());
968         close(fd);
969         return -1;
970       }
971 
972       auto sk_info = bpf_object__find_map_by_name(obj, "sk_info");
973       if (!sk_info) {
974         auto error = errno;
975         LOG(FATAL) << "Failed to get sk_info: "
976                    << xsi_strerror(error, errbuf.data(), errbuf.size());
977         close(fd);
978         return -1;
979       }
980 
981       constexpr uint32_t zero = 0;
982       uint64_t num_socks = config->num_worker;
983 
984       rv = bpf_map__update_elem(sk_info, &zero, sizeof(zero), &num_socks,
985                                 sizeof(num_socks), BPF_ANY);
986       if (rv != 0) {
987         auto error = errno;
988         LOG(FATAL) << "Failed to update sk_info: "
989                    << xsi_strerror(error, errbuf.data(), errbuf.size());
990         close(fd);
991         return -1;
992       }
993 
994       constexpr uint32_t key_high_idx = 1;
995       constexpr uint32_t key_low_idx = 2;
996 
997       auto &qkms = conn_handler_->get_quic_keying_materials();
998       auto &qkm = qkms->keying_materials.front();
999 
1000       rv = bpf_map__update_elem(sk_info, &key_high_idx, sizeof(key_high_idx),
1001                                 qkm.cid_encryption_key.data(),
1002                                 qkm.cid_encryption_key.size() / 2, BPF_ANY);
1003       if (rv != 0) {
1004         auto error = errno;
1005         LOG(FATAL) << "Failed to update key_high_idx sk_info: "
1006                    << xsi_strerror(error, errbuf.data(), errbuf.size());
1007         close(fd);
1008         return -1;
1009       }
1010 
1011       rv = bpf_map__update_elem(sk_info, &key_low_idx, sizeof(key_low_idx),
1012                                 qkm.cid_encryption_key.data() +
1013                                     qkm.cid_encryption_key.size() / 2,
1014                                 qkm.cid_encryption_key.size() / 2, BPF_ANY);
1015       if (rv != 0) {
1016         auto error = errno;
1017         LOG(FATAL) << "Failed to update key_low_idx sk_info: "
1018                    << xsi_strerror(error, errbuf.data(), errbuf.size());
1019         close(fd);
1020         return -1;
1021       }
1022 
1023       auto prog_fd = bpf_program__fd(prog);
1024 
1025       if (setsockopt(fd, SOL_SOCKET, SO_ATTACH_REUSEPORT_EBPF, &prog_fd,
1026                      static_cast<socklen_t>(sizeof(prog_fd))) == -1) {
1027         LOG(FATAL) << "Failed to attach bpf program: "
1028                    << xsi_strerror(errno, errbuf.data(), errbuf.size());
1029         close(fd);
1030         return -1;
1031       }
1032     }
1033 
1034     if (should_update_bpf_map()) {
1035       const auto &ref = quic_bpf_refs[faddr.index];
1036       auto sk_index = compute_sk_index();
1037 
1038       rv = bpf_map__update_elem(ref.reuseport_array, &sk_index,
1039                                 sizeof(sk_index), &fd, sizeof(fd), BPF_NOEXIST);
1040       if (rv != 0) {
1041         auto error = errno;
1042         LOG(FATAL) << "Failed to update reuseport_array: "
1043                    << xsi_strerror(error, errbuf.data(), errbuf.size());
1044         close(fd);
1045         return -1;
1046       }
1047 
1048       rv = bpf_map__update_elem(ref.cid_prefix_map, cid_prefix_.data(),
1049                                 cid_prefix_.size(), &sk_index, sizeof(sk_index),
1050                                 BPF_NOEXIST);
1051       if (rv != 0) {
1052         auto error = errno;
1053         LOG(FATAL) << "Failed to update cid_prefix_map: "
1054                    << xsi_strerror(error, errbuf.data(), errbuf.size());
1055         close(fd);
1056         return -1;
1057       }
1058     }
1059 #  endif // HAVE_LIBBPF
1060 
1061     break;
1062   }
1063 
1064   if (!rp) {
1065     LOG(FATAL) << "Listening " << (faddr.family == AF_INET ? "IPv4" : "IPv6")
1066                << " socket failed";
1067 
1068     return -1;
1069   }
1070 
1071   faddr.fd = fd;
1072   faddr.hostport = util::make_http_hostport(mod_config()->balloc,
1073                                             StringRef{host.data()}, faddr.port);
1074 
1075   LOG(NOTICE) << "Listening on " << faddr.hostport << ", quic";
1076 
1077   return 0;
1078 }
1079 
get_cid_prefix() const1080 const uint8_t *Worker::get_cid_prefix() const { return cid_prefix_.data(); }
1081 
find_quic_upstream_addr(const Address & local_addr)1082 const UpstreamAddr *Worker::find_quic_upstream_addr(const Address &local_addr) {
1083   std::array<char, NI_MAXHOST> host;
1084 
1085   auto rv = getnameinfo(&local_addr.su.sa, local_addr.len, host.data(),
1086                         host.size(), nullptr, 0, NI_NUMERICHOST);
1087   if (rv != 0) {
1088     LOG(ERROR) << "getnameinfo: " << gai_strerror(rv);
1089 
1090     return nullptr;
1091   }
1092 
1093   uint16_t port;
1094 
1095   switch (local_addr.su.sa.sa_family) {
1096   case AF_INET:
1097     port = htons(local_addr.su.in.sin_port);
1098 
1099     break;
1100   case AF_INET6:
1101     port = htons(local_addr.su.in6.sin6_port);
1102 
1103     break;
1104   default:
1105     assert(0);
1106     abort();
1107   }
1108 
1109   std::array<char, util::max_hostport> hostport_buf;
1110 
1111   auto hostport = util::make_http_hostport(std::begin(hostport_buf),
1112                                            StringRef{host.data()}, port);
1113   const UpstreamAddr *fallback_faddr = nullptr;
1114 
1115   for (auto &faddr : quic_upstream_addrs_) {
1116     if (faddr.hostport == hostport) {
1117       return &faddr;
1118     }
1119 
1120     if (faddr.port != port || faddr.family != local_addr.su.sa.sa_family) {
1121       continue;
1122     }
1123 
1124     if (faddr.port == 443 || faddr.port == 80) {
1125       switch (faddr.family) {
1126       case AF_INET:
1127         if (util::streq(faddr.hostport, StringRef::from_lit("0.0.0.0"))) {
1128           fallback_faddr = &faddr;
1129         }
1130 
1131         break;
1132       case AF_INET6:
1133         if (util::streq(faddr.hostport, StringRef::from_lit("[::]"))) {
1134           fallback_faddr = &faddr;
1135         }
1136 
1137         break;
1138       default:
1139         assert(0);
1140       }
1141     } else {
1142       switch (faddr.family) {
1143       case AF_INET:
1144         if (util::starts_with(faddr.hostport,
1145                               StringRef::from_lit("0.0.0.0:"))) {
1146           fallback_faddr = &faddr;
1147         }
1148 
1149         break;
1150       case AF_INET6:
1151         if (util::starts_with(faddr.hostport, StringRef::from_lit("[::]:"))) {
1152           fallback_faddr = &faddr;
1153         }
1154 
1155         break;
1156       default:
1157         assert(0);
1158       }
1159     }
1160   }
1161 
1162   return fallback_faddr;
1163 }
1164 #endif // ENABLE_HTTP3
1165 
1166 namespace {
match_downstream_addr_group_host(const RouterConfig & routerconf,const StringRef & host,const StringRef & path,const std::vector<std::shared_ptr<DownstreamAddrGroup>> & groups,size_t catch_all,BlockAllocator & balloc)1167 size_t match_downstream_addr_group_host(
1168     const RouterConfig &routerconf, const StringRef &host,
1169     const StringRef &path,
1170     const std::vector<std::shared_ptr<DownstreamAddrGroup>> &groups,
1171     size_t catch_all, BlockAllocator &balloc) {
1172 
1173   const auto &router = routerconf.router;
1174   const auto &rev_wildcard_router = routerconf.rev_wildcard_router;
1175   const auto &wildcard_patterns = routerconf.wildcard_patterns;
1176 
1177   if (LOG_ENABLED(INFO)) {
1178     LOG(INFO) << "Perform mapping selection, using host=" << host
1179               << ", path=" << path;
1180   }
1181 
1182   auto group = router.match(host, path);
1183   if (group != -1) {
1184     if (LOG_ENABLED(INFO)) {
1185       LOG(INFO) << "Found pattern with query " << host << path
1186                 << ", matched pattern=" << groups[group]->pattern;
1187     }
1188     return group;
1189   }
1190 
1191   if (!wildcard_patterns.empty() && !host.empty()) {
1192     auto rev_host_src = make_byte_ref(balloc, host.size() - 1);
1193     auto ep =
1194         std::copy(std::begin(host) + 1, std::end(host), rev_host_src.base);
1195     std::reverse(rev_host_src.base, ep);
1196     auto rev_host = StringRef{rev_host_src.base, ep};
1197 
1198     ssize_t best_group = -1;
1199     const RNode *last_node = nullptr;
1200 
1201     for (;;) {
1202       size_t nread = 0;
1203       auto wcidx =
1204           rev_wildcard_router.match_prefix(&nread, &last_node, rev_host);
1205       if (wcidx == -1) {
1206         break;
1207       }
1208 
1209       rev_host = StringRef{std::begin(rev_host) + nread, std::end(rev_host)};
1210 
1211       auto &wc = wildcard_patterns[wcidx];
1212       auto group = wc.router.match(StringRef{}, path);
1213       if (group != -1) {
1214         // We sorted wildcard_patterns in a way that first match is the
1215         // longest host pattern.
1216         if (LOG_ENABLED(INFO)) {
1217           LOG(INFO) << "Found wildcard pattern with query " << host << path
1218                     << ", matched pattern=" << groups[group]->pattern;
1219         }
1220 
1221         best_group = group;
1222       }
1223     }
1224 
1225     if (best_group != -1) {
1226       return best_group;
1227     }
1228   }
1229 
1230   group = router.match(StringRef::from_lit(""), path);
1231   if (group != -1) {
1232     if (LOG_ENABLED(INFO)) {
1233       LOG(INFO) << "Found pattern with query " << path
1234                 << ", matched pattern=" << groups[group]->pattern;
1235     }
1236     return group;
1237   }
1238 
1239   if (LOG_ENABLED(INFO)) {
1240     LOG(INFO) << "None match.  Use catch-all pattern";
1241   }
1242   return catch_all;
1243 }
1244 } // namespace
1245 
match_downstream_addr_group(const RouterConfig & routerconf,const StringRef & hostport,const StringRef & raw_path,const std::vector<std::shared_ptr<DownstreamAddrGroup>> & groups,size_t catch_all,BlockAllocator & balloc)1246 size_t match_downstream_addr_group(
1247     const RouterConfig &routerconf, const StringRef &hostport,
1248     const StringRef &raw_path,
1249     const std::vector<std::shared_ptr<DownstreamAddrGroup>> &groups,
1250     size_t catch_all, BlockAllocator &balloc) {
1251   if (std::find(std::begin(hostport), std::end(hostport), '/') !=
1252       std::end(hostport)) {
1253     // We use '/' specially, and if '/' is included in host, it breaks
1254     // our code.  Select catch-all case.
1255     return catch_all;
1256   }
1257 
1258   auto fragment = std::find(std::begin(raw_path), std::end(raw_path), '#');
1259   auto query = std::find(std::begin(raw_path), fragment, '?');
1260   auto path = StringRef{std::begin(raw_path), query};
1261 
1262   if (path.empty() || path[0] != '/') {
1263     path = StringRef::from_lit("/");
1264   }
1265 
1266   if (hostport.empty()) {
1267     return match_downstream_addr_group_host(routerconf, hostport, path, groups,
1268                                             catch_all, balloc);
1269   }
1270 
1271   StringRef host;
1272   if (hostport[0] == '[') {
1273     // assume this is IPv6 numeric address
1274     auto p = std::find(std::begin(hostport), std::end(hostport), ']');
1275     if (p == std::end(hostport)) {
1276       return catch_all;
1277     }
1278     if (p + 1 < std::end(hostport) && *(p + 1) != ':') {
1279       return catch_all;
1280     }
1281     host = StringRef{std::begin(hostport), p + 1};
1282   } else {
1283     auto p = std::find(std::begin(hostport), std::end(hostport), ':');
1284     if (p == std::begin(hostport)) {
1285       return catch_all;
1286     }
1287     host = StringRef{std::begin(hostport), p};
1288   }
1289 
1290   if (std::find_if(std::begin(host), std::end(host), [](char c) {
1291         return 'A' <= c || c <= 'Z';
1292       }) != std::end(host)) {
1293     auto low_host = make_byte_ref(balloc, host.size() + 1);
1294     auto ep = std::copy(std::begin(host), std::end(host), low_host.base);
1295     *ep = '\0';
1296     util::inp_strlower(low_host.base, ep);
1297     host = StringRef{low_host.base, ep};
1298   }
1299   return match_downstream_addr_group_host(routerconf, host, path, groups,
1300                                           catch_all, balloc);
1301 }
1302 
downstream_failure(DownstreamAddr * addr,const Address * raddr)1303 void downstream_failure(DownstreamAddr *addr, const Address *raddr) {
1304   const auto &connect_blocker = addr->connect_blocker;
1305 
1306   if (connect_blocker->in_offline()) {
1307     return;
1308   }
1309 
1310   connect_blocker->on_failure();
1311 
1312   if (addr->fall == 0) {
1313     return;
1314   }
1315 
1316   auto fail_count = connect_blocker->get_fail_count();
1317 
1318   if (fail_count >= addr->fall) {
1319     if (raddr) {
1320       LOG(WARN) << "Could not connect to " << util::to_numeric_addr(raddr)
1321                 << " " << fail_count
1322                 << " times in a row; considered as offline";
1323     } else {
1324       LOG(WARN) << "Could not connect to " << addr->host << ":" << addr->port
1325                 << " " << fail_count
1326                 << " times in a row; considered as offline";
1327     }
1328 
1329     connect_blocker->offline();
1330 
1331     if (addr->rise) {
1332       addr->live_check->schedule();
1333     }
1334   }
1335 }
1336 
1337 #ifdef ENABLE_HTTP3
create_cid_prefix(uint8_t * cid_prefix,const uint8_t * server_id)1338 int create_cid_prefix(uint8_t *cid_prefix, const uint8_t *server_id) {
1339   auto p = std::copy_n(server_id, SHRPX_QUIC_SERVER_IDLEN, cid_prefix);
1340 
1341   if (RAND_bytes(p, SHRPX_QUIC_CID_PREFIXLEN - SHRPX_QUIC_SERVER_IDLEN) != 1) {
1342     return -1;
1343   }
1344 
1345   return 0;
1346 }
1347 #endif // ENABLE_HTTP3
1348 
1349 } // namespace shrpx
1350