1 /* 2 * nghttp2 - HTTP/2 C Library 3 * 4 * Copyright (c) 2012 Tatsuhiro Tsujikawa 5 * 6 * Permission is hereby granted, free of charge, to any person obtaining 7 * a copy of this software and associated documentation files (the 8 * "Software"), to deal in the Software without restriction, including 9 * without limitation the rights to use, copy, modify, merge, publish, 10 * distribute, sublicense, and/or sell copies of the Software, and to 11 * permit persons to whom the Software is furnished to do so, subject to 12 * the following conditions: 13 * 14 * The above copyright notice and this permission notice shall be 15 * included in all copies or substantial portions of the Software. 16 * 17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 18 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 19 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 20 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE 21 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION 22 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION 23 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 24 */ 25 #ifndef SHRPX_WORKER_H 26 #define SHRPX_WORKER_H 27 28 #include "shrpx.h" 29 30 #include <mutex> 31 #include <vector> 32 #include <random> 33 #include <unordered_map> 34 #include <deque> 35 #include <thread> 36 #include <queue> 37 #ifndef NOTHREADS 38 # include <future> 39 #endif // NOTHREADS 40 41 #include "ssl_compat.h" 42 43 #ifdef NGHTTP2_OPENSSL_IS_WOLFSSL 44 # include <wolfssl/options.h> 45 # include <wolfssl/openssl/ssl.h> 46 # include <wolfssl/openssl/err.h> 47 #else // !NGHTTP2_OPENSSL_IS_WOLFSSL 48 # include <openssl/ssl.h> 49 # include <openssl/err.h> 50 #endif // !NGHTTP2_OPENSSL_IS_WOLFSSL 51 52 #include <ev.h> 53 54 #include "shrpx_config.h" 55 #include "shrpx_downstream_connection_pool.h" 56 #include "memchunk.h" 57 #include "shrpx_tls.h" 58 #include "shrpx_live_check.h" 59 #include "shrpx_connect_blocker.h" 60 #include "shrpx_dns_tracker.h" 61 #ifdef ENABLE_HTTP3 62 # include "shrpx_quic_connection_handler.h" 63 # include "shrpx_quic.h" 64 #endif // ENABLE_HTTP3 65 #include "allocator.h" 66 67 using namespace nghttp2; 68 69 namespace shrpx { 70 71 class Http2Session; 72 class ConnectBlocker; 73 class MemcachedDispatcher; 74 struct UpstreamAddr; 75 class ConnectionHandler; 76 #ifdef ENABLE_HTTP3 77 class QUICListener; 78 #endif // ENABLE_HTTP3 79 80 #ifdef HAVE_MRUBY 81 namespace mruby { 82 83 class MRubyContext; 84 85 } // namespace mruby 86 #endif // HAVE_MRUBY 87 88 namespace tls { 89 class CertLookupTree; 90 } // namespace tls 91 92 struct WeightGroup; 93 94 struct DownstreamAddr { 95 Address addr; 96 // backend address. If |host_unix| is true, this is UNIX domain 97 // socket path. 98 StringRef host; 99 StringRef hostport; 100 // backend port. 0 if |host_unix| is true. 101 uint16_t port; 102 // true if |host| contains UNIX domain socket path. 103 bool host_unix; 104 105 // sni field to send remote server if TLS is enabled. 106 StringRef sni; 107 108 std::unique_ptr<ConnectBlocker> connect_blocker; 109 std::unique_ptr<LiveCheck> live_check; 110 // Connection pool for this particular address if session affinity 111 // is enabled 112 std::unique_ptr<DownstreamConnectionPool> dconn_pool; 113 size_t fall; 114 size_t rise; 115 // Client side TLS session cache 116 tls::TLSSessionCache tls_session_cache; 117 // List of Http2Session which is not fully utilized (i.e., the 118 // server advertised maximum concurrency is not reached). We will 119 // coalesce as much stream as possible in one Http2Session to fully 120 // utilize TCP connection. 121 DList<Http2Session> http2_extra_freelist; 122 WeightGroup *wg; 123 // total number of streams created in HTTP/2 connections for this 124 // address. 125 size_t num_dconn; 126 // the sequence number of this address to randomize the order access 127 // threads. 128 size_t seq; 129 // Application protocol used in this backend 130 Proto proto; 131 // cycle is used to prioritize this address. Lower value takes 132 // higher priority. 133 uint32_t cycle; 134 // penalty which is applied to the next cycle calculation. 135 uint32_t pending_penalty; 136 // Weight of this address inside a weight group. Its range is [1, 137 // 256], inclusive. 138 uint32_t weight; 139 // name of group which this address belongs to. 140 StringRef group; 141 // Weight of the weight group which this address belongs to. Its 142 // range is [1, 256], inclusive. 143 uint32_t group_weight; 144 // affinity hash for this address. It is assigned when strict 145 // stickiness is enabled. 146 uint32_t affinity_hash; 147 // true if TLS is used in this backend 148 bool tls; 149 // true if dynamic DNS is enabled 150 bool dns; 151 // true if :scheme pseudo header field should be upgraded to secure 152 // variant (e.g., "https") when forwarding request to a backend 153 // connected by TLS connection. 154 bool upgrade_scheme; 155 // true if this address is queued. 156 bool queued; 157 }; 158 159 constexpr uint32_t MAX_DOWNSTREAM_ADDR_WEIGHT = 256; 160 161 struct DownstreamAddrEntry { 162 DownstreamAddr *addr; 163 size_t seq; 164 uint32_t cycle; 165 }; 166 167 struct DownstreamAddrEntryGreater { operatorDownstreamAddrEntryGreater168 bool operator()(const DownstreamAddrEntry &lhs, 169 const DownstreamAddrEntry &rhs) const { 170 auto d = lhs.cycle - rhs.cycle; 171 if (d == 0) { 172 return rhs.seq < lhs.seq; 173 } 174 return d <= 2 * MAX_DOWNSTREAM_ADDR_WEIGHT - 1; 175 } 176 }; 177 178 struct WeightGroup { 179 std::priority_queue<DownstreamAddrEntry, std::vector<DownstreamAddrEntry>, 180 DownstreamAddrEntryGreater> 181 pq; 182 size_t seq; 183 uint32_t weight; 184 uint32_t cycle; 185 uint32_t pending_penalty; 186 // true if this object is queued. 187 bool queued; 188 }; 189 190 struct WeightGroupEntry { 191 WeightGroup *wg; 192 size_t seq; 193 uint32_t cycle; 194 }; 195 196 struct WeightGroupEntryGreater { operatorWeightGroupEntryGreater197 bool operator()(const WeightGroupEntry &lhs, 198 const WeightGroupEntry &rhs) const { 199 auto d = lhs.cycle - rhs.cycle; 200 if (d == 0) { 201 return rhs.seq < lhs.seq; 202 } 203 return d <= 2 * MAX_DOWNSTREAM_ADDR_WEIGHT - 1; 204 } 205 }; 206 207 struct SharedDownstreamAddr { SharedDownstreamAddrSharedDownstreamAddr208 SharedDownstreamAddr() 209 : balloc(1024, 1024), 210 affinity{SessionAffinity::NONE}, 211 redirect_if_not_tls{false}, 212 dnf{false}, 213 timeout{} {} 214 215 SharedDownstreamAddr(const SharedDownstreamAddr &) = delete; 216 SharedDownstreamAddr(SharedDownstreamAddr &&) = delete; 217 SharedDownstreamAddr &operator=(const SharedDownstreamAddr &) = delete; 218 SharedDownstreamAddr &operator=(SharedDownstreamAddr &&) = delete; 219 220 BlockAllocator balloc; 221 std::vector<DownstreamAddr> addrs; 222 std::vector<WeightGroup> wgs; 223 std::priority_queue<WeightGroupEntry, std::vector<WeightGroupEntry>, 224 WeightGroupEntryGreater> 225 pq; 226 // Bunch of session affinity hash. Only used if affinity == 227 // SessionAffinity::IP. 228 std::vector<AffinityHash> affinity_hash; 229 // Maps affinity hash of each DownstreamAddr to its index in addrs. 230 // It is only assigned when strict stickiness is enabled. 231 std::unordered_map<uint32_t, size_t> affinity_hash_map; 232 #ifdef HAVE_MRUBY 233 std::shared_ptr<mruby::MRubyContext> mruby_ctx; 234 #endif // HAVE_MRUBY 235 // Configuration for session affinity 236 AffinityConfig affinity; 237 // Session affinity 238 // true if this group requires that client connection must be TLS, 239 // and the request must be redirected to https URI. 240 bool redirect_if_not_tls; 241 // true if a request should not be forwarded to a backend. 242 bool dnf; 243 // Timeouts for backend connection. 244 struct { 245 ev_tstamp read; 246 ev_tstamp write; 247 } timeout; 248 }; 249 250 struct DownstreamAddrGroup { 251 DownstreamAddrGroup(); 252 ~DownstreamAddrGroup(); 253 254 DownstreamAddrGroup(const DownstreamAddrGroup &) = delete; 255 DownstreamAddrGroup(DownstreamAddrGroup &&) = delete; 256 DownstreamAddrGroup &operator=(const DownstreamAddrGroup &) = delete; 257 DownstreamAddrGroup &operator=(DownstreamAddrGroup &&) = delete; 258 259 ImmutableString pattern; 260 std::shared_ptr<SharedDownstreamAddr> shared_addr; 261 // true if this group is no longer used for new request. If this is 262 // true, the connection made using one of address in shared_addr 263 // must not be pooled. 264 bool retired; 265 }; 266 267 struct WorkerStat { 268 size_t num_connections; 269 size_t num_close_waits; 270 }; 271 272 #ifdef ENABLE_HTTP3 273 struct QUICPacket { QUICPacketQUICPacket274 QUICPacket(size_t upstream_addr_index, const Address &remote_addr, 275 const Address &local_addr, const ngtcp2_pkt_info &pi, 276 std::span<const uint8_t> data) 277 : upstream_addr_index{upstream_addr_index}, 278 remote_addr{remote_addr}, 279 local_addr{local_addr}, 280 pi{pi}, 281 data{std::begin(data), std::end(data)} {} QUICPacketQUICPacket282 QUICPacket() : upstream_addr_index{}, remote_addr{}, local_addr{}, pi{} {} 283 size_t upstream_addr_index; 284 Address remote_addr; 285 Address local_addr; 286 ngtcp2_pkt_info pi; 287 std::vector<uint8_t> data; 288 }; 289 #endif // ENABLE_HTTP3 290 291 enum class WorkerEventType { 292 NEW_CONNECTION = 0x01, 293 REOPEN_LOG = 0x02, 294 GRACEFUL_SHUTDOWN = 0x03, 295 REPLACE_DOWNSTREAM = 0x04, 296 #ifdef ENABLE_HTTP3 297 QUIC_PKT_FORWARD = 0x05, 298 #endif // ENABLE_HTTP3 299 }; 300 301 struct WorkerEvent { 302 WorkerEventType type; 303 struct { 304 sockaddr_union client_addr; 305 size_t client_addrlen; 306 int client_fd; 307 const UpstreamAddr *faddr; 308 }; 309 std::shared_ptr<TicketKeys> ticket_keys; 310 std::shared_ptr<DownstreamConfig> downstreamconf; 311 #ifdef ENABLE_HTTP3 312 std::unique_ptr<QUICPacket> quic_pkt; 313 #endif // ENABLE_HTTP3 314 }; 315 316 class Worker { 317 public: 318 Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx, 319 SSL_CTX *tls_session_cache_memcached_ssl_ctx, 320 tls::CertLookupTree *cert_tree, 321 #ifdef ENABLE_HTTP3 322 SSL_CTX *quic_sv_ssl_ctx, tls::CertLookupTree *quic_cert_tree, 323 WorkerID wid, 324 # ifdef HAVE_LIBBPF 325 size_t index, 326 # endif // HAVE_LIBBPF 327 #endif // ENABLE_HTTP3 328 const std::shared_ptr<TicketKeys> &ticket_keys, 329 ConnectionHandler *conn_handler, 330 std::shared_ptr<DownstreamConfig> downstreamconf); 331 ~Worker(); 332 void run_async(); 333 void wait(); 334 void process_events(); 335 void send(WorkerEvent event); 336 337 tls::CertLookupTree *get_cert_lookup_tree() const; 338 #ifdef ENABLE_HTTP3 339 tls::CertLookupTree *get_quic_cert_lookup_tree() const; 340 #endif // ENABLE_HTTP3 341 342 // These 2 functions make a lock m_ to get/set ticket keys 343 // atomically. 344 std::shared_ptr<TicketKeys> get_ticket_keys(); 345 void set_ticket_keys(std::shared_ptr<TicketKeys> ticket_keys); 346 347 WorkerStat *get_worker_stat(); 348 struct ev_loop *get_loop() const; 349 SSL_CTX *get_sv_ssl_ctx() const; 350 SSL_CTX *get_cl_ssl_ctx() const; 351 #ifdef ENABLE_HTTP3 352 SSL_CTX *get_quic_sv_ssl_ctx() const; 353 #endif // ENABLE_HTTP3 354 355 void set_graceful_shutdown(bool f); 356 bool get_graceful_shutdown() const; 357 358 MemchunkPool *get_mcpool(); 359 void schedule_clear_mcpool(); 360 361 MemcachedDispatcher *get_session_cache_memcached_dispatcher(); 362 363 std::mt19937 &get_randgen(); 364 365 #ifdef HAVE_MRUBY 366 int create_mruby_context(); 367 368 mruby::MRubyContext *get_mruby_context() const; 369 #endif // HAVE_MRUBY 370 371 std::vector<std::shared_ptr<DownstreamAddrGroup>> & 372 get_downstream_addr_groups(); 373 374 ConnectBlocker *get_connect_blocker() const; 375 376 const DownstreamConfig *get_downstream_config() const; 377 378 void 379 replace_downstream_config(std::shared_ptr<DownstreamConfig> downstreamconf); 380 381 ConnectionHandler *get_connection_handler() const; 382 383 #ifdef ENABLE_HTTP3 384 QUICConnectionHandler *get_quic_connection_handler(); 385 386 int setup_quic_server_socket(); 387 388 const WorkerID &get_worker_id() const; 389 390 # ifdef HAVE_LIBBPF 391 bool should_attach_bpf() const; 392 393 bool should_update_bpf_map() const; 394 395 uint32_t compute_sk_index() const; 396 # endif // HAVE_LIBBPF 397 398 int create_quic_server_socket(UpstreamAddr &addr); 399 400 // Returns a pointer to UpstreamAddr which matches |local_addr|. 401 const UpstreamAddr *find_quic_upstream_addr(const Address &local_addr); 402 #endif // ENABLE_HTTP3 403 404 DNSTracker *get_dns_tracker(); 405 406 private: 407 #ifndef NOTHREADS 408 std::future<void> fut_; 409 #endif // NOTHREADS 410 #if defined(ENABLE_HTTP3) && defined(HAVE_LIBBPF) 411 // Unique index of this worker. 412 size_t index_; 413 #endif // ENABLE_HTTP3 && HAVE_LIBBPF 414 std::mutex m_; 415 std::deque<WorkerEvent> q_; 416 std::mt19937 randgen_; 417 ev_async w_; 418 ev_timer mcpool_clear_timer_; 419 ev_timer proc_wev_timer_; 420 MemchunkPool mcpool_; 421 WorkerStat worker_stat_; 422 DNSTracker dns_tracker_; 423 424 #ifdef ENABLE_HTTP3 425 WorkerID worker_id_; 426 std::vector<UpstreamAddr> quic_upstream_addrs_; 427 std::vector<std::unique_ptr<QUICListener>> quic_listeners_; 428 #endif // ENABLE_HTTP3 429 430 std::shared_ptr<DownstreamConfig> downstreamconf_; 431 std::unique_ptr<MemcachedDispatcher> session_cache_memcached_dispatcher_; 432 #ifdef HAVE_MRUBY 433 std::unique_ptr<mruby::MRubyContext> mruby_ctx_; 434 #endif // HAVE_MRUBY 435 struct ev_loop *loop_; 436 437 // Following fields are shared across threads if 438 // get_config()->tls_ctx_per_worker == true. 439 SSL_CTX *sv_ssl_ctx_; 440 SSL_CTX *cl_ssl_ctx_; 441 tls::CertLookupTree *cert_tree_; 442 ConnectionHandler *conn_handler_; 443 #ifdef ENABLE_HTTP3 444 SSL_CTX *quic_sv_ssl_ctx_; 445 tls::CertLookupTree *quic_cert_tree_; 446 447 QUICConnectionHandler quic_conn_handler_; 448 #endif // ENABLE_HTTP3 449 450 #ifdef HAVE_ATOMIC_STD_SHARED_PTR 451 std::atomic<std::shared_ptr<TicketKeys>> ticket_keys_; 452 #else // !HAVE_ATOMIC_STD_SHARED_PTR 453 std::mutex ticket_keys_m_; 454 std::shared_ptr<TicketKeys> ticket_keys_; 455 #endif // !HAVE_ATOMIC_STD_SHARED_PTR 456 std::vector<std::shared_ptr<DownstreamAddrGroup>> downstream_addr_groups_; 457 // Worker level blocker for downstream connection. For example, 458 // this is used when file descriptor is exhausted. 459 std::unique_ptr<ConnectBlocker> connect_blocker_; 460 461 bool graceful_shutdown_; 462 }; 463 464 // Selects group based on request's |hostport| and |path|. |hostport| 465 // is the value taken from :authority or host header field, and may 466 // contain port. The |path| may contain query part. We require the 467 // catch-all pattern in place, so this function always selects one 468 // group. The catch-all group index is given in |catch_all|. All 469 // patterns are given in |groups|. 470 size_t match_downstream_addr_group( 471 const RouterConfig &routerconfig, const StringRef &hostport, 472 const StringRef &path, 473 const std::vector<std::shared_ptr<DownstreamAddrGroup>> &groups, 474 size_t catch_all, BlockAllocator &balloc); 475 476 // Calls this function if connecting to backend failed. |raddr| is 477 // the actual address used to connect to backend, and it could be 478 // nullptr. This function may schedule live check. 479 void downstream_failure(DownstreamAddr *addr, const Address *raddr); 480 481 } // namespace shrpx 482 483 #endif // SHRPX_WORKER_H 484