• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * nghttp2 - HTTP/2 C Library
3  *
4  * Copyright (c) 2012 Tatsuhiro Tsujikawa
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining
7  * a copy of this software and associated documentation files (the
8  * "Software"), to deal in the Software without restriction, including
9  * without limitation the rights to use, copy, modify, merge, publish,
10  * distribute, sublicense, and/or sell copies of the Software, and to
11  * permit persons to whom the Software is furnished to do so, subject to
12  * the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be
15  * included in all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21  * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22  * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23  * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24  */
25 #ifndef SHRPX_WORKER_H
26 #define SHRPX_WORKER_H
27 
28 #include "shrpx.h"
29 
30 #include <mutex>
31 #include <vector>
32 #include <random>
33 #include <unordered_map>
34 #include <deque>
35 #include <thread>
36 #include <queue>
37 #ifndef NOTHREADS
38 #  include <future>
39 #endif // NOTHREADS
40 
41 #include <openssl/ssl.h>
42 #include <openssl/err.h>
43 
44 #include <ev.h>
45 
46 #include "shrpx_config.h"
47 #include "shrpx_downstream_connection_pool.h"
48 #include "memchunk.h"
49 #include "shrpx_tls.h"
50 #include "shrpx_live_check.h"
51 #include "shrpx_connect_blocker.h"
52 #include "shrpx_dns_tracker.h"
53 #ifdef ENABLE_HTTP3
54 #  include "shrpx_quic_connection_handler.h"
55 #  include "shrpx_quic.h"
56 #endif // ENABLE_HTTP3
57 #include "allocator.h"
58 
59 using namespace nghttp2;
60 
61 namespace shrpx {
62 
63 class Http2Session;
64 class ConnectBlocker;
65 class MemcachedDispatcher;
66 struct UpstreamAddr;
67 class ConnectionHandler;
68 #ifdef ENABLE_HTTP3
69 class QUICListener;
70 #endif // ENABLE_HTTP3
71 
72 #ifdef HAVE_MRUBY
73 namespace mruby {
74 
75 class MRubyContext;
76 
77 } // namespace mruby
78 #endif // HAVE_MRUBY
79 
80 namespace tls {
81 class CertLookupTree;
82 } // namespace tls
83 
84 struct WeightGroup;
85 
86 struct DownstreamAddr {
87   Address addr;
88   // backend address.  If |host_unix| is true, this is UNIX domain
89   // socket path.
90   StringRef host;
91   StringRef hostport;
92   // backend port.  0 if |host_unix| is true.
93   uint16_t port;
94   // true if |host| contains UNIX domain socket path.
95   bool host_unix;
96 
97   // sni field to send remote server if TLS is enabled.
98   StringRef sni;
99 
100   std::unique_ptr<ConnectBlocker> connect_blocker;
101   std::unique_ptr<LiveCheck> live_check;
102   // Connection pool for this particular address if session affinity
103   // is enabled
104   std::unique_ptr<DownstreamConnectionPool> dconn_pool;
105   size_t fall;
106   size_t rise;
107   // Client side TLS session cache
108   tls::TLSSessionCache tls_session_cache;
109   // List of Http2Session which is not fully utilized (i.e., the
110   // server advertised maximum concurrency is not reached).  We will
111   // coalesce as much stream as possible in one Http2Session to fully
112   // utilize TCP connection.
113   DList<Http2Session> http2_extra_freelist;
114   WeightGroup *wg;
115   // total number of streams created in HTTP/2 connections for this
116   // address.
117   size_t num_dconn;
118   // the sequence number of this address to randomize the order access
119   // threads.
120   size_t seq;
121   // Application protocol used in this backend
122   Proto proto;
123   // cycle is used to prioritize this address.  Lower value takes
124   // higher priority.
125   uint32_t cycle;
126   // penalty which is applied to the next cycle calculation.
127   uint32_t pending_penalty;
128   // Weight of this address inside a weight group.  Its range is [1,
129   // 256], inclusive.
130   uint32_t weight;
131   // name of group which this address belongs to.
132   StringRef group;
133   // Weight of the weight group which this address belongs to.  Its
134   // range is [1, 256], inclusive.
135   uint32_t group_weight;
136   // affinity hash for this address.  It is assigned when strict
137   // stickiness is enabled.
138   uint32_t affinity_hash;
139   // true if TLS is used in this backend
140   bool tls;
141   // true if dynamic DNS is enabled
142   bool dns;
143   // true if :scheme pseudo header field should be upgraded to secure
144   // variant (e.g., "https") when forwarding request to a backend
145   // connected by TLS connection.
146   bool upgrade_scheme;
147   // true if this address is queued.
148   bool queued;
149 };
150 
151 constexpr uint32_t MAX_DOWNSTREAM_ADDR_WEIGHT = 256;
152 
153 struct DownstreamAddrEntry {
154   DownstreamAddr *addr;
155   size_t seq;
156   uint32_t cycle;
157 };
158 
159 struct DownstreamAddrEntryGreater {
operatorDownstreamAddrEntryGreater160   bool operator()(const DownstreamAddrEntry &lhs,
161                   const DownstreamAddrEntry &rhs) const {
162     auto d = lhs.cycle - rhs.cycle;
163     if (d == 0) {
164       return rhs.seq < lhs.seq;
165     }
166     return d <= 2 * MAX_DOWNSTREAM_ADDR_WEIGHT - 1;
167   }
168 };
169 
170 struct WeightGroup {
171   std::priority_queue<DownstreamAddrEntry, std::vector<DownstreamAddrEntry>,
172                       DownstreamAddrEntryGreater>
173       pq;
174   size_t seq;
175   uint32_t weight;
176   uint32_t cycle;
177   uint32_t pending_penalty;
178   // true if this object is queued.
179   bool queued;
180 };
181 
182 struct WeightGroupEntry {
183   WeightGroup *wg;
184   size_t seq;
185   uint32_t cycle;
186 };
187 
188 struct WeightGroupEntryGreater {
operatorWeightGroupEntryGreater189   bool operator()(const WeightGroupEntry &lhs,
190                   const WeightGroupEntry &rhs) const {
191     auto d = lhs.cycle - rhs.cycle;
192     if (d == 0) {
193       return rhs.seq < lhs.seq;
194     }
195     return d <= 2 * MAX_DOWNSTREAM_ADDR_WEIGHT - 1;
196   }
197 };
198 
199 struct SharedDownstreamAddr {
SharedDownstreamAddrSharedDownstreamAddr200   SharedDownstreamAddr()
201       : balloc(1024, 1024),
202         affinity{SessionAffinity::NONE},
203         redirect_if_not_tls{false},
204         dnf{false},
205         timeout{} {}
206 
207   SharedDownstreamAddr(const SharedDownstreamAddr &) = delete;
208   SharedDownstreamAddr(SharedDownstreamAddr &&) = delete;
209   SharedDownstreamAddr &operator=(const SharedDownstreamAddr &) = delete;
210   SharedDownstreamAddr &operator=(SharedDownstreamAddr &&) = delete;
211 
212   BlockAllocator balloc;
213   std::vector<DownstreamAddr> addrs;
214   std::vector<WeightGroup> wgs;
215   std::priority_queue<WeightGroupEntry, std::vector<WeightGroupEntry>,
216                       WeightGroupEntryGreater>
217       pq;
218   // Bunch of session affinity hash.  Only used if affinity ==
219   // SessionAffinity::IP.
220   std::vector<AffinityHash> affinity_hash;
221   // Maps affinity hash of each DownstreamAddr to its index in addrs.
222   // It is only assigned when strict stickiness is enabled.
223   std::unordered_map<uint32_t, size_t> affinity_hash_map;
224 #ifdef HAVE_MRUBY
225   std::shared_ptr<mruby::MRubyContext> mruby_ctx;
226 #endif // HAVE_MRUBY
227   // Configuration for session affinity
228   AffinityConfig affinity;
229   // Session affinity
230   // true if this group requires that client connection must be TLS,
231   // and the request must be redirected to https URI.
232   bool redirect_if_not_tls;
233   // true if a request should not be forwarded to a backend.
234   bool dnf;
235   // Timeouts for backend connection.
236   struct {
237     ev_tstamp read;
238     ev_tstamp write;
239   } timeout;
240 };
241 
242 struct DownstreamAddrGroup {
243   DownstreamAddrGroup();
244   ~DownstreamAddrGroup();
245 
246   DownstreamAddrGroup(const DownstreamAddrGroup &) = delete;
247   DownstreamAddrGroup(DownstreamAddrGroup &&) = delete;
248   DownstreamAddrGroup &operator=(const DownstreamAddrGroup &) = delete;
249   DownstreamAddrGroup &operator=(DownstreamAddrGroup &&) = delete;
250 
251   ImmutableString pattern;
252   std::shared_ptr<SharedDownstreamAddr> shared_addr;
253   // true if this group is no longer used for new request.  If this is
254   // true, the connection made using one of address in shared_addr
255   // must not be pooled.
256   bool retired;
257 };
258 
259 struct WorkerStat {
260   size_t num_connections;
261   size_t num_close_waits;
262 };
263 
264 #ifdef ENABLE_HTTP3
265 struct QUICPacket {
QUICPacketQUICPacket266   QUICPacket(size_t upstream_addr_index, const Address &remote_addr,
267              const Address &local_addr, const ngtcp2_pkt_info &pi,
268              const uint8_t *data, size_t datalen)
269       : upstream_addr_index{upstream_addr_index},
270         remote_addr{remote_addr},
271         local_addr{local_addr},
272         pi{pi},
273         data{data, data + datalen} {}
QUICPacketQUICPacket274   QUICPacket() : upstream_addr_index{}, remote_addr{}, local_addr{}, pi{} {}
275   size_t upstream_addr_index;
276   Address remote_addr;
277   Address local_addr;
278   ngtcp2_pkt_info pi;
279   std::vector<uint8_t> data;
280 };
281 #endif // ENABLE_HTTP3
282 
283 enum class WorkerEventType {
284   NEW_CONNECTION = 0x01,
285   REOPEN_LOG = 0x02,
286   GRACEFUL_SHUTDOWN = 0x03,
287   REPLACE_DOWNSTREAM = 0x04,
288 #ifdef ENABLE_HTTP3
289   QUIC_PKT_FORWARD = 0x05,
290 #endif // ENABLE_HTTP3
291 };
292 
293 struct WorkerEvent {
294   WorkerEventType type;
295   struct {
296     sockaddr_union client_addr;
297     size_t client_addrlen;
298     int client_fd;
299     const UpstreamAddr *faddr;
300   };
301   std::shared_ptr<TicketKeys> ticket_keys;
302   std::shared_ptr<DownstreamConfig> downstreamconf;
303 #ifdef ENABLE_HTTP3
304   std::unique_ptr<QUICPacket> quic_pkt;
305 #endif // ENABLE_HTTP3
306 };
307 
308 class Worker {
309 public:
310   Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
311          SSL_CTX *tls_session_cache_memcached_ssl_ctx,
312          tls::CertLookupTree *cert_tree,
313 #ifdef ENABLE_HTTP3
314          SSL_CTX *quic_sv_ssl_ctx, tls::CertLookupTree *quic_cert_tree,
315          const uint8_t *cid_prefix, size_t cid_prefixlen,
316 #  ifdef HAVE_LIBBPF
317          size_t index,
318 #  endif // HAVE_LIBBPF
319 #endif   // ENABLE_HTTP3
320          const std::shared_ptr<TicketKeys> &ticket_keys,
321          ConnectionHandler *conn_handler,
322          std::shared_ptr<DownstreamConfig> downstreamconf);
323   ~Worker();
324   void run_async();
325   void wait();
326   void process_events();
327   void send(WorkerEvent event);
328 
329   tls::CertLookupTree *get_cert_lookup_tree() const;
330 #ifdef ENABLE_HTTP3
331   tls::CertLookupTree *get_quic_cert_lookup_tree() const;
332 #endif // ENABLE_HTTP3
333 
334   // These 2 functions make a lock m_ to get/set ticket keys
335   // atomically.
336   std::shared_ptr<TicketKeys> get_ticket_keys();
337   void set_ticket_keys(std::shared_ptr<TicketKeys> ticket_keys);
338 
339   WorkerStat *get_worker_stat();
340   struct ev_loop *get_loop() const;
341   SSL_CTX *get_sv_ssl_ctx() const;
342   SSL_CTX *get_cl_ssl_ctx() const;
343 #ifdef ENABLE_HTTP3
344   SSL_CTX *get_quic_sv_ssl_ctx() const;
345 #endif // ENABLE_HTTP3
346 
347   void set_graceful_shutdown(bool f);
348   bool get_graceful_shutdown() const;
349 
350   MemchunkPool *get_mcpool();
351   void schedule_clear_mcpool();
352 
353   MemcachedDispatcher *get_session_cache_memcached_dispatcher();
354 
355   std::mt19937 &get_randgen();
356 
357 #ifdef HAVE_MRUBY
358   int create_mruby_context();
359 
360   mruby::MRubyContext *get_mruby_context() const;
361 #endif // HAVE_MRUBY
362 
363   std::vector<std::shared_ptr<DownstreamAddrGroup>> &
364   get_downstream_addr_groups();
365 
366   ConnectBlocker *get_connect_blocker() const;
367 
368   const DownstreamConfig *get_downstream_config() const;
369 
370   void
371   replace_downstream_config(std::shared_ptr<DownstreamConfig> downstreamconf);
372 
373   ConnectionHandler *get_connection_handler() const;
374 
375 #ifdef ENABLE_HTTP3
376   QUICConnectionHandler *get_quic_connection_handler();
377 
378   int setup_quic_server_socket();
379 
380   const uint8_t *get_cid_prefix() const;
381 
382 #  ifdef HAVE_LIBBPF
383   bool should_attach_bpf() const;
384 
385   bool should_update_bpf_map() const;
386 
387   uint32_t compute_sk_index() const;
388 #  endif // HAVE_LIBBPF
389 
390   int create_quic_server_socket(UpstreamAddr &addr);
391 
392   // Returns a pointer to UpstreamAddr which matches |local_addr|.
393   const UpstreamAddr *find_quic_upstream_addr(const Address &local_addr);
394 #endif // ENABLE_HTTP3
395 
396   DNSTracker *get_dns_tracker();
397 
398 private:
399 #ifndef NOTHREADS
400   std::future<void> fut_;
401 #endif // NOTHREADS
402 #if defined(ENABLE_HTTP3) && defined(HAVE_LIBBPF)
403   // Unique index of this worker.
404   size_t index_;
405 #endif // ENABLE_HTTP3 && HAVE_LIBBPF
406   std::mutex m_;
407   std::deque<WorkerEvent> q_;
408   std::mt19937 randgen_;
409   ev_async w_;
410   ev_timer mcpool_clear_timer_;
411   ev_timer proc_wev_timer_;
412   MemchunkPool mcpool_;
413   WorkerStat worker_stat_;
414   DNSTracker dns_tracker_;
415 
416 #ifdef ENABLE_HTTP3
417   std::array<uint8_t, SHRPX_QUIC_CID_PREFIXLEN> cid_prefix_;
418   std::vector<UpstreamAddr> quic_upstream_addrs_;
419   std::vector<std::unique_ptr<QUICListener>> quic_listeners_;
420 #endif // ENABLE_HTTP3
421 
422   std::shared_ptr<DownstreamConfig> downstreamconf_;
423   std::unique_ptr<MemcachedDispatcher> session_cache_memcached_dispatcher_;
424 #ifdef HAVE_MRUBY
425   std::unique_ptr<mruby::MRubyContext> mruby_ctx_;
426 #endif // HAVE_MRUBY
427   struct ev_loop *loop_;
428 
429   // Following fields are shared across threads if
430   // get_config()->tls_ctx_per_worker == true.
431   SSL_CTX *sv_ssl_ctx_;
432   SSL_CTX *cl_ssl_ctx_;
433   tls::CertLookupTree *cert_tree_;
434   ConnectionHandler *conn_handler_;
435 #ifdef ENABLE_HTTP3
436   SSL_CTX *quic_sv_ssl_ctx_;
437   tls::CertLookupTree *quic_cert_tree_;
438 
439   QUICConnectionHandler quic_conn_handler_;
440 #endif // ENABLE_HTTP3
441 
442 #ifndef HAVE_ATOMIC_STD_SHARED_PTR
443   std::mutex ticket_keys_m_;
444 #endif // !HAVE_ATOMIC_STD_SHARED_PTR
445   std::shared_ptr<TicketKeys> ticket_keys_;
446   std::vector<std::shared_ptr<DownstreamAddrGroup>> downstream_addr_groups_;
447   // Worker level blocker for downstream connection.  For example,
448   // this is used when file descriptor is exhausted.
449   std::unique_ptr<ConnectBlocker> connect_blocker_;
450 
451   bool graceful_shutdown_;
452 };
453 
454 // Selects group based on request's |hostport| and |path|.  |hostport|
455 // is the value taken from :authority or host header field, and may
456 // contain port.  The |path| may contain query part.  We require the
457 // catch-all pattern in place, so this function always selects one
458 // group.  The catch-all group index is given in |catch_all|.  All
459 // patterns are given in |groups|.
460 size_t match_downstream_addr_group(
461     const RouterConfig &routerconfig, const StringRef &hostport,
462     const StringRef &path,
463     const std::vector<std::shared_ptr<DownstreamAddrGroup>> &groups,
464     size_t catch_all, BlockAllocator &balloc);
465 
466 // Calls this function if connecting to backend failed.  |raddr| is
467 // the actual address used to connect to backend, and it could be
468 // nullptr.  This function may schedule live check.
469 void downstream_failure(DownstreamAddr *addr, const Address *raddr);
470 
471 #ifdef ENABLE_HTTP3
472 // Creates unpredictable SHRPX_QUIC_CID_PREFIXLEN bytes sequence which
473 // is used as a prefix of QUIC Connection ID.  This function returns
474 // -1 on failure.  |server_id| must be 2 bytes long.
475 int create_cid_prefix(uint8_t *cid_prefix, const uint8_t *server_id);
476 #endif // ENABLE_HTTP3
477 
478 } // namespace shrpx
479 
480 #endif // SHRPX_WORKER_H
481