1 /*
2 * nghttp2 - HTTP/2 C Library
3 *
4 * Copyright (c) 2012 Tatsuhiro Tsujikawa
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining
7 * a copy of this software and associated documentation files (the
8 * "Software"), to deal in the Software without restriction, including
9 * without limitation the rights to use, copy, modify, merge, publish,
10 * distribute, sublicense, and/or sell copies of the Software, and to
11 * permit persons to whom the Software is furnished to do so, subject to
12 * the following conditions:
13 *
14 * The above copyright notice and this permission notice shall be
15 * included in all copies or substantial portions of the Software.
16 *
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24 */
25 #include "shrpx_worker.h"
26
27 #ifdef HAVE_UNISTD_H
28 # include <unistd.h>
29 #endif // HAVE_UNISTD_H
30
31 #include <memory>
32
33 #include "shrpx_tls.h"
34 #include "shrpx_log.h"
35 #include "shrpx_client_handler.h"
36 #include "shrpx_http2_session.h"
37 #include "shrpx_log_config.h"
38 #include "shrpx_memcached_dispatcher.h"
39 #ifdef HAVE_MRUBY
40 # include "shrpx_mruby.h"
41 #endif // HAVE_MRUBY
42 #include "util.h"
43 #include "template.h"
44
45 namespace shrpx {
46
47 namespace {
eventcb(struct ev_loop * loop,ev_async * w,int revents)48 void eventcb(struct ev_loop *loop, ev_async *w, int revents) {
49 auto worker = static_cast<Worker *>(w->data);
50 worker->process_events();
51 }
52 } // namespace
53
54 namespace {
mcpool_clear_cb(struct ev_loop * loop,ev_timer * w,int revents)55 void mcpool_clear_cb(struct ev_loop *loop, ev_timer *w, int revents) {
56 auto worker = static_cast<Worker *>(w->data);
57 if (worker->get_worker_stat()->num_connections != 0) {
58 return;
59 }
60 auto mcpool = worker->get_mcpool();
61 if (mcpool->freelistsize == mcpool->poolsize) {
62 worker->get_mcpool()->clear();
63 }
64 }
65 } // namespace
66
67 namespace {
proc_wev_cb(struct ev_loop * loop,ev_timer * w,int revents)68 void proc_wev_cb(struct ev_loop *loop, ev_timer *w, int revents) {
69 auto worker = static_cast<Worker *>(w->data);
70 worker->process_events();
71 }
72 } // namespace
73
DownstreamAddrGroup()74 DownstreamAddrGroup::DownstreamAddrGroup() : retired{false} {}
75
~DownstreamAddrGroup()76 DownstreamAddrGroup::~DownstreamAddrGroup() {}
77
78 // DownstreamKey is used to index SharedDownstreamAddr in order to
79 // find the same configuration.
80 using DownstreamKey =
81 std::tuple<std::vector<std::tuple<StringRef, StringRef, StringRef, size_t,
82 size_t, Proto, uint32_t, uint32_t,
83 uint32_t, bool, bool, bool, bool>>,
84 bool, SessionAffinity, StringRef, StringRef,
85 SessionAffinityCookieSecure, int64_t, int64_t, StringRef>;
86
87 namespace {
88 DownstreamKey
create_downstream_key(const std::shared_ptr<SharedDownstreamAddr> & shared_addr,const StringRef & mruby_file)89 create_downstream_key(const std::shared_ptr<SharedDownstreamAddr> &shared_addr,
90 const StringRef &mruby_file) {
91 DownstreamKey dkey;
92
93 auto &addrs = std::get<0>(dkey);
94 addrs.resize(shared_addr->addrs.size());
95 auto p = std::begin(addrs);
96 for (auto &a : shared_addr->addrs) {
97 std::get<0>(*p) = a.host;
98 std::get<1>(*p) = a.sni;
99 std::get<2>(*p) = a.group;
100 std::get<3>(*p) = a.fall;
101 std::get<4>(*p) = a.rise;
102 std::get<5>(*p) = a.proto;
103 std::get<6>(*p) = a.port;
104 std::get<7>(*p) = a.weight;
105 std::get<8>(*p) = a.group_weight;
106 std::get<9>(*p) = a.host_unix;
107 std::get<10>(*p) = a.tls;
108 std::get<11>(*p) = a.dns;
109 std::get<12>(*p) = a.upgrade_scheme;
110 ++p;
111 }
112 std::sort(std::begin(addrs), std::end(addrs));
113
114 std::get<1>(dkey) = shared_addr->redirect_if_not_tls;
115
116 auto &affinity = shared_addr->affinity;
117 std::get<2>(dkey) = affinity.type;
118 std::get<3>(dkey) = affinity.cookie.name;
119 std::get<4>(dkey) = affinity.cookie.path;
120 std::get<5>(dkey) = affinity.cookie.secure;
121 auto &timeout = shared_addr->timeout;
122 std::get<6>(dkey) = timeout.read;
123 std::get<7>(dkey) = timeout.write;
124 std::get<8>(dkey) = mruby_file;
125
126 return dkey;
127 }
128 } // namespace
129
Worker(struct ev_loop * loop,SSL_CTX * sv_ssl_ctx,SSL_CTX * cl_ssl_ctx,SSL_CTX * tls_session_cache_memcached_ssl_ctx,tls::CertLookupTree * cert_tree,const std::shared_ptr<TicketKeys> & ticket_keys,ConnectionHandler * conn_handler,std::shared_ptr<DownstreamConfig> downstreamconf)130 Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
131 SSL_CTX *tls_session_cache_memcached_ssl_ctx,
132 tls::CertLookupTree *cert_tree,
133 const std::shared_ptr<TicketKeys> &ticket_keys,
134 ConnectionHandler *conn_handler,
135 std::shared_ptr<DownstreamConfig> downstreamconf)
136 : randgen_(util::make_mt19937()),
137 worker_stat_{},
138 dns_tracker_(loop),
139 loop_(loop),
140 sv_ssl_ctx_(sv_ssl_ctx),
141 cl_ssl_ctx_(cl_ssl_ctx),
142 cert_tree_(cert_tree),
143 conn_handler_(conn_handler),
144 ticket_keys_(ticket_keys),
145 connect_blocker_(
146 std::make_unique<ConnectBlocker>(randgen_, loop_, nullptr, nullptr)),
147 graceful_shutdown_(false) {
148 ev_async_init(&w_, eventcb);
149 w_.data = this;
150 ev_async_start(loop_, &w_);
151
152 ev_timer_init(&mcpool_clear_timer_, mcpool_clear_cb, 0., 0.);
153 mcpool_clear_timer_.data = this;
154
155 ev_timer_init(&proc_wev_timer_, proc_wev_cb, 0., 0.);
156 proc_wev_timer_.data = this;
157
158 auto &session_cacheconf = get_config()->tls.session_cache;
159
160 if (!session_cacheconf.memcached.host.empty()) {
161 session_cache_memcached_dispatcher_ = std::make_unique<MemcachedDispatcher>(
162 &session_cacheconf.memcached.addr, loop,
163 tls_session_cache_memcached_ssl_ctx,
164 StringRef{session_cacheconf.memcached.host}, &mcpool_, randgen_);
165 }
166
167 replace_downstream_config(std::move(downstreamconf));
168 }
169
170 namespace {
ensure_enqueue_addr(std::priority_queue<WeightGroupEntry,std::vector<WeightGroupEntry>,WeightGroupEntryGreater> & wgpq,WeightGroup * wg,DownstreamAddr * addr)171 void ensure_enqueue_addr(
172 std::priority_queue<WeightGroupEntry, std::vector<WeightGroupEntry>,
173 WeightGroupEntryGreater> &wgpq,
174 WeightGroup *wg, DownstreamAddr *addr) {
175 uint32_t cycle;
176 if (!wg->pq.empty()) {
177 auto &top = wg->pq.top();
178 cycle = top.cycle;
179 } else {
180 cycle = 0;
181 }
182
183 addr->cycle = cycle;
184 addr->pending_penalty = 0;
185 wg->pq.push(DownstreamAddrEntry{addr, addr->seq, addr->cycle});
186 addr->queued = true;
187
188 if (!wg->queued) {
189 if (!wgpq.empty()) {
190 auto &top = wgpq.top();
191 cycle = top.cycle;
192 } else {
193 cycle = 0;
194 }
195
196 wg->cycle = cycle;
197 wg->pending_penalty = 0;
198 wgpq.push(WeightGroupEntry{wg, wg->seq, wg->cycle});
199 wg->queued = true;
200 }
201 }
202 } // namespace
203
replace_downstream_config(std::shared_ptr<DownstreamConfig> downstreamconf)204 void Worker::replace_downstream_config(
205 std::shared_ptr<DownstreamConfig> downstreamconf) {
206 for (auto &g : downstream_addr_groups_) {
207 g->retired = true;
208
209 auto &shared_addr = g->shared_addr;
210 for (auto &addr : shared_addr->addrs) {
211 addr.dconn_pool->remove_all();
212 }
213 }
214
215 downstreamconf_ = downstreamconf;
216
217 // Making a copy is much faster with multiple thread on
218 // backendconfig API call.
219 auto groups = downstreamconf->addr_groups;
220
221 downstream_addr_groups_ =
222 std::vector<std::shared_ptr<DownstreamAddrGroup>>(groups.size());
223
224 std::map<DownstreamKey, size_t> addr_groups_indexer;
225 #ifdef HAVE_MRUBY
226 // TODO It is a bit less efficient because
227 // mruby::create_mruby_context returns std::unique_ptr and we cannot
228 // use std::make_shared.
229 std::map<StringRef, std::shared_ptr<mruby::MRubyContext>> shared_mruby_ctxs;
230 #endif // HAVE_MRUBY
231
232 for (size_t i = 0; i < groups.size(); ++i) {
233 auto &src = groups[i];
234 auto &dst = downstream_addr_groups_[i];
235
236 dst = std::make_shared<DownstreamAddrGroup>();
237 dst->pattern =
238 ImmutableString{std::begin(src.pattern), std::end(src.pattern)};
239
240 auto shared_addr = std::make_shared<SharedDownstreamAddr>();
241
242 shared_addr->addrs.resize(src.addrs.size());
243 shared_addr->affinity.type = src.affinity.type;
244 if (src.affinity.type == SessionAffinity::COOKIE) {
245 shared_addr->affinity.cookie.name =
246 make_string_ref(shared_addr->balloc, src.affinity.cookie.name);
247 if (!src.affinity.cookie.path.empty()) {
248 shared_addr->affinity.cookie.path =
249 make_string_ref(shared_addr->balloc, src.affinity.cookie.path);
250 }
251 shared_addr->affinity.cookie.secure = src.affinity.cookie.secure;
252 }
253 shared_addr->affinity_hash = src.affinity_hash;
254 shared_addr->redirect_if_not_tls = src.redirect_if_not_tls;
255 shared_addr->timeout.read = src.timeout.read;
256 shared_addr->timeout.write = src.timeout.write;
257
258 for (size_t j = 0; j < src.addrs.size(); ++j) {
259 auto &src_addr = src.addrs[j];
260 auto &dst_addr = shared_addr->addrs[j];
261
262 dst_addr.addr = src_addr.addr;
263 dst_addr.host = make_string_ref(shared_addr->balloc, src_addr.host);
264 dst_addr.hostport =
265 make_string_ref(shared_addr->balloc, src_addr.hostport);
266 dst_addr.port = src_addr.port;
267 dst_addr.host_unix = src_addr.host_unix;
268 dst_addr.weight = src_addr.weight;
269 dst_addr.group = make_string_ref(shared_addr->balloc, src_addr.group);
270 dst_addr.group_weight = src_addr.group_weight;
271 dst_addr.proto = src_addr.proto;
272 dst_addr.tls = src_addr.tls;
273 dst_addr.sni = make_string_ref(shared_addr->balloc, src_addr.sni);
274 dst_addr.fall = src_addr.fall;
275 dst_addr.rise = src_addr.rise;
276 dst_addr.dns = src_addr.dns;
277 dst_addr.upgrade_scheme = src_addr.upgrade_scheme;
278
279 auto shared_addr_ptr = shared_addr.get();
280
281 dst_addr.connect_blocker = std::make_unique<ConnectBlocker>(
282 randgen_, loop_, nullptr, [shared_addr_ptr, &dst_addr]() {
283 if (!dst_addr.queued) {
284 if (!dst_addr.wg) {
285 return;
286 }
287 ensure_enqueue_addr(shared_addr_ptr->pq, dst_addr.wg, &dst_addr);
288 }
289 });
290
291 dst_addr.live_check = std::make_unique<LiveCheck>(
292 loop_, cl_ssl_ctx_, this, &dst_addr, randgen_);
293 }
294
295 #ifdef HAVE_MRUBY
296 auto mruby_ctx_it = shared_mruby_ctxs.find(src.mruby_file);
297 if (mruby_ctx_it == std::end(shared_mruby_ctxs)) {
298 shared_addr->mruby_ctx = mruby::create_mruby_context(src.mruby_file);
299 assert(shared_addr->mruby_ctx);
300 shared_mruby_ctxs.emplace(src.mruby_file, shared_addr->mruby_ctx);
301 } else {
302 shared_addr->mruby_ctx = (*mruby_ctx_it).second;
303 }
304 #endif // HAVE_MRUBY
305
306 // share the connection if patterns have the same set of backend
307 // addresses.
308
309 auto dkey = create_downstream_key(shared_addr, src.mruby_file);
310 auto it = addr_groups_indexer.find(dkey);
311
312 if (it == std::end(addr_groups_indexer)) {
313 std::shuffle(std::begin(shared_addr->addrs), std::end(shared_addr->addrs),
314 randgen_);
315
316 size_t seq = 0;
317 for (auto &addr : shared_addr->addrs) {
318 addr.dconn_pool = std::make_unique<DownstreamConnectionPool>();
319 addr.seq = seq++;
320 }
321
322 if (shared_addr->affinity.type == SessionAffinity::NONE) {
323 std::map<StringRef, WeightGroup *> wgs;
324 size_t num_wgs = 0;
325 for (auto &addr : shared_addr->addrs) {
326 if (wgs.find(addr.group) == std::end(wgs)) {
327 ++num_wgs;
328 wgs.emplace(addr.group, nullptr);
329 }
330 }
331
332 shared_addr->wgs = std::vector<WeightGroup>(num_wgs);
333
334 for (auto &addr : shared_addr->addrs) {
335 auto &wg = wgs[addr.group];
336 if (wg == nullptr) {
337 wg = &shared_addr->wgs[--num_wgs];
338 wg->seq = num_wgs;
339 }
340
341 wg->weight = addr.group_weight;
342 wg->pq.push(DownstreamAddrEntry{&addr, addr.seq, addr.cycle});
343 addr.queued = true;
344 addr.wg = wg;
345 }
346
347 assert(num_wgs == 0);
348
349 for (auto &kv : wgs) {
350 shared_addr->pq.push(
351 WeightGroupEntry{kv.second, kv.second->seq, kv.second->cycle});
352 kv.second->queued = true;
353 }
354 }
355
356 dst->shared_addr = shared_addr;
357
358 addr_groups_indexer.emplace(std::move(dkey), i);
359 } else {
360 auto &g = *(std::begin(downstream_addr_groups_) + (*it).second);
361 if (LOG_ENABLED(INFO)) {
362 LOG(INFO) << dst->pattern << " shares the same backend group with "
363 << g->pattern;
364 }
365 dst->shared_addr = g->shared_addr;
366 }
367 }
368 }
369
~Worker()370 Worker::~Worker() {
371 ev_async_stop(loop_, &w_);
372 ev_timer_stop(loop_, &mcpool_clear_timer_);
373 ev_timer_stop(loop_, &proc_wev_timer_);
374 }
375
schedule_clear_mcpool()376 void Worker::schedule_clear_mcpool() {
377 // libev manual says: "If the watcher is already active nothing will
378 // happen." Since we don't change any timeout here, we don't have
379 // to worry about querying ev_is_active.
380 ev_timer_start(loop_, &mcpool_clear_timer_);
381 }
382
wait()383 void Worker::wait() {
384 #ifndef NOTHREADS
385 fut_.get();
386 #endif // !NOTHREADS
387 }
388
run_async()389 void Worker::run_async() {
390 #ifndef NOTHREADS
391 fut_ = std::async(std::launch::async, [this] {
392 (void)reopen_log_files(get_config()->logging);
393 ev_run(loop_);
394 delete_log_config();
395 });
396 #endif // !NOTHREADS
397 }
398
send(const WorkerEvent & event)399 void Worker::send(const WorkerEvent &event) {
400 {
401 std::lock_guard<std::mutex> g(m_);
402
403 q_.push_back(event);
404 }
405
406 ev_async_send(loop_, &w_);
407 }
408
process_events()409 void Worker::process_events() {
410 WorkerEvent wev;
411 {
412 std::lock_guard<std::mutex> g(m_);
413
414 // Process event one at a time. This is important for
415 // WorkerEventType::NEW_CONNECTION event since accepting large
416 // number of new connections at once may delay time to 1st byte
417 // for existing connections.
418
419 if (q_.empty()) {
420 ev_timer_stop(loop_, &proc_wev_timer_);
421 return;
422 }
423
424 wev = q_.front();
425 q_.pop_front();
426 }
427
428 ev_timer_start(loop_, &proc_wev_timer_);
429
430 auto config = get_config();
431
432 auto worker_connections = config->conn.upstream.worker_connections;
433
434 switch (wev.type) {
435 case WorkerEventType::NEW_CONNECTION: {
436 if (LOG_ENABLED(INFO)) {
437 WLOG(INFO, this) << "WorkerEvent: client_fd=" << wev.client_fd
438 << ", addrlen=" << wev.client_addrlen;
439 }
440
441 if (worker_stat_.num_connections >= worker_connections) {
442
443 if (LOG_ENABLED(INFO)) {
444 WLOG(INFO, this) << "Too many connections >= " << worker_connections;
445 }
446
447 close(wev.client_fd);
448
449 break;
450 }
451
452 auto client_handler =
453 tls::accept_connection(this, wev.client_fd, &wev.client_addr.sa,
454 wev.client_addrlen, wev.faddr);
455 if (!client_handler) {
456 if (LOG_ENABLED(INFO)) {
457 WLOG(ERROR, this) << "ClientHandler creation failed";
458 }
459 close(wev.client_fd);
460 break;
461 }
462
463 if (LOG_ENABLED(INFO)) {
464 WLOG(INFO, this) << "CLIENT_HANDLER:" << client_handler << " created ";
465 }
466
467 break;
468 }
469 case WorkerEventType::REOPEN_LOG:
470 WLOG(NOTICE, this) << "Reopening log files: worker process (thread " << this
471 << ")";
472
473 reopen_log_files(config->logging);
474
475 break;
476 case WorkerEventType::GRACEFUL_SHUTDOWN:
477 WLOG(NOTICE, this) << "Graceful shutdown commencing";
478
479 graceful_shutdown_ = true;
480
481 if (worker_stat_.num_connections == 0) {
482 ev_break(loop_);
483
484 return;
485 }
486
487 break;
488 case WorkerEventType::REPLACE_DOWNSTREAM:
489 WLOG(NOTICE, this) << "Replace downstream";
490
491 replace_downstream_config(wev.downstreamconf);
492
493 break;
494 default:
495 if (LOG_ENABLED(INFO)) {
496 WLOG(INFO, this) << "unknown event type " << static_cast<int>(wev.type);
497 }
498 }
499 }
500
get_cert_lookup_tree() const501 tls::CertLookupTree *Worker::get_cert_lookup_tree() const { return cert_tree_; }
502
get_ticket_keys()503 std::shared_ptr<TicketKeys> Worker::get_ticket_keys() {
504 #ifdef HAVE_ATOMIC_STD_SHARED_PTR
505 return std::atomic_load_explicit(&ticket_keys_, std::memory_order_acquire);
506 #else // !HAVE_ATOMIC_STD_SHARED_PTR
507 std::lock_guard<std::mutex> g(ticket_keys_m_);
508 return ticket_keys_;
509 #endif // !HAVE_ATOMIC_STD_SHARED_PTR
510 }
511
set_ticket_keys(std::shared_ptr<TicketKeys> ticket_keys)512 void Worker::set_ticket_keys(std::shared_ptr<TicketKeys> ticket_keys) {
513 #ifdef HAVE_ATOMIC_STD_SHARED_PTR
514 // This is single writer
515 std::atomic_store_explicit(&ticket_keys_, std::move(ticket_keys),
516 std::memory_order_release);
517 #else // !HAVE_ATOMIC_STD_SHARED_PTR
518 std::lock_guard<std::mutex> g(ticket_keys_m_);
519 ticket_keys_ = std::move(ticket_keys);
520 #endif // !HAVE_ATOMIC_STD_SHARED_PTR
521 }
522
get_worker_stat()523 WorkerStat *Worker::get_worker_stat() { return &worker_stat_; }
524
get_loop() const525 struct ev_loop *Worker::get_loop() const {
526 return loop_;
527 }
528
get_sv_ssl_ctx() const529 SSL_CTX *Worker::get_sv_ssl_ctx() const { return sv_ssl_ctx_; }
530
get_cl_ssl_ctx() const531 SSL_CTX *Worker::get_cl_ssl_ctx() const { return cl_ssl_ctx_; }
532
set_graceful_shutdown(bool f)533 void Worker::set_graceful_shutdown(bool f) { graceful_shutdown_ = f; }
534
get_graceful_shutdown() const535 bool Worker::get_graceful_shutdown() const { return graceful_shutdown_; }
536
get_mcpool()537 MemchunkPool *Worker::get_mcpool() { return &mcpool_; }
538
get_session_cache_memcached_dispatcher()539 MemcachedDispatcher *Worker::get_session_cache_memcached_dispatcher() {
540 return session_cache_memcached_dispatcher_.get();
541 }
542
get_randgen()543 std::mt19937 &Worker::get_randgen() { return randgen_; }
544
545 #ifdef HAVE_MRUBY
create_mruby_context()546 int Worker::create_mruby_context() {
547 mruby_ctx_ = mruby::create_mruby_context(StringRef{get_config()->mruby_file});
548 if (!mruby_ctx_) {
549 return -1;
550 }
551
552 return 0;
553 }
554
get_mruby_context() const555 mruby::MRubyContext *Worker::get_mruby_context() const {
556 return mruby_ctx_.get();
557 }
558 #endif // HAVE_MRUBY
559
560 std::vector<std::shared_ptr<DownstreamAddrGroup>> &
get_downstream_addr_groups()561 Worker::get_downstream_addr_groups() {
562 return downstream_addr_groups_;
563 }
564
get_connect_blocker() const565 ConnectBlocker *Worker::get_connect_blocker() const {
566 return connect_blocker_.get();
567 }
568
get_downstream_config() const569 const DownstreamConfig *Worker::get_downstream_config() const {
570 return downstreamconf_.get();
571 }
572
get_connection_handler() const573 ConnectionHandler *Worker::get_connection_handler() const {
574 return conn_handler_;
575 }
576
get_dns_tracker()577 DNSTracker *Worker::get_dns_tracker() { return &dns_tracker_; }
578
579 namespace {
match_downstream_addr_group_host(const RouterConfig & routerconf,const StringRef & host,const StringRef & path,const std::vector<std::shared_ptr<DownstreamAddrGroup>> & groups,size_t catch_all,BlockAllocator & balloc)580 size_t match_downstream_addr_group_host(
581 const RouterConfig &routerconf, const StringRef &host,
582 const StringRef &path,
583 const std::vector<std::shared_ptr<DownstreamAddrGroup>> &groups,
584 size_t catch_all, BlockAllocator &balloc) {
585
586 const auto &router = routerconf.router;
587 const auto &rev_wildcard_router = routerconf.rev_wildcard_router;
588 const auto &wildcard_patterns = routerconf.wildcard_patterns;
589
590 if (LOG_ENABLED(INFO)) {
591 LOG(INFO) << "Perform mapping selection, using host=" << host
592 << ", path=" << path;
593 }
594
595 auto group = router.match(host, path);
596 if (group != -1) {
597 if (LOG_ENABLED(INFO)) {
598 LOG(INFO) << "Found pattern with query " << host << path
599 << ", matched pattern=" << groups[group]->pattern;
600 }
601 return group;
602 }
603
604 if (!wildcard_patterns.empty() && !host.empty()) {
605 auto rev_host_src = make_byte_ref(balloc, host.size() - 1);
606 auto ep =
607 std::copy(std::begin(host) + 1, std::end(host), rev_host_src.base);
608 std::reverse(rev_host_src.base, ep);
609 auto rev_host = StringRef{rev_host_src.base, ep};
610
611 ssize_t best_group = -1;
612 const RNode *last_node = nullptr;
613
614 for (;;) {
615 size_t nread = 0;
616 auto wcidx =
617 rev_wildcard_router.match_prefix(&nread, &last_node, rev_host);
618 if (wcidx == -1) {
619 break;
620 }
621
622 rev_host = StringRef{std::begin(rev_host) + nread, std::end(rev_host)};
623
624 auto &wc = wildcard_patterns[wcidx];
625 auto group = wc.router.match(StringRef{}, path);
626 if (group != -1) {
627 // We sorted wildcard_patterns in a way that first match is the
628 // longest host pattern.
629 if (LOG_ENABLED(INFO)) {
630 LOG(INFO) << "Found wildcard pattern with query " << host << path
631 << ", matched pattern=" << groups[group]->pattern;
632 }
633
634 best_group = group;
635 }
636 }
637
638 if (best_group != -1) {
639 return best_group;
640 }
641 }
642
643 group = router.match(StringRef::from_lit(""), path);
644 if (group != -1) {
645 if (LOG_ENABLED(INFO)) {
646 LOG(INFO) << "Found pattern with query " << path
647 << ", matched pattern=" << groups[group]->pattern;
648 }
649 return group;
650 }
651
652 if (LOG_ENABLED(INFO)) {
653 LOG(INFO) << "None match. Use catch-all pattern";
654 }
655 return catch_all;
656 }
657 } // namespace
658
match_downstream_addr_group(const RouterConfig & routerconf,const StringRef & hostport,const StringRef & raw_path,const std::vector<std::shared_ptr<DownstreamAddrGroup>> & groups,size_t catch_all,BlockAllocator & balloc)659 size_t match_downstream_addr_group(
660 const RouterConfig &routerconf, const StringRef &hostport,
661 const StringRef &raw_path,
662 const std::vector<std::shared_ptr<DownstreamAddrGroup>> &groups,
663 size_t catch_all, BlockAllocator &balloc) {
664 if (std::find(std::begin(hostport), std::end(hostport), '/') !=
665 std::end(hostport)) {
666 // We use '/' specially, and if '/' is included in host, it breaks
667 // our code. Select catch-all case.
668 return catch_all;
669 }
670
671 auto fragment = std::find(std::begin(raw_path), std::end(raw_path), '#');
672 auto query = std::find(std::begin(raw_path), fragment, '?');
673 auto path = StringRef{std::begin(raw_path), query};
674
675 if (path.empty() || path[0] != '/') {
676 path = StringRef::from_lit("/");
677 }
678
679 if (hostport.empty()) {
680 return match_downstream_addr_group_host(routerconf, hostport, path, groups,
681 catch_all, balloc);
682 }
683
684 StringRef host;
685 if (hostport[0] == '[') {
686 // assume this is IPv6 numeric address
687 auto p = std::find(std::begin(hostport), std::end(hostport), ']');
688 if (p == std::end(hostport)) {
689 return catch_all;
690 }
691 if (p + 1 < std::end(hostport) && *(p + 1) != ':') {
692 return catch_all;
693 }
694 host = StringRef{std::begin(hostport), p + 1};
695 } else {
696 auto p = std::find(std::begin(hostport), std::end(hostport), ':');
697 if (p == std::begin(hostport)) {
698 return catch_all;
699 }
700 host = StringRef{std::begin(hostport), p};
701 }
702
703 if (std::find_if(std::begin(host), std::end(host), [](char c) {
704 return 'A' <= c || c <= 'Z';
705 }) != std::end(host)) {
706 auto low_host = make_byte_ref(balloc, host.size() + 1);
707 auto ep = std::copy(std::begin(host), std::end(host), low_host.base);
708 *ep = '\0';
709 util::inp_strlower(low_host.base, ep);
710 host = StringRef{low_host.base, ep};
711 }
712 return match_downstream_addr_group_host(routerconf, host, path, groups,
713 catch_all, balloc);
714 }
715
downstream_failure(DownstreamAddr * addr,const Address * raddr)716 void downstream_failure(DownstreamAddr *addr, const Address *raddr) {
717 const auto &connect_blocker = addr->connect_blocker;
718
719 if (connect_blocker->in_offline()) {
720 return;
721 }
722
723 connect_blocker->on_failure();
724
725 if (addr->fall == 0) {
726 return;
727 }
728
729 auto fail_count = connect_blocker->get_fail_count();
730
731 if (fail_count >= addr->fall) {
732 if (raddr) {
733 LOG(WARN) << "Could not connect to " << util::to_numeric_addr(raddr)
734 << " " << fail_count
735 << " times in a row; considered as offline";
736 } else {
737 LOG(WARN) << "Could not connect to " << addr->host << ":" << addr->port
738 << " " << fail_count
739 << " times in a row; considered as offline";
740 }
741
742 connect_blocker->offline();
743
744 if (addr->rise) {
745 addr->live_check->schedule();
746 }
747 }
748 }
749
750 } // namespace shrpx
751