• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * nghttp2 - HTTP/2 C Library
3  *
4  * Copyright (c) 2012 Tatsuhiro Tsujikawa
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining
7  * a copy of this software and associated documentation files (the
8  * "Software"), to deal in the Software without restriction, including
9  * without limitation the rights to use, copy, modify, merge, publish,
10  * distribute, sublicense, and/or sell copies of the Software, and to
11  * permit persons to whom the Software is furnished to do so, subject to
12  * the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be
15  * included in all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21  * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22  * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23  * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24  */
25 #include "shrpx_worker.h"
26 
27 #ifdef HAVE_UNISTD_H
28 #  include <unistd.h>
29 #endif // HAVE_UNISTD_H
30 
31 #include <memory>
32 
33 #include "shrpx_tls.h"
34 #include "shrpx_log.h"
35 #include "shrpx_client_handler.h"
36 #include "shrpx_http2_session.h"
37 #include "shrpx_log_config.h"
38 #include "shrpx_memcached_dispatcher.h"
39 #ifdef HAVE_MRUBY
40 #  include "shrpx_mruby.h"
41 #endif // HAVE_MRUBY
42 #include "util.h"
43 #include "template.h"
44 
45 namespace shrpx {
46 
47 namespace {
eventcb(struct ev_loop * loop,ev_async * w,int revents)48 void eventcb(struct ev_loop *loop, ev_async *w, int revents) {
49   auto worker = static_cast<Worker *>(w->data);
50   worker->process_events();
51 }
52 } // namespace
53 
54 namespace {
mcpool_clear_cb(struct ev_loop * loop,ev_timer * w,int revents)55 void mcpool_clear_cb(struct ev_loop *loop, ev_timer *w, int revents) {
56   auto worker = static_cast<Worker *>(w->data);
57   if (worker->get_worker_stat()->num_connections != 0) {
58     return;
59   }
60   auto mcpool = worker->get_mcpool();
61   if (mcpool->freelistsize == mcpool->poolsize) {
62     worker->get_mcpool()->clear();
63   }
64 }
65 } // namespace
66 
67 namespace {
proc_wev_cb(struct ev_loop * loop,ev_timer * w,int revents)68 void proc_wev_cb(struct ev_loop *loop, ev_timer *w, int revents) {
69   auto worker = static_cast<Worker *>(w->data);
70   worker->process_events();
71 }
72 } // namespace
73 
DownstreamAddrGroup()74 DownstreamAddrGroup::DownstreamAddrGroup() : retired{false} {}
75 
~DownstreamAddrGroup()76 DownstreamAddrGroup::~DownstreamAddrGroup() {}
77 
78 // DownstreamKey is used to index SharedDownstreamAddr in order to
79 // find the same configuration.
80 using DownstreamKey =
81     std::tuple<std::vector<std::tuple<StringRef, StringRef, StringRef, size_t,
82                                       size_t, Proto, uint32_t, uint32_t,
83                                       uint32_t, bool, bool, bool, bool>>,
84                bool, SessionAffinity, StringRef, StringRef,
85                SessionAffinityCookieSecure, int64_t, int64_t, StringRef>;
86 
87 namespace {
88 DownstreamKey
create_downstream_key(const std::shared_ptr<SharedDownstreamAddr> & shared_addr,const StringRef & mruby_file)89 create_downstream_key(const std::shared_ptr<SharedDownstreamAddr> &shared_addr,
90                       const StringRef &mruby_file) {
91   DownstreamKey dkey;
92 
93   auto &addrs = std::get<0>(dkey);
94   addrs.resize(shared_addr->addrs.size());
95   auto p = std::begin(addrs);
96   for (auto &a : shared_addr->addrs) {
97     std::get<0>(*p) = a.host;
98     std::get<1>(*p) = a.sni;
99     std::get<2>(*p) = a.group;
100     std::get<3>(*p) = a.fall;
101     std::get<4>(*p) = a.rise;
102     std::get<5>(*p) = a.proto;
103     std::get<6>(*p) = a.port;
104     std::get<7>(*p) = a.weight;
105     std::get<8>(*p) = a.group_weight;
106     std::get<9>(*p) = a.host_unix;
107     std::get<10>(*p) = a.tls;
108     std::get<11>(*p) = a.dns;
109     std::get<12>(*p) = a.upgrade_scheme;
110     ++p;
111   }
112   std::sort(std::begin(addrs), std::end(addrs));
113 
114   std::get<1>(dkey) = shared_addr->redirect_if_not_tls;
115 
116   auto &affinity = shared_addr->affinity;
117   std::get<2>(dkey) = affinity.type;
118   std::get<3>(dkey) = affinity.cookie.name;
119   std::get<4>(dkey) = affinity.cookie.path;
120   std::get<5>(dkey) = affinity.cookie.secure;
121   auto &timeout = shared_addr->timeout;
122   std::get<6>(dkey) = timeout.read;
123   std::get<7>(dkey) = timeout.write;
124   std::get<8>(dkey) = mruby_file;
125 
126   return dkey;
127 }
128 } // namespace
129 
Worker(struct ev_loop * loop,SSL_CTX * sv_ssl_ctx,SSL_CTX * cl_ssl_ctx,SSL_CTX * tls_session_cache_memcached_ssl_ctx,tls::CertLookupTree * cert_tree,const std::shared_ptr<TicketKeys> & ticket_keys,ConnectionHandler * conn_handler,std::shared_ptr<DownstreamConfig> downstreamconf)130 Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
131                SSL_CTX *tls_session_cache_memcached_ssl_ctx,
132                tls::CertLookupTree *cert_tree,
133                const std::shared_ptr<TicketKeys> &ticket_keys,
134                ConnectionHandler *conn_handler,
135                std::shared_ptr<DownstreamConfig> downstreamconf)
136     : randgen_(util::make_mt19937()),
137       worker_stat_{},
138       dns_tracker_(loop),
139       loop_(loop),
140       sv_ssl_ctx_(sv_ssl_ctx),
141       cl_ssl_ctx_(cl_ssl_ctx),
142       cert_tree_(cert_tree),
143       conn_handler_(conn_handler),
144       ticket_keys_(ticket_keys),
145       connect_blocker_(
146           std::make_unique<ConnectBlocker>(randgen_, loop_, nullptr, nullptr)),
147       graceful_shutdown_(false) {
148   ev_async_init(&w_, eventcb);
149   w_.data = this;
150   ev_async_start(loop_, &w_);
151 
152   ev_timer_init(&mcpool_clear_timer_, mcpool_clear_cb, 0., 0.);
153   mcpool_clear_timer_.data = this;
154 
155   ev_timer_init(&proc_wev_timer_, proc_wev_cb, 0., 0.);
156   proc_wev_timer_.data = this;
157 
158   auto &session_cacheconf = get_config()->tls.session_cache;
159 
160   if (!session_cacheconf.memcached.host.empty()) {
161     session_cache_memcached_dispatcher_ = std::make_unique<MemcachedDispatcher>(
162         &session_cacheconf.memcached.addr, loop,
163         tls_session_cache_memcached_ssl_ctx,
164         StringRef{session_cacheconf.memcached.host}, &mcpool_, randgen_);
165   }
166 
167   replace_downstream_config(std::move(downstreamconf));
168 }
169 
170 namespace {
ensure_enqueue_addr(std::priority_queue<WeightGroupEntry,std::vector<WeightGroupEntry>,WeightGroupEntryGreater> & wgpq,WeightGroup * wg,DownstreamAddr * addr)171 void ensure_enqueue_addr(
172     std::priority_queue<WeightGroupEntry, std::vector<WeightGroupEntry>,
173                         WeightGroupEntryGreater> &wgpq,
174     WeightGroup *wg, DownstreamAddr *addr) {
175   uint32_t cycle;
176   if (!wg->pq.empty()) {
177     auto &top = wg->pq.top();
178     cycle = top.cycle;
179   } else {
180     cycle = 0;
181   }
182 
183   addr->cycle = cycle;
184   addr->pending_penalty = 0;
185   wg->pq.push(DownstreamAddrEntry{addr, addr->seq, addr->cycle});
186   addr->queued = true;
187 
188   if (!wg->queued) {
189     if (!wgpq.empty()) {
190       auto &top = wgpq.top();
191       cycle = top.cycle;
192     } else {
193       cycle = 0;
194     }
195 
196     wg->cycle = cycle;
197     wg->pending_penalty = 0;
198     wgpq.push(WeightGroupEntry{wg, wg->seq, wg->cycle});
199     wg->queued = true;
200   }
201 }
202 } // namespace
203 
replace_downstream_config(std::shared_ptr<DownstreamConfig> downstreamconf)204 void Worker::replace_downstream_config(
205     std::shared_ptr<DownstreamConfig> downstreamconf) {
206   for (auto &g : downstream_addr_groups_) {
207     g->retired = true;
208 
209     auto &shared_addr = g->shared_addr;
210     for (auto &addr : shared_addr->addrs) {
211       addr.dconn_pool->remove_all();
212     }
213   }
214 
215   downstreamconf_ = downstreamconf;
216 
217   // Making a copy is much faster with multiple thread on
218   // backendconfig API call.
219   auto groups = downstreamconf->addr_groups;
220 
221   downstream_addr_groups_ =
222       std::vector<std::shared_ptr<DownstreamAddrGroup>>(groups.size());
223 
224   std::map<DownstreamKey, size_t> addr_groups_indexer;
225 #ifdef HAVE_MRUBY
226   // TODO It is a bit less efficient because
227   // mruby::create_mruby_context returns std::unique_ptr and we cannot
228   // use std::make_shared.
229   std::map<StringRef, std::shared_ptr<mruby::MRubyContext>> shared_mruby_ctxs;
230 #endif // HAVE_MRUBY
231 
232   for (size_t i = 0; i < groups.size(); ++i) {
233     auto &src = groups[i];
234     auto &dst = downstream_addr_groups_[i];
235 
236     dst = std::make_shared<DownstreamAddrGroup>();
237     dst->pattern =
238         ImmutableString{std::begin(src.pattern), std::end(src.pattern)};
239 
240     auto shared_addr = std::make_shared<SharedDownstreamAddr>();
241 
242     shared_addr->addrs.resize(src.addrs.size());
243     shared_addr->affinity.type = src.affinity.type;
244     if (src.affinity.type == SessionAffinity::COOKIE) {
245       shared_addr->affinity.cookie.name =
246           make_string_ref(shared_addr->balloc, src.affinity.cookie.name);
247       if (!src.affinity.cookie.path.empty()) {
248         shared_addr->affinity.cookie.path =
249             make_string_ref(shared_addr->balloc, src.affinity.cookie.path);
250       }
251       shared_addr->affinity.cookie.secure = src.affinity.cookie.secure;
252     }
253     shared_addr->affinity_hash = src.affinity_hash;
254     shared_addr->redirect_if_not_tls = src.redirect_if_not_tls;
255     shared_addr->timeout.read = src.timeout.read;
256     shared_addr->timeout.write = src.timeout.write;
257 
258     for (size_t j = 0; j < src.addrs.size(); ++j) {
259       auto &src_addr = src.addrs[j];
260       auto &dst_addr = shared_addr->addrs[j];
261 
262       dst_addr.addr = src_addr.addr;
263       dst_addr.host = make_string_ref(shared_addr->balloc, src_addr.host);
264       dst_addr.hostport =
265           make_string_ref(shared_addr->balloc, src_addr.hostport);
266       dst_addr.port = src_addr.port;
267       dst_addr.host_unix = src_addr.host_unix;
268       dst_addr.weight = src_addr.weight;
269       dst_addr.group = make_string_ref(shared_addr->balloc, src_addr.group);
270       dst_addr.group_weight = src_addr.group_weight;
271       dst_addr.proto = src_addr.proto;
272       dst_addr.tls = src_addr.tls;
273       dst_addr.sni = make_string_ref(shared_addr->balloc, src_addr.sni);
274       dst_addr.fall = src_addr.fall;
275       dst_addr.rise = src_addr.rise;
276       dst_addr.dns = src_addr.dns;
277       dst_addr.upgrade_scheme = src_addr.upgrade_scheme;
278 
279       auto shared_addr_ptr = shared_addr.get();
280 
281       dst_addr.connect_blocker = std::make_unique<ConnectBlocker>(
282           randgen_, loop_, nullptr, [shared_addr_ptr, &dst_addr]() {
283             if (!dst_addr.queued) {
284               if (!dst_addr.wg) {
285                 return;
286               }
287               ensure_enqueue_addr(shared_addr_ptr->pq, dst_addr.wg, &dst_addr);
288             }
289           });
290 
291       dst_addr.live_check = std::make_unique<LiveCheck>(
292           loop_, cl_ssl_ctx_, this, &dst_addr, randgen_);
293     }
294 
295 #ifdef HAVE_MRUBY
296     auto mruby_ctx_it = shared_mruby_ctxs.find(src.mruby_file);
297     if (mruby_ctx_it == std::end(shared_mruby_ctxs)) {
298       shared_addr->mruby_ctx = mruby::create_mruby_context(src.mruby_file);
299       assert(shared_addr->mruby_ctx);
300       shared_mruby_ctxs.emplace(src.mruby_file, shared_addr->mruby_ctx);
301     } else {
302       shared_addr->mruby_ctx = (*mruby_ctx_it).second;
303     }
304 #endif // HAVE_MRUBY
305 
306     // share the connection if patterns have the same set of backend
307     // addresses.
308 
309     auto dkey = create_downstream_key(shared_addr, src.mruby_file);
310     auto it = addr_groups_indexer.find(dkey);
311 
312     if (it == std::end(addr_groups_indexer)) {
313       std::shuffle(std::begin(shared_addr->addrs), std::end(shared_addr->addrs),
314                    randgen_);
315 
316       size_t seq = 0;
317       for (auto &addr : shared_addr->addrs) {
318         addr.dconn_pool = std::make_unique<DownstreamConnectionPool>();
319         addr.seq = seq++;
320       }
321 
322       if (shared_addr->affinity.type == SessionAffinity::NONE) {
323         std::map<StringRef, WeightGroup *> wgs;
324         size_t num_wgs = 0;
325         for (auto &addr : shared_addr->addrs) {
326           if (wgs.find(addr.group) == std::end(wgs)) {
327             ++num_wgs;
328             wgs.emplace(addr.group, nullptr);
329           }
330         }
331 
332         shared_addr->wgs = std::vector<WeightGroup>(num_wgs);
333 
334         for (auto &addr : shared_addr->addrs) {
335           auto &wg = wgs[addr.group];
336           if (wg == nullptr) {
337             wg = &shared_addr->wgs[--num_wgs];
338             wg->seq = num_wgs;
339           }
340 
341           wg->weight = addr.group_weight;
342           wg->pq.push(DownstreamAddrEntry{&addr, addr.seq, addr.cycle});
343           addr.queued = true;
344           addr.wg = wg;
345         }
346 
347         assert(num_wgs == 0);
348 
349         for (auto &kv : wgs) {
350           shared_addr->pq.push(
351               WeightGroupEntry{kv.second, kv.second->seq, kv.second->cycle});
352           kv.second->queued = true;
353         }
354       }
355 
356       dst->shared_addr = shared_addr;
357 
358       addr_groups_indexer.emplace(std::move(dkey), i);
359     } else {
360       auto &g = *(std::begin(downstream_addr_groups_) + (*it).second);
361       if (LOG_ENABLED(INFO)) {
362         LOG(INFO) << dst->pattern << " shares the same backend group with "
363                   << g->pattern;
364       }
365       dst->shared_addr = g->shared_addr;
366     }
367   }
368 }
369 
~Worker()370 Worker::~Worker() {
371   ev_async_stop(loop_, &w_);
372   ev_timer_stop(loop_, &mcpool_clear_timer_);
373   ev_timer_stop(loop_, &proc_wev_timer_);
374 }
375 
schedule_clear_mcpool()376 void Worker::schedule_clear_mcpool() {
377   // libev manual says: "If the watcher is already active nothing will
378   // happen."  Since we don't change any timeout here, we don't have
379   // to worry about querying ev_is_active.
380   ev_timer_start(loop_, &mcpool_clear_timer_);
381 }
382 
wait()383 void Worker::wait() {
384 #ifndef NOTHREADS
385   fut_.get();
386 #endif // !NOTHREADS
387 }
388 
run_async()389 void Worker::run_async() {
390 #ifndef NOTHREADS
391   fut_ = std::async(std::launch::async, [this] {
392     (void)reopen_log_files(get_config()->logging);
393     ev_run(loop_);
394     delete_log_config();
395   });
396 #endif // !NOTHREADS
397 }
398 
send(const WorkerEvent & event)399 void Worker::send(const WorkerEvent &event) {
400   {
401     std::lock_guard<std::mutex> g(m_);
402 
403     q_.push_back(event);
404   }
405 
406   ev_async_send(loop_, &w_);
407 }
408 
process_events()409 void Worker::process_events() {
410   WorkerEvent wev;
411   {
412     std::lock_guard<std::mutex> g(m_);
413 
414     // Process event one at a time.  This is important for
415     // WorkerEventType::NEW_CONNECTION event since accepting large
416     // number of new connections at once may delay time to 1st byte
417     // for existing connections.
418 
419     if (q_.empty()) {
420       ev_timer_stop(loop_, &proc_wev_timer_);
421       return;
422     }
423 
424     wev = q_.front();
425     q_.pop_front();
426   }
427 
428   ev_timer_start(loop_, &proc_wev_timer_);
429 
430   auto config = get_config();
431 
432   auto worker_connections = config->conn.upstream.worker_connections;
433 
434   switch (wev.type) {
435   case WorkerEventType::NEW_CONNECTION: {
436     if (LOG_ENABLED(INFO)) {
437       WLOG(INFO, this) << "WorkerEvent: client_fd=" << wev.client_fd
438                        << ", addrlen=" << wev.client_addrlen;
439     }
440 
441     if (worker_stat_.num_connections >= worker_connections) {
442 
443       if (LOG_ENABLED(INFO)) {
444         WLOG(INFO, this) << "Too many connections >= " << worker_connections;
445       }
446 
447       close(wev.client_fd);
448 
449       break;
450     }
451 
452     auto client_handler =
453         tls::accept_connection(this, wev.client_fd, &wev.client_addr.sa,
454                                wev.client_addrlen, wev.faddr);
455     if (!client_handler) {
456       if (LOG_ENABLED(INFO)) {
457         WLOG(ERROR, this) << "ClientHandler creation failed";
458       }
459       close(wev.client_fd);
460       break;
461     }
462 
463     if (LOG_ENABLED(INFO)) {
464       WLOG(INFO, this) << "CLIENT_HANDLER:" << client_handler << " created ";
465     }
466 
467     break;
468   }
469   case WorkerEventType::REOPEN_LOG:
470     WLOG(NOTICE, this) << "Reopening log files: worker process (thread " << this
471                        << ")";
472 
473     reopen_log_files(config->logging);
474 
475     break;
476   case WorkerEventType::GRACEFUL_SHUTDOWN:
477     WLOG(NOTICE, this) << "Graceful shutdown commencing";
478 
479     graceful_shutdown_ = true;
480 
481     if (worker_stat_.num_connections == 0) {
482       ev_break(loop_);
483 
484       return;
485     }
486 
487     break;
488   case WorkerEventType::REPLACE_DOWNSTREAM:
489     WLOG(NOTICE, this) << "Replace downstream";
490 
491     replace_downstream_config(wev.downstreamconf);
492 
493     break;
494   default:
495     if (LOG_ENABLED(INFO)) {
496       WLOG(INFO, this) << "unknown event type " << static_cast<int>(wev.type);
497     }
498   }
499 }
500 
get_cert_lookup_tree() const501 tls::CertLookupTree *Worker::get_cert_lookup_tree() const { return cert_tree_; }
502 
get_ticket_keys()503 std::shared_ptr<TicketKeys> Worker::get_ticket_keys() {
504 #ifdef HAVE_ATOMIC_STD_SHARED_PTR
505   return std::atomic_load_explicit(&ticket_keys_, std::memory_order_acquire);
506 #else  // !HAVE_ATOMIC_STD_SHARED_PTR
507   std::lock_guard<std::mutex> g(ticket_keys_m_);
508   return ticket_keys_;
509 #endif // !HAVE_ATOMIC_STD_SHARED_PTR
510 }
511 
set_ticket_keys(std::shared_ptr<TicketKeys> ticket_keys)512 void Worker::set_ticket_keys(std::shared_ptr<TicketKeys> ticket_keys) {
513 #ifdef HAVE_ATOMIC_STD_SHARED_PTR
514   // This is single writer
515   std::atomic_store_explicit(&ticket_keys_, std::move(ticket_keys),
516                              std::memory_order_release);
517 #else  // !HAVE_ATOMIC_STD_SHARED_PTR
518   std::lock_guard<std::mutex> g(ticket_keys_m_);
519   ticket_keys_ = std::move(ticket_keys);
520 #endif // !HAVE_ATOMIC_STD_SHARED_PTR
521 }
522 
get_worker_stat()523 WorkerStat *Worker::get_worker_stat() { return &worker_stat_; }
524 
get_loop() const525 struct ev_loop *Worker::get_loop() const {
526   return loop_;
527 }
528 
get_sv_ssl_ctx() const529 SSL_CTX *Worker::get_sv_ssl_ctx() const { return sv_ssl_ctx_; }
530 
get_cl_ssl_ctx() const531 SSL_CTX *Worker::get_cl_ssl_ctx() const { return cl_ssl_ctx_; }
532 
set_graceful_shutdown(bool f)533 void Worker::set_graceful_shutdown(bool f) { graceful_shutdown_ = f; }
534 
get_graceful_shutdown() const535 bool Worker::get_graceful_shutdown() const { return graceful_shutdown_; }
536 
get_mcpool()537 MemchunkPool *Worker::get_mcpool() { return &mcpool_; }
538 
get_session_cache_memcached_dispatcher()539 MemcachedDispatcher *Worker::get_session_cache_memcached_dispatcher() {
540   return session_cache_memcached_dispatcher_.get();
541 }
542 
get_randgen()543 std::mt19937 &Worker::get_randgen() { return randgen_; }
544 
545 #ifdef HAVE_MRUBY
create_mruby_context()546 int Worker::create_mruby_context() {
547   mruby_ctx_ = mruby::create_mruby_context(StringRef{get_config()->mruby_file});
548   if (!mruby_ctx_) {
549     return -1;
550   }
551 
552   return 0;
553 }
554 
get_mruby_context() const555 mruby::MRubyContext *Worker::get_mruby_context() const {
556   return mruby_ctx_.get();
557 }
558 #endif // HAVE_MRUBY
559 
560 std::vector<std::shared_ptr<DownstreamAddrGroup>> &
get_downstream_addr_groups()561 Worker::get_downstream_addr_groups() {
562   return downstream_addr_groups_;
563 }
564 
get_connect_blocker() const565 ConnectBlocker *Worker::get_connect_blocker() const {
566   return connect_blocker_.get();
567 }
568 
get_downstream_config() const569 const DownstreamConfig *Worker::get_downstream_config() const {
570   return downstreamconf_.get();
571 }
572 
get_connection_handler() const573 ConnectionHandler *Worker::get_connection_handler() const {
574   return conn_handler_;
575 }
576 
get_dns_tracker()577 DNSTracker *Worker::get_dns_tracker() { return &dns_tracker_; }
578 
579 namespace {
match_downstream_addr_group_host(const RouterConfig & routerconf,const StringRef & host,const StringRef & path,const std::vector<std::shared_ptr<DownstreamAddrGroup>> & groups,size_t catch_all,BlockAllocator & balloc)580 size_t match_downstream_addr_group_host(
581     const RouterConfig &routerconf, const StringRef &host,
582     const StringRef &path,
583     const std::vector<std::shared_ptr<DownstreamAddrGroup>> &groups,
584     size_t catch_all, BlockAllocator &balloc) {
585 
586   const auto &router = routerconf.router;
587   const auto &rev_wildcard_router = routerconf.rev_wildcard_router;
588   const auto &wildcard_patterns = routerconf.wildcard_patterns;
589 
590   if (LOG_ENABLED(INFO)) {
591     LOG(INFO) << "Perform mapping selection, using host=" << host
592               << ", path=" << path;
593   }
594 
595   auto group = router.match(host, path);
596   if (group != -1) {
597     if (LOG_ENABLED(INFO)) {
598       LOG(INFO) << "Found pattern with query " << host << path
599                 << ", matched pattern=" << groups[group]->pattern;
600     }
601     return group;
602   }
603 
604   if (!wildcard_patterns.empty() && !host.empty()) {
605     auto rev_host_src = make_byte_ref(balloc, host.size() - 1);
606     auto ep =
607         std::copy(std::begin(host) + 1, std::end(host), rev_host_src.base);
608     std::reverse(rev_host_src.base, ep);
609     auto rev_host = StringRef{rev_host_src.base, ep};
610 
611     ssize_t best_group = -1;
612     const RNode *last_node = nullptr;
613 
614     for (;;) {
615       size_t nread = 0;
616       auto wcidx =
617           rev_wildcard_router.match_prefix(&nread, &last_node, rev_host);
618       if (wcidx == -1) {
619         break;
620       }
621 
622       rev_host = StringRef{std::begin(rev_host) + nread, std::end(rev_host)};
623 
624       auto &wc = wildcard_patterns[wcidx];
625       auto group = wc.router.match(StringRef{}, path);
626       if (group != -1) {
627         // We sorted wildcard_patterns in a way that first match is the
628         // longest host pattern.
629         if (LOG_ENABLED(INFO)) {
630           LOG(INFO) << "Found wildcard pattern with query " << host << path
631                     << ", matched pattern=" << groups[group]->pattern;
632         }
633 
634         best_group = group;
635       }
636     }
637 
638     if (best_group != -1) {
639       return best_group;
640     }
641   }
642 
643   group = router.match(StringRef::from_lit(""), path);
644   if (group != -1) {
645     if (LOG_ENABLED(INFO)) {
646       LOG(INFO) << "Found pattern with query " << path
647                 << ", matched pattern=" << groups[group]->pattern;
648     }
649     return group;
650   }
651 
652   if (LOG_ENABLED(INFO)) {
653     LOG(INFO) << "None match.  Use catch-all pattern";
654   }
655   return catch_all;
656 }
657 } // namespace
658 
match_downstream_addr_group(const RouterConfig & routerconf,const StringRef & hostport,const StringRef & raw_path,const std::vector<std::shared_ptr<DownstreamAddrGroup>> & groups,size_t catch_all,BlockAllocator & balloc)659 size_t match_downstream_addr_group(
660     const RouterConfig &routerconf, const StringRef &hostport,
661     const StringRef &raw_path,
662     const std::vector<std::shared_ptr<DownstreamAddrGroup>> &groups,
663     size_t catch_all, BlockAllocator &balloc) {
664   if (std::find(std::begin(hostport), std::end(hostport), '/') !=
665       std::end(hostport)) {
666     // We use '/' specially, and if '/' is included in host, it breaks
667     // our code.  Select catch-all case.
668     return catch_all;
669   }
670 
671   auto fragment = std::find(std::begin(raw_path), std::end(raw_path), '#');
672   auto query = std::find(std::begin(raw_path), fragment, '?');
673   auto path = StringRef{std::begin(raw_path), query};
674 
675   if (path.empty() || path[0] != '/') {
676     path = StringRef::from_lit("/");
677   }
678 
679   if (hostport.empty()) {
680     return match_downstream_addr_group_host(routerconf, hostport, path, groups,
681                                             catch_all, balloc);
682   }
683 
684   StringRef host;
685   if (hostport[0] == '[') {
686     // assume this is IPv6 numeric address
687     auto p = std::find(std::begin(hostport), std::end(hostport), ']');
688     if (p == std::end(hostport)) {
689       return catch_all;
690     }
691     if (p + 1 < std::end(hostport) && *(p + 1) != ':') {
692       return catch_all;
693     }
694     host = StringRef{std::begin(hostport), p + 1};
695   } else {
696     auto p = std::find(std::begin(hostport), std::end(hostport), ':');
697     if (p == std::begin(hostport)) {
698       return catch_all;
699     }
700     host = StringRef{std::begin(hostport), p};
701   }
702 
703   if (std::find_if(std::begin(host), std::end(host), [](char c) {
704         return 'A' <= c || c <= 'Z';
705       }) != std::end(host)) {
706     auto low_host = make_byte_ref(balloc, host.size() + 1);
707     auto ep = std::copy(std::begin(host), std::end(host), low_host.base);
708     *ep = '\0';
709     util::inp_strlower(low_host.base, ep);
710     host = StringRef{low_host.base, ep};
711   }
712   return match_downstream_addr_group_host(routerconf, host, path, groups,
713                                           catch_all, balloc);
714 }
715 
downstream_failure(DownstreamAddr * addr,const Address * raddr)716 void downstream_failure(DownstreamAddr *addr, const Address *raddr) {
717   const auto &connect_blocker = addr->connect_blocker;
718 
719   if (connect_blocker->in_offline()) {
720     return;
721   }
722 
723   connect_blocker->on_failure();
724 
725   if (addr->fall == 0) {
726     return;
727   }
728 
729   auto fail_count = connect_blocker->get_fail_count();
730 
731   if (fail_count >= addr->fall) {
732     if (raddr) {
733       LOG(WARN) << "Could not connect to " << util::to_numeric_addr(raddr)
734                 << " " << fail_count
735                 << " times in a row; considered as offline";
736     } else {
737       LOG(WARN) << "Could not connect to " << addr->host << ":" << addr->port
738                 << " " << fail_count
739                 << " times in a row; considered as offline";
740     }
741 
742     connect_blocker->offline();
743 
744     if (addr->rise) {
745       addr->live_check->schedule();
746     }
747   }
748 }
749 
750 } // namespace shrpx
751