• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * nghttp2 - HTTP/2 C Library
3  *
4  * Copyright (c) 2012 Tatsuhiro Tsujikawa
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining
7  * a copy of this software and associated documentation files (the
8  * "Software"), to deal in the Software without restriction, including
9  * without limitation the rights to use, copy, modify, merge, publish,
10  * distribute, sublicense, and/or sell copies of the Software, and to
11  * permit persons to whom the Software is furnished to do so, subject to
12  * the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be
15  * included in all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21  * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22  * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23  * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24  */
25 #ifndef SHRPX_WORKER_H
26 #define SHRPX_WORKER_H
27 
28 #include "shrpx.h"
29 
30 #include <mutex>
31 #include <vector>
32 #include <random>
33 #include <unordered_map>
34 #include <deque>
35 #include <thread>
36 #include <queue>
37 #ifndef NOTHREADS
38 #  include <future>
39 #endif // NOTHREADS
40 
41 #include <openssl/ssl.h>
42 #include <openssl/err.h>
43 
44 #include <ev.h>
45 
46 #include "shrpx_config.h"
47 #include "shrpx_downstream_connection_pool.h"
48 #include "memchunk.h"
49 #include "shrpx_tls.h"
50 #include "shrpx_live_check.h"
51 #include "shrpx_connect_blocker.h"
52 #include "shrpx_dns_tracker.h"
53 #include "allocator.h"
54 
55 using namespace nghttp2;
56 
57 namespace shrpx {
58 
59 class Http2Session;
60 class ConnectBlocker;
61 class MemcachedDispatcher;
62 struct UpstreamAddr;
63 class ConnectionHandler;
64 
65 #ifdef HAVE_MRUBY
66 namespace mruby {
67 
68 class MRubyContext;
69 
70 } // namespace mruby
71 #endif // HAVE_MRUBY
72 
73 namespace tls {
74 class CertLookupTree;
75 } // namespace tls
76 
77 struct WeightGroup;
78 
79 struct DownstreamAddr {
80   Address addr;
81   // backend address.  If |host_unix| is true, this is UNIX domain
82   // socket path.
83   StringRef host;
84   StringRef hostport;
85   // backend port.  0 if |host_unix| is true.
86   uint16_t port;
87   // true if |host| contains UNIX domain socket path.
88   bool host_unix;
89 
90   // sni field to send remote server if TLS is enabled.
91   StringRef sni;
92 
93   std::unique_ptr<ConnectBlocker> connect_blocker;
94   std::unique_ptr<LiveCheck> live_check;
95   // Connection pool for this particular address if session affinity
96   // is enabled
97   std::unique_ptr<DownstreamConnectionPool> dconn_pool;
98   size_t fall;
99   size_t rise;
100   // Client side TLS session cache
101   tls::TLSSessionCache tls_session_cache;
102   // List of Http2Session which is not fully utilized (i.e., the
103   // server advertised maximum concurrency is not reached).  We will
104   // coalesce as much stream as possible in one Http2Session to fully
105   // utilize TCP connection.
106   DList<Http2Session> http2_extra_freelist;
107   WeightGroup *wg;
108   // total number of streams created in HTTP/2 connections for this
109   // address.
110   size_t num_dconn;
111   // the sequence number of this address to randomize the order access
112   // threads.
113   size_t seq;
114   // Application protocol used in this backend
115   Proto proto;
116   // cycle is used to prioritize this address.  Lower value takes
117   // higher priority.
118   uint32_t cycle;
119   // penalty which is applied to the next cycle calculation.
120   uint32_t pending_penalty;
121   // Weight of this address inside a weight group.  Its range is [1,
122   // 256], inclusive.
123   uint32_t weight;
124   // name of group which this address belongs to.
125   StringRef group;
126   // Weight of the weight group which this address belongs to.  Its
127   // range is [1, 256], inclusive.
128   uint32_t group_weight;
129   // true if TLS is used in this backend
130   bool tls;
131   // true if dynamic DNS is enabled
132   bool dns;
133   // true if :scheme pseudo header field should be upgraded to secure
134   // variant (e.g., "https") when forwarding request to a backend
135   // connected by TLS connection.
136   bool upgrade_scheme;
137   // true if this address is queued.
138   bool queued;
139 };
140 
141 constexpr uint32_t MAX_DOWNSTREAM_ADDR_WEIGHT = 256;
142 
143 struct DownstreamAddrEntry {
144   DownstreamAddr *addr;
145   size_t seq;
146   uint32_t cycle;
147 };
148 
149 struct DownstreamAddrEntryGreater {
operatorDownstreamAddrEntryGreater150   bool operator()(const DownstreamAddrEntry &lhs,
151                   const DownstreamAddrEntry &rhs) const {
152     auto d = lhs.cycle - rhs.cycle;
153     if (d == 0) {
154       return rhs.seq < lhs.seq;
155     }
156     return d <= MAX_DOWNSTREAM_ADDR_WEIGHT;
157   }
158 };
159 
160 struct WeightGroup {
161   std::priority_queue<DownstreamAddrEntry, std::vector<DownstreamAddrEntry>,
162                       DownstreamAddrEntryGreater>
163       pq;
164   size_t seq;
165   uint32_t weight;
166   uint32_t cycle;
167   uint32_t pending_penalty;
168   // true if this object is queued.
169   bool queued;
170 };
171 
172 struct WeightGroupEntry {
173   WeightGroup *wg;
174   size_t seq;
175   uint32_t cycle;
176 };
177 
178 struct WeightGroupEntryGreater {
operatorWeightGroupEntryGreater179   bool operator()(const WeightGroupEntry &lhs,
180                   const WeightGroupEntry &rhs) const {
181     auto d = lhs.cycle - rhs.cycle;
182     if (d == 0) {
183       return rhs.seq < lhs.seq;
184     }
185     return d <= MAX_DOWNSTREAM_ADDR_WEIGHT;
186   }
187 };
188 
189 struct SharedDownstreamAddr {
SharedDownstreamAddrSharedDownstreamAddr190   SharedDownstreamAddr()
191       : balloc(1024, 1024),
192         affinity{SessionAffinity::NONE},
193         redirect_if_not_tls{false},
194         timeout{} {}
195 
196   SharedDownstreamAddr(const SharedDownstreamAddr &) = delete;
197   SharedDownstreamAddr(SharedDownstreamAddr &&) = delete;
198   SharedDownstreamAddr &operator=(const SharedDownstreamAddr &) = delete;
199   SharedDownstreamAddr &operator=(SharedDownstreamAddr &&) = delete;
200 
201   BlockAllocator balloc;
202   std::vector<DownstreamAddr> addrs;
203   std::vector<WeightGroup> wgs;
204   std::priority_queue<WeightGroupEntry, std::vector<WeightGroupEntry>,
205                       WeightGroupEntryGreater>
206       pq;
207   // Bunch of session affinity hash.  Only used if affinity ==
208   // SessionAffinity::IP.
209   std::vector<AffinityHash> affinity_hash;
210 #ifdef HAVE_MRUBY
211   std::shared_ptr<mruby::MRubyContext> mruby_ctx;
212 #endif // HAVE_MRUBY
213   // Configuration for session affinity
214   AffinityConfig affinity;
215   // Session affinity
216   // true if this group requires that client connection must be TLS,
217   // and the request must be redirected to https URI.
218   bool redirect_if_not_tls;
219   // Timeouts for backend connection.
220   struct {
221     ev_tstamp read;
222     ev_tstamp write;
223   } timeout;
224 };
225 
226 struct DownstreamAddrGroup {
227   DownstreamAddrGroup();
228   ~DownstreamAddrGroup();
229 
230   DownstreamAddrGroup(const DownstreamAddrGroup &) = delete;
231   DownstreamAddrGroup(DownstreamAddrGroup &&) = delete;
232   DownstreamAddrGroup &operator=(const DownstreamAddrGroup &) = delete;
233   DownstreamAddrGroup &operator=(DownstreamAddrGroup &&) = delete;
234 
235   ImmutableString pattern;
236   std::shared_ptr<SharedDownstreamAddr> shared_addr;
237   // true if this group is no longer used for new request.  If this is
238   // true, the connection made using one of address in shared_addr
239   // must not be pooled.
240   bool retired;
241 };
242 
243 struct WorkerStat {
244   size_t num_connections;
245 };
246 
247 enum class WorkerEventType {
248   NEW_CONNECTION = 0x01,
249   REOPEN_LOG = 0x02,
250   GRACEFUL_SHUTDOWN = 0x03,
251   REPLACE_DOWNSTREAM = 0x04,
252 };
253 
254 struct WorkerEvent {
255   WorkerEventType type;
256   struct {
257     sockaddr_union client_addr;
258     size_t client_addrlen;
259     int client_fd;
260     const UpstreamAddr *faddr;
261   };
262   std::shared_ptr<TicketKeys> ticket_keys;
263   std::shared_ptr<DownstreamConfig> downstreamconf;
264 };
265 
266 class Worker {
267 public:
268   Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
269          SSL_CTX *tls_session_cache_memcached_ssl_ctx,
270          tls::CertLookupTree *cert_tree,
271          const std::shared_ptr<TicketKeys> &ticket_keys,
272          ConnectionHandler *conn_handler,
273          std::shared_ptr<DownstreamConfig> downstreamconf);
274   ~Worker();
275   void run_async();
276   void wait();
277   void process_events();
278   void send(const WorkerEvent &event);
279 
280   tls::CertLookupTree *get_cert_lookup_tree() const;
281 
282   // These 2 functions make a lock m_ to get/set ticket keys
283   // atomically.
284   std::shared_ptr<TicketKeys> get_ticket_keys();
285   void set_ticket_keys(std::shared_ptr<TicketKeys> ticket_keys);
286 
287   WorkerStat *get_worker_stat();
288   struct ev_loop *get_loop() const;
289   SSL_CTX *get_sv_ssl_ctx() const;
290   SSL_CTX *get_cl_ssl_ctx() const;
291 
292   void set_graceful_shutdown(bool f);
293   bool get_graceful_shutdown() const;
294 
295   MemchunkPool *get_mcpool();
296   void schedule_clear_mcpool();
297 
298   MemcachedDispatcher *get_session_cache_memcached_dispatcher();
299 
300   std::mt19937 &get_randgen();
301 
302 #ifdef HAVE_MRUBY
303   int create_mruby_context();
304 
305   mruby::MRubyContext *get_mruby_context() const;
306 #endif // HAVE_MRUBY
307 
308   std::vector<std::shared_ptr<DownstreamAddrGroup>> &
309   get_downstream_addr_groups();
310 
311   ConnectBlocker *get_connect_blocker() const;
312 
313   const DownstreamConfig *get_downstream_config() const;
314 
315   void
316   replace_downstream_config(std::shared_ptr<DownstreamConfig> downstreamconf);
317 
318   ConnectionHandler *get_connection_handler() const;
319 
320   DNSTracker *get_dns_tracker();
321 
322 private:
323 #ifndef NOTHREADS
324   std::future<void> fut_;
325 #endif // NOTHREADS
326   std::mutex m_;
327   std::deque<WorkerEvent> q_;
328   std::mt19937 randgen_;
329   ev_async w_;
330   ev_timer mcpool_clear_timer_;
331   ev_timer proc_wev_timer_;
332   MemchunkPool mcpool_;
333   WorkerStat worker_stat_;
334   DNSTracker dns_tracker_;
335 
336   std::shared_ptr<DownstreamConfig> downstreamconf_;
337   std::unique_ptr<MemcachedDispatcher> session_cache_memcached_dispatcher_;
338 #ifdef HAVE_MRUBY
339   std::unique_ptr<mruby::MRubyContext> mruby_ctx_;
340 #endif // HAVE_MRUBY
341   struct ev_loop *loop_;
342 
343   // Following fields are shared across threads if
344   // get_config()->tls_ctx_per_worker == true.
345   SSL_CTX *sv_ssl_ctx_;
346   SSL_CTX *cl_ssl_ctx_;
347   tls::CertLookupTree *cert_tree_;
348   ConnectionHandler *conn_handler_;
349 
350 #ifndef HAVE_ATOMIC_STD_SHARED_PTR
351   std::mutex ticket_keys_m_;
352 #endif // !HAVE_ATOMIC_STD_SHARED_PTR
353   std::shared_ptr<TicketKeys> ticket_keys_;
354   std::vector<std::shared_ptr<DownstreamAddrGroup>> downstream_addr_groups_;
355   // Worker level blocker for downstream connection.  For example,
356   // this is used when file decriptor is exhausted.
357   std::unique_ptr<ConnectBlocker> connect_blocker_;
358 
359   bool graceful_shutdown_;
360 };
361 
362 // Selects group based on request's |hostport| and |path|.  |hostport|
363 // is the value taken from :authority or host header field, and may
364 // contain port.  The |path| may contain query part.  We require the
365 // catch-all pattern in place, so this function always selects one
366 // group.  The catch-all group index is given in |catch_all|.  All
367 // patterns are given in |groups|.
368 size_t match_downstream_addr_group(
369     const RouterConfig &routerconfig, const StringRef &hostport,
370     const StringRef &path,
371     const std::vector<std::shared_ptr<DownstreamAddrGroup>> &groups,
372     size_t catch_all, BlockAllocator &balloc);
373 
374 // Calls this function if connecting to backend failed.  |raddr| is
375 // the actual address used to connect to backend, and it could be
376 // nullptr.  This function may schedule live check.
377 void downstream_failure(DownstreamAddr *addr, const Address *raddr);
378 
379 } // namespace shrpx
380 
381 #endif // SHRPX_WORKER_H
382