• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * nghttp2 - HTTP/2 C Library
3  *
4  * Copyright (c) 2012 Tatsuhiro Tsujikawa
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining
7  * a copy of this software and associated documentation files (the
8  * "Software"), to deal in the Software without restriction, including
9  * without limitation the rights to use, copy, modify, merge, publish,
10  * distribute, sublicense, and/or sell copies of the Software, and to
11  * permit persons to whom the Software is furnished to do so, subject to
12  * the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be
15  * included in all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21  * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22  * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23  * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24  */
25 #include "shrpx_downstream.h"
26 
27 #include <cassert>
28 
29 #include "url-parser/url_parser.h"
30 
31 #include "shrpx_upstream.h"
32 #include "shrpx_client_handler.h"
33 #include "shrpx_config.h"
34 #include "shrpx_error.h"
35 #include "shrpx_downstream_connection.h"
36 #include "shrpx_downstream_queue.h"
37 #include "shrpx_worker.h"
38 #include "shrpx_http2_session.h"
39 #include "shrpx_log.h"
40 #ifdef HAVE_MRUBY
41 #  include "shrpx_mruby.h"
42 #endif // HAVE_MRUBY
43 #include "util.h"
44 #include "http2.h"
45 
46 namespace shrpx {
47 
48 namespace {
header_timeoutcb(struct ev_loop * loop,ev_timer * w,int revents)49 void header_timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
50   auto downstream = static_cast<Downstream *>(w->data);
51   auto upstream = downstream->get_upstream();
52 
53   if (LOG_ENABLED(INFO)) {
54     DLOG(INFO, downstream) << "request header timeout stream_id="
55                            << downstream->get_stream_id();
56   }
57 
58   downstream->disable_upstream_rtimer();
59   downstream->disable_upstream_wtimer();
60 
61   upstream->on_timeout(downstream);
62 }
63 } // namespace
64 
65 namespace {
upstream_timeoutcb(struct ev_loop * loop,ev_timer * w,int revents)66 void upstream_timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
67   auto downstream = static_cast<Downstream *>(w->data);
68   auto upstream = downstream->get_upstream();
69 
70   auto which = revents == EV_READ ? "read" : "write";
71 
72   if (LOG_ENABLED(INFO)) {
73     DLOG(INFO, downstream) << "upstream timeout stream_id="
74                            << downstream->get_stream_id() << " event=" << which;
75   }
76 
77   downstream->disable_upstream_rtimer();
78   downstream->disable_upstream_wtimer();
79 
80   upstream->on_timeout(downstream);
81 }
82 } // namespace
83 
84 namespace {
upstream_rtimeoutcb(struct ev_loop * loop,ev_timer * w,int revents)85 void upstream_rtimeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
86   upstream_timeoutcb(loop, w, EV_READ);
87 }
88 } // namespace
89 
90 namespace {
upstream_wtimeoutcb(struct ev_loop * loop,ev_timer * w,int revents)91 void upstream_wtimeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
92   upstream_timeoutcb(loop, w, EV_WRITE);
93 }
94 } // namespace
95 
96 namespace {
downstream_timeoutcb(struct ev_loop * loop,ev_timer * w,int revents)97 void downstream_timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
98   auto downstream = static_cast<Downstream *>(w->data);
99 
100   auto which = revents == EV_READ ? "read" : "write";
101 
102   if (LOG_ENABLED(INFO)) {
103     DLOG(INFO, downstream) << "downstream timeout stream_id="
104                            << downstream->get_downstream_stream_id()
105                            << " event=" << which;
106   }
107 
108   downstream->disable_downstream_rtimer();
109   downstream->disable_downstream_wtimer();
110 
111   auto dconn = downstream->get_downstream_connection();
112 
113   if (dconn) {
114     dconn->on_timeout();
115   }
116 }
117 } // namespace
118 
119 namespace {
downstream_rtimeoutcb(struct ev_loop * loop,ev_timer * w,int revents)120 void downstream_rtimeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
121   downstream_timeoutcb(loop, w, EV_READ);
122 }
123 } // namespace
124 
125 namespace {
downstream_wtimeoutcb(struct ev_loop * loop,ev_timer * w,int revents)126 void downstream_wtimeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
127   downstream_timeoutcb(loop, w, EV_WRITE);
128 }
129 } // namespace
130 
131 // upstream could be nullptr for unittests
Downstream(Upstream * upstream,MemchunkPool * mcpool,int64_t stream_id)132 Downstream::Downstream(Upstream *upstream, MemchunkPool *mcpool,
133                        int64_t stream_id)
134     : dlnext(nullptr),
135       dlprev(nullptr),
136       response_sent_body_length(0),
137       balloc_(1024, 1024),
138       req_(balloc_),
139       resp_(balloc_),
140       request_start_time_(std::chrono::high_resolution_clock::now()),
141       blocked_request_buf_(mcpool),
142       request_buf_(mcpool),
143       response_buf_(mcpool),
144       upstream_(upstream),
145       blocked_link_(nullptr),
146       addr_(nullptr),
147       num_retry_(0),
148       stream_id_(stream_id),
149       assoc_stream_id_(-1),
150       downstream_stream_id_(-1),
151       response_rst_stream_error_code_(NGHTTP2_NO_ERROR),
152       affinity_cookie_(0),
153       request_state_(DownstreamState::INITIAL),
154       response_state_(DownstreamState::INITIAL),
155       dispatch_state_(DispatchState::NONE),
156       upgraded_(false),
157       chunked_request_(false),
158       chunked_response_(false),
159       expect_final_response_(false),
160       request_pending_(false),
161       request_header_sent_(false),
162       accesslog_written_(false),
163       new_affinity_cookie_(false),
164       blocked_request_data_eof_(false),
165       expect_100_continue_(false),
166       stop_reading_(false) {
167 
168   auto config = get_config();
169   auto &httpconf = config->http;
170 
171   ev_timer_init(&header_timer_, header_timeoutcb, 0., httpconf.timeout.header);
172 
173   auto &timeoutconf = config->http2.timeout;
174 
175   ev_timer_init(&upstream_rtimer_, &upstream_rtimeoutcb, 0.,
176                 timeoutconf.stream_read);
177   ev_timer_init(&upstream_wtimer_, &upstream_wtimeoutcb, 0.,
178                 timeoutconf.stream_write);
179   ev_timer_init(&downstream_rtimer_, &downstream_rtimeoutcb, 0.,
180                 timeoutconf.stream_read);
181   ev_timer_init(&downstream_wtimer_, &downstream_wtimeoutcb, 0.,
182                 timeoutconf.stream_write);
183 
184   header_timer_.data = this;
185   upstream_rtimer_.data = this;
186   upstream_wtimer_.data = this;
187   downstream_rtimer_.data = this;
188   downstream_wtimer_.data = this;
189 
190   rcbufs_.reserve(32);
191 #ifdef ENABLE_HTTP3
192   rcbufs3_.reserve(32);
193 #endif // ENABLE_HTTP3
194 }
195 
~Downstream()196 Downstream::~Downstream() {
197   if (LOG_ENABLED(INFO)) {
198     DLOG(INFO, this) << "Deleting";
199   }
200 
201   // check nullptr for unittest
202   if (upstream_) {
203     auto loop = upstream_->get_client_handler()->get_loop();
204 
205     ev_timer_stop(loop, &upstream_rtimer_);
206     ev_timer_stop(loop, &upstream_wtimer_);
207     ev_timer_stop(loop, &downstream_rtimer_);
208     ev_timer_stop(loop, &downstream_wtimer_);
209     ev_timer_stop(loop, &header_timer_);
210 
211 #ifdef HAVE_MRUBY
212     auto handler = upstream_->get_client_handler();
213     auto worker = handler->get_worker();
214     auto mruby_ctx = worker->get_mruby_context();
215 
216     mruby_ctx->delete_downstream(this);
217 #endif // HAVE_MRUBY
218   }
219 
220 #ifdef HAVE_MRUBY
221   if (dconn_) {
222     const auto &group = dconn_->get_downstream_addr_group();
223     if (group) {
224       const auto &mruby_ctx = group->shared_addr->mruby_ctx;
225       mruby_ctx->delete_downstream(this);
226     }
227   }
228 #endif // HAVE_MRUBY
229 
230   // DownstreamConnection may refer to this object.  Delete it now
231   // explicitly.
232   dconn_.reset();
233 
234 #ifdef ENABLE_HTTP3
235   for (auto rcbuf : rcbufs3_) {
236     nghttp3_rcbuf_decref(rcbuf);
237   }
238 #endif // ENABLE_HTTP3
239 
240   for (auto rcbuf : rcbufs_) {
241     nghttp2_rcbuf_decref(rcbuf);
242   }
243 
244   if (LOG_ENABLED(INFO)) {
245     DLOG(INFO, this) << "Deleted";
246   }
247 }
248 
attach_downstream_connection(std::unique_ptr<DownstreamConnection> dconn)249 int Downstream::attach_downstream_connection(
250     std::unique_ptr<DownstreamConnection> dconn) {
251   if (dconn->attach_downstream(this) != 0) {
252     return -1;
253   }
254 
255   dconn_ = std::move(dconn);
256 
257   return 0;
258 }
259 
detach_downstream_connection()260 void Downstream::detach_downstream_connection() {
261   if (!dconn_) {
262     return;
263   }
264 
265 #ifdef HAVE_MRUBY
266   const auto &group = dconn_->get_downstream_addr_group();
267   if (group) {
268     const auto &mruby_ctx = group->shared_addr->mruby_ctx;
269     mruby_ctx->delete_downstream(this);
270   }
271 #endif // HAVE_MRUBY
272 
273   dconn_->detach_downstream(this);
274 
275   auto handler = dconn_->get_client_handler();
276 
277   handler->pool_downstream_connection(
278       std::unique_ptr<DownstreamConnection>(dconn_.release()));
279 }
280 
get_downstream_connection()281 DownstreamConnection *Downstream::get_downstream_connection() {
282   return dconn_.get();
283 }
284 
pop_downstream_connection()285 std::unique_ptr<DownstreamConnection> Downstream::pop_downstream_connection() {
286 #ifdef HAVE_MRUBY
287   if (!dconn_) {
288     return nullptr;
289   }
290 
291   const auto &group = dconn_->get_downstream_addr_group();
292   if (group) {
293     const auto &mruby_ctx = group->shared_addr->mruby_ctx;
294     mruby_ctx->delete_downstream(this);
295   }
296 #endif // HAVE_MRUBY
297 
298   return std::unique_ptr<DownstreamConnection>(dconn_.release());
299 }
300 
pause_read(IOCtrlReason reason)301 void Downstream::pause_read(IOCtrlReason reason) {
302   if (dconn_) {
303     dconn_->pause_read(reason);
304   }
305 }
306 
resume_read(IOCtrlReason reason,size_t consumed)307 int Downstream::resume_read(IOCtrlReason reason, size_t consumed) {
308   if (dconn_) {
309     return dconn_->resume_read(reason, consumed);
310   }
311 
312   return 0;
313 }
314 
force_resume_read()315 void Downstream::force_resume_read() {
316   if (dconn_) {
317     dconn_->force_resume_read();
318   }
319 }
320 
321 namespace {
322 const HeaderRefs::value_type *
search_header_linear_backwards(const HeaderRefs & headers,const StringRef & name)323 search_header_linear_backwards(const HeaderRefs &headers,
324                                const StringRef &name) {
325   for (auto it = headers.rbegin(); it != headers.rend(); ++it) {
326     auto &kv = *it;
327     if (kv.name == name) {
328       return &kv;
329     }
330   }
331   return nullptr;
332 }
333 } // namespace
334 
assemble_request_cookie()335 StringRef Downstream::assemble_request_cookie() {
336   size_t len = 0;
337 
338   for (auto &kv : req_.fs.headers()) {
339     if (kv.token != http2::HD_COOKIE || kv.value.empty()) {
340       continue;
341     }
342 
343     len += kv.value.size() + str_size("; ");
344   }
345 
346   auto iov = make_byte_ref(balloc_, len + 1);
347   auto p = std::begin(iov);
348 
349   for (auto &kv : req_.fs.headers()) {
350     if (kv.token != http2::HD_COOKIE || kv.value.empty()) {
351       continue;
352     }
353 
354     auto end = std::end(kv.value);
355     for (auto it = std::begin(kv.value) + kv.value.size();
356          it != std::begin(kv.value); --it) {
357       auto c = *(it - 1);
358       if (c == ' ' || c == ';') {
359         continue;
360       }
361       end = it;
362       break;
363     }
364 
365     p = std::copy(std::begin(kv.value), end, p);
366     p = util::copy_lit(p, "; ");
367   }
368 
369   // cut trailing "; "
370   if (p - std::begin(iov) >= 2) {
371     p -= 2;
372   }
373 
374   return StringRef{std::span{std::begin(iov), p}};
375 }
376 
find_affinity_cookie(const StringRef & name)377 uint32_t Downstream::find_affinity_cookie(const StringRef &name) {
378   for (auto &kv : req_.fs.headers()) {
379     if (kv.token != http2::HD_COOKIE) {
380       continue;
381     }
382 
383     for (auto it = std::begin(kv.value); it != std::end(kv.value);) {
384       if (*it == '\t' || *it == ' ' || *it == ';') {
385         ++it;
386         continue;
387       }
388 
389       auto end = std::find(it, std::end(kv.value), '=');
390       if (end == std::end(kv.value)) {
391         return 0;
392       }
393 
394       if (name != StringRef{it, end}) {
395         it = std::find(it, std::end(kv.value), ';');
396         continue;
397       }
398 
399       it = std::find(end + 1, std::end(kv.value), ';');
400       auto val = StringRef{end + 1, it};
401       if (val.size() != 8) {
402         return 0;
403       }
404       uint32_t h = 0;
405       for (auto c : val) {
406         auto n = util::hex_to_uint(c);
407         if (n == 256) {
408           return 0;
409         }
410         h <<= 4;
411         h += n;
412       }
413       affinity_cookie_ = h;
414       return h;
415     }
416   }
417   return 0;
418 }
419 
count_crumble_request_cookie()420 size_t Downstream::count_crumble_request_cookie() {
421   size_t n = 0;
422   for (auto &kv : req_.fs.headers()) {
423     if (kv.token != http2::HD_COOKIE) {
424       continue;
425     }
426 
427     for (auto it = std::begin(kv.value); it != std::end(kv.value);) {
428       if (*it == '\t' || *it == ' ' || *it == ';') {
429         ++it;
430         continue;
431       }
432 
433       it = std::find(it, std::end(kv.value), ';');
434 
435       ++n;
436     }
437   }
438   return n;
439 }
440 
crumble_request_cookie(std::vector<nghttp2_nv> & nva)441 void Downstream::crumble_request_cookie(std::vector<nghttp2_nv> &nva) {
442   for (auto &kv : req_.fs.headers()) {
443     if (kv.token != http2::HD_COOKIE) {
444       continue;
445     }
446 
447     for (auto it = std::begin(kv.value); it != std::end(kv.value);) {
448       if (*it == '\t' || *it == ' ' || *it == ';') {
449         ++it;
450         continue;
451       }
452 
453       auto first = it;
454 
455       it = std::find(it, std::end(kv.value), ';');
456 
457       nva.push_back({(uint8_t *)"cookie", (uint8_t *)first, str_size("cookie"),
458                      (size_t)(it - first),
459                      (uint8_t)(NGHTTP2_NV_FLAG_NO_COPY_NAME |
460                                NGHTTP2_NV_FLAG_NO_COPY_VALUE |
461                                (kv.no_index ? NGHTTP2_NV_FLAG_NO_INDEX : 0))});
462     }
463   }
464 }
465 
466 namespace {
add_header(size_t & sum,HeaderRefs & headers,const StringRef & name,const StringRef & value,bool no_index,int32_t token)467 void add_header(size_t &sum, HeaderRefs &headers, const StringRef &name,
468                 const StringRef &value, bool no_index, int32_t token) {
469   sum += name.size() + value.size();
470   headers.emplace_back(name, value, no_index, token);
471 }
472 } // namespace
473 
474 namespace {
alloc_header_name(BlockAllocator & balloc,const StringRef & name)475 StringRef alloc_header_name(BlockAllocator &balloc, const StringRef &name) {
476   auto iov = make_byte_ref(balloc, name.size() + 1);
477   auto p = std::copy(std::begin(name), std::end(name), std::begin(iov));
478   util::inp_strlower(std::begin(iov), p);
479   *p = '\0';
480 
481   return StringRef{std::span{std::begin(iov), p}};
482 }
483 } // namespace
484 
485 namespace {
append_last_header_key(BlockAllocator & balloc,bool & key_prev,size_t & sum,HeaderRefs & headers,const char * data,size_t len)486 void append_last_header_key(BlockAllocator &balloc, bool &key_prev, size_t &sum,
487                             HeaderRefs &headers, const char *data, size_t len) {
488   assert(key_prev);
489   sum += len;
490   auto &item = headers.back();
491   auto name =
492       realloc_concat_string_ref(balloc, item.name, StringRef{data, len});
493 
494   auto p = const_cast<uint8_t *>(name.byte());
495   util::inp_strlower(p + name.size() - len, p + name.size());
496 
497   item.name = name;
498   item.token = http2::lookup_token(item.name);
499 }
500 } // namespace
501 
502 namespace {
append_last_header_value(BlockAllocator & balloc,bool & key_prev,size_t & sum,HeaderRefs & headers,const char * data,size_t len)503 void append_last_header_value(BlockAllocator &balloc, bool &key_prev,
504                               size_t &sum, HeaderRefs &headers,
505                               const char *data, size_t len) {
506   key_prev = false;
507   sum += len;
508   auto &item = headers.back();
509   item.value =
510       realloc_concat_string_ref(balloc, item.value, StringRef{data, len});
511 }
512 } // namespace
513 
parse_content_length()514 int FieldStore::parse_content_length() {
515   content_length = -1;
516 
517   for (auto &kv : headers_) {
518     if (kv.token != http2::HD_CONTENT_LENGTH) {
519       continue;
520     }
521 
522     auto len = util::parse_uint(kv.value);
523     if (!len) {
524       return -1;
525     }
526     if (content_length != -1) {
527       return -1;
528     }
529     content_length = *len;
530   }
531   return 0;
532 }
533 
header(int32_t token) const534 const HeaderRefs::value_type *FieldStore::header(int32_t token) const {
535   for (auto it = headers_.rbegin(); it != headers_.rend(); ++it) {
536     auto &kv = *it;
537     if (kv.token == token) {
538       return &kv;
539     }
540   }
541   return nullptr;
542 }
543 
header(int32_t token)544 HeaderRefs::value_type *FieldStore::header(int32_t token) {
545   for (auto it = headers_.rbegin(); it != headers_.rend(); ++it) {
546     auto &kv = *it;
547     if (kv.token == token) {
548       return &kv;
549     }
550   }
551   return nullptr;
552 }
553 
header(const StringRef & name) const554 const HeaderRefs::value_type *FieldStore::header(const StringRef &name) const {
555   return search_header_linear_backwards(headers_, name);
556 }
557 
add_header_token(const StringRef & name,const StringRef & value,bool no_index,int32_t token)558 void FieldStore::add_header_token(const StringRef &name, const StringRef &value,
559                                   bool no_index, int32_t token) {
560   shrpx::add_header(buffer_size_, headers_, name, value, no_index, token);
561 }
562 
alloc_add_header_name(const StringRef & name)563 void FieldStore::alloc_add_header_name(const StringRef &name) {
564   auto name_ref = alloc_header_name(balloc_, name);
565   auto token = http2::lookup_token(name_ref);
566   add_header_token(name_ref, StringRef{}, false, token);
567   header_key_prev_ = true;
568 }
569 
append_last_header_key(const char * data,size_t len)570 void FieldStore::append_last_header_key(const char *data, size_t len) {
571   shrpx::append_last_header_key(balloc_, header_key_prev_, buffer_size_,
572                                 headers_, data, len);
573 }
574 
append_last_header_value(const char * data,size_t len)575 void FieldStore::append_last_header_value(const char *data, size_t len) {
576   shrpx::append_last_header_value(balloc_, header_key_prev_, buffer_size_,
577                                   headers_, data, len);
578 }
579 
clear_headers()580 void FieldStore::clear_headers() {
581   headers_.clear();
582   header_key_prev_ = false;
583 }
584 
add_trailer_token(const StringRef & name,const StringRef & value,bool no_index,int32_t token)585 void FieldStore::add_trailer_token(const StringRef &name,
586                                    const StringRef &value, bool no_index,
587                                    int32_t token) {
588   // Header size limit should be applied to all header and trailer
589   // fields combined.
590   shrpx::add_header(buffer_size_, trailers_, name, value, no_index, token);
591 }
592 
alloc_add_trailer_name(const StringRef & name)593 void FieldStore::alloc_add_trailer_name(const StringRef &name) {
594   auto name_ref = alloc_header_name(balloc_, name);
595   auto token = http2::lookup_token(name_ref);
596   add_trailer_token(name_ref, StringRef{}, false, token);
597   trailer_key_prev_ = true;
598 }
599 
append_last_trailer_key(const char * data,size_t len)600 void FieldStore::append_last_trailer_key(const char *data, size_t len) {
601   shrpx::append_last_header_key(balloc_, trailer_key_prev_, buffer_size_,
602                                 trailers_, data, len);
603 }
604 
append_last_trailer_value(const char * data,size_t len)605 void FieldStore::append_last_trailer_value(const char *data, size_t len) {
606   shrpx::append_last_header_value(balloc_, trailer_key_prev_, buffer_size_,
607                                   trailers_, data, len);
608 }
609 
erase_content_length_and_transfer_encoding()610 void FieldStore::erase_content_length_and_transfer_encoding() {
611   for (auto &kv : headers_) {
612     switch (kv.token) {
613     case http2::HD_CONTENT_LENGTH:
614     case http2::HD_TRANSFER_ENCODING:
615       kv.name = StringRef{};
616       kv.token = -1;
617       break;
618     }
619   }
620 }
621 
set_request_start_time(std::chrono::high_resolution_clock::time_point time)622 void Downstream::set_request_start_time(
623     std::chrono::high_resolution_clock::time_point time) {
624   request_start_time_ = std::move(time);
625 }
626 
627 const std::chrono::high_resolution_clock::time_point &
get_request_start_time() const628 Downstream::get_request_start_time() const {
629   return request_start_time_;
630 }
631 
reset_upstream(Upstream * upstream)632 void Downstream::reset_upstream(Upstream *upstream) {
633   upstream_ = upstream;
634   if (dconn_) {
635     dconn_->on_upstream_change(upstream);
636   }
637 }
638 
get_upstream() const639 Upstream *Downstream::get_upstream() const { return upstream_; }
640 
set_stream_id(int64_t stream_id)641 void Downstream::set_stream_id(int64_t stream_id) { stream_id_ = stream_id; }
642 
get_stream_id() const643 int64_t Downstream::get_stream_id() const { return stream_id_; }
644 
set_request_state(DownstreamState state)645 void Downstream::set_request_state(DownstreamState state) {
646   request_state_ = state;
647 }
648 
get_request_state() const649 DownstreamState Downstream::get_request_state() const { return request_state_; }
650 
get_chunked_request() const651 bool Downstream::get_chunked_request() const { return chunked_request_; }
652 
set_chunked_request(bool f)653 void Downstream::set_chunked_request(bool f) { chunked_request_ = f; }
654 
request_buf_full()655 bool Downstream::request_buf_full() {
656   auto handler = upstream_->get_client_handler();
657   auto faddr = handler->get_upstream_addr();
658   auto worker = handler->get_worker();
659 
660   // We don't check buffer size here for API endpoint.
661   if (faddr->alt_mode == UpstreamAltMode::API) {
662     return false;
663   }
664 
665   if (dconn_) {
666     auto &downstreamconf = *worker->get_downstream_config();
667     return blocked_request_buf_.rleft() + request_buf_.rleft() >=
668            downstreamconf.request_buffer_size;
669   }
670 
671   return false;
672 }
673 
get_request_buf()674 DefaultMemchunks *Downstream::get_request_buf() { return &request_buf_; }
675 
676 // Call this function after this object is attached to
677 // Downstream. Otherwise, the program will crash.
push_request_headers()678 int Downstream::push_request_headers() {
679   if (!dconn_) {
680     DLOG(INFO, this) << "dconn_ is NULL";
681     return -1;
682   }
683   return dconn_->push_request_headers();
684 }
685 
push_upload_data_chunk(const uint8_t * data,size_t datalen)686 int Downstream::push_upload_data_chunk(const uint8_t *data, size_t datalen) {
687   req_.recv_body_length += datalen;
688 
689   if (!dconn_ && !request_header_sent_) {
690     blocked_request_buf_.append(data, datalen);
691     req_.unconsumed_body_length += datalen;
692     return 0;
693   }
694 
695   // Assumes that request headers have already been pushed to output
696   // buffer using push_request_headers().
697   if (!dconn_) {
698     DLOG(INFO, this) << "dconn_ is NULL";
699     return -1;
700   }
701   if (dconn_->push_upload_data_chunk(data, datalen) != 0) {
702     return -1;
703   }
704 
705   req_.unconsumed_body_length += datalen;
706 
707   return 0;
708 }
709 
end_upload_data()710 int Downstream::end_upload_data() {
711   if (!dconn_ && !request_header_sent_) {
712     blocked_request_data_eof_ = true;
713     return 0;
714   }
715   if (!dconn_) {
716     DLOG(INFO, this) << "dconn_ is NULL";
717     return -1;
718   }
719   return dconn_->end_upload_data();
720 }
721 
rewrite_location_response_header(const StringRef & upstream_scheme)722 void Downstream::rewrite_location_response_header(
723     const StringRef &upstream_scheme) {
724   auto hd = resp_.fs.header(http2::HD_LOCATION);
725   if (!hd) {
726     return;
727   }
728 
729   if (request_downstream_host_.empty() || req_.authority.empty()) {
730     return;
731   }
732 
733   http_parser_url u{};
734   auto rv = http_parser_parse_url(hd->value.data(), hd->value.size(), 0, &u);
735   if (rv != 0) {
736     return;
737   }
738 
739   auto new_uri = http2::rewrite_location_uri(balloc_, hd->value, u,
740                                              request_downstream_host_,
741                                              req_.authority, upstream_scheme);
742 
743   if (new_uri.empty()) {
744     return;
745   }
746 
747   hd->value = new_uri;
748 }
749 
get_chunked_response() const750 bool Downstream::get_chunked_response() const { return chunked_response_; }
751 
set_chunked_response(bool f)752 void Downstream::set_chunked_response(bool f) { chunked_response_ = f; }
753 
on_read()754 int Downstream::on_read() {
755   if (!dconn_) {
756     DLOG(INFO, this) << "dconn_ is NULL";
757     return -1;
758   }
759   return dconn_->on_read();
760 }
761 
set_response_state(DownstreamState state)762 void Downstream::set_response_state(DownstreamState state) {
763   response_state_ = state;
764 }
765 
get_response_state() const766 DownstreamState Downstream::get_response_state() const {
767   return response_state_;
768 }
769 
get_response_buf()770 DefaultMemchunks *Downstream::get_response_buf() { return &response_buf_; }
771 
response_buf_full()772 bool Downstream::response_buf_full() {
773   if (dconn_) {
774     auto handler = upstream_->get_client_handler();
775     auto worker = handler->get_worker();
776     auto &downstreamconf = *worker->get_downstream_config();
777 
778     return response_buf_.rleft() >= downstreamconf.response_buffer_size;
779   }
780 
781   return false;
782 }
783 
validate_request_recv_body_length() const784 bool Downstream::validate_request_recv_body_length() const {
785   if (req_.fs.content_length == -1) {
786     return true;
787   }
788 
789   if (req_.fs.content_length != req_.recv_body_length) {
790     if (LOG_ENABLED(INFO)) {
791       DLOG(INFO, this) << "request invalid bodylen: content-length="
792                        << req_.fs.content_length
793                        << ", received=" << req_.recv_body_length;
794     }
795     return false;
796   }
797 
798   return true;
799 }
800 
validate_response_recv_body_length() const801 bool Downstream::validate_response_recv_body_length() const {
802   if (!expect_response_body() || resp_.fs.content_length == -1) {
803     return true;
804   }
805 
806   if (resp_.fs.content_length != resp_.recv_body_length) {
807     if (LOG_ENABLED(INFO)) {
808       DLOG(INFO, this) << "response invalid bodylen: content-length="
809                        << resp_.fs.content_length
810                        << ", received=" << resp_.recv_body_length;
811     }
812     return false;
813   }
814 
815   return true;
816 }
817 
check_upgrade_fulfilled_http2()818 void Downstream::check_upgrade_fulfilled_http2() {
819   // This handles nonzero req_.connect_proto and h1 frontend requests
820   // WebSocket upgrade.
821   upgraded_ = (req_.method == HTTP_CONNECT ||
822                req_.connect_proto == ConnectProto::WEBSOCKET) &&
823               resp_.http_status / 100 == 2;
824 }
825 
check_upgrade_fulfilled_http1()826 void Downstream::check_upgrade_fulfilled_http1() {
827   if (req_.method == HTTP_CONNECT) {
828     if (req_.connect_proto == ConnectProto::WEBSOCKET) {
829       if (resp_.http_status != 101) {
830         return;
831       }
832 
833       // This is done for HTTP/2 frontend only.
834       auto accept = resp_.fs.header(http2::HD_SEC_WEBSOCKET_ACCEPT);
835       if (!accept) {
836         return;
837       }
838 
839       std::array<uint8_t, base64::encode_length(20)> accept_buf;
840       auto expected =
841           http2::make_websocket_accept_token(accept_buf.data(), ws_key_);
842 
843       upgraded_ = !expected.empty() && expected == accept->value;
844     } else {
845       upgraded_ = resp_.http_status / 100 == 2;
846     }
847 
848     return;
849   }
850 
851   if (resp_.http_status == 101) {
852     // TODO Do more strict checking for upgrade headers
853     upgraded_ = req_.upgrade_request;
854 
855     return;
856   }
857 }
858 
inspect_http2_request()859 void Downstream::inspect_http2_request() {
860   if (req_.method == HTTP_CONNECT) {
861     req_.upgrade_request = true;
862   }
863 }
864 
inspect_http1_request()865 void Downstream::inspect_http1_request() {
866   if (req_.method == HTTP_CONNECT) {
867     req_.upgrade_request = true;
868   } else if (req_.http_minor > 0) {
869     auto upgrade = req_.fs.header(http2::HD_UPGRADE);
870     if (upgrade) {
871       const auto &val = upgrade->value;
872       // TODO Perform more strict checking for upgrade headers
873       if (NGHTTP2_CLEARTEXT_PROTO_VERSION_ID ""_sr == val) {
874         req_.http2_upgrade_seen = true;
875       } else {
876         req_.upgrade_request = true;
877 
878         // TODO Should we check Sec-WebSocket-Key, and
879         // Sec-WebSocket-Version as well?
880         if (util::strieq("websocket"_sr, val)) {
881           req_.connect_proto = ConnectProto::WEBSOCKET;
882         }
883       }
884     }
885   }
886   auto transfer_encoding = req_.fs.header(http2::HD_TRANSFER_ENCODING);
887   if (transfer_encoding) {
888     req_.fs.content_length = -1;
889   }
890 
891   auto expect = req_.fs.header(http2::HD_EXPECT);
892   expect_100_continue_ =
893       expect && util::strieq(expect->value, "100-continue"_sr);
894 }
895 
inspect_http1_response()896 void Downstream::inspect_http1_response() {
897   auto transfer_encoding = resp_.fs.header(http2::HD_TRANSFER_ENCODING);
898   if (transfer_encoding) {
899     resp_.fs.content_length = -1;
900   }
901 }
902 
reset_response()903 void Downstream::reset_response() {
904   resp_.http_status = 0;
905   resp_.http_major = 1;
906   resp_.http_minor = 1;
907 }
908 
get_non_final_response() const909 bool Downstream::get_non_final_response() const {
910   return !upgraded_ && resp_.http_status / 100 == 1;
911 }
912 
supports_non_final_response() const913 bool Downstream::supports_non_final_response() const {
914   return req_.http_major == 3 || req_.http_major == 2 ||
915          (req_.http_major == 1 && req_.http_minor == 1);
916 }
917 
get_upgraded() const918 bool Downstream::get_upgraded() const { return upgraded_; }
919 
get_http2_upgrade_request() const920 bool Downstream::get_http2_upgrade_request() const {
921   return req_.http2_upgrade_seen && req_.fs.header(http2::HD_HTTP2_SETTINGS) &&
922          response_state_ == DownstreamState::INITIAL;
923 }
924 
get_http2_settings() const925 StringRef Downstream::get_http2_settings() const {
926   auto http2_settings = req_.fs.header(http2::HD_HTTP2_SETTINGS);
927   if (!http2_settings) {
928     return StringRef{};
929   }
930   return http2_settings->value;
931 }
932 
set_downstream_stream_id(int64_t stream_id)933 void Downstream::set_downstream_stream_id(int64_t stream_id) {
934   downstream_stream_id_ = stream_id;
935 }
936 
get_downstream_stream_id() const937 int64_t Downstream::get_downstream_stream_id() const {
938   return downstream_stream_id_;
939 }
940 
get_response_rst_stream_error_code() const941 uint32_t Downstream::get_response_rst_stream_error_code() const {
942   return response_rst_stream_error_code_;
943 }
944 
set_response_rst_stream_error_code(uint32_t error_code)945 void Downstream::set_response_rst_stream_error_code(uint32_t error_code) {
946   response_rst_stream_error_code_ = error_code;
947 }
948 
set_expect_final_response(bool f)949 void Downstream::set_expect_final_response(bool f) {
950   expect_final_response_ = f;
951 }
952 
get_expect_final_response() const953 bool Downstream::get_expect_final_response() const {
954   return expect_final_response_;
955 }
956 
expect_response_body() const957 bool Downstream::expect_response_body() const {
958   return !resp_.headers_only &&
959          http2::expect_response_body(req_.method, resp_.http_status);
960 }
961 
expect_response_trailer() const962 bool Downstream::expect_response_trailer() const {
963   // In HTTP/2, if final response HEADERS does not bear END_STREAM it
964   // is possible trailer fields might come, regardless of request
965   // method or status code.
966   return !resp_.headers_only &&
967          (resp_.http_major == 3 || resp_.http_major == 2);
968 }
969 
repeat_header_timer()970 void Downstream::repeat_header_timer() {
971   auto loop = upstream_->get_client_handler()->get_loop();
972 
973   ev_timer_again(loop, &header_timer_);
974 }
975 
stop_header_timer()976 void Downstream::stop_header_timer() {
977   auto loop = upstream_->get_client_handler()->get_loop();
978 
979   ev_timer_stop(loop, &header_timer_);
980 }
981 
982 namespace {
reset_timer(struct ev_loop * loop,ev_timer * w)983 void reset_timer(struct ev_loop *loop, ev_timer *w) { ev_timer_again(loop, w); }
984 } // namespace
985 
986 namespace {
try_reset_timer(struct ev_loop * loop,ev_timer * w)987 void try_reset_timer(struct ev_loop *loop, ev_timer *w) {
988   if (!ev_is_active(w)) {
989     return;
990   }
991   ev_timer_again(loop, w);
992 }
993 } // namespace
994 
995 namespace {
ensure_timer(struct ev_loop * loop,ev_timer * w)996 void ensure_timer(struct ev_loop *loop, ev_timer *w) {
997   if (ev_is_active(w)) {
998     return;
999   }
1000   ev_timer_again(loop, w);
1001 }
1002 } // namespace
1003 
1004 namespace {
disable_timer(struct ev_loop * loop,ev_timer * w)1005 void disable_timer(struct ev_loop *loop, ev_timer *w) {
1006   ev_timer_stop(loop, w);
1007 }
1008 } // namespace
1009 
reset_upstream_rtimer()1010 void Downstream::reset_upstream_rtimer() {
1011   if (get_config()->http2.timeout.stream_read == 0.) {
1012     return;
1013   }
1014   auto loop = upstream_->get_client_handler()->get_loop();
1015   reset_timer(loop, &upstream_rtimer_);
1016 }
1017 
reset_upstream_wtimer()1018 void Downstream::reset_upstream_wtimer() {
1019   auto loop = upstream_->get_client_handler()->get_loop();
1020   auto &timeoutconf = get_config()->http2.timeout;
1021 
1022   if (timeoutconf.stream_write != 0.) {
1023     reset_timer(loop, &upstream_wtimer_);
1024   }
1025   if (timeoutconf.stream_read != 0.) {
1026     try_reset_timer(loop, &upstream_rtimer_);
1027   }
1028 }
1029 
ensure_upstream_wtimer()1030 void Downstream::ensure_upstream_wtimer() {
1031   if (get_config()->http2.timeout.stream_write == 0.) {
1032     return;
1033   }
1034   auto loop = upstream_->get_client_handler()->get_loop();
1035   ensure_timer(loop, &upstream_wtimer_);
1036 }
1037 
disable_upstream_rtimer()1038 void Downstream::disable_upstream_rtimer() {
1039   if (get_config()->http2.timeout.stream_read == 0.) {
1040     return;
1041   }
1042   auto loop = upstream_->get_client_handler()->get_loop();
1043   disable_timer(loop, &upstream_rtimer_);
1044 }
1045 
disable_upstream_wtimer()1046 void Downstream::disable_upstream_wtimer() {
1047   if (get_config()->http2.timeout.stream_write == 0.) {
1048     return;
1049   }
1050   auto loop = upstream_->get_client_handler()->get_loop();
1051   disable_timer(loop, &upstream_wtimer_);
1052 }
1053 
reset_downstream_rtimer()1054 void Downstream::reset_downstream_rtimer() {
1055   if (get_config()->http2.timeout.stream_read == 0.) {
1056     return;
1057   }
1058   auto loop = upstream_->get_client_handler()->get_loop();
1059   reset_timer(loop, &downstream_rtimer_);
1060 }
1061 
reset_downstream_wtimer()1062 void Downstream::reset_downstream_wtimer() {
1063   auto loop = upstream_->get_client_handler()->get_loop();
1064   auto &timeoutconf = get_config()->http2.timeout;
1065 
1066   if (timeoutconf.stream_write != 0.) {
1067     reset_timer(loop, &downstream_wtimer_);
1068   }
1069   if (timeoutconf.stream_read != 0.) {
1070     try_reset_timer(loop, &downstream_rtimer_);
1071   }
1072 }
1073 
ensure_downstream_wtimer()1074 void Downstream::ensure_downstream_wtimer() {
1075   if (get_config()->http2.timeout.stream_write == 0.) {
1076     return;
1077   }
1078   auto loop = upstream_->get_client_handler()->get_loop();
1079   ensure_timer(loop, &downstream_wtimer_);
1080 }
1081 
disable_downstream_rtimer()1082 void Downstream::disable_downstream_rtimer() {
1083   if (get_config()->http2.timeout.stream_read == 0.) {
1084     return;
1085   }
1086   auto loop = upstream_->get_client_handler()->get_loop();
1087   disable_timer(loop, &downstream_rtimer_);
1088 }
1089 
disable_downstream_wtimer()1090 void Downstream::disable_downstream_wtimer() {
1091   if (get_config()->http2.timeout.stream_write == 0.) {
1092     return;
1093   }
1094   auto loop = upstream_->get_client_handler()->get_loop();
1095   disable_timer(loop, &downstream_wtimer_);
1096 }
1097 
accesslog_ready() const1098 bool Downstream::accesslog_ready() const {
1099   return !accesslog_written_ && resp_.http_status > 0;
1100 }
1101 
add_retry()1102 void Downstream::add_retry() { ++num_retry_; }
1103 
no_more_retry() const1104 bool Downstream::no_more_retry() const { return num_retry_ > 50; }
1105 
set_request_downstream_host(const StringRef & host)1106 void Downstream::set_request_downstream_host(const StringRef &host) {
1107   request_downstream_host_ = host;
1108 }
1109 
set_request_pending(bool f)1110 void Downstream::set_request_pending(bool f) { request_pending_ = f; }
1111 
get_request_pending() const1112 bool Downstream::get_request_pending() const { return request_pending_; }
1113 
set_request_header_sent(bool f)1114 void Downstream::set_request_header_sent(bool f) { request_header_sent_ = f; }
1115 
get_request_header_sent() const1116 bool Downstream::get_request_header_sent() const {
1117   return request_header_sent_;
1118 }
1119 
request_submission_ready() const1120 bool Downstream::request_submission_ready() const {
1121   return (request_state_ == DownstreamState::HEADER_COMPLETE ||
1122           request_state_ == DownstreamState::MSG_COMPLETE) &&
1123          (request_pending_ || !request_header_sent_) &&
1124          response_state_ == DownstreamState::INITIAL;
1125 }
1126 
get_dispatch_state() const1127 DispatchState Downstream::get_dispatch_state() const { return dispatch_state_; }
1128 
set_dispatch_state(DispatchState s)1129 void Downstream::set_dispatch_state(DispatchState s) { dispatch_state_ = s; }
1130 
attach_blocked_link(BlockedLink * l)1131 void Downstream::attach_blocked_link(BlockedLink *l) {
1132   assert(!blocked_link_);
1133 
1134   l->downstream = this;
1135   blocked_link_ = l;
1136 }
1137 
detach_blocked_link()1138 BlockedLink *Downstream::detach_blocked_link() {
1139   auto link = blocked_link_;
1140   blocked_link_ = nullptr;
1141   return link;
1142 }
1143 
can_detach_downstream_connection() const1144 bool Downstream::can_detach_downstream_connection() const {
1145   // We should check request and response buffer.  If request buffer
1146   // is not empty, then we might leave downstream connection in weird
1147   // state, especially for HTTP/1.1
1148   return dconn_ && response_state_ == DownstreamState::MSG_COMPLETE &&
1149          request_state_ == DownstreamState::MSG_COMPLETE && !upgraded_ &&
1150          !resp_.connection_close && request_buf_.rleft() == 0;
1151 }
1152 
pop_response_buf()1153 DefaultMemchunks Downstream::pop_response_buf() {
1154   return std::move(response_buf_);
1155 }
1156 
set_assoc_stream_id(int64_t stream_id)1157 void Downstream::set_assoc_stream_id(int64_t stream_id) {
1158   assoc_stream_id_ = stream_id;
1159 }
1160 
get_assoc_stream_id() const1161 int64_t Downstream::get_assoc_stream_id() const { return assoc_stream_id_; }
1162 
get_block_allocator()1163 BlockAllocator &Downstream::get_block_allocator() { return balloc_; }
1164 
add_rcbuf(nghttp2_rcbuf * rcbuf)1165 void Downstream::add_rcbuf(nghttp2_rcbuf *rcbuf) {
1166   nghttp2_rcbuf_incref(rcbuf);
1167   rcbufs_.push_back(rcbuf);
1168 }
1169 
1170 #ifdef ENABLE_HTTP3
add_rcbuf(nghttp3_rcbuf * rcbuf)1171 void Downstream::add_rcbuf(nghttp3_rcbuf *rcbuf) {
1172   nghttp3_rcbuf_incref(rcbuf);
1173   rcbufs3_.push_back(rcbuf);
1174 }
1175 #endif // ENABLE_HTTP3
1176 
set_downstream_addr_group(const std::shared_ptr<DownstreamAddrGroup> & group)1177 void Downstream::set_downstream_addr_group(
1178     const std::shared_ptr<DownstreamAddrGroup> &group) {
1179   group_ = group;
1180 }
1181 
set_addr(const DownstreamAddr * addr)1182 void Downstream::set_addr(const DownstreamAddr *addr) { addr_ = addr; }
1183 
get_addr() const1184 const DownstreamAddr *Downstream::get_addr() const { return addr_; }
1185 
set_accesslog_written(bool f)1186 void Downstream::set_accesslog_written(bool f) { accesslog_written_ = f; }
1187 
renew_affinity_cookie(uint32_t h)1188 void Downstream::renew_affinity_cookie(uint32_t h) {
1189   affinity_cookie_ = h;
1190   new_affinity_cookie_ = true;
1191 }
1192 
get_affinity_cookie_to_send() const1193 uint32_t Downstream::get_affinity_cookie_to_send() const {
1194   if (new_affinity_cookie_) {
1195     return affinity_cookie_;
1196   }
1197   return 0;
1198 }
1199 
get_blocked_request_buf()1200 DefaultMemchunks *Downstream::get_blocked_request_buf() {
1201   return &blocked_request_buf_;
1202 }
1203 
get_blocked_request_data_eof() const1204 bool Downstream::get_blocked_request_data_eof() const {
1205   return blocked_request_data_eof_;
1206 }
1207 
set_blocked_request_data_eof(bool f)1208 void Downstream::set_blocked_request_data_eof(bool f) {
1209   blocked_request_data_eof_ = f;
1210 }
1211 
set_ws_key(const StringRef & key)1212 void Downstream::set_ws_key(const StringRef &key) { ws_key_ = key; }
1213 
get_expect_100_continue() const1214 bool Downstream::get_expect_100_continue() const {
1215   return expect_100_continue_;
1216 }
1217 
get_stop_reading() const1218 bool Downstream::get_stop_reading() const { return stop_reading_; }
1219 
set_stop_reading(bool f)1220 void Downstream::set_stop_reading(bool f) { stop_reading_ = f; }
1221 
1222 } // namespace shrpx
1223