• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * nghttp2 - HTTP/2 C Library
3  *
4  * Copyright (c) 2012 Tatsuhiro Tsujikawa
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining
7  * a copy of this software and associated documentation files (the
8  * "Software"), to deal in the Software without restriction, including
9  * without limitation the rights to use, copy, modify, merge, publish,
10  * distribute, sublicense, and/or sell copies of the Software, and to
11  * permit persons to whom the Software is furnished to do so, subject to
12  * the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be
15  * included in all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21  * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22  * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23  * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24  */
25 #include "shrpx_downstream.h"
26 
27 #include <cassert>
28 
29 #include "url-parser/url_parser.h"
30 
31 #include "shrpx_upstream.h"
32 #include "shrpx_client_handler.h"
33 #include "shrpx_config.h"
34 #include "shrpx_error.h"
35 #include "shrpx_downstream_connection.h"
36 #include "shrpx_downstream_queue.h"
37 #include "shrpx_worker.h"
38 #include "shrpx_http2_session.h"
39 #include "shrpx_log.h"
40 #ifdef HAVE_MRUBY
41 #  include "shrpx_mruby.h"
42 #endif // HAVE_MRUBY
43 #include "util.h"
44 #include "http2.h"
45 
46 namespace shrpx {
47 
48 namespace {
header_timeoutcb(struct ev_loop * loop,ev_timer * w,int revents)49 void header_timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
50   auto downstream = static_cast<Downstream *>(w->data);
51   auto upstream = downstream->get_upstream();
52 
53   if (LOG_ENABLED(INFO)) {
54     DLOG(INFO, downstream) << "request header timeout stream_id="
55                            << downstream->get_stream_id();
56   }
57 
58   downstream->disable_upstream_rtimer();
59   downstream->disable_upstream_wtimer();
60 
61   upstream->on_timeout(downstream);
62 }
63 } // namespace
64 
65 namespace {
upstream_timeoutcb(struct ev_loop * loop,ev_timer * w,int revents)66 void upstream_timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
67   auto downstream = static_cast<Downstream *>(w->data);
68   auto upstream = downstream->get_upstream();
69 
70   auto which = revents == EV_READ ? "read" : "write";
71 
72   if (LOG_ENABLED(INFO)) {
73     DLOG(INFO, downstream) << "upstream timeout stream_id="
74                            << downstream->get_stream_id() << " event=" << which;
75   }
76 
77   downstream->disable_upstream_rtimer();
78   downstream->disable_upstream_wtimer();
79 
80   upstream->on_timeout(downstream);
81 }
82 } // namespace
83 
84 namespace {
upstream_rtimeoutcb(struct ev_loop * loop,ev_timer * w,int revents)85 void upstream_rtimeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
86   upstream_timeoutcb(loop, w, EV_READ);
87 }
88 } // namespace
89 
90 namespace {
upstream_wtimeoutcb(struct ev_loop * loop,ev_timer * w,int revents)91 void upstream_wtimeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
92   upstream_timeoutcb(loop, w, EV_WRITE);
93 }
94 } // namespace
95 
96 namespace {
downstream_timeoutcb(struct ev_loop * loop,ev_timer * w,int revents)97 void downstream_timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
98   auto downstream = static_cast<Downstream *>(w->data);
99 
100   auto which = revents == EV_READ ? "read" : "write";
101 
102   if (LOG_ENABLED(INFO)) {
103     DLOG(INFO, downstream) << "downstream timeout stream_id="
104                            << downstream->get_downstream_stream_id()
105                            << " event=" << which;
106   }
107 
108   downstream->disable_downstream_rtimer();
109   downstream->disable_downstream_wtimer();
110 
111   auto dconn = downstream->get_downstream_connection();
112 
113   if (dconn) {
114     dconn->on_timeout();
115   }
116 }
117 } // namespace
118 
119 namespace {
downstream_rtimeoutcb(struct ev_loop * loop,ev_timer * w,int revents)120 void downstream_rtimeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
121   downstream_timeoutcb(loop, w, EV_READ);
122 }
123 } // namespace
124 
125 namespace {
downstream_wtimeoutcb(struct ev_loop * loop,ev_timer * w,int revents)126 void downstream_wtimeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
127   downstream_timeoutcb(loop, w, EV_WRITE);
128 }
129 } // namespace
130 
131 // upstream could be nullptr for unittests
Downstream(Upstream * upstream,MemchunkPool * mcpool,int64_t stream_id)132 Downstream::Downstream(Upstream *upstream, MemchunkPool *mcpool,
133                        int64_t stream_id)
134   : dlnext(nullptr),
135     dlprev(nullptr),
136     response_sent_body_length(0),
137     balloc_(1024, 1024),
138     req_(balloc_),
139     resp_(balloc_),
140     request_start_time_(std::chrono::high_resolution_clock::now()),
141     blocked_request_buf_(mcpool),
142     request_buf_(mcpool),
143     response_buf_(mcpool),
144     upstream_(upstream),
145     blocked_link_(nullptr),
146     addr_(nullptr),
147     num_retry_(0),
148     stream_id_(stream_id),
149     assoc_stream_id_(-1),
150     downstream_stream_id_(-1),
151     response_rst_stream_error_code_(NGHTTP2_NO_ERROR),
152     affinity_cookie_(0),
153     request_state_(DownstreamState::INITIAL),
154     response_state_(DownstreamState::INITIAL),
155     dispatch_state_(DispatchState::NONE),
156     upgraded_(false),
157     chunked_request_(false),
158     chunked_response_(false),
159     expect_final_response_(false),
160     request_pending_(false),
161     request_header_sent_(false),
162     accesslog_written_(false),
163     new_affinity_cookie_(false),
164     blocked_request_data_eof_(false),
165     expect_100_continue_(false),
166     stop_reading_(false) {
167   auto config = get_config();
168   auto &httpconf = config->http;
169 
170   ev_timer_init(&header_timer_, header_timeoutcb, 0., httpconf.timeout.header);
171 
172   auto &timeoutconf = config->http2.timeout;
173 
174   ev_timer_init(&upstream_rtimer_, &upstream_rtimeoutcb, 0.,
175                 timeoutconf.stream_read);
176   ev_timer_init(&upstream_wtimer_, &upstream_wtimeoutcb, 0.,
177                 timeoutconf.stream_write);
178   ev_timer_init(&downstream_rtimer_, &downstream_rtimeoutcb, 0.,
179                 timeoutconf.stream_read);
180   ev_timer_init(&downstream_wtimer_, &downstream_wtimeoutcb, 0.,
181                 timeoutconf.stream_write);
182 
183   header_timer_.data = this;
184   upstream_rtimer_.data = this;
185   upstream_wtimer_.data = this;
186   downstream_rtimer_.data = this;
187   downstream_wtimer_.data = this;
188 
189   rcbufs_.reserve(32);
190 #ifdef ENABLE_HTTP3
191   rcbufs3_.reserve(32);
192 #endif // ENABLE_HTTP3
193 }
194 
~Downstream()195 Downstream::~Downstream() {
196   if (LOG_ENABLED(INFO)) {
197     DLOG(INFO, this) << "Deleting";
198   }
199 
200   // check nullptr for unittest
201   if (upstream_) {
202     auto loop = upstream_->get_client_handler()->get_loop();
203 
204     ev_timer_stop(loop, &upstream_rtimer_);
205     ev_timer_stop(loop, &upstream_wtimer_);
206     ev_timer_stop(loop, &downstream_rtimer_);
207     ev_timer_stop(loop, &downstream_wtimer_);
208     ev_timer_stop(loop, &header_timer_);
209 
210 #ifdef HAVE_MRUBY
211     auto handler = upstream_->get_client_handler();
212     auto worker = handler->get_worker();
213     auto mruby_ctx = worker->get_mruby_context();
214 
215     mruby_ctx->delete_downstream(this);
216 #endif // HAVE_MRUBY
217   }
218 
219 #ifdef HAVE_MRUBY
220   if (dconn_) {
221     const auto &group = dconn_->get_downstream_addr_group();
222     if (group) {
223       const auto &mruby_ctx = group->shared_addr->mruby_ctx;
224       mruby_ctx->delete_downstream(this);
225     }
226   }
227 #endif // HAVE_MRUBY
228 
229   // DownstreamConnection may refer to this object.  Delete it now
230   // explicitly.
231   dconn_.reset();
232 
233 #ifdef ENABLE_HTTP3
234   for (auto rcbuf : rcbufs3_) {
235     nghttp3_rcbuf_decref(rcbuf);
236   }
237 #endif // ENABLE_HTTP3
238 
239   for (auto rcbuf : rcbufs_) {
240     nghttp2_rcbuf_decref(rcbuf);
241   }
242 
243   if (LOG_ENABLED(INFO)) {
244     DLOG(INFO, this) << "Deleted";
245   }
246 }
247 
attach_downstream_connection(std::unique_ptr<DownstreamConnection> dconn)248 int Downstream::attach_downstream_connection(
249   std::unique_ptr<DownstreamConnection> dconn) {
250   if (dconn->attach_downstream(this) != 0) {
251     return -1;
252   }
253 
254   dconn_ = std::move(dconn);
255 
256   return 0;
257 }
258 
detach_downstream_connection()259 void Downstream::detach_downstream_connection() {
260   if (!dconn_) {
261     return;
262   }
263 
264 #ifdef HAVE_MRUBY
265   const auto &group = dconn_->get_downstream_addr_group();
266   if (group) {
267     const auto &mruby_ctx = group->shared_addr->mruby_ctx;
268     mruby_ctx->delete_downstream(this);
269   }
270 #endif // HAVE_MRUBY
271 
272   dconn_->detach_downstream(this);
273 
274   auto handler = dconn_->get_client_handler();
275 
276   handler->pool_downstream_connection(
277     std::unique_ptr<DownstreamConnection>(dconn_.release()));
278 }
279 
get_downstream_connection()280 DownstreamConnection *Downstream::get_downstream_connection() {
281   return dconn_.get();
282 }
283 
pop_downstream_connection()284 std::unique_ptr<DownstreamConnection> Downstream::pop_downstream_connection() {
285 #ifdef HAVE_MRUBY
286   if (!dconn_) {
287     return nullptr;
288   }
289 
290   const auto &group = dconn_->get_downstream_addr_group();
291   if (group) {
292     const auto &mruby_ctx = group->shared_addr->mruby_ctx;
293     mruby_ctx->delete_downstream(this);
294   }
295 #endif // HAVE_MRUBY
296 
297   return std::unique_ptr<DownstreamConnection>(dconn_.release());
298 }
299 
pause_read(IOCtrlReason reason)300 void Downstream::pause_read(IOCtrlReason reason) {
301   if (dconn_) {
302     dconn_->pause_read(reason);
303   }
304 }
305 
resume_read(IOCtrlReason reason,size_t consumed)306 int Downstream::resume_read(IOCtrlReason reason, size_t consumed) {
307   if (dconn_) {
308     return dconn_->resume_read(reason, consumed);
309   }
310 
311   return 0;
312 }
313 
force_resume_read()314 void Downstream::force_resume_read() {
315   if (dconn_) {
316     dconn_->force_resume_read();
317   }
318 }
319 
320 namespace {
321 const HeaderRefs::value_type *
search_header_linear_backwards(const HeaderRefs & headers,const StringRef & name)322 search_header_linear_backwards(const HeaderRefs &headers,
323                                const StringRef &name) {
324   for (auto it = headers.rbegin(); it != headers.rend(); ++it) {
325     auto &kv = *it;
326     if (kv.name == name) {
327       return &kv;
328     }
329   }
330   return nullptr;
331 }
332 } // namespace
333 
assemble_request_cookie()334 StringRef Downstream::assemble_request_cookie() {
335   size_t len = 0;
336 
337   for (auto &kv : req_.fs.headers()) {
338     if (kv.token != http2::HD_COOKIE || kv.value.empty()) {
339       continue;
340     }
341 
342     len += kv.value.size() + str_size("; ");
343   }
344 
345   auto iov = make_byte_ref(balloc_, len + 1);
346   auto p = std::begin(iov);
347 
348   for (auto &kv : req_.fs.headers()) {
349     if (kv.token != http2::HD_COOKIE || kv.value.empty()) {
350       continue;
351     }
352 
353     auto end = std::end(kv.value);
354     for (auto it = std::begin(kv.value) + kv.value.size();
355          it != std::begin(kv.value); --it) {
356       auto c = *(it - 1);
357       if (c == ' ' || c == ';') {
358         continue;
359       }
360       end = it;
361       break;
362     }
363 
364     p = std::copy(std::begin(kv.value), end, p);
365     p = util::copy_lit(p, "; ");
366   }
367 
368   // cut trailing "; "
369   if (p - std::begin(iov) >= 2) {
370     p -= 2;
371   }
372 
373   return StringRef{std::span{std::begin(iov), p}};
374 }
375 
find_affinity_cookie(const StringRef & name)376 uint32_t Downstream::find_affinity_cookie(const StringRef &name) {
377   for (auto &kv : req_.fs.headers()) {
378     if (kv.token != http2::HD_COOKIE) {
379       continue;
380     }
381 
382     for (auto it = std::begin(kv.value); it != std::end(kv.value);) {
383       if (*it == '\t' || *it == ' ' || *it == ';') {
384         ++it;
385         continue;
386       }
387 
388       auto end = std::find(it, std::end(kv.value), '=');
389       if (end == std::end(kv.value)) {
390         return 0;
391       }
392 
393       if (name != StringRef{it, end}) {
394         it = std::find(it, std::end(kv.value), ';');
395         continue;
396       }
397 
398       it = std::find(end + 1, std::end(kv.value), ';');
399       auto val = StringRef{end + 1, it};
400       if (val.size() != 8) {
401         return 0;
402       }
403       uint32_t h = 0;
404       for (auto c : val) {
405         auto n = util::hex_to_uint(c);
406         if (n == 256) {
407           return 0;
408         }
409         h <<= 4;
410         h += n;
411       }
412       affinity_cookie_ = h;
413       return h;
414     }
415   }
416   return 0;
417 }
418 
count_crumble_request_cookie()419 size_t Downstream::count_crumble_request_cookie() {
420   size_t n = 0;
421   for (auto &kv : req_.fs.headers()) {
422     if (kv.token != http2::HD_COOKIE) {
423       continue;
424     }
425 
426     for (auto it = std::begin(kv.value); it != std::end(kv.value);) {
427       if (*it == '\t' || *it == ' ' || *it == ';') {
428         ++it;
429         continue;
430       }
431 
432       it = std::find(it, std::end(kv.value), ';');
433 
434       ++n;
435     }
436   }
437   return n;
438 }
439 
crumble_request_cookie(std::vector<nghttp2_nv> & nva)440 void Downstream::crumble_request_cookie(std::vector<nghttp2_nv> &nva) {
441   for (auto &kv : req_.fs.headers()) {
442     if (kv.token != http2::HD_COOKIE) {
443       continue;
444     }
445 
446     for (auto it = std::begin(kv.value); it != std::end(kv.value);) {
447       if (*it == '\t' || *it == ' ' || *it == ';') {
448         ++it;
449         continue;
450       }
451 
452       auto first = it;
453 
454       it = std::find(it, std::end(kv.value), ';');
455 
456       nva.push_back({(uint8_t *)"cookie", (uint8_t *)first, str_size("cookie"),
457                      (size_t)(it - first),
458                      (uint8_t)(NGHTTP2_NV_FLAG_NO_COPY_NAME |
459                                NGHTTP2_NV_FLAG_NO_COPY_VALUE |
460                                (kv.no_index ? NGHTTP2_NV_FLAG_NO_INDEX : 0))});
461     }
462   }
463 }
464 
465 namespace {
add_header(size_t & sum,HeaderRefs & headers,const StringRef & name,const StringRef & value,bool no_index,int32_t token)466 void add_header(size_t &sum, HeaderRefs &headers, const StringRef &name,
467                 const StringRef &value, bool no_index, int32_t token) {
468   sum += name.size() + value.size();
469   headers.emplace_back(name, value, no_index, token);
470 }
471 } // namespace
472 
473 namespace {
alloc_header_name(BlockAllocator & balloc,const StringRef & name)474 StringRef alloc_header_name(BlockAllocator &balloc, const StringRef &name) {
475   auto iov = make_byte_ref(balloc, name.size() + 1);
476   auto p = std::copy(std::begin(name), std::end(name), std::begin(iov));
477   util::inp_strlower(std::begin(iov), p);
478   *p = '\0';
479 
480   return StringRef{std::span{std::begin(iov), p}};
481 }
482 } // namespace
483 
484 namespace {
append_last_header_key(BlockAllocator & balloc,bool & key_prev,size_t & sum,HeaderRefs & headers,const char * data,size_t len)485 void append_last_header_key(BlockAllocator &balloc, bool &key_prev, size_t &sum,
486                             HeaderRefs &headers, const char *data, size_t len) {
487   assert(key_prev);
488   sum += len;
489   auto &item = headers.back();
490   auto name =
491     realloc_concat_string_ref(balloc, item.name, StringRef{data, len});
492 
493   auto p = const_cast<uint8_t *>(name.byte());
494   util::inp_strlower(p + name.size() - len, p + name.size());
495 
496   item.name = name;
497   item.token = http2::lookup_token(item.name);
498 }
499 } // namespace
500 
501 namespace {
append_last_header_value(BlockAllocator & balloc,bool & key_prev,size_t & sum,HeaderRefs & headers,const char * data,size_t len)502 void append_last_header_value(BlockAllocator &balloc, bool &key_prev,
503                               size_t &sum, HeaderRefs &headers,
504                               const char *data, size_t len) {
505   key_prev = false;
506   sum += len;
507   auto &item = headers.back();
508   item.value =
509     realloc_concat_string_ref(balloc, item.value, StringRef{data, len});
510 }
511 } // namespace
512 
parse_content_length()513 int FieldStore::parse_content_length() {
514   content_length = -1;
515 
516   for (auto &kv : headers_) {
517     if (kv.token != http2::HD_CONTENT_LENGTH) {
518       continue;
519     }
520 
521     auto len = util::parse_uint(kv.value);
522     if (!len) {
523       return -1;
524     }
525     if (content_length != -1) {
526       return -1;
527     }
528     content_length = *len;
529   }
530   return 0;
531 }
532 
header(int32_t token) const533 const HeaderRefs::value_type *FieldStore::header(int32_t token) const {
534   for (auto it = headers_.rbegin(); it != headers_.rend(); ++it) {
535     auto &kv = *it;
536     if (kv.token == token) {
537       return &kv;
538     }
539   }
540   return nullptr;
541 }
542 
header(int32_t token)543 HeaderRefs::value_type *FieldStore::header(int32_t token) {
544   for (auto it = headers_.rbegin(); it != headers_.rend(); ++it) {
545     auto &kv = *it;
546     if (kv.token == token) {
547       return &kv;
548     }
549   }
550   return nullptr;
551 }
552 
header(const StringRef & name) const553 const HeaderRefs::value_type *FieldStore::header(const StringRef &name) const {
554   return search_header_linear_backwards(headers_, name);
555 }
556 
add_header_token(const StringRef & name,const StringRef & value,bool no_index,int32_t token)557 void FieldStore::add_header_token(const StringRef &name, const StringRef &value,
558                                   bool no_index, int32_t token) {
559   shrpx::add_header(buffer_size_, headers_, name, value, no_index, token);
560 }
561 
alloc_add_header_name(const StringRef & name)562 void FieldStore::alloc_add_header_name(const StringRef &name) {
563   auto name_ref = alloc_header_name(balloc_, name);
564   auto token = http2::lookup_token(name_ref);
565   add_header_token(name_ref, StringRef{}, false, token);
566   header_key_prev_ = true;
567 }
568 
append_last_header_key(const char * data,size_t len)569 void FieldStore::append_last_header_key(const char *data, size_t len) {
570   shrpx::append_last_header_key(balloc_, header_key_prev_, buffer_size_,
571                                 headers_, data, len);
572 }
573 
append_last_header_value(const char * data,size_t len)574 void FieldStore::append_last_header_value(const char *data, size_t len) {
575   shrpx::append_last_header_value(balloc_, header_key_prev_, buffer_size_,
576                                   headers_, data, len);
577 }
578 
clear_headers()579 void FieldStore::clear_headers() {
580   headers_.clear();
581   header_key_prev_ = false;
582 }
583 
add_trailer_token(const StringRef & name,const StringRef & value,bool no_index,int32_t token)584 void FieldStore::add_trailer_token(const StringRef &name,
585                                    const StringRef &value, bool no_index,
586                                    int32_t token) {
587   // Header size limit should be applied to all header and trailer
588   // fields combined.
589   shrpx::add_header(buffer_size_, trailers_, name, value, no_index, token);
590 }
591 
alloc_add_trailer_name(const StringRef & name)592 void FieldStore::alloc_add_trailer_name(const StringRef &name) {
593   auto name_ref = alloc_header_name(balloc_, name);
594   auto token = http2::lookup_token(name_ref);
595   add_trailer_token(name_ref, StringRef{}, false, token);
596   trailer_key_prev_ = true;
597 }
598 
append_last_trailer_key(const char * data,size_t len)599 void FieldStore::append_last_trailer_key(const char *data, size_t len) {
600   shrpx::append_last_header_key(balloc_, trailer_key_prev_, buffer_size_,
601                                 trailers_, data, len);
602 }
603 
append_last_trailer_value(const char * data,size_t len)604 void FieldStore::append_last_trailer_value(const char *data, size_t len) {
605   shrpx::append_last_header_value(balloc_, trailer_key_prev_, buffer_size_,
606                                   trailers_, data, len);
607 }
608 
erase_content_length_and_transfer_encoding()609 void FieldStore::erase_content_length_and_transfer_encoding() {
610   for (auto &kv : headers_) {
611     switch (kv.token) {
612     case http2::HD_CONTENT_LENGTH:
613     case http2::HD_TRANSFER_ENCODING:
614       kv.name = StringRef{};
615       kv.token = -1;
616       break;
617     }
618   }
619 }
620 
set_request_start_time(std::chrono::high_resolution_clock::time_point time)621 void Downstream::set_request_start_time(
622   std::chrono::high_resolution_clock::time_point time) {
623   request_start_time_ = std::move(time);
624 }
625 
626 const std::chrono::high_resolution_clock::time_point &
get_request_start_time() const627 Downstream::get_request_start_time() const {
628   return request_start_time_;
629 }
630 
reset_upstream(Upstream * upstream)631 void Downstream::reset_upstream(Upstream *upstream) {
632   upstream_ = upstream;
633   if (dconn_) {
634     dconn_->on_upstream_change(upstream);
635   }
636 }
637 
get_upstream() const638 Upstream *Downstream::get_upstream() const { return upstream_; }
639 
set_stream_id(int64_t stream_id)640 void Downstream::set_stream_id(int64_t stream_id) { stream_id_ = stream_id; }
641 
get_stream_id() const642 int64_t Downstream::get_stream_id() const { return stream_id_; }
643 
set_request_state(DownstreamState state)644 void Downstream::set_request_state(DownstreamState state) {
645   request_state_ = state;
646 }
647 
get_request_state() const648 DownstreamState Downstream::get_request_state() const { return request_state_; }
649 
get_chunked_request() const650 bool Downstream::get_chunked_request() const { return chunked_request_; }
651 
set_chunked_request(bool f)652 void Downstream::set_chunked_request(bool f) { chunked_request_ = f; }
653 
request_buf_full()654 bool Downstream::request_buf_full() {
655   auto handler = upstream_->get_client_handler();
656   auto faddr = handler->get_upstream_addr();
657   auto worker = handler->get_worker();
658 
659   // We don't check buffer size here for API endpoint.
660   if (faddr->alt_mode == UpstreamAltMode::API) {
661     return false;
662   }
663 
664   if (dconn_) {
665     auto &downstreamconf = *worker->get_downstream_config();
666     return blocked_request_buf_.rleft() + request_buf_.rleft() >=
667            downstreamconf.request_buffer_size;
668   }
669 
670   return false;
671 }
672 
get_request_buf()673 DefaultMemchunks *Downstream::get_request_buf() { return &request_buf_; }
674 
675 // Call this function after this object is attached to
676 // Downstream. Otherwise, the program will crash.
push_request_headers()677 int Downstream::push_request_headers() {
678   if (!dconn_) {
679     DLOG(INFO, this) << "dconn_ is NULL";
680     return -1;
681   }
682   return dconn_->push_request_headers();
683 }
684 
push_upload_data_chunk(const uint8_t * data,size_t datalen)685 int Downstream::push_upload_data_chunk(const uint8_t *data, size_t datalen) {
686   req_.recv_body_length += datalen;
687 
688   if (!dconn_ && !request_header_sent_) {
689     blocked_request_buf_.append(data, datalen);
690     req_.unconsumed_body_length += datalen;
691     return 0;
692   }
693 
694   // Assumes that request headers have already been pushed to output
695   // buffer using push_request_headers().
696   if (!dconn_) {
697     DLOG(INFO, this) << "dconn_ is NULL";
698     return -1;
699   }
700   if (dconn_->push_upload_data_chunk(data, datalen) != 0) {
701     return -1;
702   }
703 
704   req_.unconsumed_body_length += datalen;
705 
706   return 0;
707 }
708 
end_upload_data()709 int Downstream::end_upload_data() {
710   if (!dconn_ && !request_header_sent_) {
711     blocked_request_data_eof_ = true;
712     return 0;
713   }
714   if (!dconn_) {
715     DLOG(INFO, this) << "dconn_ is NULL";
716     return -1;
717   }
718   return dconn_->end_upload_data();
719 }
720 
rewrite_location_response_header(const StringRef & upstream_scheme)721 void Downstream::rewrite_location_response_header(
722   const StringRef &upstream_scheme) {
723   auto hd = resp_.fs.header(http2::HD_LOCATION);
724   if (!hd) {
725     return;
726   }
727 
728   if (request_downstream_host_.empty() || req_.authority.empty()) {
729     return;
730   }
731 
732   http_parser_url u{};
733   auto rv = http_parser_parse_url(hd->value.data(), hd->value.size(), 0, &u);
734   if (rv != 0) {
735     return;
736   }
737 
738   auto new_uri =
739     http2::rewrite_location_uri(balloc_, hd->value, u, request_downstream_host_,
740                                 req_.authority, upstream_scheme);
741 
742   if (new_uri.empty()) {
743     return;
744   }
745 
746   hd->value = new_uri;
747 }
748 
get_chunked_response() const749 bool Downstream::get_chunked_response() const { return chunked_response_; }
750 
set_chunked_response(bool f)751 void Downstream::set_chunked_response(bool f) { chunked_response_ = f; }
752 
on_read()753 int Downstream::on_read() {
754   if (!dconn_) {
755     DLOG(INFO, this) << "dconn_ is NULL";
756     return -1;
757   }
758   return dconn_->on_read();
759 }
760 
set_response_state(DownstreamState state)761 void Downstream::set_response_state(DownstreamState state) {
762   response_state_ = state;
763 }
764 
get_response_state() const765 DownstreamState Downstream::get_response_state() const {
766   return response_state_;
767 }
768 
get_response_buf()769 DefaultMemchunks *Downstream::get_response_buf() { return &response_buf_; }
770 
response_buf_full()771 bool Downstream::response_buf_full() {
772   if (dconn_) {
773     auto handler = upstream_->get_client_handler();
774     auto worker = handler->get_worker();
775     auto &downstreamconf = *worker->get_downstream_config();
776 
777     return response_buf_.rleft() >= downstreamconf.response_buffer_size;
778   }
779 
780   return false;
781 }
782 
validate_request_recv_body_length() const783 bool Downstream::validate_request_recv_body_length() const {
784   if (req_.fs.content_length == -1) {
785     return true;
786   }
787 
788   if (req_.fs.content_length != req_.recv_body_length) {
789     if (LOG_ENABLED(INFO)) {
790       DLOG(INFO, this) << "request invalid bodylen: content-length="
791                        << req_.fs.content_length
792                        << ", received=" << req_.recv_body_length;
793     }
794     return false;
795   }
796 
797   return true;
798 }
799 
validate_response_recv_body_length() const800 bool Downstream::validate_response_recv_body_length() const {
801   if (!expect_response_body() || resp_.fs.content_length == -1) {
802     return true;
803   }
804 
805   if (resp_.fs.content_length != resp_.recv_body_length) {
806     if (LOG_ENABLED(INFO)) {
807       DLOG(INFO, this) << "response invalid bodylen: content-length="
808                        << resp_.fs.content_length
809                        << ", received=" << resp_.recv_body_length;
810     }
811     return false;
812   }
813 
814   return true;
815 }
816 
check_upgrade_fulfilled_http2()817 void Downstream::check_upgrade_fulfilled_http2() {
818   // This handles nonzero req_.connect_proto and h1 frontend requests
819   // WebSocket upgrade.
820   upgraded_ = (req_.method == HTTP_CONNECT ||
821                req_.connect_proto == ConnectProto::WEBSOCKET) &&
822               resp_.http_status / 100 == 2;
823 }
824 
check_upgrade_fulfilled_http1()825 void Downstream::check_upgrade_fulfilled_http1() {
826   if (req_.method == HTTP_CONNECT) {
827     if (req_.connect_proto == ConnectProto::WEBSOCKET) {
828       if (resp_.http_status != 101) {
829         return;
830       }
831 
832       // This is done for HTTP/2 frontend only.
833       auto accept = resp_.fs.header(http2::HD_SEC_WEBSOCKET_ACCEPT);
834       if (!accept) {
835         return;
836       }
837 
838       std::array<uint8_t, base64::encode_length(20)> accept_buf;
839       auto expected =
840         http2::make_websocket_accept_token(accept_buf.data(), ws_key_);
841 
842       upgraded_ = !expected.empty() && expected == accept->value;
843     } else {
844       upgraded_ = resp_.http_status / 100 == 2;
845     }
846 
847     return;
848   }
849 
850   if (resp_.http_status == 101) {
851     // TODO Do more strict checking for upgrade headers
852     upgraded_ = req_.upgrade_request;
853 
854     return;
855   }
856 }
857 
inspect_http2_request()858 void Downstream::inspect_http2_request() {
859   if (req_.method == HTTP_CONNECT) {
860     req_.upgrade_request = true;
861   }
862 }
863 
inspect_http1_request()864 void Downstream::inspect_http1_request() {
865   if (req_.method == HTTP_CONNECT) {
866     req_.upgrade_request = true;
867   } else if (req_.http_minor > 0) {
868     auto upgrade = req_.fs.header(http2::HD_UPGRADE);
869     if (upgrade) {
870       const auto &val = upgrade->value;
871       // TODO Perform more strict checking for upgrade headers
872       if (NGHTTP2_CLEARTEXT_PROTO_VERSION_ID ""_sr == val) {
873         req_.http2_upgrade_seen = true;
874       } else {
875         req_.upgrade_request = true;
876 
877         // TODO Should we check Sec-WebSocket-Key, and
878         // Sec-WebSocket-Version as well?
879         if (util::strieq("websocket"_sr, val)) {
880           req_.connect_proto = ConnectProto::WEBSOCKET;
881         }
882       }
883     }
884   }
885   auto transfer_encoding = req_.fs.header(http2::HD_TRANSFER_ENCODING);
886   if (transfer_encoding) {
887     req_.fs.content_length = -1;
888   }
889 
890   auto expect = req_.fs.header(http2::HD_EXPECT);
891   expect_100_continue_ =
892     expect && util::strieq(expect->value, "100-continue"_sr);
893 }
894 
inspect_http1_response()895 void Downstream::inspect_http1_response() {
896   auto transfer_encoding = resp_.fs.header(http2::HD_TRANSFER_ENCODING);
897   if (transfer_encoding) {
898     resp_.fs.content_length = -1;
899   }
900 }
901 
reset_response()902 void Downstream::reset_response() {
903   resp_.http_status = 0;
904   resp_.http_major = 1;
905   resp_.http_minor = 1;
906 }
907 
get_non_final_response() const908 bool Downstream::get_non_final_response() const {
909   return !upgraded_ && resp_.http_status / 100 == 1;
910 }
911 
supports_non_final_response() const912 bool Downstream::supports_non_final_response() const {
913   return req_.http_major == 3 || req_.http_major == 2 ||
914          (req_.http_major == 1 && req_.http_minor == 1);
915 }
916 
get_upgraded() const917 bool Downstream::get_upgraded() const { return upgraded_; }
918 
get_http2_upgrade_request() const919 bool Downstream::get_http2_upgrade_request() const {
920   return req_.http2_upgrade_seen && req_.fs.header(http2::HD_HTTP2_SETTINGS) &&
921          response_state_ == DownstreamState::INITIAL;
922 }
923 
get_http2_settings() const924 StringRef Downstream::get_http2_settings() const {
925   auto http2_settings = req_.fs.header(http2::HD_HTTP2_SETTINGS);
926   if (!http2_settings) {
927     return StringRef{};
928   }
929   return http2_settings->value;
930 }
931 
set_downstream_stream_id(int64_t stream_id)932 void Downstream::set_downstream_stream_id(int64_t stream_id) {
933   downstream_stream_id_ = stream_id;
934 }
935 
get_downstream_stream_id() const936 int64_t Downstream::get_downstream_stream_id() const {
937   return downstream_stream_id_;
938 }
939 
get_response_rst_stream_error_code() const940 uint32_t Downstream::get_response_rst_stream_error_code() const {
941   return response_rst_stream_error_code_;
942 }
943 
set_response_rst_stream_error_code(uint32_t error_code)944 void Downstream::set_response_rst_stream_error_code(uint32_t error_code) {
945   response_rst_stream_error_code_ = error_code;
946 }
947 
set_expect_final_response(bool f)948 void Downstream::set_expect_final_response(bool f) {
949   expect_final_response_ = f;
950 }
951 
get_expect_final_response() const952 bool Downstream::get_expect_final_response() const {
953   return expect_final_response_;
954 }
955 
expect_response_body() const956 bool Downstream::expect_response_body() const {
957   return !resp_.headers_only &&
958          http2::expect_response_body(req_.method, resp_.http_status);
959 }
960 
expect_response_trailer() const961 bool Downstream::expect_response_trailer() const {
962   // In HTTP/2, if final response HEADERS does not bear END_STREAM it
963   // is possible trailer fields might come, regardless of request
964   // method or status code.
965   return !resp_.headers_only &&
966          (resp_.http_major == 3 || resp_.http_major == 2);
967 }
968 
repeat_header_timer()969 void Downstream::repeat_header_timer() {
970   auto loop = upstream_->get_client_handler()->get_loop();
971 
972   ev_timer_again(loop, &header_timer_);
973 }
974 
stop_header_timer()975 void Downstream::stop_header_timer() {
976   auto loop = upstream_->get_client_handler()->get_loop();
977 
978   ev_timer_stop(loop, &header_timer_);
979 }
980 
981 namespace {
reset_timer(struct ev_loop * loop,ev_timer * w)982 void reset_timer(struct ev_loop *loop, ev_timer *w) { ev_timer_again(loop, w); }
983 } // namespace
984 
985 namespace {
try_reset_timer(struct ev_loop * loop,ev_timer * w)986 void try_reset_timer(struct ev_loop *loop, ev_timer *w) {
987   if (!ev_is_active(w)) {
988     return;
989   }
990   ev_timer_again(loop, w);
991 }
992 } // namespace
993 
994 namespace {
ensure_timer(struct ev_loop * loop,ev_timer * w)995 void ensure_timer(struct ev_loop *loop, ev_timer *w) {
996   if (ev_is_active(w)) {
997     return;
998   }
999   ev_timer_again(loop, w);
1000 }
1001 } // namespace
1002 
1003 namespace {
disable_timer(struct ev_loop * loop,ev_timer * w)1004 void disable_timer(struct ev_loop *loop, ev_timer *w) {
1005   ev_timer_stop(loop, w);
1006 }
1007 } // namespace
1008 
reset_upstream_rtimer()1009 void Downstream::reset_upstream_rtimer() {
1010   if (get_config()->http2.timeout.stream_read == 0.) {
1011     return;
1012   }
1013   auto loop = upstream_->get_client_handler()->get_loop();
1014   reset_timer(loop, &upstream_rtimer_);
1015 }
1016 
reset_upstream_wtimer()1017 void Downstream::reset_upstream_wtimer() {
1018   auto loop = upstream_->get_client_handler()->get_loop();
1019   auto &timeoutconf = get_config()->http2.timeout;
1020 
1021   if (timeoutconf.stream_write != 0.) {
1022     reset_timer(loop, &upstream_wtimer_);
1023   }
1024   if (timeoutconf.stream_read != 0.) {
1025     try_reset_timer(loop, &upstream_rtimer_);
1026   }
1027 }
1028 
ensure_upstream_wtimer()1029 void Downstream::ensure_upstream_wtimer() {
1030   if (get_config()->http2.timeout.stream_write == 0.) {
1031     return;
1032   }
1033   auto loop = upstream_->get_client_handler()->get_loop();
1034   ensure_timer(loop, &upstream_wtimer_);
1035 }
1036 
disable_upstream_rtimer()1037 void Downstream::disable_upstream_rtimer() {
1038   if (get_config()->http2.timeout.stream_read == 0.) {
1039     return;
1040   }
1041   auto loop = upstream_->get_client_handler()->get_loop();
1042   disable_timer(loop, &upstream_rtimer_);
1043 }
1044 
disable_upstream_wtimer()1045 void Downstream::disable_upstream_wtimer() {
1046   if (get_config()->http2.timeout.stream_write == 0.) {
1047     return;
1048   }
1049   auto loop = upstream_->get_client_handler()->get_loop();
1050   disable_timer(loop, &upstream_wtimer_);
1051 }
1052 
reset_downstream_rtimer()1053 void Downstream::reset_downstream_rtimer() {
1054   if (get_config()->http2.timeout.stream_read == 0.) {
1055     return;
1056   }
1057   auto loop = upstream_->get_client_handler()->get_loop();
1058   reset_timer(loop, &downstream_rtimer_);
1059 }
1060 
reset_downstream_wtimer()1061 void Downstream::reset_downstream_wtimer() {
1062   auto loop = upstream_->get_client_handler()->get_loop();
1063   auto &timeoutconf = get_config()->http2.timeout;
1064 
1065   if (timeoutconf.stream_write != 0.) {
1066     reset_timer(loop, &downstream_wtimer_);
1067   }
1068   if (timeoutconf.stream_read != 0.) {
1069     try_reset_timer(loop, &downstream_rtimer_);
1070   }
1071 }
1072 
ensure_downstream_wtimer()1073 void Downstream::ensure_downstream_wtimer() {
1074   if (get_config()->http2.timeout.stream_write == 0.) {
1075     return;
1076   }
1077   auto loop = upstream_->get_client_handler()->get_loop();
1078   ensure_timer(loop, &downstream_wtimer_);
1079 }
1080 
disable_downstream_rtimer()1081 void Downstream::disable_downstream_rtimer() {
1082   if (get_config()->http2.timeout.stream_read == 0.) {
1083     return;
1084   }
1085   auto loop = upstream_->get_client_handler()->get_loop();
1086   disable_timer(loop, &downstream_rtimer_);
1087 }
1088 
disable_downstream_wtimer()1089 void Downstream::disable_downstream_wtimer() {
1090   if (get_config()->http2.timeout.stream_write == 0.) {
1091     return;
1092   }
1093   auto loop = upstream_->get_client_handler()->get_loop();
1094   disable_timer(loop, &downstream_wtimer_);
1095 }
1096 
accesslog_ready() const1097 bool Downstream::accesslog_ready() const {
1098   return !accesslog_written_ && resp_.http_status > 0;
1099 }
1100 
add_retry()1101 void Downstream::add_retry() { ++num_retry_; }
1102 
no_more_retry() const1103 bool Downstream::no_more_retry() const { return num_retry_ > 50; }
1104 
set_request_downstream_host(const StringRef & host)1105 void Downstream::set_request_downstream_host(const StringRef &host) {
1106   request_downstream_host_ = host;
1107 }
1108 
set_request_pending(bool f)1109 void Downstream::set_request_pending(bool f) { request_pending_ = f; }
1110 
get_request_pending() const1111 bool Downstream::get_request_pending() const { return request_pending_; }
1112 
set_request_header_sent(bool f)1113 void Downstream::set_request_header_sent(bool f) { request_header_sent_ = f; }
1114 
get_request_header_sent() const1115 bool Downstream::get_request_header_sent() const {
1116   return request_header_sent_;
1117 }
1118 
request_submission_ready() const1119 bool Downstream::request_submission_ready() const {
1120   return (request_state_ == DownstreamState::HEADER_COMPLETE ||
1121           request_state_ == DownstreamState::MSG_COMPLETE) &&
1122          (request_pending_ || !request_header_sent_) &&
1123          response_state_ == DownstreamState::INITIAL;
1124 }
1125 
get_dispatch_state() const1126 DispatchState Downstream::get_dispatch_state() const { return dispatch_state_; }
1127 
set_dispatch_state(DispatchState s)1128 void Downstream::set_dispatch_state(DispatchState s) { dispatch_state_ = s; }
1129 
attach_blocked_link(BlockedLink * l)1130 void Downstream::attach_blocked_link(BlockedLink *l) {
1131   assert(!blocked_link_);
1132 
1133   l->downstream = this;
1134   blocked_link_ = l;
1135 }
1136 
detach_blocked_link()1137 BlockedLink *Downstream::detach_blocked_link() {
1138   auto link = blocked_link_;
1139   blocked_link_ = nullptr;
1140   return link;
1141 }
1142 
can_detach_downstream_connection() const1143 bool Downstream::can_detach_downstream_connection() const {
1144   // We should check request and response buffer.  If request buffer
1145   // is not empty, then we might leave downstream connection in weird
1146   // state, especially for HTTP/1.1
1147   return dconn_ && response_state_ == DownstreamState::MSG_COMPLETE &&
1148          request_state_ == DownstreamState::MSG_COMPLETE && !upgraded_ &&
1149          !resp_.connection_close && request_buf_.rleft() == 0;
1150 }
1151 
pop_response_buf()1152 DefaultMemchunks Downstream::pop_response_buf() {
1153   return std::move(response_buf_);
1154 }
1155 
set_assoc_stream_id(int64_t stream_id)1156 void Downstream::set_assoc_stream_id(int64_t stream_id) {
1157   assoc_stream_id_ = stream_id;
1158 }
1159 
get_assoc_stream_id() const1160 int64_t Downstream::get_assoc_stream_id() const { return assoc_stream_id_; }
1161 
get_block_allocator()1162 BlockAllocator &Downstream::get_block_allocator() { return balloc_; }
1163 
add_rcbuf(nghttp2_rcbuf * rcbuf)1164 void Downstream::add_rcbuf(nghttp2_rcbuf *rcbuf) {
1165   nghttp2_rcbuf_incref(rcbuf);
1166   rcbufs_.push_back(rcbuf);
1167 }
1168 
1169 #ifdef ENABLE_HTTP3
add_rcbuf(nghttp3_rcbuf * rcbuf)1170 void Downstream::add_rcbuf(nghttp3_rcbuf *rcbuf) {
1171   nghttp3_rcbuf_incref(rcbuf);
1172   rcbufs3_.push_back(rcbuf);
1173 }
1174 #endif // ENABLE_HTTP3
1175 
set_downstream_addr_group(const std::shared_ptr<DownstreamAddrGroup> & group)1176 void Downstream::set_downstream_addr_group(
1177   const std::shared_ptr<DownstreamAddrGroup> &group) {
1178   group_ = group;
1179 }
1180 
set_addr(const DownstreamAddr * addr)1181 void Downstream::set_addr(const DownstreamAddr *addr) { addr_ = addr; }
1182 
get_addr() const1183 const DownstreamAddr *Downstream::get_addr() const { return addr_; }
1184 
set_accesslog_written(bool f)1185 void Downstream::set_accesslog_written(bool f) { accesslog_written_ = f; }
1186 
renew_affinity_cookie(uint32_t h)1187 void Downstream::renew_affinity_cookie(uint32_t h) {
1188   affinity_cookie_ = h;
1189   new_affinity_cookie_ = true;
1190 }
1191 
get_affinity_cookie_to_send() const1192 uint32_t Downstream::get_affinity_cookie_to_send() const {
1193   if (new_affinity_cookie_) {
1194     return affinity_cookie_;
1195   }
1196   return 0;
1197 }
1198 
get_blocked_request_buf()1199 DefaultMemchunks *Downstream::get_blocked_request_buf() {
1200   return &blocked_request_buf_;
1201 }
1202 
get_blocked_request_data_eof() const1203 bool Downstream::get_blocked_request_data_eof() const {
1204   return blocked_request_data_eof_;
1205 }
1206 
set_blocked_request_data_eof(bool f)1207 void Downstream::set_blocked_request_data_eof(bool f) {
1208   blocked_request_data_eof_ = f;
1209 }
1210 
set_ws_key(const StringRef & key)1211 void Downstream::set_ws_key(const StringRef &key) { ws_key_ = key; }
1212 
get_expect_100_continue() const1213 bool Downstream::get_expect_100_continue() const {
1214   return expect_100_continue_;
1215 }
1216 
get_stop_reading() const1217 bool Downstream::get_stop_reading() const { return stop_reading_; }
1218 
set_stop_reading(bool f)1219 void Downstream::set_stop_reading(bool f) { stop_reading_ = f; }
1220 
1221 } // namespace shrpx
1222