• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * nghttp2 - HTTP/2 C Library
3  *
4  * Copyright (c) 2012 Tatsuhiro Tsujikawa
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining
7  * a copy of this software and associated documentation files (the
8  * "Software"), to deal in the Software without restriction, including
9  * without limitation the rights to use, copy, modify, merge, publish,
10  * distribute, sublicense, and/or sell copies of the Software, and to
11  * permit persons to whom the Software is furnished to do so, subject to
12  * the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be
15  * included in all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21  * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22  * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23  * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24  */
25 #ifndef SHRPX_WORKER_H
26 #define SHRPX_WORKER_H
27 
28 #include "shrpx.h"
29 
30 #include <mutex>
31 #include <vector>
32 #include <random>
33 #include <unordered_map>
34 #include <deque>
35 #include <thread>
36 #include <queue>
37 #ifndef NOTHREADS
38 #  include <future>
39 #endif // NOTHREADS
40 
41 #include "ssl_compat.h"
42 
43 #ifdef NGHTTP2_OPENSSL_IS_WOLFSSL
44 #  include <wolfssl/options.h>
45 #  include <wolfssl/openssl/ssl.h>
46 #  include <wolfssl/openssl/err.h>
47 #else // !NGHTTP2_OPENSSL_IS_WOLFSSL
48 #  include <openssl/ssl.h>
49 #  include <openssl/err.h>
50 #endif // !NGHTTP2_OPENSSL_IS_WOLFSSL
51 
52 #include <ev.h>
53 
54 #include "shrpx_config.h"
55 #include "shrpx_downstream_connection_pool.h"
56 #include "memchunk.h"
57 #include "shrpx_tls.h"
58 #include "shrpx_live_check.h"
59 #include "shrpx_connect_blocker.h"
60 #include "shrpx_dns_tracker.h"
61 #ifdef ENABLE_HTTP3
62 #  include "shrpx_quic_connection_handler.h"
63 #  include "shrpx_quic.h"
64 #endif // ENABLE_HTTP3
65 #include "allocator.h"
66 
67 using namespace nghttp2;
68 
69 namespace shrpx {
70 
71 class Http2Session;
72 class ConnectBlocker;
73 class MemcachedDispatcher;
74 struct UpstreamAddr;
75 class ConnectionHandler;
76 #ifdef ENABLE_HTTP3
77 class QUICListener;
78 #endif // ENABLE_HTTP3
79 
80 #ifdef HAVE_MRUBY
81 namespace mruby {
82 
83 class MRubyContext;
84 
85 } // namespace mruby
86 #endif // HAVE_MRUBY
87 
88 namespace tls {
89 class CertLookupTree;
90 } // namespace tls
91 
92 struct WeightGroup;
93 
94 struct DownstreamAddr {
95   Address addr;
96   // backend address.  If |host_unix| is true, this is UNIX domain
97   // socket path.
98   StringRef host;
99   StringRef hostport;
100   // backend port.  0 if |host_unix| is true.
101   uint16_t port;
102   // true if |host| contains UNIX domain socket path.
103   bool host_unix;
104 
105   // sni field to send remote server if TLS is enabled.
106   StringRef sni;
107 
108   std::unique_ptr<ConnectBlocker> connect_blocker;
109   std::unique_ptr<LiveCheck> live_check;
110   // Connection pool for this particular address if session affinity
111   // is enabled
112   std::unique_ptr<DownstreamConnectionPool> dconn_pool;
113   size_t fall;
114   size_t rise;
115   // Client side TLS session cache
116   tls::TLSSessionCache tls_session_cache;
117   // List of Http2Session which is not fully utilized (i.e., the
118   // server advertised maximum concurrency is not reached).  We will
119   // coalesce as much stream as possible in one Http2Session to fully
120   // utilize TCP connection.
121   DList<Http2Session> http2_extra_freelist;
122   WeightGroup *wg;
123   // total number of streams created in HTTP/2 connections for this
124   // address.
125   size_t num_dconn;
126   // the sequence number of this address to randomize the order access
127   // threads.
128   size_t seq;
129   // Application protocol used in this backend
130   Proto proto;
131   // cycle is used to prioritize this address.  Lower value takes
132   // higher priority.
133   uint32_t cycle;
134   // penalty which is applied to the next cycle calculation.
135   uint32_t pending_penalty;
136   // Weight of this address inside a weight group.  Its range is [1,
137   // 256], inclusive.
138   uint32_t weight;
139   // name of group which this address belongs to.
140   StringRef group;
141   // Weight of the weight group which this address belongs to.  Its
142   // range is [1, 256], inclusive.
143   uint32_t group_weight;
144   // affinity hash for this address.  It is assigned when strict
145   // stickiness is enabled.
146   uint32_t affinity_hash;
147   // true if TLS is used in this backend
148   bool tls;
149   // true if dynamic DNS is enabled
150   bool dns;
151   // true if :scheme pseudo header field should be upgraded to secure
152   // variant (e.g., "https") when forwarding request to a backend
153   // connected by TLS connection.
154   bool upgrade_scheme;
155   // true if this address is queued.
156   bool queued;
157 };
158 
159 constexpr uint32_t MAX_DOWNSTREAM_ADDR_WEIGHT = 256;
160 
161 struct DownstreamAddrEntry {
162   DownstreamAddr *addr;
163   size_t seq;
164   uint32_t cycle;
165 };
166 
167 struct DownstreamAddrEntryGreater {
operatorDownstreamAddrEntryGreater168   bool operator()(const DownstreamAddrEntry &lhs,
169                   const DownstreamAddrEntry &rhs) const {
170     auto d = lhs.cycle - rhs.cycle;
171     if (d == 0) {
172       return rhs.seq < lhs.seq;
173     }
174     return d <= 2 * MAX_DOWNSTREAM_ADDR_WEIGHT - 1;
175   }
176 };
177 
178 struct WeightGroup {
179   std::priority_queue<DownstreamAddrEntry, std::vector<DownstreamAddrEntry>,
180                       DownstreamAddrEntryGreater>
181     pq;
182   size_t seq;
183   uint32_t weight;
184   uint32_t cycle;
185   uint32_t pending_penalty;
186   // true if this object is queued.
187   bool queued;
188 };
189 
190 struct WeightGroupEntry {
191   WeightGroup *wg;
192   size_t seq;
193   uint32_t cycle;
194 };
195 
196 struct WeightGroupEntryGreater {
operatorWeightGroupEntryGreater197   bool operator()(const WeightGroupEntry &lhs,
198                   const WeightGroupEntry &rhs) const {
199     auto d = lhs.cycle - rhs.cycle;
200     if (d == 0) {
201       return rhs.seq < lhs.seq;
202     }
203     return d <= 2 * MAX_DOWNSTREAM_ADDR_WEIGHT - 1;
204   }
205 };
206 
207 struct SharedDownstreamAddr {
SharedDownstreamAddrSharedDownstreamAddr208   SharedDownstreamAddr()
209     : balloc(1024, 1024),
210       affinity{SessionAffinity::NONE},
211       redirect_if_not_tls{false},
212       dnf{false},
213       timeout{} {}
214 
215   SharedDownstreamAddr(const SharedDownstreamAddr &) = delete;
216   SharedDownstreamAddr(SharedDownstreamAddr &&) = delete;
217   SharedDownstreamAddr &operator=(const SharedDownstreamAddr &) = delete;
218   SharedDownstreamAddr &operator=(SharedDownstreamAddr &&) = delete;
219 
220   BlockAllocator balloc;
221   std::vector<DownstreamAddr> addrs;
222   std::vector<WeightGroup> wgs;
223   std::priority_queue<WeightGroupEntry, std::vector<WeightGroupEntry>,
224                       WeightGroupEntryGreater>
225     pq;
226   // Bunch of session affinity hash.  Only used if affinity ==
227   // SessionAffinity::IP.
228   std::vector<AffinityHash> affinity_hash;
229   // Maps affinity hash of each DownstreamAddr to its index in addrs.
230   // It is only assigned when strict stickiness is enabled.
231   std::unordered_map<uint32_t, size_t> affinity_hash_map;
232 #ifdef HAVE_MRUBY
233   std::shared_ptr<mruby::MRubyContext> mruby_ctx;
234 #endif // HAVE_MRUBY
235   // Configuration for session affinity
236   AffinityConfig affinity;
237   // Session affinity
238   // true if this group requires that client connection must be TLS,
239   // and the request must be redirected to https URI.
240   bool redirect_if_not_tls;
241   // true if a request should not be forwarded to a backend.
242   bool dnf;
243   // Timeouts for backend connection.
244   struct {
245     ev_tstamp read;
246     ev_tstamp write;
247   } timeout;
248 };
249 
250 struct DownstreamAddrGroup {
251   DownstreamAddrGroup();
252   ~DownstreamAddrGroup();
253 
254   DownstreamAddrGroup(const DownstreamAddrGroup &) = delete;
255   DownstreamAddrGroup(DownstreamAddrGroup &&) = delete;
256   DownstreamAddrGroup &operator=(const DownstreamAddrGroup &) = delete;
257   DownstreamAddrGroup &operator=(DownstreamAddrGroup &&) = delete;
258 
259   ImmutableString pattern;
260   std::shared_ptr<SharedDownstreamAddr> shared_addr;
261   // true if this group is no longer used for new request.  If this is
262   // true, the connection made using one of address in shared_addr
263   // must not be pooled.
264   bool retired;
265 };
266 
267 struct WorkerStat {
268   size_t num_connections;
269   size_t num_close_waits;
270 };
271 
272 #ifdef ENABLE_HTTP3
273 struct QUICPacket {
QUICPacketQUICPacket274   QUICPacket(size_t upstream_addr_index, const Address &remote_addr,
275              const Address &local_addr, const ngtcp2_pkt_info &pi,
276              std::span<const uint8_t> data)
277     : upstream_addr_index{upstream_addr_index},
278       remote_addr{remote_addr},
279       local_addr{local_addr},
280       pi{pi},
281       data{std::begin(data), std::end(data)} {}
QUICPacketQUICPacket282   QUICPacket() : upstream_addr_index{}, remote_addr{}, local_addr{}, pi{} {}
283   size_t upstream_addr_index;
284   Address remote_addr;
285   Address local_addr;
286   ngtcp2_pkt_info pi;
287   std::vector<uint8_t> data;
288 };
289 #endif // ENABLE_HTTP3
290 
291 enum class WorkerEventType {
292   NEW_CONNECTION = 0x01,
293   REOPEN_LOG = 0x02,
294   GRACEFUL_SHUTDOWN = 0x03,
295   REPLACE_DOWNSTREAM = 0x04,
296 #ifdef ENABLE_HTTP3
297   QUIC_PKT_FORWARD = 0x05,
298 #endif // ENABLE_HTTP3
299 };
300 
301 struct WorkerEvent {
302   WorkerEventType type;
303   struct {
304     sockaddr_union client_addr;
305     size_t client_addrlen;
306     int client_fd;
307     const UpstreamAddr *faddr;
308   };
309   std::shared_ptr<TicketKeys> ticket_keys;
310   std::shared_ptr<DownstreamConfig> downstreamconf;
311 #ifdef ENABLE_HTTP3
312   std::unique_ptr<QUICPacket> quic_pkt;
313 #endif // ENABLE_HTTP3
314 };
315 
316 class Worker {
317 public:
318   Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
319          SSL_CTX *tls_session_cache_memcached_ssl_ctx,
320          tls::CertLookupTree *cert_tree,
321 #ifdef ENABLE_HTTP3
322          SSL_CTX *quic_sv_ssl_ctx, tls::CertLookupTree *quic_cert_tree,
323          WorkerID wid,
324 #  ifdef HAVE_LIBBPF
325          size_t index,
326 #  endif // HAVE_LIBBPF
327 #endif   // ENABLE_HTTP3
328          const std::shared_ptr<TicketKeys> &ticket_keys,
329          ConnectionHandler *conn_handler,
330          std::shared_ptr<DownstreamConfig> downstreamconf);
331   ~Worker();
332   void run_async();
333   void wait();
334   void process_events();
335   void send(WorkerEvent event);
336 
337   tls::CertLookupTree *get_cert_lookup_tree() const;
338 #ifdef ENABLE_HTTP3
339   tls::CertLookupTree *get_quic_cert_lookup_tree() const;
340 #endif // ENABLE_HTTP3
341 
342   // These 2 functions make a lock m_ to get/set ticket keys
343   // atomically.
344   std::shared_ptr<TicketKeys> get_ticket_keys();
345   void set_ticket_keys(std::shared_ptr<TicketKeys> ticket_keys);
346 
347   WorkerStat *get_worker_stat();
348   struct ev_loop *get_loop() const;
349   SSL_CTX *get_sv_ssl_ctx() const;
350   SSL_CTX *get_cl_ssl_ctx() const;
351 #ifdef ENABLE_HTTP3
352   SSL_CTX *get_quic_sv_ssl_ctx() const;
353 #endif // ENABLE_HTTP3
354 
355   void set_graceful_shutdown(bool f);
356   bool get_graceful_shutdown() const;
357 
358   MemchunkPool *get_mcpool();
359   void schedule_clear_mcpool();
360 
361   MemcachedDispatcher *get_session_cache_memcached_dispatcher();
362 
363   std::mt19937 &get_randgen();
364 
365 #ifdef HAVE_MRUBY
366   int create_mruby_context();
367 
368   mruby::MRubyContext *get_mruby_context() const;
369 #endif // HAVE_MRUBY
370 
371   std::vector<std::shared_ptr<DownstreamAddrGroup>> &
372   get_downstream_addr_groups();
373 
374   ConnectBlocker *get_connect_blocker() const;
375 
376   const DownstreamConfig *get_downstream_config() const;
377 
378   void
379   replace_downstream_config(std::shared_ptr<DownstreamConfig> downstreamconf);
380 
381   ConnectionHandler *get_connection_handler() const;
382 
383 #ifdef ENABLE_HTTP3
384   QUICConnectionHandler *get_quic_connection_handler();
385 
386   int setup_quic_server_socket();
387 
388   const WorkerID &get_worker_id() const;
389 
390 #  ifdef HAVE_LIBBPF
391   bool should_attach_bpf() const;
392 
393   bool should_update_bpf_map() const;
394 
395   uint32_t compute_sk_index() const;
396 #  endif // HAVE_LIBBPF
397 
398   int create_quic_server_socket(UpstreamAddr &addr);
399 
400   // Returns a pointer to UpstreamAddr which matches |local_addr|.
401   const UpstreamAddr *find_quic_upstream_addr(const Address &local_addr);
402 #endif // ENABLE_HTTP3
403 
404   DNSTracker *get_dns_tracker();
405 
406 private:
407 #ifndef NOTHREADS
408   std::future<void> fut_;
409 #endif // NOTHREADS
410 #if defined(ENABLE_HTTP3) && defined(HAVE_LIBBPF)
411   // Unique index of this worker.
412   size_t index_;
413 #endif // ENABLE_HTTP3 && HAVE_LIBBPF
414   std::mutex m_;
415   std::deque<WorkerEvent> q_;
416   std::mt19937 randgen_;
417   ev_async w_;
418   ev_timer mcpool_clear_timer_;
419   ev_timer proc_wev_timer_;
420   MemchunkPool mcpool_;
421   WorkerStat worker_stat_;
422   DNSTracker dns_tracker_;
423 
424 #ifdef ENABLE_HTTP3
425   WorkerID worker_id_;
426   std::vector<UpstreamAddr> quic_upstream_addrs_;
427   std::vector<std::unique_ptr<QUICListener>> quic_listeners_;
428 #endif // ENABLE_HTTP3
429 
430   std::shared_ptr<DownstreamConfig> downstreamconf_;
431   std::unique_ptr<MemcachedDispatcher> session_cache_memcached_dispatcher_;
432 #ifdef HAVE_MRUBY
433   std::unique_ptr<mruby::MRubyContext> mruby_ctx_;
434 #endif // HAVE_MRUBY
435   struct ev_loop *loop_;
436 
437   // Following fields are shared across threads if
438   // get_config()->tls_ctx_per_worker == true.
439   SSL_CTX *sv_ssl_ctx_;
440   SSL_CTX *cl_ssl_ctx_;
441   tls::CertLookupTree *cert_tree_;
442   ConnectionHandler *conn_handler_;
443 #ifdef ENABLE_HTTP3
444   SSL_CTX *quic_sv_ssl_ctx_;
445   tls::CertLookupTree *quic_cert_tree_;
446 
447   QUICConnectionHandler quic_conn_handler_;
448 #endif // ENABLE_HTTP3
449 
450 #ifdef HAVE_ATOMIC_STD_SHARED_PTR
451   std::atomic<std::shared_ptr<TicketKeys>> ticket_keys_;
452 #else  // !HAVE_ATOMIC_STD_SHARED_PTR
453   std::mutex ticket_keys_m_;
454   std::shared_ptr<TicketKeys> ticket_keys_;
455 #endif // !HAVE_ATOMIC_STD_SHARED_PTR
456   std::vector<std::shared_ptr<DownstreamAddrGroup>> downstream_addr_groups_;
457   // Worker level blocker for downstream connection.  For example,
458   // this is used when file descriptor is exhausted.
459   std::unique_ptr<ConnectBlocker> connect_blocker_;
460 
461   bool graceful_shutdown_;
462 };
463 
464 // Selects group based on request's |hostport| and |path|.  |hostport|
465 // is the value taken from :authority or host header field, and may
466 // contain port.  The |path| may contain query part.  We require the
467 // catch-all pattern in place, so this function always selects one
468 // group.  The catch-all group index is given in |catch_all|.  All
469 // patterns are given in |groups|.
470 size_t match_downstream_addr_group(
471   const RouterConfig &routerconfig, const StringRef &hostport,
472   const StringRef &path,
473   const std::vector<std::shared_ptr<DownstreamAddrGroup>> &groups,
474   size_t catch_all, BlockAllocator &balloc);
475 
476 // Calls this function if connecting to backend failed.  |raddr| is
477 // the actual address used to connect to backend, and it could be
478 // nullptr.  This function may schedule live check.
479 void downstream_failure(DownstreamAddr *addr, const Address *raddr);
480 
481 } // namespace shrpx
482 
483 #endif // SHRPX_WORKER_H
484