1 /*
2 * nghttp2 - HTTP/2 C Library
3 *
4 * Copyright (c) 2012 Tatsuhiro Tsujikawa
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining
7 * a copy of this software and associated documentation files (the
8 * "Software"), to deal in the Software without restriction, including
9 * without limitation the rights to use, copy, modify, merge, publish,
10 * distribute, sublicense, and/or sell copies of the Software, and to
11 * permit persons to whom the Software is furnished to do so, subject to
12 * the following conditions:
13 *
14 * The above copyright notice and this permission notice shall be
15 * included in all copies or substantial portions of the Software.
16 *
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24 */
25 #include "shrpx_worker.h"
26
27 #ifdef HAVE_UNISTD_H
28 # include <unistd.h>
29 #endif // HAVE_UNISTD_H
30 #include <netinet/udp.h>
31
32 #include <cstdio>
33 #include <memory>
34
35 #include "ssl_compat.h"
36
37 #ifdef NGHTTP2_OPENSSL_IS_WOLFSSL
38 # include <wolfssl/options.h>
39 # include <wolfssl/openssl/rand.h>
40 #else // !NGHTTP2_OPENSSL_IS_WOLFSSL
41 # include <openssl/rand.h>
42 #endif // !NGHTTP2_OPENSSL_IS_WOLFSSL
43
44 #ifdef HAVE_LIBBPF
45 # include <bpf/bpf.h>
46 # include <bpf/libbpf.h>
47 #endif // HAVE_LIBBPF
48
49 #include "shrpx_tls.h"
50 #include "shrpx_log.h"
51 #include "shrpx_client_handler.h"
52 #include "shrpx_http2_session.h"
53 #include "shrpx_log_config.h"
54 #include "shrpx_memcached_dispatcher.h"
55 #ifdef HAVE_MRUBY
56 # include "shrpx_mruby.h"
57 #endif // HAVE_MRUBY
58 #ifdef ENABLE_HTTP3
59 # include "shrpx_quic_listener.h"
60 #endif // ENABLE_HTTP3
61 #include "shrpx_connection_handler.h"
62 #include "util.h"
63 #include "template.h"
64 #include "xsi_strerror.h"
65
66 namespace shrpx {
67
68 namespace {
eventcb(struct ev_loop * loop,ev_async * w,int revents)69 void eventcb(struct ev_loop *loop, ev_async *w, int revents) {
70 auto worker = static_cast<Worker *>(w->data);
71 worker->process_events();
72 }
73 } // namespace
74
75 namespace {
mcpool_clear_cb(struct ev_loop * loop,ev_timer * w,int revents)76 void mcpool_clear_cb(struct ev_loop *loop, ev_timer *w, int revents) {
77 auto worker = static_cast<Worker *>(w->data);
78 if (worker->get_worker_stat()->num_connections != 0) {
79 return;
80 }
81 auto mcpool = worker->get_mcpool();
82 if (mcpool->freelistsize == mcpool->poolsize) {
83 worker->get_mcpool()->clear();
84 }
85 }
86 } // namespace
87
88 namespace {
proc_wev_cb(struct ev_loop * loop,ev_timer * w,int revents)89 void proc_wev_cb(struct ev_loop *loop, ev_timer *w, int revents) {
90 auto worker = static_cast<Worker *>(w->data);
91 worker->process_events();
92 }
93 } // namespace
94
DownstreamAddrGroup()95 DownstreamAddrGroup::DownstreamAddrGroup() : retired{false} {}
96
~DownstreamAddrGroup()97 DownstreamAddrGroup::~DownstreamAddrGroup() {}
98
99 // DownstreamKey is used to index SharedDownstreamAddr in order to
100 // find the same configuration.
101 using DownstreamKey = std::tuple<
102 std::vector<std::tuple<StringRef, StringRef, StringRef, size_t, size_t, Proto,
103 uint32_t, uint32_t, uint32_t, bool, bool, bool, bool>>,
104 bool, SessionAffinity, StringRef, StringRef, SessionAffinityCookieSecure,
105 SessionAffinityCookieStickiness, int64_t, int64_t, StringRef, bool>;
106
107 namespace {
108 DownstreamKey
create_downstream_key(const std::shared_ptr<SharedDownstreamAddr> & shared_addr,const StringRef & mruby_file)109 create_downstream_key(const std::shared_ptr<SharedDownstreamAddr> &shared_addr,
110 const StringRef &mruby_file) {
111 DownstreamKey dkey;
112
113 auto &addrs = std::get<0>(dkey);
114 addrs.resize(shared_addr->addrs.size());
115 auto p = std::begin(addrs);
116 for (auto &a : shared_addr->addrs) {
117 std::get<0>(*p) = a.host;
118 std::get<1>(*p) = a.sni;
119 std::get<2>(*p) = a.group;
120 std::get<3>(*p) = a.fall;
121 std::get<4>(*p) = a.rise;
122 std::get<5>(*p) = a.proto;
123 std::get<6>(*p) = a.port;
124 std::get<7>(*p) = a.weight;
125 std::get<8>(*p) = a.group_weight;
126 std::get<9>(*p) = a.host_unix;
127 std::get<10>(*p) = a.tls;
128 std::get<11>(*p) = a.dns;
129 std::get<12>(*p) = a.upgrade_scheme;
130 ++p;
131 }
132 std::sort(std::begin(addrs), std::end(addrs));
133
134 std::get<1>(dkey) = shared_addr->redirect_if_not_tls;
135
136 auto &affinity = shared_addr->affinity;
137 std::get<2>(dkey) = affinity.type;
138 std::get<3>(dkey) = affinity.cookie.name;
139 std::get<4>(dkey) = affinity.cookie.path;
140 std::get<5>(dkey) = affinity.cookie.secure;
141 std::get<6>(dkey) = affinity.cookie.stickiness;
142 auto &timeout = shared_addr->timeout;
143 std::get<7>(dkey) = timeout.read;
144 std::get<8>(dkey) = timeout.write;
145 std::get<9>(dkey) = mruby_file;
146 std::get<10>(dkey) = shared_addr->dnf;
147
148 return dkey;
149 }
150 } // namespace
151
Worker(struct ev_loop * loop,SSL_CTX * sv_ssl_ctx,SSL_CTX * cl_ssl_ctx,SSL_CTX * tls_session_cache_memcached_ssl_ctx,tls::CertLookupTree * cert_tree,SSL_CTX * quic_sv_ssl_ctx,tls::CertLookupTree * quic_cert_tree,WorkerID wid,size_t index,const std::shared_ptr<TicketKeys> & ticket_keys,ConnectionHandler * conn_handler,std::shared_ptr<DownstreamConfig> downstreamconf)152 Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
153 SSL_CTX *tls_session_cache_memcached_ssl_ctx,
154 tls::CertLookupTree *cert_tree,
155 #ifdef ENABLE_HTTP3
156 SSL_CTX *quic_sv_ssl_ctx, tls::CertLookupTree *quic_cert_tree,
157 WorkerID wid,
158 # ifdef HAVE_LIBBPF
159 size_t index,
160 # endif // HAVE_LIBBPF
161 #endif // ENABLE_HTTP3
162 const std::shared_ptr<TicketKeys> &ticket_keys,
163 ConnectionHandler *conn_handler,
164 std::shared_ptr<DownstreamConfig> downstreamconf)
165 :
166 #if defined(ENABLE_HTTP3) && defined(HAVE_LIBBPF)
167 index_{index},
168 #endif // ENABLE_HTTP3 && HAVE_LIBBPF
169 randgen_(util::make_mt19937()),
170 worker_stat_{},
171 dns_tracker_(loop, get_config()->conn.downstream->family),
172 #ifdef ENABLE_HTTP3
173 worker_id_{std::move(wid)},
174 quic_upstream_addrs_{get_config()->conn.quic_listener.addrs},
175 #endif // ENABLE_HTTP3
176 loop_(loop),
177 sv_ssl_ctx_(sv_ssl_ctx),
178 cl_ssl_ctx_(cl_ssl_ctx),
179 cert_tree_(cert_tree),
180 conn_handler_(conn_handler),
181 #ifdef ENABLE_HTTP3
182 quic_sv_ssl_ctx_{quic_sv_ssl_ctx},
183 quic_cert_tree_{quic_cert_tree},
184 quic_conn_handler_{this},
185 #endif // ENABLE_HTTP3
186 ticket_keys_(ticket_keys),
187 connect_blocker_(
188 std::make_unique<ConnectBlocker>(randgen_, loop_, nullptr, nullptr)),
189 graceful_shutdown_(false) {
190 ev_async_init(&w_, eventcb);
191 w_.data = this;
192 ev_async_start(loop_, &w_);
193
194 ev_timer_init(&mcpool_clear_timer_, mcpool_clear_cb, 0., 0.);
195 mcpool_clear_timer_.data = this;
196
197 ev_timer_init(&proc_wev_timer_, proc_wev_cb, 0., 0.);
198 proc_wev_timer_.data = this;
199
200 auto &session_cacheconf = get_config()->tls.session_cache;
201
202 if (!session_cacheconf.memcached.host.empty()) {
203 session_cache_memcached_dispatcher_ = std::make_unique<MemcachedDispatcher>(
204 &session_cacheconf.memcached.addr, loop,
205 tls_session_cache_memcached_ssl_ctx,
206 StringRef{session_cacheconf.memcached.host}, &mcpool_, randgen_);
207 }
208
209 replace_downstream_config(std::move(downstreamconf));
210 }
211
212 namespace {
ensure_enqueue_addr(std::priority_queue<WeightGroupEntry,std::vector<WeightGroupEntry>,WeightGroupEntryGreater> & wgpq,WeightGroup * wg,DownstreamAddr * addr)213 void ensure_enqueue_addr(
214 std::priority_queue<WeightGroupEntry, std::vector<WeightGroupEntry>,
215 WeightGroupEntryGreater> &wgpq,
216 WeightGroup *wg, DownstreamAddr *addr) {
217 uint32_t cycle;
218 if (!wg->pq.empty()) {
219 auto &top = wg->pq.top();
220 cycle = top.cycle;
221 } else {
222 cycle = 0;
223 }
224
225 addr->cycle = cycle;
226 addr->pending_penalty = 0;
227 wg->pq.push(DownstreamAddrEntry{addr, addr->seq, addr->cycle});
228 addr->queued = true;
229
230 if (!wg->queued) {
231 if (!wgpq.empty()) {
232 auto &top = wgpq.top();
233 cycle = top.cycle;
234 } else {
235 cycle = 0;
236 }
237
238 wg->cycle = cycle;
239 wg->pending_penalty = 0;
240 wgpq.push(WeightGroupEntry{wg, wg->seq, wg->cycle});
241 wg->queued = true;
242 }
243 }
244 } // namespace
245
replace_downstream_config(std::shared_ptr<DownstreamConfig> downstreamconf)246 void Worker::replace_downstream_config(
247 std::shared_ptr<DownstreamConfig> downstreamconf) {
248 for (auto &g : downstream_addr_groups_) {
249 g->retired = true;
250
251 auto &shared_addr = g->shared_addr;
252 for (auto &addr : shared_addr->addrs) {
253 addr.dconn_pool->remove_all();
254 }
255 }
256
257 downstreamconf_ = downstreamconf;
258
259 // Making a copy is much faster with multiple thread on
260 // backendconfig API call.
261 auto groups = downstreamconf->addr_groups;
262
263 downstream_addr_groups_ =
264 std::vector<std::shared_ptr<DownstreamAddrGroup>>(groups.size());
265
266 std::map<DownstreamKey, size_t> addr_groups_indexer;
267 #ifdef HAVE_MRUBY
268 // TODO It is a bit less efficient because
269 // mruby::create_mruby_context returns std::unique_ptr and we cannot
270 // use std::make_shared.
271 std::map<StringRef, std::shared_ptr<mruby::MRubyContext>> shared_mruby_ctxs;
272 #endif // HAVE_MRUBY
273
274 for (size_t i = 0; i < groups.size(); ++i) {
275 auto &src = groups[i];
276 auto &dst = downstream_addr_groups_[i];
277
278 dst = std::make_shared<DownstreamAddrGroup>();
279 dst->pattern =
280 ImmutableString{std::begin(src.pattern), std::end(src.pattern)};
281
282 auto shared_addr = std::make_shared<SharedDownstreamAddr>();
283
284 shared_addr->addrs.resize(src.addrs.size());
285 shared_addr->affinity.type = src.affinity.type;
286 if (src.affinity.type == SessionAffinity::COOKIE) {
287 shared_addr->affinity.cookie.name =
288 make_string_ref(shared_addr->balloc, src.affinity.cookie.name);
289 if (!src.affinity.cookie.path.empty()) {
290 shared_addr->affinity.cookie.path =
291 make_string_ref(shared_addr->balloc, src.affinity.cookie.path);
292 }
293 shared_addr->affinity.cookie.secure = src.affinity.cookie.secure;
294 shared_addr->affinity.cookie.stickiness = src.affinity.cookie.stickiness;
295 }
296 shared_addr->affinity_hash = src.affinity_hash;
297 shared_addr->affinity_hash_map = src.affinity_hash_map;
298 shared_addr->redirect_if_not_tls = src.redirect_if_not_tls;
299 shared_addr->dnf = src.dnf;
300 shared_addr->timeout.read = src.timeout.read;
301 shared_addr->timeout.write = src.timeout.write;
302
303 for (size_t j = 0; j < src.addrs.size(); ++j) {
304 auto &src_addr = src.addrs[j];
305 auto &dst_addr = shared_addr->addrs[j];
306
307 dst_addr.addr = src_addr.addr;
308 dst_addr.host = make_string_ref(shared_addr->balloc, src_addr.host);
309 dst_addr.hostport =
310 make_string_ref(shared_addr->balloc, src_addr.hostport);
311 dst_addr.port = src_addr.port;
312 dst_addr.host_unix = src_addr.host_unix;
313 dst_addr.weight = src_addr.weight;
314 dst_addr.group = make_string_ref(shared_addr->balloc, src_addr.group);
315 dst_addr.group_weight = src_addr.group_weight;
316 dst_addr.affinity_hash = src_addr.affinity_hash;
317 dst_addr.proto = src_addr.proto;
318 dst_addr.tls = src_addr.tls;
319 dst_addr.sni = make_string_ref(shared_addr->balloc, src_addr.sni);
320 dst_addr.fall = src_addr.fall;
321 dst_addr.rise = src_addr.rise;
322 dst_addr.dns = src_addr.dns;
323 dst_addr.upgrade_scheme = src_addr.upgrade_scheme;
324 }
325
326 #ifdef HAVE_MRUBY
327 auto mruby_ctx_it = shared_mruby_ctxs.find(src.mruby_file);
328 if (mruby_ctx_it == std::end(shared_mruby_ctxs)) {
329 shared_addr->mruby_ctx = mruby::create_mruby_context(src.mruby_file);
330 assert(shared_addr->mruby_ctx);
331 shared_mruby_ctxs.emplace(src.mruby_file, shared_addr->mruby_ctx);
332 } else {
333 shared_addr->mruby_ctx = (*mruby_ctx_it).second;
334 }
335 #endif // HAVE_MRUBY
336
337 // share the connection if patterns have the same set of backend
338 // addresses.
339
340 auto dkey = create_downstream_key(shared_addr, src.mruby_file);
341 auto it = addr_groups_indexer.find(dkey);
342
343 if (it == std::end(addr_groups_indexer)) {
344 auto shared_addr_ptr = shared_addr.get();
345
346 for (auto &addr : shared_addr->addrs) {
347 addr.connect_blocker = std::make_unique<ConnectBlocker>(
348 randgen_, loop_, nullptr, [shared_addr_ptr, &addr]() {
349 if (!addr.queued) {
350 if (!addr.wg) {
351 return;
352 }
353 ensure_enqueue_addr(shared_addr_ptr->pq, addr.wg, &addr);
354 }
355 });
356
357 addr.live_check = std::make_unique<LiveCheck>(loop_, cl_ssl_ctx_, this,
358 &addr, randgen_);
359 }
360
361 size_t seq = 0;
362 for (auto &addr : shared_addr->addrs) {
363 addr.dconn_pool = std::make_unique<DownstreamConnectionPool>();
364 addr.seq = seq++;
365 }
366
367 util::shuffle(std::begin(shared_addr->addrs),
368 std::end(shared_addr->addrs), randgen_,
369 [](auto i, auto j) { std::swap((*i).seq, (*j).seq); });
370
371 if (shared_addr->affinity.type == SessionAffinity::NONE) {
372 std::map<StringRef, WeightGroup *> wgs;
373 size_t num_wgs = 0;
374 for (auto &addr : shared_addr->addrs) {
375 if (wgs.find(addr.group) == std::end(wgs)) {
376 ++num_wgs;
377 wgs.emplace(addr.group, nullptr);
378 }
379 }
380
381 shared_addr->wgs = std::vector<WeightGroup>(num_wgs);
382
383 for (auto &addr : shared_addr->addrs) {
384 auto &wg = wgs[addr.group];
385 if (wg == nullptr) {
386 wg = &shared_addr->wgs[--num_wgs];
387 wg->seq = num_wgs;
388 }
389
390 wg->weight = addr.group_weight;
391 wg->pq.push(DownstreamAddrEntry{&addr, addr.seq, addr.cycle});
392 addr.queued = true;
393 addr.wg = wg;
394 }
395
396 assert(num_wgs == 0);
397
398 for (auto &kv : wgs) {
399 shared_addr->pq.push(
400 WeightGroupEntry{kv.second, kv.second->seq, kv.second->cycle});
401 kv.second->queued = true;
402 }
403 }
404
405 dst->shared_addr = std::move(shared_addr);
406
407 addr_groups_indexer.emplace(std::move(dkey), i);
408 } else {
409 auto &g = *(std::begin(downstream_addr_groups_) + (*it).second);
410 if (LOG_ENABLED(INFO)) {
411 LOG(INFO) << dst->pattern << " shares the same backend group with "
412 << g->pattern;
413 }
414 dst->shared_addr = g->shared_addr;
415 }
416 }
417 }
418
~Worker()419 Worker::~Worker() {
420 ev_async_stop(loop_, &w_);
421 ev_timer_stop(loop_, &mcpool_clear_timer_);
422 ev_timer_stop(loop_, &proc_wev_timer_);
423 }
424
schedule_clear_mcpool()425 void Worker::schedule_clear_mcpool() {
426 // libev manual says: "If the watcher is already active nothing will
427 // happen." Since we don't change any timeout here, we don't have
428 // to worry about querying ev_is_active.
429 ev_timer_start(loop_, &mcpool_clear_timer_);
430 }
431
wait()432 void Worker::wait() {
433 #ifndef NOTHREADS
434 fut_.get();
435 #endif // !NOTHREADS
436 }
437
run_async()438 void Worker::run_async() {
439 #ifndef NOTHREADS
440 fut_ = std::async(std::launch::async, [this] {
441 (void)reopen_log_files(get_config()->logging);
442 ev_run(loop_);
443 delete_log_config();
444
445 # ifdef NGHTTP2_OPENSSL_IS_WOLFSSL
446 wc_ecc_fp_free();
447 # endif // NGHTTP2_OPENSSL_IS_WOLFSSL
448 });
449 #endif // !NOTHREADS
450 }
451
send(WorkerEvent event)452 void Worker::send(WorkerEvent event) {
453 {
454 std::lock_guard<std::mutex> g(m_);
455
456 q_.emplace_back(std::move(event));
457 }
458
459 ev_async_send(loop_, &w_);
460 }
461
process_events()462 void Worker::process_events() {
463 WorkerEvent wev;
464 {
465 std::lock_guard<std::mutex> g(m_);
466
467 // Process event one at a time. This is important for
468 // WorkerEventType::NEW_CONNECTION event since accepting large
469 // number of new connections at once may delay time to 1st byte
470 // for existing connections.
471
472 if (q_.empty()) {
473 ev_timer_stop(loop_, &proc_wev_timer_);
474 return;
475 }
476
477 wev = std::move(q_.front());
478 q_.pop_front();
479 }
480
481 ev_timer_start(loop_, &proc_wev_timer_);
482
483 auto config = get_config();
484
485 auto worker_connections = config->conn.upstream.worker_connections;
486
487 switch (wev.type) {
488 case WorkerEventType::NEW_CONNECTION: {
489 if (LOG_ENABLED(INFO)) {
490 WLOG(INFO, this) << "WorkerEvent: client_fd=" << wev.client_fd
491 << ", addrlen=" << wev.client_addrlen;
492 }
493
494 if (worker_stat_.num_connections >= worker_connections) {
495 if (LOG_ENABLED(INFO)) {
496 WLOG(INFO, this) << "Too many connections >= " << worker_connections;
497 }
498
499 close(wev.client_fd);
500
501 break;
502 }
503
504 auto client_handler = tls::accept_connection(
505 this, wev.client_fd, &wev.client_addr.sa, wev.client_addrlen, wev.faddr);
506 if (!client_handler) {
507 if (LOG_ENABLED(INFO)) {
508 WLOG(ERROR, this) << "ClientHandler creation failed";
509 }
510 close(wev.client_fd);
511 break;
512 }
513
514 if (LOG_ENABLED(INFO)) {
515 WLOG(INFO, this) << "CLIENT_HANDLER:" << client_handler << " created";
516 }
517
518 break;
519 }
520 case WorkerEventType::REOPEN_LOG:
521 WLOG(NOTICE, this) << "Reopening log files: worker process (thread " << this
522 << ")";
523
524 reopen_log_files(config->logging);
525
526 break;
527 case WorkerEventType::GRACEFUL_SHUTDOWN:
528 WLOG(NOTICE, this) << "Graceful shutdown commencing";
529
530 graceful_shutdown_ = true;
531
532 if (worker_stat_.num_connections == 0 &&
533 worker_stat_.num_close_waits == 0) {
534 ev_break(loop_);
535
536 return;
537 }
538
539 break;
540 case WorkerEventType::REPLACE_DOWNSTREAM:
541 WLOG(NOTICE, this) << "Replace downstream";
542
543 replace_downstream_config(wev.downstreamconf);
544
545 break;
546 #ifdef ENABLE_HTTP3
547 case WorkerEventType::QUIC_PKT_FORWARD: {
548 const UpstreamAddr *faddr;
549
550 if (wev.quic_pkt->upstream_addr_index == static_cast<size_t>(-1)) {
551 faddr = find_quic_upstream_addr(wev.quic_pkt->local_addr);
552 if (faddr == nullptr) {
553 LOG(ERROR) << "No suitable upstream address found";
554
555 break;
556 }
557 } else if (quic_upstream_addrs_.size() <=
558 wev.quic_pkt->upstream_addr_index) {
559 LOG(ERROR) << "upstream_addr_index is too large";
560
561 break;
562 } else {
563 faddr = &quic_upstream_addrs_[wev.quic_pkt->upstream_addr_index];
564 }
565
566 quic_conn_handler_.handle_packet(faddr, wev.quic_pkt->remote_addr,
567 wev.quic_pkt->local_addr, wev.quic_pkt->pi,
568 wev.quic_pkt->data);
569
570 break;
571 }
572 #endif // ENABLE_HTTP3
573 default:
574 if (LOG_ENABLED(INFO)) {
575 WLOG(INFO, this) << "unknown event type " << static_cast<int>(wev.type);
576 }
577 }
578 }
579
get_cert_lookup_tree() const580 tls::CertLookupTree *Worker::get_cert_lookup_tree() const { return cert_tree_; }
581
582 #ifdef ENABLE_HTTP3
get_quic_cert_lookup_tree() const583 tls::CertLookupTree *Worker::get_quic_cert_lookup_tree() const {
584 return quic_cert_tree_;
585 }
586 #endif // ENABLE_HTTP3
587
get_ticket_keys()588 std::shared_ptr<TicketKeys> Worker::get_ticket_keys() {
589 #ifdef HAVE_ATOMIC_STD_SHARED_PTR
590 return ticket_keys_.load(std::memory_order_acquire);
591 #else // !HAVE_ATOMIC_STD_SHARED_PTR
592 std::lock_guard<std::mutex> g(ticket_keys_m_);
593 return ticket_keys_;
594 #endif // !HAVE_ATOMIC_STD_SHARED_PTR
595 }
596
set_ticket_keys(std::shared_ptr<TicketKeys> ticket_keys)597 void Worker::set_ticket_keys(std::shared_ptr<TicketKeys> ticket_keys) {
598 #ifdef HAVE_ATOMIC_STD_SHARED_PTR
599 // This is single writer
600 ticket_keys_.store(std::move(ticket_keys), std::memory_order_release);
601 #else // !HAVE_ATOMIC_STD_SHARED_PTR
602 std::lock_guard<std::mutex> g(ticket_keys_m_);
603 ticket_keys_ = std::move(ticket_keys);
604 #endif // !HAVE_ATOMIC_STD_SHARED_PTR
605 }
606
get_worker_stat()607 WorkerStat *Worker::get_worker_stat() { return &worker_stat_; }
608
get_loop() const609 struct ev_loop *Worker::get_loop() const { return loop_; }
610
get_sv_ssl_ctx() const611 SSL_CTX *Worker::get_sv_ssl_ctx() const { return sv_ssl_ctx_; }
612
get_cl_ssl_ctx() const613 SSL_CTX *Worker::get_cl_ssl_ctx() const { return cl_ssl_ctx_; }
614
615 #ifdef ENABLE_HTTP3
get_quic_sv_ssl_ctx() const616 SSL_CTX *Worker::get_quic_sv_ssl_ctx() const { return quic_sv_ssl_ctx_; }
617 #endif // ENABLE_HTTP3
618
set_graceful_shutdown(bool f)619 void Worker::set_graceful_shutdown(bool f) { graceful_shutdown_ = f; }
620
get_graceful_shutdown() const621 bool Worker::get_graceful_shutdown() const { return graceful_shutdown_; }
622
get_mcpool()623 MemchunkPool *Worker::get_mcpool() { return &mcpool_; }
624
get_session_cache_memcached_dispatcher()625 MemcachedDispatcher *Worker::get_session_cache_memcached_dispatcher() {
626 return session_cache_memcached_dispatcher_.get();
627 }
628
get_randgen()629 std::mt19937 &Worker::get_randgen() { return randgen_; }
630
631 #ifdef HAVE_MRUBY
create_mruby_context()632 int Worker::create_mruby_context() {
633 mruby_ctx_ = mruby::create_mruby_context(StringRef{get_config()->mruby_file});
634 if (!mruby_ctx_) {
635 return -1;
636 }
637
638 return 0;
639 }
640
get_mruby_context() const641 mruby::MRubyContext *Worker::get_mruby_context() const {
642 return mruby_ctx_.get();
643 }
644 #endif // HAVE_MRUBY
645
646 std::vector<std::shared_ptr<DownstreamAddrGroup>> &
get_downstream_addr_groups()647 Worker::get_downstream_addr_groups() {
648 return downstream_addr_groups_;
649 }
650
get_connect_blocker() const651 ConnectBlocker *Worker::get_connect_blocker() const {
652 return connect_blocker_.get();
653 }
654
get_downstream_config() const655 const DownstreamConfig *Worker::get_downstream_config() const {
656 return downstreamconf_.get();
657 }
658
get_connection_handler() const659 ConnectionHandler *Worker::get_connection_handler() const {
660 return conn_handler_;
661 }
662
663 #ifdef ENABLE_HTTP3
get_quic_connection_handler()664 QUICConnectionHandler *Worker::get_quic_connection_handler() {
665 return &quic_conn_handler_;
666 }
667 #endif // ENABLE_HTTP3
668
get_dns_tracker()669 DNSTracker *Worker::get_dns_tracker() { return &dns_tracker_; }
670
671 #ifdef ENABLE_HTTP3
672 # ifdef HAVE_LIBBPF
should_attach_bpf() const673 bool Worker::should_attach_bpf() const {
674 auto config = get_config();
675 auto &quicconf = config->quic;
676 auto &apiconf = config->api;
677
678 if (quicconf.bpf.disabled) {
679 return false;
680 }
681
682 if (!config->single_thread && apiconf.enabled) {
683 return index_ == 1;
684 }
685
686 return index_ == 0;
687 }
688
should_update_bpf_map() const689 bool Worker::should_update_bpf_map() const {
690 auto config = get_config();
691 auto &quicconf = config->quic;
692
693 return !quicconf.bpf.disabled;
694 }
695
compute_sk_index() const696 uint32_t Worker::compute_sk_index() const {
697 auto config = get_config();
698 auto &apiconf = config->api;
699
700 if (!config->single_thread && apiconf.enabled) {
701 return index_ - 1;
702 }
703
704 return index_;
705 }
706 # endif // HAVE_LIBBPF
707
setup_quic_server_socket()708 int Worker::setup_quic_server_socket() {
709 size_t n = 0;
710
711 for (auto &addr : quic_upstream_addrs_) {
712 assert(!addr.host_unix);
713 if (create_quic_server_socket(addr) != 0) {
714 return -1;
715 }
716
717 // Make sure that each endpoint has a unique address.
718 for (size_t i = 0; i < n; ++i) {
719 const auto &a = quic_upstream_addrs_[i];
720
721 if (addr.hostport == a.hostport) {
722 LOG(FATAL)
723 << "QUIC frontend endpoint must be unique: a duplicate found for "
724 << addr.hostport;
725
726 return -1;
727 }
728 }
729
730 ++n;
731
732 quic_listeners_.emplace_back(std::make_unique<QUICListener>(&addr, this));
733 }
734
735 return 0;
736 }
737
738 # ifdef HAVE_LIBBPF
739 namespace {
740 // https://github.com/kokke/tiny-AES-c
741 //
742 // License is Public Domain.
743 // Commit hash: 12e7744b4919e9d55de75b7ab566326a1c8e7a67
744
745 // The number of columns comprising a state in AES. This is a constant
746 // in AES. Value=4
747 # define Nb 4
748
749 # define Nk 4 // The number of 32 bit words in a key.
750 # define Nr 10 // The number of rounds in AES Cipher.
751
752 // The lookup-tables are marked const so they can be placed in
753 // read-only storage instead of RAM The numbers below can be computed
754 // dynamically trading ROM for RAM - This can be useful in (embedded)
755 // bootloader applications, where ROM is often limited.
756 const uint8_t sbox[256] = {
757 // 0 1 2 3 4 5 6 7 8 9 A B C D E F
758 0x63, 0x7c, 0x77, 0x7b, 0xf2, 0x6b, 0x6f, 0xc5, 0x30, 0x01, 0x67, 0x2b, 0xfe,
759 0xd7, 0xab, 0x76, 0xca, 0x82, 0xc9, 0x7d, 0xfa, 0x59, 0x47, 0xf0, 0xad, 0xd4,
760 0xa2, 0xaf, 0x9c, 0xa4, 0x72, 0xc0, 0xb7, 0xfd, 0x93, 0x26, 0x36, 0x3f, 0xf7,
761 0xcc, 0x34, 0xa5, 0xe5, 0xf1, 0x71, 0xd8, 0x31, 0x15, 0x04, 0xc7, 0x23, 0xc3,
762 0x18, 0x96, 0x05, 0x9a, 0x07, 0x12, 0x80, 0xe2, 0xeb, 0x27, 0xb2, 0x75, 0x09,
763 0x83, 0x2c, 0x1a, 0x1b, 0x6e, 0x5a, 0xa0, 0x52, 0x3b, 0xd6, 0xb3, 0x29, 0xe3,
764 0x2f, 0x84, 0x53, 0xd1, 0x00, 0xed, 0x20, 0xfc, 0xb1, 0x5b, 0x6a, 0xcb, 0xbe,
765 0x39, 0x4a, 0x4c, 0x58, 0xcf, 0xd0, 0xef, 0xaa, 0xfb, 0x43, 0x4d, 0x33, 0x85,
766 0x45, 0xf9, 0x02, 0x7f, 0x50, 0x3c, 0x9f, 0xa8, 0x51, 0xa3, 0x40, 0x8f, 0x92,
767 0x9d, 0x38, 0xf5, 0xbc, 0xb6, 0xda, 0x21, 0x10, 0xff, 0xf3, 0xd2, 0xcd, 0x0c,
768 0x13, 0xec, 0x5f, 0x97, 0x44, 0x17, 0xc4, 0xa7, 0x7e, 0x3d, 0x64, 0x5d, 0x19,
769 0x73, 0x60, 0x81, 0x4f, 0xdc, 0x22, 0x2a, 0x90, 0x88, 0x46, 0xee, 0xb8, 0x14,
770 0xde, 0x5e, 0x0b, 0xdb, 0xe0, 0x32, 0x3a, 0x0a, 0x49, 0x06, 0x24, 0x5c, 0xc2,
771 0xd3, 0xac, 0x62, 0x91, 0x95, 0xe4, 0x79, 0xe7, 0xc8, 0x37, 0x6d, 0x8d, 0xd5,
772 0x4e, 0xa9, 0x6c, 0x56, 0xf4, 0xea, 0x65, 0x7a, 0xae, 0x08, 0xba, 0x78, 0x25,
773 0x2e, 0x1c, 0xa6, 0xb4, 0xc6, 0xe8, 0xdd, 0x74, 0x1f, 0x4b, 0xbd, 0x8b, 0x8a,
774 0x70, 0x3e, 0xb5, 0x66, 0x48, 0x03, 0xf6, 0x0e, 0x61, 0x35, 0x57, 0xb9, 0x86,
775 0xc1, 0x1d, 0x9e, 0xe1, 0xf8, 0x98, 0x11, 0x69, 0xd9, 0x8e, 0x94, 0x9b, 0x1e,
776 0x87, 0xe9, 0xce, 0x55, 0x28, 0xdf, 0x8c, 0xa1, 0x89, 0x0d, 0xbf, 0xe6, 0x42,
777 0x68, 0x41, 0x99, 0x2d, 0x0f, 0xb0, 0x54, 0xbb, 0x16};
778
779 # define getSBoxValue(num) (sbox[(num)])
780
781 // The round constant word array, Rcon[i], contains the values given
782 // by x to the power (i-1) being powers of x (x is denoted as {02}) in
783 // the field GF(2^8)
784 const uint8_t Rcon[11] = {0x8d, 0x01, 0x02, 0x04, 0x08, 0x10,
785 0x20, 0x40, 0x80, 0x1b, 0x36};
786
787 // This function produces Nb(Nr+1) round keys. The round keys are used
788 // in each round to decrypt the states.
KeyExpansion(uint8_t * RoundKey,const uint8_t * Key)789 void KeyExpansion(uint8_t *RoundKey, const uint8_t *Key) {
790 unsigned i, j, k;
791 uint8_t tempa[4]; // Used for the column/row operations
792
793 // The first round key is the key itself.
794 for (i = 0; i < Nk; ++i) {
795 RoundKey[(i * 4) + 0] = Key[(i * 4) + 0];
796 RoundKey[(i * 4) + 1] = Key[(i * 4) + 1];
797 RoundKey[(i * 4) + 2] = Key[(i * 4) + 2];
798 RoundKey[(i * 4) + 3] = Key[(i * 4) + 3];
799 }
800
801 // All other round keys are found from the previous round keys.
802 for (i = Nk; i < Nb * (Nr + 1); ++i) {
803 {
804 k = (i - 1) * 4;
805 tempa[0] = RoundKey[k + 0];
806 tempa[1] = RoundKey[k + 1];
807 tempa[2] = RoundKey[k + 2];
808 tempa[3] = RoundKey[k + 3];
809 }
810
811 if (i % Nk == 0) {
812 // This function shifts the 4 bytes in a word to the left once.
813 // [a0,a1,a2,a3] becomes [a1,a2,a3,a0]
814
815 // Function RotWord()
816 {
817 const uint8_t u8tmp = tempa[0];
818 tempa[0] = tempa[1];
819 tempa[1] = tempa[2];
820 tempa[2] = tempa[3];
821 tempa[3] = u8tmp;
822 }
823
824 // SubWord() is a function that takes a four-byte input word and
825 // applies the S-box to each of the four bytes to produce an
826 // output word.
827
828 // Function Subword()
829 {
830 tempa[0] = getSBoxValue(tempa[0]);
831 tempa[1] = getSBoxValue(tempa[1]);
832 tempa[2] = getSBoxValue(tempa[2]);
833 tempa[3] = getSBoxValue(tempa[3]);
834 }
835
836 tempa[0] = tempa[0] ^ Rcon[i / Nk];
837 }
838 j = i * 4;
839 k = (i - Nk) * 4;
840 RoundKey[j + 0] = RoundKey[k + 0] ^ tempa[0];
841 RoundKey[j + 1] = RoundKey[k + 1] ^ tempa[1];
842 RoundKey[j + 2] = RoundKey[k + 2] ^ tempa[2];
843 RoundKey[j + 3] = RoundKey[k + 3] ^ tempa[3];
844 }
845 }
846 } // namespace
847 # endif // HAVE_LIBBPF
848
create_quic_server_socket(UpstreamAddr & faddr)849 int Worker::create_quic_server_socket(UpstreamAddr &faddr) {
850 std::array<char, STRERROR_BUFSIZE> errbuf;
851 int fd = -1;
852 int rv;
853
854 auto service = util::utos(faddr.port);
855 addrinfo hints{};
856 hints.ai_family = faddr.family;
857 hints.ai_socktype = SOCK_DGRAM;
858 hints.ai_flags = AI_PASSIVE;
859 # ifdef AI_ADDRCONFIG
860 hints.ai_flags |= AI_ADDRCONFIG;
861 # endif // AI_ADDRCONFIG
862
863 auto node = faddr.host == "*"_sr ? nullptr : faddr.host.data();
864
865 addrinfo *res, *rp;
866 rv = getaddrinfo(node, service.c_str(), &hints, &res);
867 # ifdef AI_ADDRCONFIG
868 if (rv != 0) {
869 // Retry without AI_ADDRCONFIG
870 hints.ai_flags &= ~AI_ADDRCONFIG;
871 rv = getaddrinfo(node, service.c_str(), &hints, &res);
872 }
873 # endif // AI_ADDRCONFIG
874 if (rv != 0) {
875 LOG(FATAL) << "Unable to get IPv" << (faddr.family == AF_INET ? "4" : "6")
876 << " address for " << faddr.host << ", port " << faddr.port
877 << ": " << gai_strerror(rv);
878 return -1;
879 }
880
881 auto res_d = defer(freeaddrinfo, res);
882
883 std::array<char, NI_MAXHOST> host;
884
885 for (rp = res; rp; rp = rp->ai_next) {
886 rv = getnameinfo(rp->ai_addr, rp->ai_addrlen, host.data(), host.size(),
887 nullptr, 0, NI_NUMERICHOST);
888 if (rv != 0) {
889 LOG(WARN) << "getnameinfo() failed: " << gai_strerror(rv);
890 continue;
891 }
892
893 # ifdef SOCK_NONBLOCK
894 fd = socket(rp->ai_family, rp->ai_socktype | SOCK_NONBLOCK | SOCK_CLOEXEC,
895 rp->ai_protocol);
896 if (fd == -1) {
897 auto error = errno;
898 LOG(WARN) << "socket() syscall failed: "
899 << xsi_strerror(error, errbuf.data(), errbuf.size());
900 continue;
901 }
902 # else // !SOCK_NONBLOCK
903 fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
904 if (fd == -1) {
905 auto error = errno;
906 LOG(WARN) << "socket() syscall failed: "
907 << xsi_strerror(error, errbuf.data(), errbuf.size());
908 continue;
909 }
910 util::make_socket_nonblocking(fd);
911 util::make_socket_closeonexec(fd);
912 # endif // !SOCK_NONBLOCK
913
914 int val = 1;
915 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val,
916 static_cast<socklen_t>(sizeof(val))) == -1) {
917 auto error = errno;
918 LOG(WARN) << "Failed to set SO_REUSEADDR option to listener socket: "
919 << xsi_strerror(error, errbuf.data(), errbuf.size());
920 close(fd);
921 continue;
922 }
923
924 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &val,
925 static_cast<socklen_t>(sizeof(val))) == -1) {
926 auto error = errno;
927 LOG(WARN) << "Failed to set SO_REUSEPORT option to listener socket: "
928 << xsi_strerror(error, errbuf.data(), errbuf.size());
929 close(fd);
930 continue;
931 }
932
933 if (faddr.family == AF_INET6) {
934 # ifdef IPV6_V6ONLY
935 if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val,
936 static_cast<socklen_t>(sizeof(val))) == -1) {
937 auto error = errno;
938 LOG(WARN) << "Failed to set IPV6_V6ONLY option to listener socket: "
939 << xsi_strerror(error, errbuf.data(), errbuf.size());
940 close(fd);
941 continue;
942 }
943 # endif // IPV6_V6ONLY
944
945 if (setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &val,
946 static_cast<socklen_t>(sizeof(val))) == -1) {
947 auto error = errno;
948 LOG(WARN)
949 << "Failed to set IPV6_RECVPKTINFO option to listener socket: "
950 << xsi_strerror(error, errbuf.data(), errbuf.size());
951 close(fd);
952 continue;
953 }
954
955 if (setsockopt(fd, IPPROTO_IPV6, IPV6_RECVTCLASS, &val,
956 static_cast<socklen_t>(sizeof(val))) == -1) {
957 auto error = errno;
958 LOG(WARN) << "Failed to set IPV6_RECVTCLASS option to listener socket: "
959 << xsi_strerror(error, errbuf.data(), errbuf.size());
960 close(fd);
961 continue;
962 }
963
964 # if defined(IPV6_MTU_DISCOVER) && defined(IPV6_PMTUDISC_DO)
965 int mtu_disc = IPV6_PMTUDISC_DO;
966 if (setsockopt(fd, IPPROTO_IPV6, IPV6_MTU_DISCOVER, &mtu_disc,
967 static_cast<socklen_t>(sizeof(mtu_disc))) == -1) {
968 auto error = errno;
969 LOG(WARN)
970 << "Failed to set IPV6_MTU_DISCOVER option to listener socket: "
971 << xsi_strerror(error, errbuf.data(), errbuf.size());
972 close(fd);
973 continue;
974 }
975 # endif // defined(IPV6_MTU_DISCOVER) && defined(IP_PMTUDISC_DO)
976 } else {
977 if (setsockopt(fd, IPPROTO_IP, IP_PKTINFO, &val,
978 static_cast<socklen_t>(sizeof(val))) == -1) {
979 auto error = errno;
980 LOG(WARN) << "Failed to set IP_PKTINFO option to listener socket: "
981 << xsi_strerror(error, errbuf.data(), errbuf.size());
982 close(fd);
983 continue;
984 }
985
986 if (setsockopt(fd, IPPROTO_IP, IP_RECVTOS, &val,
987 static_cast<socklen_t>(sizeof(val))) == -1) {
988 auto error = errno;
989 LOG(WARN) << "Failed to set IP_RECVTOS option to listener socket: "
990 << xsi_strerror(error, errbuf.data(), errbuf.size());
991 close(fd);
992 continue;
993 }
994
995 # if defined(IP_MTU_DISCOVER) && defined(IP_PMTUDISC_DO)
996 int mtu_disc = IP_PMTUDISC_DO;
997 if (setsockopt(fd, IPPROTO_IP, IP_MTU_DISCOVER, &mtu_disc,
998 static_cast<socklen_t>(sizeof(mtu_disc))) == -1) {
999 auto error = errno;
1000 LOG(WARN) << "Failed to set IP_MTU_DISCOVER option to listener socket: "
1001 << xsi_strerror(error, errbuf.data(), errbuf.size());
1002 close(fd);
1003 continue;
1004 }
1005 # endif // defined(IP_MTU_DISCOVER) && defined(IP_PMTUDISC_DO)
1006 }
1007
1008 # ifdef UDP_GRO
1009 if (setsockopt(fd, IPPROTO_UDP, UDP_GRO, &val, sizeof(val)) == -1) {
1010 auto error = errno;
1011 LOG(WARN) << "Failed to set UDP_GRO option to listener socket: "
1012 << xsi_strerror(error, errbuf.data(), errbuf.size());
1013 close(fd);
1014 continue;
1015 }
1016 # endif // UDP_GRO
1017
1018 if (bind(fd, rp->ai_addr, rp->ai_addrlen) == -1) {
1019 auto error = errno;
1020 LOG(WARN) << "bind() syscall failed: "
1021 << xsi_strerror(error, errbuf.data(), errbuf.size());
1022 close(fd);
1023 continue;
1024 }
1025
1026 # ifdef HAVE_LIBBPF
1027 auto config = get_config();
1028
1029 auto &quic_bpf_refs = conn_handler_->get_quic_bpf_refs();
1030
1031 if (should_attach_bpf()) {
1032 auto &bpfconf = config->quic.bpf;
1033
1034 auto obj = bpf_object__open_file(bpfconf.prog_file.data(), nullptr);
1035 if (!obj) {
1036 auto error = errno;
1037 LOG(FATAL) << "Failed to open bpf object file: "
1038 << xsi_strerror(error, errbuf.data(), errbuf.size());
1039 close(fd);
1040 return -1;
1041 }
1042
1043 rv = bpf_object__load(obj);
1044 if (rv != 0) {
1045 auto error = errno;
1046 LOG(FATAL) << "Failed to load bpf object file: "
1047 << xsi_strerror(error, errbuf.data(), errbuf.size());
1048 close(fd);
1049 return -1;
1050 }
1051
1052 auto prog = bpf_object__find_program_by_name(obj, "select_reuseport");
1053 if (!prog) {
1054 auto error = errno;
1055 LOG(FATAL) << "Failed to find sk_reuseport program: "
1056 << xsi_strerror(error, errbuf.data(), errbuf.size());
1057 close(fd);
1058 return -1;
1059 }
1060
1061 auto &ref = quic_bpf_refs[faddr.index];
1062
1063 ref.obj = obj;
1064
1065 ref.reuseport_array =
1066 bpf_object__find_map_by_name(obj, "reuseport_array");
1067 if (!ref.reuseport_array) {
1068 auto error = errno;
1069 LOG(FATAL) << "Failed to get reuseport_array: "
1070 << xsi_strerror(error, errbuf.data(), errbuf.size());
1071 close(fd);
1072 return -1;
1073 }
1074
1075 ref.worker_id_map = bpf_object__find_map_by_name(obj, "worker_id_map");
1076 if (!ref.worker_id_map) {
1077 auto error = errno;
1078 LOG(FATAL) << "Failed to get worker_id_map: "
1079 << xsi_strerror(error, errbuf.data(), errbuf.size());
1080 close(fd);
1081 return -1;
1082 }
1083
1084 auto sk_info = bpf_object__find_map_by_name(obj, "sk_info");
1085 if (!sk_info) {
1086 auto error = errno;
1087 LOG(FATAL) << "Failed to get sk_info: "
1088 << xsi_strerror(error, errbuf.data(), errbuf.size());
1089 close(fd);
1090 return -1;
1091 }
1092
1093 constexpr uint32_t zero = 0;
1094 uint64_t num_socks = config->num_worker;
1095
1096 rv = bpf_map__update_elem(sk_info, &zero, sizeof(zero), &num_socks,
1097 sizeof(num_socks), BPF_ANY);
1098 if (rv != 0) {
1099 auto error = errno;
1100 LOG(FATAL) << "Failed to update sk_info: "
1101 << xsi_strerror(error, errbuf.data(), errbuf.size());
1102 close(fd);
1103 return -1;
1104 }
1105
1106 auto &qkms = conn_handler_->get_quic_keying_materials();
1107 auto &qkm = qkms->keying_materials.front();
1108
1109 auto aes_key = bpf_object__find_map_by_name(obj, "aes_key");
1110 if (!aes_key) {
1111 auto error = errno;
1112 LOG(FATAL) << "Failed to get aes_key: "
1113 << xsi_strerror(error, errbuf.data(), errbuf.size());
1114 close(fd);
1115 return -1;
1116 }
1117
1118 constexpr size_t expanded_aes_keylen = 176;
1119 std::array<uint8_t, expanded_aes_keylen> aes_exp_key;
1120
1121 KeyExpansion(aes_exp_key.data(), qkm.cid_encryption_key.data());
1122
1123 rv =
1124 bpf_map__update_elem(aes_key, &zero, sizeof(zero), aes_exp_key.data(),
1125 aes_exp_key.size(), BPF_ANY);
1126 if (rv != 0) {
1127 auto error = errno;
1128 LOG(FATAL) << "Failed to update aes_key: "
1129 << xsi_strerror(error, errbuf.data(), errbuf.size());
1130 close(fd);
1131 return -1;
1132 }
1133
1134 auto prog_fd = bpf_program__fd(prog);
1135
1136 if (setsockopt(fd, SOL_SOCKET, SO_ATTACH_REUSEPORT_EBPF, &prog_fd,
1137 static_cast<socklen_t>(sizeof(prog_fd))) == -1) {
1138 LOG(FATAL) << "Failed to attach bpf program: "
1139 << xsi_strerror(errno, errbuf.data(), errbuf.size());
1140 close(fd);
1141 return -1;
1142 }
1143 }
1144
1145 if (should_update_bpf_map()) {
1146 const auto &ref = quic_bpf_refs[faddr.index];
1147 auto sk_index = compute_sk_index();
1148
1149 rv = bpf_map__update_elem(ref.reuseport_array, &sk_index,
1150 sizeof(sk_index), &fd, sizeof(fd), BPF_NOEXIST);
1151 if (rv != 0) {
1152 auto error = errno;
1153 LOG(FATAL) << "Failed to update reuseport_array: "
1154 << xsi_strerror(error, errbuf.data(), errbuf.size());
1155 close(fd);
1156 return -1;
1157 }
1158
1159 rv =
1160 bpf_map__update_elem(ref.worker_id_map, &worker_id_, sizeof(worker_id_),
1161 &sk_index, sizeof(sk_index), BPF_NOEXIST);
1162 if (rv != 0) {
1163 auto error = errno;
1164 LOG(FATAL) << "Failed to update worker_id_map: "
1165 << xsi_strerror(error, errbuf.data(), errbuf.size());
1166 close(fd);
1167 return -1;
1168 }
1169 }
1170 # endif // HAVE_LIBBPF
1171
1172 break;
1173 }
1174
1175 if (!rp) {
1176 LOG(FATAL) << "Listening " << (faddr.family == AF_INET ? "IPv4" : "IPv6")
1177 << " socket failed";
1178
1179 return -1;
1180 }
1181
1182 faddr.fd = fd;
1183 faddr.hostport = util::make_http_hostport(mod_config()->balloc,
1184 StringRef{host.data()}, faddr.port);
1185
1186 LOG(NOTICE) << "Listening on " << faddr.hostport << ", quic";
1187
1188 return 0;
1189 }
1190
get_worker_id() const1191 const WorkerID &Worker::get_worker_id() const { return worker_id_; }
1192
find_quic_upstream_addr(const Address & local_addr)1193 const UpstreamAddr *Worker::find_quic_upstream_addr(const Address &local_addr) {
1194 std::array<char, NI_MAXHOST> host;
1195
1196 auto rv = getnameinfo(&local_addr.su.sa, local_addr.len, host.data(),
1197 host.size(), nullptr, 0, NI_NUMERICHOST);
1198 if (rv != 0) {
1199 LOG(ERROR) << "getnameinfo: " << gai_strerror(rv);
1200
1201 return nullptr;
1202 }
1203
1204 uint16_t port;
1205
1206 switch (local_addr.su.sa.sa_family) {
1207 case AF_INET:
1208 port = htons(local_addr.su.in.sin_port);
1209
1210 break;
1211 case AF_INET6:
1212 port = htons(local_addr.su.in6.sin6_port);
1213
1214 break;
1215 default:
1216 assert(0);
1217 abort();
1218 }
1219
1220 std::array<char, util::max_hostport> hostport_buf;
1221
1222 auto hostport = util::make_http_hostport(std::begin(hostport_buf),
1223 StringRef{host.data()}, port);
1224 const UpstreamAddr *fallback_faddr = nullptr;
1225
1226 for (auto &faddr : quic_upstream_addrs_) {
1227 if (faddr.hostport == hostport) {
1228 return &faddr;
1229 }
1230
1231 if (faddr.port != port || faddr.family != local_addr.su.sa.sa_family) {
1232 continue;
1233 }
1234
1235 if (faddr.port == 443 || faddr.port == 80) {
1236 switch (faddr.family) {
1237 case AF_INET:
1238 if (faddr.hostport == "0.0.0.0"_sr) {
1239 fallback_faddr = &faddr;
1240 }
1241
1242 break;
1243 case AF_INET6:
1244 if (faddr.hostport == "[::]"_sr) {
1245 fallback_faddr = &faddr;
1246 }
1247
1248 break;
1249 default:
1250 assert(0);
1251 }
1252 } else {
1253 switch (faddr.family) {
1254 case AF_INET:
1255 if (util::starts_with(faddr.hostport, "0.0.0.0:"_sr)) {
1256 fallback_faddr = &faddr;
1257 }
1258
1259 break;
1260 case AF_INET6:
1261 if (util::starts_with(faddr.hostport, "[::]:"_sr)) {
1262 fallback_faddr = &faddr;
1263 }
1264
1265 break;
1266 default:
1267 assert(0);
1268 }
1269 }
1270 }
1271
1272 return fallback_faddr;
1273 }
1274 #endif // ENABLE_HTTP3
1275
1276 namespace {
match_downstream_addr_group_host(const RouterConfig & routerconf,const StringRef & host,const StringRef & path,const std::vector<std::shared_ptr<DownstreamAddrGroup>> & groups,size_t catch_all,BlockAllocator & balloc)1277 size_t match_downstream_addr_group_host(
1278 const RouterConfig &routerconf, const StringRef &host, const StringRef &path,
1279 const std::vector<std::shared_ptr<DownstreamAddrGroup>> &groups,
1280 size_t catch_all, BlockAllocator &balloc) {
1281 const auto &router = routerconf.router;
1282 const auto &rev_wildcard_router = routerconf.rev_wildcard_router;
1283 const auto &wildcard_patterns = routerconf.wildcard_patterns;
1284
1285 if (LOG_ENABLED(INFO)) {
1286 LOG(INFO) << "Perform mapping selection, using host=" << host
1287 << ", path=" << path;
1288 }
1289
1290 auto group = router.match(host, path);
1291 if (group != -1) {
1292 if (LOG_ENABLED(INFO)) {
1293 LOG(INFO) << "Found pattern with query " << host << path
1294 << ", matched pattern=" << groups[group]->pattern;
1295 }
1296 return group;
1297 }
1298
1299 if (!wildcard_patterns.empty() && !host.empty()) {
1300 auto rev_host_src = make_byte_ref(balloc, host.size() - 1);
1301 auto ep =
1302 std::copy(std::begin(host) + 1, std::end(host), std::begin(rev_host_src));
1303 std::reverse(std::begin(rev_host_src), ep);
1304 auto rev_host = StringRef{std::span{std::begin(rev_host_src), ep}};
1305
1306 ssize_t best_group = -1;
1307 const RNode *last_node = nullptr;
1308
1309 for (;;) {
1310 size_t nread = 0;
1311 auto wcidx =
1312 rev_wildcard_router.match_prefix(&nread, &last_node, rev_host);
1313 if (wcidx == -1) {
1314 break;
1315 }
1316
1317 rev_host = StringRef{std::begin(rev_host) + nread, std::end(rev_host)};
1318
1319 auto &wc = wildcard_patterns[wcidx];
1320 auto group = wc.router.match(StringRef{}, path);
1321 if (group != -1) {
1322 // We sorted wildcard_patterns in a way that first match is the
1323 // longest host pattern.
1324 if (LOG_ENABLED(INFO)) {
1325 LOG(INFO) << "Found wildcard pattern with query " << host << path
1326 << ", matched pattern=" << groups[group]->pattern;
1327 }
1328
1329 best_group = group;
1330 }
1331 }
1332
1333 if (best_group != -1) {
1334 return best_group;
1335 }
1336 }
1337
1338 group = router.match(""_sr, path);
1339 if (group != -1) {
1340 if (LOG_ENABLED(INFO)) {
1341 LOG(INFO) << "Found pattern with query " << path
1342 << ", matched pattern=" << groups[group]->pattern;
1343 }
1344 return group;
1345 }
1346
1347 if (LOG_ENABLED(INFO)) {
1348 LOG(INFO) << "None match. Use catch-all pattern";
1349 }
1350 return catch_all;
1351 }
1352 } // namespace
1353
match_downstream_addr_group(const RouterConfig & routerconf,const StringRef & hostport,const StringRef & raw_path,const std::vector<std::shared_ptr<DownstreamAddrGroup>> & groups,size_t catch_all,BlockAllocator & balloc)1354 size_t match_downstream_addr_group(
1355 const RouterConfig &routerconf, const StringRef &hostport,
1356 const StringRef &raw_path,
1357 const std::vector<std::shared_ptr<DownstreamAddrGroup>> &groups,
1358 size_t catch_all, BlockAllocator &balloc) {
1359 if (std::find(std::begin(hostport), std::end(hostport), '/') !=
1360 std::end(hostport)) {
1361 // We use '/' specially, and if '/' is included in host, it breaks
1362 // our code. Select catch-all case.
1363 return catch_all;
1364 }
1365
1366 auto fragment = std::find(std::begin(raw_path), std::end(raw_path), '#');
1367 auto query = std::find(std::begin(raw_path), fragment, '?');
1368 auto path = StringRef{std::begin(raw_path), query};
1369
1370 if (path.empty() || path[0] != '/') {
1371 path = "/"_sr;
1372 }
1373
1374 if (hostport.empty()) {
1375 return match_downstream_addr_group_host(routerconf, hostport, path, groups,
1376 catch_all, balloc);
1377 }
1378
1379 StringRef host;
1380 if (hostport[0] == '[') {
1381 // assume this is IPv6 numeric address
1382 auto p = std::find(std::begin(hostport), std::end(hostport), ']');
1383 if (p == std::end(hostport)) {
1384 return catch_all;
1385 }
1386 if (p + 1 < std::end(hostport) && *(p + 1) != ':') {
1387 return catch_all;
1388 }
1389 host = StringRef{std::begin(hostport), p + 1};
1390 } else {
1391 auto p = std::find(std::begin(hostport), std::end(hostport), ':');
1392 if (p == std::begin(hostport)) {
1393 return catch_all;
1394 }
1395 host = StringRef{std::begin(hostport), p};
1396 }
1397
1398 if (std::find_if(std::begin(host), std::end(host), [](char c) {
1399 return 'A' <= c || c <= 'Z';
1400 }) != std::end(host)) {
1401 auto low_host = make_byte_ref(balloc, host.size() + 1);
1402 auto ep = std::copy(std::begin(host), std::end(host), std::begin(low_host));
1403 *ep = '\0';
1404 util::inp_strlower(std::begin(low_host), ep);
1405 host = StringRef{std::span{std::begin(low_host), ep}};
1406 }
1407 return match_downstream_addr_group_host(routerconf, host, path, groups,
1408 catch_all, balloc);
1409 }
1410
downstream_failure(DownstreamAddr * addr,const Address * raddr)1411 void downstream_failure(DownstreamAddr *addr, const Address *raddr) {
1412 const auto &connect_blocker = addr->connect_blocker;
1413
1414 if (connect_blocker->in_offline()) {
1415 return;
1416 }
1417
1418 connect_blocker->on_failure();
1419
1420 if (addr->fall == 0) {
1421 return;
1422 }
1423
1424 auto fail_count = connect_blocker->get_fail_count();
1425
1426 if (fail_count >= addr->fall) {
1427 if (raddr) {
1428 LOG(WARN) << "Could not connect to " << util::to_numeric_addr(raddr)
1429 << " " << fail_count
1430 << " times in a row; considered as offline";
1431 } else {
1432 LOG(WARN) << "Could not connect to " << addr->host << ":" << addr->port
1433 << " " << fail_count
1434 << " times in a row; considered as offline";
1435 }
1436
1437 connect_blocker->offline();
1438
1439 if (addr->rise) {
1440 addr->live_check->schedule();
1441 }
1442 }
1443 }
1444
1445 } // namespace shrpx
1446