1 /* 2 * nghttp2 - HTTP/2 C Library 3 * 4 * Copyright (c) 2012 Tatsuhiro Tsujikawa 5 * 6 * Permission is hereby granted, free of charge, to any person obtaining 7 * a copy of this software and associated documentation files (the 8 * "Software"), to deal in the Software without restriction, including 9 * without limitation the rights to use, copy, modify, merge, publish, 10 * distribute, sublicense, and/or sell copies of the Software, and to 11 * permit persons to whom the Software is furnished to do so, subject to 12 * the following conditions: 13 * 14 * The above copyright notice and this permission notice shall be 15 * included in all copies or substantial portions of the Software. 16 * 17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 18 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 19 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 20 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE 21 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION 22 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION 23 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 24 */ 25 #ifndef SHRPX_WORKER_H 26 #define SHRPX_WORKER_H 27 28 #include "shrpx.h" 29 30 #include <mutex> 31 #include <vector> 32 #include <random> 33 #include <unordered_map> 34 #include <deque> 35 #include <thread> 36 #include <queue> 37 #ifndef NOTHREADS 38 # include <future> 39 #endif // NOTHREADS 40 41 #include <openssl/ssl.h> 42 #include <openssl/err.h> 43 44 #include <ev.h> 45 46 #include "shrpx_config.h" 47 #include "shrpx_downstream_connection_pool.h" 48 #include "memchunk.h" 49 #include "shrpx_tls.h" 50 #include "shrpx_live_check.h" 51 #include "shrpx_connect_blocker.h" 52 #include "shrpx_dns_tracker.h" 53 #include "allocator.h" 54 55 using namespace nghttp2; 56 57 namespace shrpx { 58 59 class Http2Session; 60 class ConnectBlocker; 61 class MemcachedDispatcher; 62 struct UpstreamAddr; 63 class ConnectionHandler; 64 65 #ifdef HAVE_MRUBY 66 namespace mruby { 67 68 class MRubyContext; 69 70 } // namespace mruby 71 #endif // HAVE_MRUBY 72 73 namespace tls { 74 class CertLookupTree; 75 } // namespace tls 76 77 struct WeightGroup; 78 79 struct DownstreamAddr { 80 Address addr; 81 // backend address. If |host_unix| is true, this is UNIX domain 82 // socket path. 83 StringRef host; 84 StringRef hostport; 85 // backend port. 0 if |host_unix| is true. 86 uint16_t port; 87 // true if |host| contains UNIX domain socket path. 88 bool host_unix; 89 90 // sni field to send remote server if TLS is enabled. 91 StringRef sni; 92 93 std::unique_ptr<ConnectBlocker> connect_blocker; 94 std::unique_ptr<LiveCheck> live_check; 95 // Connection pool for this particular address if session affinity 96 // is enabled 97 std::unique_ptr<DownstreamConnectionPool> dconn_pool; 98 size_t fall; 99 size_t rise; 100 // Client side TLS session cache 101 tls::TLSSessionCache tls_session_cache; 102 // List of Http2Session which is not fully utilized (i.e., the 103 // server advertised maximum concurrency is not reached). We will 104 // coalesce as much stream as possible in one Http2Session to fully 105 // utilize TCP connection. 106 DList<Http2Session> http2_extra_freelist; 107 WeightGroup *wg; 108 // total number of streams created in HTTP/2 connections for this 109 // address. 110 size_t num_dconn; 111 // the sequence number of this address to randomize the order access 112 // threads. 113 size_t seq; 114 // Application protocol used in this backend 115 Proto proto; 116 // cycle is used to prioritize this address. Lower value takes 117 // higher priority. 118 uint32_t cycle; 119 // penalty which is applied to the next cycle calculation. 120 uint32_t pending_penalty; 121 // Weight of this address inside a weight group. Its range is [1, 122 // 256], inclusive. 123 uint32_t weight; 124 // name of group which this address belongs to. 125 StringRef group; 126 // Weight of the weight group which this address belongs to. Its 127 // range is [1, 256], inclusive. 128 uint32_t group_weight; 129 // true if TLS is used in this backend 130 bool tls; 131 // true if dynamic DNS is enabled 132 bool dns; 133 // true if :scheme pseudo header field should be upgraded to secure 134 // variant (e.g., "https") when forwarding request to a backend 135 // connected by TLS connection. 136 bool upgrade_scheme; 137 // true if this address is queued. 138 bool queued; 139 }; 140 141 constexpr uint32_t MAX_DOWNSTREAM_ADDR_WEIGHT = 256; 142 143 struct DownstreamAddrEntry { 144 DownstreamAddr *addr; 145 size_t seq; 146 uint32_t cycle; 147 }; 148 149 struct DownstreamAddrEntryGreater { operatorDownstreamAddrEntryGreater150 bool operator()(const DownstreamAddrEntry &lhs, 151 const DownstreamAddrEntry &rhs) const { 152 auto d = lhs.cycle - rhs.cycle; 153 if (d == 0) { 154 return rhs.seq < lhs.seq; 155 } 156 return d <= MAX_DOWNSTREAM_ADDR_WEIGHT; 157 } 158 }; 159 160 struct WeightGroup { 161 std::priority_queue<DownstreamAddrEntry, std::vector<DownstreamAddrEntry>, 162 DownstreamAddrEntryGreater> 163 pq; 164 size_t seq; 165 uint32_t weight; 166 uint32_t cycle; 167 uint32_t pending_penalty; 168 // true if this object is queued. 169 bool queued; 170 }; 171 172 struct WeightGroupEntry { 173 WeightGroup *wg; 174 size_t seq; 175 uint32_t cycle; 176 }; 177 178 struct WeightGroupEntryGreater { operatorWeightGroupEntryGreater179 bool operator()(const WeightGroupEntry &lhs, 180 const WeightGroupEntry &rhs) const { 181 auto d = lhs.cycle - rhs.cycle; 182 if (d == 0) { 183 return rhs.seq < lhs.seq; 184 } 185 return d <= MAX_DOWNSTREAM_ADDR_WEIGHT; 186 } 187 }; 188 189 struct SharedDownstreamAddr { SharedDownstreamAddrSharedDownstreamAddr190 SharedDownstreamAddr() 191 : balloc(1024, 1024), 192 affinity{SessionAffinity::NONE}, 193 redirect_if_not_tls{false}, 194 timeout{} {} 195 196 SharedDownstreamAddr(const SharedDownstreamAddr &) = delete; 197 SharedDownstreamAddr(SharedDownstreamAddr &&) = delete; 198 SharedDownstreamAddr &operator=(const SharedDownstreamAddr &) = delete; 199 SharedDownstreamAddr &operator=(SharedDownstreamAddr &&) = delete; 200 201 BlockAllocator balloc; 202 std::vector<DownstreamAddr> addrs; 203 std::vector<WeightGroup> wgs; 204 std::priority_queue<WeightGroupEntry, std::vector<WeightGroupEntry>, 205 WeightGroupEntryGreater> 206 pq; 207 // Bunch of session affinity hash. Only used if affinity == 208 // SessionAffinity::IP. 209 std::vector<AffinityHash> affinity_hash; 210 #ifdef HAVE_MRUBY 211 std::shared_ptr<mruby::MRubyContext> mruby_ctx; 212 #endif // HAVE_MRUBY 213 // Configuration for session affinity 214 AffinityConfig affinity; 215 // Session affinity 216 // true if this group requires that client connection must be TLS, 217 // and the request must be redirected to https URI. 218 bool redirect_if_not_tls; 219 // Timeouts for backend connection. 220 struct { 221 ev_tstamp read; 222 ev_tstamp write; 223 } timeout; 224 }; 225 226 struct DownstreamAddrGroup { 227 DownstreamAddrGroup(); 228 ~DownstreamAddrGroup(); 229 230 DownstreamAddrGroup(const DownstreamAddrGroup &) = delete; 231 DownstreamAddrGroup(DownstreamAddrGroup &&) = delete; 232 DownstreamAddrGroup &operator=(const DownstreamAddrGroup &) = delete; 233 DownstreamAddrGroup &operator=(DownstreamAddrGroup &&) = delete; 234 235 ImmutableString pattern; 236 std::shared_ptr<SharedDownstreamAddr> shared_addr; 237 // true if this group is no longer used for new request. If this is 238 // true, the connection made using one of address in shared_addr 239 // must not be pooled. 240 bool retired; 241 }; 242 243 struct WorkerStat { 244 size_t num_connections; 245 }; 246 247 enum class WorkerEventType { 248 NEW_CONNECTION = 0x01, 249 REOPEN_LOG = 0x02, 250 GRACEFUL_SHUTDOWN = 0x03, 251 REPLACE_DOWNSTREAM = 0x04, 252 }; 253 254 struct WorkerEvent { 255 WorkerEventType type; 256 struct { 257 sockaddr_union client_addr; 258 size_t client_addrlen; 259 int client_fd; 260 const UpstreamAddr *faddr; 261 }; 262 std::shared_ptr<TicketKeys> ticket_keys; 263 std::shared_ptr<DownstreamConfig> downstreamconf; 264 }; 265 266 class Worker { 267 public: 268 Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx, 269 SSL_CTX *tls_session_cache_memcached_ssl_ctx, 270 tls::CertLookupTree *cert_tree, 271 const std::shared_ptr<TicketKeys> &ticket_keys, 272 ConnectionHandler *conn_handler, 273 std::shared_ptr<DownstreamConfig> downstreamconf); 274 ~Worker(); 275 void run_async(); 276 void wait(); 277 void process_events(); 278 void send(const WorkerEvent &event); 279 280 tls::CertLookupTree *get_cert_lookup_tree() const; 281 282 // These 2 functions make a lock m_ to get/set ticket keys 283 // atomically. 284 std::shared_ptr<TicketKeys> get_ticket_keys(); 285 void set_ticket_keys(std::shared_ptr<TicketKeys> ticket_keys); 286 287 WorkerStat *get_worker_stat(); 288 struct ev_loop *get_loop() const; 289 SSL_CTX *get_sv_ssl_ctx() const; 290 SSL_CTX *get_cl_ssl_ctx() const; 291 292 void set_graceful_shutdown(bool f); 293 bool get_graceful_shutdown() const; 294 295 MemchunkPool *get_mcpool(); 296 void schedule_clear_mcpool(); 297 298 MemcachedDispatcher *get_session_cache_memcached_dispatcher(); 299 300 std::mt19937 &get_randgen(); 301 302 #ifdef HAVE_MRUBY 303 int create_mruby_context(); 304 305 mruby::MRubyContext *get_mruby_context() const; 306 #endif // HAVE_MRUBY 307 308 std::vector<std::shared_ptr<DownstreamAddrGroup>> & 309 get_downstream_addr_groups(); 310 311 ConnectBlocker *get_connect_blocker() const; 312 313 const DownstreamConfig *get_downstream_config() const; 314 315 void 316 replace_downstream_config(std::shared_ptr<DownstreamConfig> downstreamconf); 317 318 ConnectionHandler *get_connection_handler() const; 319 320 DNSTracker *get_dns_tracker(); 321 322 private: 323 #ifndef NOTHREADS 324 std::future<void> fut_; 325 #endif // NOTHREADS 326 std::mutex m_; 327 std::deque<WorkerEvent> q_; 328 std::mt19937 randgen_; 329 ev_async w_; 330 ev_timer mcpool_clear_timer_; 331 ev_timer proc_wev_timer_; 332 MemchunkPool mcpool_; 333 WorkerStat worker_stat_; 334 DNSTracker dns_tracker_; 335 336 std::shared_ptr<DownstreamConfig> downstreamconf_; 337 std::unique_ptr<MemcachedDispatcher> session_cache_memcached_dispatcher_; 338 #ifdef HAVE_MRUBY 339 std::unique_ptr<mruby::MRubyContext> mruby_ctx_; 340 #endif // HAVE_MRUBY 341 struct ev_loop *loop_; 342 343 // Following fields are shared across threads if 344 // get_config()->tls_ctx_per_worker == true. 345 SSL_CTX *sv_ssl_ctx_; 346 SSL_CTX *cl_ssl_ctx_; 347 tls::CertLookupTree *cert_tree_; 348 ConnectionHandler *conn_handler_; 349 350 #ifndef HAVE_ATOMIC_STD_SHARED_PTR 351 std::mutex ticket_keys_m_; 352 #endif // !HAVE_ATOMIC_STD_SHARED_PTR 353 std::shared_ptr<TicketKeys> ticket_keys_; 354 std::vector<std::shared_ptr<DownstreamAddrGroup>> downstream_addr_groups_; 355 // Worker level blocker for downstream connection. For example, 356 // this is used when file decriptor is exhausted. 357 std::unique_ptr<ConnectBlocker> connect_blocker_; 358 359 bool graceful_shutdown_; 360 }; 361 362 // Selects group based on request's |hostport| and |path|. |hostport| 363 // is the value taken from :authority or host header field, and may 364 // contain port. The |path| may contain query part. We require the 365 // catch-all pattern in place, so this function always selects one 366 // group. The catch-all group index is given in |catch_all|. All 367 // patterns are given in |groups|. 368 size_t match_downstream_addr_group( 369 const RouterConfig &routerconfig, const StringRef &hostport, 370 const StringRef &path, 371 const std::vector<std::shared_ptr<DownstreamAddrGroup>> &groups, 372 size_t catch_all, BlockAllocator &balloc); 373 374 // Calls this function if connecting to backend failed. |raddr| is 375 // the actual address used to connect to backend, and it could be 376 // nullptr. This function may schedule live check. 377 void downstream_failure(DownstreamAddr *addr, const Address *raddr); 378 379 } // namespace shrpx 380 381 #endif // SHRPX_WORKER_H 382