1 /* 2 * nghttp2 - HTTP/2 C Library 3 * 4 * Copyright (c) 2012 Tatsuhiro Tsujikawa 5 * 6 * Permission is hereby granted, free of charge, to any person obtaining 7 * a copy of this software and associated documentation files (the 8 * "Software"), to deal in the Software without restriction, including 9 * without limitation the rights to use, copy, modify, merge, publish, 10 * distribute, sublicense, and/or sell copies of the Software, and to 11 * permit persons to whom the Software is furnished to do so, subject to 12 * the following conditions: 13 * 14 * The above copyright notice and this permission notice shall be 15 * included in all copies or substantial portions of the Software. 16 * 17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 18 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 19 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 20 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE 21 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION 22 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION 23 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 24 */ 25 #ifndef SHRPX_WORKER_H 26 #define SHRPX_WORKER_H 27 28 #include "shrpx.h" 29 30 #include <mutex> 31 #include <vector> 32 #include <random> 33 #include <unordered_map> 34 #include <deque> 35 #include <thread> 36 #include <queue> 37 #ifndef NOTHREADS 38 # include <future> 39 #endif // NOTHREADS 40 41 #include <openssl/ssl.h> 42 #include <openssl/err.h> 43 44 #include <ev.h> 45 46 #include "shrpx_config.h" 47 #include "shrpx_downstream_connection_pool.h" 48 #include "memchunk.h" 49 #include "shrpx_tls.h" 50 #include "shrpx_live_check.h" 51 #include "shrpx_connect_blocker.h" 52 #include "shrpx_dns_tracker.h" 53 #ifdef ENABLE_HTTP3 54 # include "shrpx_quic_connection_handler.h" 55 # include "shrpx_quic.h" 56 #endif // ENABLE_HTTP3 57 #include "allocator.h" 58 59 using namespace nghttp2; 60 61 namespace shrpx { 62 63 class Http2Session; 64 class ConnectBlocker; 65 class MemcachedDispatcher; 66 struct UpstreamAddr; 67 class ConnectionHandler; 68 #ifdef ENABLE_HTTP3 69 class QUICListener; 70 #endif // ENABLE_HTTP3 71 72 #ifdef HAVE_MRUBY 73 namespace mruby { 74 75 class MRubyContext; 76 77 } // namespace mruby 78 #endif // HAVE_MRUBY 79 80 namespace tls { 81 class CertLookupTree; 82 } // namespace tls 83 84 struct WeightGroup; 85 86 struct DownstreamAddr { 87 Address addr; 88 // backend address. If |host_unix| is true, this is UNIX domain 89 // socket path. 90 StringRef host; 91 StringRef hostport; 92 // backend port. 0 if |host_unix| is true. 93 uint16_t port; 94 // true if |host| contains UNIX domain socket path. 95 bool host_unix; 96 97 // sni field to send remote server if TLS is enabled. 98 StringRef sni; 99 100 std::unique_ptr<ConnectBlocker> connect_blocker; 101 std::unique_ptr<LiveCheck> live_check; 102 // Connection pool for this particular address if session affinity 103 // is enabled 104 std::unique_ptr<DownstreamConnectionPool> dconn_pool; 105 size_t fall; 106 size_t rise; 107 // Client side TLS session cache 108 tls::TLSSessionCache tls_session_cache; 109 // List of Http2Session which is not fully utilized (i.e., the 110 // server advertised maximum concurrency is not reached). We will 111 // coalesce as much stream as possible in one Http2Session to fully 112 // utilize TCP connection. 113 DList<Http2Session> http2_extra_freelist; 114 WeightGroup *wg; 115 // total number of streams created in HTTP/2 connections for this 116 // address. 117 size_t num_dconn; 118 // the sequence number of this address to randomize the order access 119 // threads. 120 size_t seq; 121 // Application protocol used in this backend 122 Proto proto; 123 // cycle is used to prioritize this address. Lower value takes 124 // higher priority. 125 uint32_t cycle; 126 // penalty which is applied to the next cycle calculation. 127 uint32_t pending_penalty; 128 // Weight of this address inside a weight group. Its range is [1, 129 // 256], inclusive. 130 uint32_t weight; 131 // name of group which this address belongs to. 132 StringRef group; 133 // Weight of the weight group which this address belongs to. Its 134 // range is [1, 256], inclusive. 135 uint32_t group_weight; 136 // affinity hash for this address. It is assigned when strict 137 // stickiness is enabled. 138 uint32_t affinity_hash; 139 // true if TLS is used in this backend 140 bool tls; 141 // true if dynamic DNS is enabled 142 bool dns; 143 // true if :scheme pseudo header field should be upgraded to secure 144 // variant (e.g., "https") when forwarding request to a backend 145 // connected by TLS connection. 146 bool upgrade_scheme; 147 // true if this address is queued. 148 bool queued; 149 }; 150 151 constexpr uint32_t MAX_DOWNSTREAM_ADDR_WEIGHT = 256; 152 153 struct DownstreamAddrEntry { 154 DownstreamAddr *addr; 155 size_t seq; 156 uint32_t cycle; 157 }; 158 159 struct DownstreamAddrEntryGreater { operatorDownstreamAddrEntryGreater160 bool operator()(const DownstreamAddrEntry &lhs, 161 const DownstreamAddrEntry &rhs) const { 162 auto d = lhs.cycle - rhs.cycle; 163 if (d == 0) { 164 return rhs.seq < lhs.seq; 165 } 166 return d <= 2 * MAX_DOWNSTREAM_ADDR_WEIGHT - 1; 167 } 168 }; 169 170 struct WeightGroup { 171 std::priority_queue<DownstreamAddrEntry, std::vector<DownstreamAddrEntry>, 172 DownstreamAddrEntryGreater> 173 pq; 174 size_t seq; 175 uint32_t weight; 176 uint32_t cycle; 177 uint32_t pending_penalty; 178 // true if this object is queued. 179 bool queued; 180 }; 181 182 struct WeightGroupEntry { 183 WeightGroup *wg; 184 size_t seq; 185 uint32_t cycle; 186 }; 187 188 struct WeightGroupEntryGreater { operatorWeightGroupEntryGreater189 bool operator()(const WeightGroupEntry &lhs, 190 const WeightGroupEntry &rhs) const { 191 auto d = lhs.cycle - rhs.cycle; 192 if (d == 0) { 193 return rhs.seq < lhs.seq; 194 } 195 return d <= 2 * MAX_DOWNSTREAM_ADDR_WEIGHT - 1; 196 } 197 }; 198 199 struct SharedDownstreamAddr { SharedDownstreamAddrSharedDownstreamAddr200 SharedDownstreamAddr() 201 : balloc(1024, 1024), 202 affinity{SessionAffinity::NONE}, 203 redirect_if_not_tls{false}, 204 dnf{false}, 205 timeout{} {} 206 207 SharedDownstreamAddr(const SharedDownstreamAddr &) = delete; 208 SharedDownstreamAddr(SharedDownstreamAddr &&) = delete; 209 SharedDownstreamAddr &operator=(const SharedDownstreamAddr &) = delete; 210 SharedDownstreamAddr &operator=(SharedDownstreamAddr &&) = delete; 211 212 BlockAllocator balloc; 213 std::vector<DownstreamAddr> addrs; 214 std::vector<WeightGroup> wgs; 215 std::priority_queue<WeightGroupEntry, std::vector<WeightGroupEntry>, 216 WeightGroupEntryGreater> 217 pq; 218 // Bunch of session affinity hash. Only used if affinity == 219 // SessionAffinity::IP. 220 std::vector<AffinityHash> affinity_hash; 221 // Maps affinity hash of each DownstreamAddr to its index in addrs. 222 // It is only assigned when strict stickiness is enabled. 223 std::unordered_map<uint32_t, size_t> affinity_hash_map; 224 #ifdef HAVE_MRUBY 225 std::shared_ptr<mruby::MRubyContext> mruby_ctx; 226 #endif // HAVE_MRUBY 227 // Configuration for session affinity 228 AffinityConfig affinity; 229 // Session affinity 230 // true if this group requires that client connection must be TLS, 231 // and the request must be redirected to https URI. 232 bool redirect_if_not_tls; 233 // true if a request should not be forwarded to a backend. 234 bool dnf; 235 // Timeouts for backend connection. 236 struct { 237 ev_tstamp read; 238 ev_tstamp write; 239 } timeout; 240 }; 241 242 struct DownstreamAddrGroup { 243 DownstreamAddrGroup(); 244 ~DownstreamAddrGroup(); 245 246 DownstreamAddrGroup(const DownstreamAddrGroup &) = delete; 247 DownstreamAddrGroup(DownstreamAddrGroup &&) = delete; 248 DownstreamAddrGroup &operator=(const DownstreamAddrGroup &) = delete; 249 DownstreamAddrGroup &operator=(DownstreamAddrGroup &&) = delete; 250 251 ImmutableString pattern; 252 std::shared_ptr<SharedDownstreamAddr> shared_addr; 253 // true if this group is no longer used for new request. If this is 254 // true, the connection made using one of address in shared_addr 255 // must not be pooled. 256 bool retired; 257 }; 258 259 struct WorkerStat { 260 size_t num_connections; 261 size_t num_close_waits; 262 }; 263 264 #ifdef ENABLE_HTTP3 265 struct QUICPacket { QUICPacketQUICPacket266 QUICPacket(size_t upstream_addr_index, const Address &remote_addr, 267 const Address &local_addr, const ngtcp2_pkt_info &pi, 268 const uint8_t *data, size_t datalen) 269 : upstream_addr_index{upstream_addr_index}, 270 remote_addr{remote_addr}, 271 local_addr{local_addr}, 272 pi{pi}, 273 data{data, data + datalen} {} QUICPacketQUICPacket274 QUICPacket() : upstream_addr_index{}, remote_addr{}, local_addr{}, pi{} {} 275 size_t upstream_addr_index; 276 Address remote_addr; 277 Address local_addr; 278 ngtcp2_pkt_info pi; 279 std::vector<uint8_t> data; 280 }; 281 #endif // ENABLE_HTTP3 282 283 enum class WorkerEventType { 284 NEW_CONNECTION = 0x01, 285 REOPEN_LOG = 0x02, 286 GRACEFUL_SHUTDOWN = 0x03, 287 REPLACE_DOWNSTREAM = 0x04, 288 #ifdef ENABLE_HTTP3 289 QUIC_PKT_FORWARD = 0x05, 290 #endif // ENABLE_HTTP3 291 }; 292 293 struct WorkerEvent { 294 WorkerEventType type; 295 struct { 296 sockaddr_union client_addr; 297 size_t client_addrlen; 298 int client_fd; 299 const UpstreamAddr *faddr; 300 }; 301 std::shared_ptr<TicketKeys> ticket_keys; 302 std::shared_ptr<DownstreamConfig> downstreamconf; 303 #ifdef ENABLE_HTTP3 304 std::unique_ptr<QUICPacket> quic_pkt; 305 #endif // ENABLE_HTTP3 306 }; 307 308 class Worker { 309 public: 310 Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx, 311 SSL_CTX *tls_session_cache_memcached_ssl_ctx, 312 tls::CertLookupTree *cert_tree, 313 #ifdef ENABLE_HTTP3 314 SSL_CTX *quic_sv_ssl_ctx, tls::CertLookupTree *quic_cert_tree, 315 const uint8_t *cid_prefix, size_t cid_prefixlen, 316 # ifdef HAVE_LIBBPF 317 size_t index, 318 # endif // HAVE_LIBBPF 319 #endif // ENABLE_HTTP3 320 const std::shared_ptr<TicketKeys> &ticket_keys, 321 ConnectionHandler *conn_handler, 322 std::shared_ptr<DownstreamConfig> downstreamconf); 323 ~Worker(); 324 void run_async(); 325 void wait(); 326 void process_events(); 327 void send(WorkerEvent event); 328 329 tls::CertLookupTree *get_cert_lookup_tree() const; 330 #ifdef ENABLE_HTTP3 331 tls::CertLookupTree *get_quic_cert_lookup_tree() const; 332 #endif // ENABLE_HTTP3 333 334 // These 2 functions make a lock m_ to get/set ticket keys 335 // atomically. 336 std::shared_ptr<TicketKeys> get_ticket_keys(); 337 void set_ticket_keys(std::shared_ptr<TicketKeys> ticket_keys); 338 339 WorkerStat *get_worker_stat(); 340 struct ev_loop *get_loop() const; 341 SSL_CTX *get_sv_ssl_ctx() const; 342 SSL_CTX *get_cl_ssl_ctx() const; 343 #ifdef ENABLE_HTTP3 344 SSL_CTX *get_quic_sv_ssl_ctx() const; 345 #endif // ENABLE_HTTP3 346 347 void set_graceful_shutdown(bool f); 348 bool get_graceful_shutdown() const; 349 350 MemchunkPool *get_mcpool(); 351 void schedule_clear_mcpool(); 352 353 MemcachedDispatcher *get_session_cache_memcached_dispatcher(); 354 355 std::mt19937 &get_randgen(); 356 357 #ifdef HAVE_MRUBY 358 int create_mruby_context(); 359 360 mruby::MRubyContext *get_mruby_context() const; 361 #endif // HAVE_MRUBY 362 363 std::vector<std::shared_ptr<DownstreamAddrGroup>> & 364 get_downstream_addr_groups(); 365 366 ConnectBlocker *get_connect_blocker() const; 367 368 const DownstreamConfig *get_downstream_config() const; 369 370 void 371 replace_downstream_config(std::shared_ptr<DownstreamConfig> downstreamconf); 372 373 ConnectionHandler *get_connection_handler() const; 374 375 #ifdef ENABLE_HTTP3 376 QUICConnectionHandler *get_quic_connection_handler(); 377 378 int setup_quic_server_socket(); 379 380 const uint8_t *get_cid_prefix() const; 381 382 # ifdef HAVE_LIBBPF 383 bool should_attach_bpf() const; 384 385 bool should_update_bpf_map() const; 386 387 uint32_t compute_sk_index() const; 388 # endif // HAVE_LIBBPF 389 390 int create_quic_server_socket(UpstreamAddr &addr); 391 392 // Returns a pointer to UpstreamAddr which matches |local_addr|. 393 const UpstreamAddr *find_quic_upstream_addr(const Address &local_addr); 394 #endif // ENABLE_HTTP3 395 396 DNSTracker *get_dns_tracker(); 397 398 private: 399 #ifndef NOTHREADS 400 std::future<void> fut_; 401 #endif // NOTHREADS 402 #if defined(ENABLE_HTTP3) && defined(HAVE_LIBBPF) 403 // Unique index of this worker. 404 size_t index_; 405 #endif // ENABLE_HTTP3 && HAVE_LIBBPF 406 std::mutex m_; 407 std::deque<WorkerEvent> q_; 408 std::mt19937 randgen_; 409 ev_async w_; 410 ev_timer mcpool_clear_timer_; 411 ev_timer proc_wev_timer_; 412 MemchunkPool mcpool_; 413 WorkerStat worker_stat_; 414 DNSTracker dns_tracker_; 415 416 #ifdef ENABLE_HTTP3 417 std::array<uint8_t, SHRPX_QUIC_CID_PREFIXLEN> cid_prefix_; 418 std::vector<UpstreamAddr> quic_upstream_addrs_; 419 std::vector<std::unique_ptr<QUICListener>> quic_listeners_; 420 #endif // ENABLE_HTTP3 421 422 std::shared_ptr<DownstreamConfig> downstreamconf_; 423 std::unique_ptr<MemcachedDispatcher> session_cache_memcached_dispatcher_; 424 #ifdef HAVE_MRUBY 425 std::unique_ptr<mruby::MRubyContext> mruby_ctx_; 426 #endif // HAVE_MRUBY 427 struct ev_loop *loop_; 428 429 // Following fields are shared across threads if 430 // get_config()->tls_ctx_per_worker == true. 431 SSL_CTX *sv_ssl_ctx_; 432 SSL_CTX *cl_ssl_ctx_; 433 tls::CertLookupTree *cert_tree_; 434 ConnectionHandler *conn_handler_; 435 #ifdef ENABLE_HTTP3 436 SSL_CTX *quic_sv_ssl_ctx_; 437 tls::CertLookupTree *quic_cert_tree_; 438 439 QUICConnectionHandler quic_conn_handler_; 440 #endif // ENABLE_HTTP3 441 442 #ifndef HAVE_ATOMIC_STD_SHARED_PTR 443 std::mutex ticket_keys_m_; 444 #endif // !HAVE_ATOMIC_STD_SHARED_PTR 445 std::shared_ptr<TicketKeys> ticket_keys_; 446 std::vector<std::shared_ptr<DownstreamAddrGroup>> downstream_addr_groups_; 447 // Worker level blocker for downstream connection. For example, 448 // this is used when file descriptor is exhausted. 449 std::unique_ptr<ConnectBlocker> connect_blocker_; 450 451 bool graceful_shutdown_; 452 }; 453 454 // Selects group based on request's |hostport| and |path|. |hostport| 455 // is the value taken from :authority or host header field, and may 456 // contain port. The |path| may contain query part. We require the 457 // catch-all pattern in place, so this function always selects one 458 // group. The catch-all group index is given in |catch_all|. All 459 // patterns are given in |groups|. 460 size_t match_downstream_addr_group( 461 const RouterConfig &routerconfig, const StringRef &hostport, 462 const StringRef &path, 463 const std::vector<std::shared_ptr<DownstreamAddrGroup>> &groups, 464 size_t catch_all, BlockAllocator &balloc); 465 466 // Calls this function if connecting to backend failed. |raddr| is 467 // the actual address used to connect to backend, and it could be 468 // nullptr. This function may schedule live check. 469 void downstream_failure(DownstreamAddr *addr, const Address *raddr); 470 471 #ifdef ENABLE_HTTP3 472 // Creates unpredictable SHRPX_QUIC_CID_PREFIXLEN bytes sequence which 473 // is used as a prefix of QUIC Connection ID. This function returns 474 // -1 on failure. |server_id| must be 2 bytes long. 475 int create_cid_prefix(uint8_t *cid_prefix, const uint8_t *server_id); 476 #endif // ENABLE_HTTP3 477 478 } // namespace shrpx 479 480 #endif // SHRPX_WORKER_H 481