• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * nghttp2 - HTTP/2 C Library
3  *
4  * Copyright (c) 2012 Tatsuhiro Tsujikawa
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining
7  * a copy of this software and associated documentation files (the
8  * "Software"), to deal in the Software without restriction, including
9  * without limitation the rights to use, copy, modify, merge, publish,
10  * distribute, sublicense, and/or sell copies of the Software, and to
11  * permit persons to whom the Software is furnished to do so, subject to
12  * the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be
15  * included in all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21  * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22  * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23  * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24  */
25 #include "shrpx_http2_upstream.h"
26 
27 #include <netinet/tcp.h>
28 #include <assert.h>
29 #include <cerrno>
30 #include <sstream>
31 
32 #include "shrpx_client_handler.h"
33 #include "shrpx_https_upstream.h"
34 #include "shrpx_downstream.h"
35 #include "shrpx_downstream_connection.h"
36 #include "shrpx_config.h"
37 #include "shrpx_http.h"
38 #include "shrpx_worker.h"
39 #include "shrpx_http2_session.h"
40 #include "shrpx_log.h"
41 #ifdef HAVE_MRUBY
42 #  include "shrpx_mruby.h"
43 #endif // HAVE_MRUBY
44 #include "http2.h"
45 #include "util.h"
46 #include "base64.h"
47 #include "app_helper.h"
48 #include "template.h"
49 
50 using namespace nghttp2;
51 
52 namespace shrpx {
53 
54 namespace {
55 constexpr size_t MAX_BUFFER_SIZE = 32_k;
56 } // namespace
57 
58 namespace {
on_stream_close_callback(nghttp2_session * session,int32_t stream_id,uint32_t error_code,void * user_data)59 int on_stream_close_callback(nghttp2_session *session, int32_t stream_id,
60                              uint32_t error_code, void *user_data) {
61   auto upstream = static_cast<Http2Upstream *>(user_data);
62   if (LOG_ENABLED(INFO)) {
63     ULOG(INFO, upstream) << "Stream stream_id=" << stream_id
64                          << " is being closed";
65   }
66 
67   auto downstream = static_cast<Downstream *>(
68       nghttp2_session_get_stream_user_data(session, stream_id));
69 
70   if (!downstream) {
71     return 0;
72   }
73 
74   auto &req = downstream->request();
75 
76   upstream->consume(stream_id, req.unconsumed_body_length);
77 
78   req.unconsumed_body_length = 0;
79 
80   if (downstream->get_request_state() == DownstreamState::CONNECT_FAIL) {
81     upstream->remove_downstream(downstream);
82     // downstream was deleted
83 
84     return 0;
85   }
86 
87   if (downstream->can_detach_downstream_connection()) {
88     // Keep-alive
89     downstream->detach_downstream_connection();
90   }
91 
92   downstream->set_request_state(DownstreamState::STREAM_CLOSED);
93 
94   // At this point, downstream read may be paused.
95 
96   // If shrpx_downstream::push_request_headers() failed, the
97   // error is handled here.
98   upstream->remove_downstream(downstream);
99   // downstream was deleted
100 
101   // How to test this case? Request sufficient large download
102   // and make client send RST_STREAM after it gets first DATA
103   // frame chunk.
104 
105   return 0;
106 }
107 } // namespace
108 
upgrade_upstream(HttpsUpstream * http)109 int Http2Upstream::upgrade_upstream(HttpsUpstream *http) {
110   int rv;
111 
112   auto &balloc = http->get_downstream()->get_block_allocator();
113 
114   auto http2_settings = http->get_downstream()->get_http2_settings();
115   http2_settings = util::to_base64(balloc, http2_settings);
116 
117   auto settings_payload = base64::decode(balloc, std::begin(http2_settings),
118                                          std::end(http2_settings));
119 
120   rv = nghttp2_session_upgrade2(
121       session_, settings_payload.data(), settings_payload.size(),
122       http->get_downstream()->request().method == HTTP_HEAD, nullptr);
123   if (rv != 0) {
124     if (LOG_ENABLED(INFO)) {
125       ULOG(INFO, this) << "nghttp2_session_upgrade() returned error: "
126                        << nghttp2_strerror(rv);
127     }
128     return -1;
129   }
130   pre_upstream_.reset(http);
131   auto downstream = http->pop_downstream();
132   downstream->reset_upstream(this);
133   downstream->set_stream_id(1);
134   downstream->reset_upstream_rtimer();
135   downstream->set_stream_id(1);
136 
137   auto ptr = downstream.get();
138 
139   nghttp2_session_set_stream_user_data(session_, 1, ptr);
140   downstream_queue_.add_pending(std::move(downstream));
141   downstream_queue_.mark_active(ptr);
142 
143   // TODO This might not be necessary
144   handler_->stop_read_timer();
145 
146   if (LOG_ENABLED(INFO)) {
147     ULOG(INFO, this) << "Connection upgraded to HTTP/2";
148   }
149 
150   return 0;
151 }
152 
start_settings_timer()153 void Http2Upstream::start_settings_timer() {
154   ev_timer_start(handler_->get_loop(), &settings_timer_);
155 }
156 
stop_settings_timer()157 void Http2Upstream::stop_settings_timer() {
158   ev_timer_stop(handler_->get_loop(), &settings_timer_);
159 }
160 
161 namespace {
on_header_callback2(nghttp2_session * session,const nghttp2_frame * frame,nghttp2_rcbuf * name,nghttp2_rcbuf * value,uint8_t flags,void * user_data)162 int on_header_callback2(nghttp2_session *session, const nghttp2_frame *frame,
163                         nghttp2_rcbuf *name, nghttp2_rcbuf *value,
164                         uint8_t flags, void *user_data) {
165   auto namebuf = nghttp2_rcbuf_get_buf(name);
166   auto valuebuf = nghttp2_rcbuf_get_buf(value);
167   auto config = get_config();
168 
169   if (config->http2.upstream.debug.frame_debug) {
170     verbose_on_header_callback(session, frame, namebuf.base, namebuf.len,
171                                valuebuf.base, valuebuf.len, flags, user_data);
172   }
173   if (frame->hd.type != NGHTTP2_HEADERS) {
174     return 0;
175   }
176   auto upstream = static_cast<Http2Upstream *>(user_data);
177   auto downstream = static_cast<Downstream *>(
178       nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
179   if (!downstream) {
180     return 0;
181   }
182 
183   auto &req = downstream->request();
184 
185   auto &httpconf = config->http;
186 
187   if (req.fs.buffer_size() + namebuf.len + valuebuf.len >
188           httpconf.request_header_field_buffer ||
189       req.fs.num_fields() >= httpconf.max_request_header_fields) {
190     if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
191       return 0;
192     }
193 
194     if (LOG_ENABLED(INFO)) {
195       ULOG(INFO, upstream) << "Too large or many header field size="
196                            << req.fs.buffer_size() + namebuf.len + valuebuf.len
197                            << ", num=" << req.fs.num_fields() + 1;
198     }
199 
200     // just ignore header fields if this is trailer part.
201     if (frame->headers.cat == NGHTTP2_HCAT_HEADERS) {
202       return 0;
203     }
204 
205     if (upstream->error_reply(downstream, 431) != 0) {
206       return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
207     }
208 
209     return 0;
210   }
211 
212   auto nameref = StringRef{namebuf.base, namebuf.len};
213   auto valueref = StringRef{valuebuf.base, valuebuf.len};
214   auto token = http2::lookup_token(nameref);
215   auto no_index = flags & NGHTTP2_NV_FLAG_NO_INDEX;
216 
217   downstream->add_rcbuf(name);
218   downstream->add_rcbuf(value);
219 
220   if (frame->headers.cat == NGHTTP2_HCAT_HEADERS) {
221     // just store header fields for trailer part
222     req.fs.add_trailer_token(nameref, valueref, no_index, token);
223     return 0;
224   }
225 
226   req.fs.add_header_token(nameref, valueref, no_index, token);
227   return 0;
228 }
229 } // namespace
230 
231 namespace {
on_invalid_header_callback2(nghttp2_session * session,const nghttp2_frame * frame,nghttp2_rcbuf * name,nghttp2_rcbuf * value,uint8_t flags,void * user_data)232 int on_invalid_header_callback2(nghttp2_session *session,
233                                 const nghttp2_frame *frame, nghttp2_rcbuf *name,
234                                 nghttp2_rcbuf *value, uint8_t flags,
235                                 void *user_data) {
236   auto upstream = static_cast<Http2Upstream *>(user_data);
237   auto downstream = static_cast<Downstream *>(
238       nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
239   if (!downstream) {
240     return 0;
241   }
242 
243   if (LOG_ENABLED(INFO)) {
244     auto namebuf = nghttp2_rcbuf_get_buf(name);
245     auto valuebuf = nghttp2_rcbuf_get_buf(value);
246 
247     ULOG(INFO, upstream) << "Invalid header field for stream_id="
248                          << frame->hd.stream_id << ": name=["
249                          << StringRef{namebuf.base, namebuf.len} << "], value=["
250                          << StringRef{valuebuf.base, valuebuf.len} << "]";
251   }
252 
253   upstream->rst_stream(downstream, NGHTTP2_PROTOCOL_ERROR);
254 
255   return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
256 }
257 } // namespace
258 
259 namespace {
on_begin_headers_callback(nghttp2_session * session,const nghttp2_frame * frame,void * user_data)260 int on_begin_headers_callback(nghttp2_session *session,
261                               const nghttp2_frame *frame, void *user_data) {
262   auto upstream = static_cast<Http2Upstream *>(user_data);
263 
264   if (frame->headers.cat != NGHTTP2_HCAT_REQUEST) {
265     return 0;
266   }
267   if (LOG_ENABLED(INFO)) {
268     ULOG(INFO, upstream) << "Received upstream request HEADERS stream_id="
269                          << frame->hd.stream_id;
270   }
271 
272   upstream->on_start_request(frame);
273 
274   return 0;
275 }
276 } // namespace
277 
on_start_request(const nghttp2_frame * frame)278 void Http2Upstream::on_start_request(const nghttp2_frame *frame) {
279   auto downstream = std::make_unique<Downstream>(this, handler_->get_mcpool(),
280                                                  frame->hd.stream_id);
281   nghttp2_session_set_stream_user_data(session_, frame->hd.stream_id,
282                                        downstream.get());
283 
284   downstream->reset_upstream_rtimer();
285 
286   auto config = get_config();
287   auto &httpconf = config->http;
288 
289   handler_->reset_upstream_read_timeout(httpconf.timeout.header);
290 
291   auto &req = downstream->request();
292 
293   // Although, we deprecated minor version from HTTP/2, we supply
294   // minor version 0 to use via header field in a conventional way.
295   req.http_major = 2;
296   req.http_minor = 0;
297 
298   add_pending_downstream(std::move(downstream));
299 
300   ++num_requests_;
301 
302   if (httpconf.max_requests <= num_requests_) {
303     start_graceful_shutdown();
304   }
305 }
306 
on_request_headers(Downstream * downstream,const nghttp2_frame * frame)307 int Http2Upstream::on_request_headers(Downstream *downstream,
308                                       const nghttp2_frame *frame) {
309   auto lgconf = log_config();
310   lgconf->update_tstamp(std::chrono::system_clock::now());
311   auto &req = downstream->request();
312   req.tstamp = lgconf->tstamp;
313 
314   if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
315     return 0;
316   }
317 
318   auto &nva = req.fs.headers();
319 
320   if (LOG_ENABLED(INFO)) {
321     std::stringstream ss;
322     for (auto &nv : nva) {
323       if (nv.name == "authorization"_sr) {
324         ss << TTY_HTTP_HD << nv.name << TTY_RST << ": <redacted>\n";
325         continue;
326       }
327       ss << TTY_HTTP_HD << nv.name << TTY_RST << ": " << nv.value << "\n";
328     }
329     ULOG(INFO, this) << "HTTP request headers. stream_id="
330                      << downstream->get_stream_id() << "\n"
331                      << ss.str();
332   }
333 
334   auto config = get_config();
335   auto &dump = config->http2.upstream.debug.dump;
336 
337   if (dump.request_header) {
338     http2::dump_nv(dump.request_header, nva);
339   }
340 
341   auto content_length = req.fs.header(http2::HD_CONTENT_LENGTH);
342   if (content_length) {
343     // libnghttp2 guarantees this can be parsed
344     req.fs.content_length =
345         util::parse_uint(content_length->value).value_or(-1);
346   }
347 
348   // presence of mandatory header fields are guaranteed by libnghttp2.
349   auto authority = req.fs.header(http2::HD__AUTHORITY);
350   auto path = req.fs.header(http2::HD__PATH);
351   auto method = req.fs.header(http2::HD__METHOD);
352   auto scheme = req.fs.header(http2::HD__SCHEME);
353 
354   auto method_token = http2::lookup_method_token(method->value);
355   if (method_token == -1) {
356     if (error_reply(downstream, 501) != 0) {
357       return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
358     }
359     return 0;
360   }
361 
362   auto faddr = handler_->get_upstream_addr();
363 
364   // For HTTP/2 proxy, we require :authority.
365   if (method_token != HTTP_CONNECT && config->http2_proxy &&
366       faddr->alt_mode == UpstreamAltMode::NONE && !authority) {
367     rst_stream(downstream, NGHTTP2_PROTOCOL_ERROR);
368     return 0;
369   }
370 
371   req.method = method_token;
372   if (scheme) {
373     req.scheme = scheme->value;
374   }
375 
376   // nghttp2 library guarantees either :authority or host exist
377   if (!authority) {
378     req.no_authority = true;
379     authority = req.fs.header(http2::HD_HOST);
380   }
381 
382   if (authority) {
383     req.authority = authority->value;
384   }
385 
386   if (path) {
387     if (method_token == HTTP_OPTIONS && path->value == "*"_sr) {
388       // Server-wide OPTIONS request.  Path is empty.
389     } else if (config->http2_proxy &&
390                faddr->alt_mode == UpstreamAltMode::NONE) {
391       req.path = path->value;
392     } else {
393       req.path = http2::rewrite_clean_path(downstream->get_block_allocator(),
394                                            path->value);
395     }
396   }
397 
398   auto connect_proto = req.fs.header(http2::HD__PROTOCOL);
399   if (connect_proto) {
400     if (connect_proto->value != "websocket"_sr) {
401       if (error_reply(downstream, 400) != 0) {
402         return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
403       }
404       return 0;
405     }
406     req.connect_proto = ConnectProto::WEBSOCKET;
407   }
408 
409   if (!(frame->hd.flags & NGHTTP2_FLAG_END_STREAM)) {
410     req.http2_expect_body = true;
411   } else if (req.fs.content_length == -1) {
412     // If END_STREAM flag is set to HEADERS frame, we are sure that
413     // content-length is 0.
414     req.fs.content_length = 0;
415   }
416 
417   downstream->inspect_http2_request();
418 
419   downstream->set_request_state(DownstreamState::HEADER_COMPLETE);
420 
421   if (config->http.require_http_scheme &&
422       !http::check_http_scheme(req.scheme, handler_->get_ssl() != nullptr)) {
423     if (error_reply(downstream, 400) != 0) {
424       return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
425     }
426     return 0;
427   }
428 
429 #ifdef HAVE_MRUBY
430   auto worker = handler_->get_worker();
431   auto mruby_ctx = worker->get_mruby_context();
432 
433   if (mruby_ctx->run_on_request_proc(downstream) != 0) {
434     if (error_reply(downstream, 500) != 0) {
435       return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
436     }
437     return 0;
438   }
439 #endif // HAVE_MRUBY
440 
441   if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
442     downstream->disable_upstream_rtimer();
443 
444     downstream->set_request_state(DownstreamState::MSG_COMPLETE);
445   }
446 
447   if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
448     return 0;
449   }
450 
451   start_downstream(downstream);
452 
453   return 0;
454 }
455 
start_downstream(Downstream * downstream)456 void Http2Upstream::start_downstream(Downstream *downstream) {
457   if (downstream_queue_.can_activate(downstream->request().authority)) {
458     initiate_downstream(downstream);
459     return;
460   }
461 
462   downstream_queue_.mark_blocked(downstream);
463 }
464 
initiate_downstream(Downstream * downstream)465 void Http2Upstream::initiate_downstream(Downstream *downstream) {
466   int rv;
467 
468 #ifdef HAVE_MRUBY
469   DownstreamConnection *dconn_ptr;
470 #endif // HAVE_MRUBY
471 
472   for (;;) {
473     auto dconn = handler_->get_downstream_connection(rv, downstream);
474     if (!dconn) {
475       if (rv == SHRPX_ERR_TLS_REQUIRED) {
476         rv = redirect_to_https(downstream);
477       } else {
478         rv = error_reply(downstream, 502);
479       }
480       if (rv != 0) {
481         rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
482       }
483 
484       downstream->set_request_state(DownstreamState::CONNECT_FAIL);
485       downstream_queue_.mark_failure(downstream);
486 
487       return;
488     }
489 
490 #ifdef HAVE_MRUBY
491     dconn_ptr = dconn.get();
492 #endif // HAVE_MRUBY
493     rv = downstream->attach_downstream_connection(std::move(dconn));
494     if (rv == 0) {
495       break;
496     }
497   }
498 
499 #ifdef HAVE_MRUBY
500   const auto &group = dconn_ptr->get_downstream_addr_group();
501   if (group) {
502     const auto &mruby_ctx = group->shared_addr->mruby_ctx;
503     if (mruby_ctx->run_on_request_proc(downstream) != 0) {
504       if (error_reply(downstream, 500) != 0) {
505         rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
506       }
507 
508       downstream_queue_.mark_failure(downstream);
509 
510       return;
511     }
512 
513     if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
514       return;
515     }
516   }
517 #endif // HAVE_MRUBY
518 
519   rv = downstream->push_request_headers();
520   if (rv != 0) {
521 
522     if (error_reply(downstream, 502) != 0) {
523       rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
524     }
525 
526     downstream_queue_.mark_failure(downstream);
527 
528     return;
529   }
530 
531   downstream_queue_.mark_active(downstream);
532 
533   auto &req = downstream->request();
534   if (!req.http2_expect_body) {
535     rv = downstream->end_upload_data();
536     if (rv != 0) {
537       rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
538     }
539   }
540 
541   return;
542 }
543 
544 namespace {
on_frame_recv_callback(nghttp2_session * session,const nghttp2_frame * frame,void * user_data)545 int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame,
546                            void *user_data) {
547   if (get_config()->http2.upstream.debug.frame_debug) {
548     verbose_on_frame_recv_callback(session, frame, user_data);
549   }
550   auto upstream = static_cast<Http2Upstream *>(user_data);
551   auto handler = upstream->get_client_handler();
552 
553   switch (frame->hd.type) {
554   case NGHTTP2_DATA: {
555     auto downstream = static_cast<Downstream *>(
556         nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
557     if (!downstream) {
558       return 0;
559     }
560 
561     if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
562       downstream->disable_upstream_rtimer();
563 
564       if (downstream->end_upload_data() != 0) {
565         if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE) {
566           upstream->rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
567         }
568       }
569 
570       downstream->set_request_state(DownstreamState::MSG_COMPLETE);
571     }
572 
573     return 0;
574   }
575   case NGHTTP2_HEADERS: {
576     auto downstream = static_cast<Downstream *>(
577         nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
578     if (!downstream) {
579       return 0;
580     }
581 
582     if (frame->headers.cat == NGHTTP2_HCAT_REQUEST) {
583       downstream->reset_upstream_rtimer();
584 
585       handler->stop_read_timer();
586 
587       return upstream->on_request_headers(downstream, frame);
588     }
589 
590     if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
591       downstream->disable_upstream_rtimer();
592 
593       if (downstream->end_upload_data() != 0) {
594         if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE) {
595           upstream->rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
596         }
597       }
598 
599       downstream->set_request_state(DownstreamState::MSG_COMPLETE);
600     }
601 
602     return 0;
603   }
604   case NGHTTP2_SETTINGS:
605     if ((frame->hd.flags & NGHTTP2_FLAG_ACK) == 0) {
606       return 0;
607     }
608     upstream->stop_settings_timer();
609     return 0;
610   case NGHTTP2_GOAWAY:
611     if (LOG_ENABLED(INFO)) {
612       auto debug_data = util::ascii_dump(frame->goaway.opaque_data,
613                                          frame->goaway.opaque_data_len);
614 
615       ULOG(INFO, upstream) << "GOAWAY received: last-stream-id="
616                            << frame->goaway.last_stream_id
617                            << ", error_code=" << frame->goaway.error_code
618                            << ", debug_data=" << debug_data;
619     }
620     return 0;
621   default:
622     return 0;
623   }
624 }
625 } // namespace
626 
627 namespace {
on_data_chunk_recv_callback(nghttp2_session * session,uint8_t flags,int32_t stream_id,const uint8_t * data,size_t len,void * user_data)628 int on_data_chunk_recv_callback(nghttp2_session *session, uint8_t flags,
629                                 int32_t stream_id, const uint8_t *data,
630                                 size_t len, void *user_data) {
631   auto upstream = static_cast<Http2Upstream *>(user_data);
632   auto downstream = static_cast<Downstream *>(
633       nghttp2_session_get_stream_user_data(session, stream_id));
634 
635   if (!downstream) {
636     if (upstream->consume(stream_id, len) != 0) {
637       return NGHTTP2_ERR_CALLBACK_FAILURE;
638     }
639 
640     return 0;
641   }
642 
643   downstream->reset_upstream_rtimer();
644 
645   if (downstream->push_upload_data_chunk(data, len) != 0) {
646     if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE) {
647       upstream->rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
648     }
649 
650     if (upstream->consume(stream_id, len) != 0) {
651       return NGHTTP2_ERR_CALLBACK_FAILURE;
652     }
653 
654     return 0;
655   }
656 
657   return 0;
658 }
659 } // namespace
660 
661 namespace {
on_frame_send_callback(nghttp2_session * session,const nghttp2_frame * frame,void * user_data)662 int on_frame_send_callback(nghttp2_session *session, const nghttp2_frame *frame,
663                            void *user_data) {
664   if (get_config()->http2.upstream.debug.frame_debug) {
665     verbose_on_frame_send_callback(session, frame, user_data);
666   }
667   auto upstream = static_cast<Http2Upstream *>(user_data);
668   auto handler = upstream->get_client_handler();
669 
670   switch (frame->hd.type) {
671   case NGHTTP2_DATA:
672   case NGHTTP2_HEADERS: {
673     if ((frame->hd.flags & NGHTTP2_FLAG_END_STREAM) == 0) {
674       return 0;
675     }
676     // RST_STREAM if request is still incomplete.
677     auto stream_id = frame->hd.stream_id;
678     auto downstream = static_cast<Downstream *>(
679         nghttp2_session_get_stream_user_data(session, stream_id));
680 
681     if (!downstream) {
682       return 0;
683     }
684 
685     // For tunneling, issue RST_STREAM to finish the stream.
686     if (downstream->get_upgraded() ||
687         nghttp2_session_get_stream_remote_close(session, stream_id) == 0) {
688       if (LOG_ENABLED(INFO)) {
689         ULOG(INFO, upstream)
690             << "Send RST_STREAM to "
691             << (downstream->get_upgraded() ? "tunneled " : "")
692             << "stream stream_id=" << downstream->get_stream_id()
693             << " to finish off incomplete request";
694       }
695 
696       upstream->rst_stream(downstream, NGHTTP2_NO_ERROR);
697     }
698 
699     return 0;
700   }
701   case NGHTTP2_SETTINGS:
702     if ((frame->hd.flags & NGHTTP2_FLAG_ACK) == 0) {
703       upstream->start_settings_timer();
704     }
705     return 0;
706   case NGHTTP2_PUSH_PROMISE: {
707     auto promised_stream_id = frame->push_promise.promised_stream_id;
708 
709     if (nghttp2_session_get_stream_user_data(session, promised_stream_id)) {
710       // In case of push from backend, downstream object was already
711       // created.
712       return 0;
713     }
714 
715     auto promised_downstream = std::make_unique<Downstream>(
716         upstream, handler->get_mcpool(), promised_stream_id);
717     auto &req = promised_downstream->request();
718 
719     // As long as we use nghttp2_session_mem_send2(), setting stream
720     // user data here should not fail.  This is because this callback
721     // is called just after frame was serialized.  So no worries about
722     // hanging Downstream.
723     nghttp2_session_set_stream_user_data(session, promised_stream_id,
724                                          promised_downstream.get());
725 
726     promised_downstream->set_assoc_stream_id(frame->hd.stream_id);
727     promised_downstream->disable_upstream_rtimer();
728 
729     req.http_major = 2;
730     req.http_minor = 0;
731 
732     req.fs.content_length = 0;
733     req.http2_expect_body = false;
734 
735     auto &promised_balloc = promised_downstream->get_block_allocator();
736 
737     for (size_t i = 0; i < frame->push_promise.nvlen; ++i) {
738       auto &nv = frame->push_promise.nva[i];
739 
740       auto name =
741           make_string_ref(promised_balloc, StringRef{nv.name, nv.namelen});
742       auto value =
743           make_string_ref(promised_balloc, StringRef{nv.value, nv.valuelen});
744 
745       auto token = http2::lookup_token(name);
746       switch (token) {
747       case http2::HD__METHOD:
748         req.method = http2::lookup_method_token(value);
749         break;
750       case http2::HD__SCHEME:
751         req.scheme = value;
752         break;
753       case http2::HD__AUTHORITY:
754         req.authority = value;
755         break;
756       case http2::HD__PATH:
757         req.path = http2::rewrite_clean_path(promised_balloc, value);
758         break;
759       }
760       req.fs.add_header_token(name, value, nv.flags & NGHTTP2_NV_FLAG_NO_INDEX,
761                               token);
762     }
763 
764     promised_downstream->inspect_http2_request();
765 
766     promised_downstream->set_request_state(DownstreamState::MSG_COMPLETE);
767 
768     // a bit weird but start_downstream() expects that given
769     // downstream is in pending queue.
770     auto ptr = promised_downstream.get();
771     upstream->add_pending_downstream(std::move(promised_downstream));
772 
773 #ifdef HAVE_MRUBY
774     auto worker = handler->get_worker();
775     auto mruby_ctx = worker->get_mruby_context();
776 
777     if (mruby_ctx->run_on_request_proc(ptr) != 0) {
778       if (upstream->error_reply(ptr, 500) != 0) {
779         upstream->rst_stream(ptr, NGHTTP2_INTERNAL_ERROR);
780         return 0;
781       }
782       return 0;
783     }
784 #endif // HAVE_MRUBY
785 
786     upstream->start_downstream(ptr);
787 
788     return 0;
789   }
790   case NGHTTP2_GOAWAY:
791     if (LOG_ENABLED(INFO)) {
792       auto debug_data = util::ascii_dump(frame->goaway.opaque_data,
793                                          frame->goaway.opaque_data_len);
794 
795       ULOG(INFO, upstream) << "Sending GOAWAY: last-stream-id="
796                            << frame->goaway.last_stream_id
797                            << ", error_code=" << frame->goaway.error_code
798                            << ", debug_data=" << debug_data;
799     }
800     return 0;
801   default:
802     return 0;
803   }
804 }
805 } // namespace
806 
807 namespace {
on_frame_not_send_callback(nghttp2_session * session,const nghttp2_frame * frame,int lib_error_code,void * user_data)808 int on_frame_not_send_callback(nghttp2_session *session,
809                                const nghttp2_frame *frame, int lib_error_code,
810                                void *user_data) {
811   auto upstream = static_cast<Http2Upstream *>(user_data);
812   if (LOG_ENABLED(INFO)) {
813     ULOG(INFO, upstream) << "Failed to send control frame type="
814                          << static_cast<uint32_t>(frame->hd.type)
815                          << ", lib_error_code=" << lib_error_code << ":"
816                          << nghttp2_strerror(lib_error_code);
817   }
818   if (frame->hd.type == NGHTTP2_HEADERS &&
819       lib_error_code != NGHTTP2_ERR_STREAM_CLOSED &&
820       lib_error_code != NGHTTP2_ERR_STREAM_CLOSING) {
821     // To avoid stream hanging around, issue RST_STREAM.
822     auto downstream = static_cast<Downstream *>(
823         nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
824     if (downstream) {
825       upstream->rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
826     }
827   }
828   return 0;
829 }
830 } // namespace
831 
832 namespace {
833 constexpr auto PADDING = std::array<uint8_t, 256>{};
834 } // namespace
835 
836 namespace {
send_data_callback(nghttp2_session * session,nghttp2_frame * frame,const uint8_t * framehd,size_t length,nghttp2_data_source * source,void * user_data)837 int send_data_callback(nghttp2_session *session, nghttp2_frame *frame,
838                        const uint8_t *framehd, size_t length,
839                        nghttp2_data_source *source, void *user_data) {
840   auto downstream = static_cast<Downstream *>(source->ptr);
841   auto upstream = static_cast<Http2Upstream *>(downstream->get_upstream());
842   auto body = downstream->get_response_buf();
843 
844   auto wb = upstream->get_response_buf();
845 
846   size_t padlen = 0;
847 
848   wb->append(framehd, 9);
849   if (frame->data.padlen > 0) {
850     padlen = frame->data.padlen - 1;
851     wb->append(static_cast<uint8_t>(padlen));
852   }
853 
854   body->remove(*wb, length);
855 
856   wb->append(PADDING.data(), padlen);
857 
858   if (body->rleft() == 0) {
859     downstream->disable_upstream_wtimer();
860   } else {
861     downstream->reset_upstream_wtimer();
862   }
863 
864   if (length > 0 && downstream->resume_read(SHRPX_NO_BUFFER, length) != 0) {
865     return NGHTTP2_ERR_CALLBACK_FAILURE;
866   }
867 
868   // We have to add length here, so that we can log this amount of
869   // data transferred.
870   downstream->response_sent_body_length += length;
871 
872   auto max_buffer_size = upstream->get_max_buffer_size();
873 
874   return wb->rleft() >= max_buffer_size ? NGHTTP2_ERR_PAUSE : 0;
875 }
876 } // namespace
877 
878 namespace {
infer_upstream_rst_stream_error_code(uint32_t downstream_error_code)879 uint32_t infer_upstream_rst_stream_error_code(uint32_t downstream_error_code) {
880   // NGHTTP2_REFUSED_STREAM is important because it tells upstream
881   // client to retry.
882   switch (downstream_error_code) {
883   case NGHTTP2_NO_ERROR:
884   case NGHTTP2_REFUSED_STREAM:
885     return downstream_error_code;
886   default:
887     return NGHTTP2_INTERNAL_ERROR;
888   }
889 }
890 } // namespace
891 
892 namespace {
settings_timeout_cb(struct ev_loop * loop,ev_timer * w,int revents)893 void settings_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
894   auto upstream = static_cast<Http2Upstream *>(w->data);
895   auto handler = upstream->get_client_handler();
896   ULOG(INFO, upstream) << "SETTINGS timeout";
897   if (upstream->terminate_session(NGHTTP2_SETTINGS_TIMEOUT) != 0) {
898     delete handler;
899     return;
900   }
901   handler->signal_write();
902 }
903 } // namespace
904 
905 namespace {
shutdown_timeout_cb(struct ev_loop * loop,ev_timer * w,int revents)906 void shutdown_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
907   auto upstream = static_cast<Http2Upstream *>(w->data);
908   auto handler = upstream->get_client_handler();
909   upstream->submit_goaway();
910   handler->signal_write();
911 }
912 } // namespace
913 
914 namespace {
prepare_cb(struct ev_loop * loop,ev_prepare * w,int revents)915 void prepare_cb(struct ev_loop *loop, ev_prepare *w, int revents) {
916   auto upstream = static_cast<Http2Upstream *>(w->data);
917   upstream->check_shutdown();
918 }
919 } // namespace
920 
submit_goaway()921 void Http2Upstream::submit_goaway() {
922   auto last_stream_id = nghttp2_session_get_last_proc_stream_id(session_);
923   nghttp2_submit_goaway(session_, NGHTTP2_FLAG_NONE, last_stream_id,
924                         NGHTTP2_NO_ERROR, nullptr, 0);
925 }
926 
check_shutdown()927 void Http2Upstream::check_shutdown() {
928   auto worker = handler_->get_worker();
929 
930   if (!worker->get_graceful_shutdown()) {
931     return;
932   }
933 
934   ev_prepare_stop(handler_->get_loop(), &prep_);
935 
936   start_graceful_shutdown();
937 }
938 
start_graceful_shutdown()939 void Http2Upstream::start_graceful_shutdown() {
940   int rv;
941   if (ev_is_active(&shutdown_timer_)) {
942     return;
943   }
944 
945   rv = nghttp2_submit_shutdown_notice(session_);
946   if (rv != 0) {
947     ULOG(FATAL, this) << "nghttp2_submit_shutdown_notice() failed: "
948                       << nghttp2_strerror(rv);
949     return;
950   }
951 
952   handler_->signal_write();
953 
954   ev_timer_start(handler_->get_loop(), &shutdown_timer_);
955 }
956 
create_http2_upstream_callbacks()957 nghttp2_session_callbacks *create_http2_upstream_callbacks() {
958   int rv;
959   nghttp2_session_callbacks *callbacks;
960 
961   rv = nghttp2_session_callbacks_new(&callbacks);
962 
963   if (rv != 0) {
964     return nullptr;
965   }
966 
967   nghttp2_session_callbacks_set_on_stream_close_callback(
968       callbacks, on_stream_close_callback);
969 
970   nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks,
971                                                        on_frame_recv_callback);
972 
973   nghttp2_session_callbacks_set_on_data_chunk_recv_callback(
974       callbacks, on_data_chunk_recv_callback);
975 
976   nghttp2_session_callbacks_set_on_frame_send_callback(callbacks,
977                                                        on_frame_send_callback);
978 
979   nghttp2_session_callbacks_set_on_frame_not_send_callback(
980       callbacks, on_frame_not_send_callback);
981 
982   nghttp2_session_callbacks_set_on_header_callback2(callbacks,
983                                                     on_header_callback2);
984 
985   nghttp2_session_callbacks_set_on_invalid_header_callback2(
986       callbacks, on_invalid_header_callback2);
987 
988   nghttp2_session_callbacks_set_on_begin_headers_callback(
989       callbacks, on_begin_headers_callback);
990 
991   nghttp2_session_callbacks_set_send_data_callback(callbacks,
992                                                    send_data_callback);
993 
994   auto config = get_config();
995 
996   if (config->padding) {
997     nghttp2_session_callbacks_set_select_padding_callback2(
998         callbacks, http::select_padding_callback);
999   }
1000 
1001   if (config->http2.upstream.debug.frame_debug) {
1002     nghttp2_session_callbacks_set_error_callback2(callbacks,
1003                                                   verbose_error_callback);
1004   }
1005 
1006   return callbacks;
1007 }
1008 
1009 namespace {
downstream_queue_size(Worker * worker)1010 size_t downstream_queue_size(Worker *worker) {
1011   auto &downstreamconf = *worker->get_downstream_config();
1012 
1013   if (get_config()->http2_proxy) {
1014     return downstreamconf.connections_per_host;
1015   }
1016 
1017   return downstreamconf.connections_per_frontend;
1018 }
1019 } // namespace
1020 
Http2Upstream(ClientHandler * handler)1021 Http2Upstream::Http2Upstream(ClientHandler *handler)
1022     : wb_(handler->get_worker()->get_mcpool()),
1023       downstream_queue_(downstream_queue_size(handler->get_worker()),
1024                         !get_config()->http2_proxy),
1025       handler_(handler),
1026       session_(nullptr),
1027       max_buffer_size_(MAX_BUFFER_SIZE),
1028       num_requests_(0) {
1029   int rv;
1030 
1031   auto config = get_config();
1032   auto &http2conf = config->http2;
1033 
1034   auto faddr = handler_->get_upstream_addr();
1035 
1036   rv =
1037       nghttp2_session_server_new2(&session_, http2conf.upstream.callbacks, this,
1038                                   faddr->alt_mode != UpstreamAltMode::NONE
1039                                       ? http2conf.upstream.alt_mode_option
1040                                       : http2conf.upstream.option);
1041 
1042   assert(rv == 0);
1043 
1044   flow_control_ = true;
1045 
1046   // TODO Maybe call from outside?
1047   std::array<nghttp2_settings_entry, 5> entry;
1048   size_t nentry = 3;
1049 
1050   entry[0].settings_id = NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS;
1051   entry[0].value = http2conf.upstream.max_concurrent_streams;
1052 
1053   entry[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
1054   if (faddr->alt_mode != UpstreamAltMode::NONE) {
1055     entry[1].value = (1u << 31) - 1;
1056   } else {
1057     entry[1].value = http2conf.upstream.window_size;
1058   }
1059 
1060   entry[2].settings_id = NGHTTP2_SETTINGS_NO_RFC7540_PRIORITIES;
1061   entry[2].value = 1;
1062 
1063   if (!config->http2_proxy) {
1064     entry[nentry].settings_id = NGHTTP2_SETTINGS_ENABLE_CONNECT_PROTOCOL;
1065     entry[nentry].value = 1;
1066     ++nentry;
1067   }
1068 
1069   if (http2conf.upstream.decoder_dynamic_table_size !=
1070       NGHTTP2_DEFAULT_HEADER_TABLE_SIZE) {
1071     entry[nentry].settings_id = NGHTTP2_SETTINGS_HEADER_TABLE_SIZE;
1072     entry[nentry].value = http2conf.upstream.decoder_dynamic_table_size;
1073     ++nentry;
1074   }
1075 
1076   rv = nghttp2_submit_settings(session_, NGHTTP2_FLAG_NONE, entry.data(),
1077                                nentry);
1078   if (rv != 0) {
1079     ULOG(ERROR, this) << "nghttp2_submit_settings() returned error: "
1080                       << nghttp2_strerror(rv);
1081   }
1082 
1083   auto window_size = faddr->alt_mode != UpstreamAltMode::NONE
1084                          ? std::numeric_limits<int32_t>::max()
1085                      : http2conf.upstream.optimize_window_size
1086                          ? std::min(http2conf.upstream.connection_window_size,
1087                                     NGHTTP2_INITIAL_CONNECTION_WINDOW_SIZE)
1088                          : http2conf.upstream.connection_window_size;
1089 
1090   rv = nghttp2_session_set_local_window_size(session_, NGHTTP2_FLAG_NONE, 0,
1091                                              window_size);
1092 
1093   if (rv != 0) {
1094     ULOG(ERROR, this)
1095         << "nghttp2_session_set_local_window_size() returned error: "
1096         << nghttp2_strerror(rv);
1097   }
1098 
1099   // We wait for SETTINGS ACK at least 10 seconds.
1100   ev_timer_init(&settings_timer_, settings_timeout_cb,
1101                 http2conf.upstream.timeout.settings, 0.);
1102 
1103   settings_timer_.data = this;
1104 
1105   // timer for 2nd GOAWAY.  HTTP/2 spec recommend 1 RTT.  We wait for
1106   // 2 seconds.
1107   ev_timer_init(&shutdown_timer_, shutdown_timeout_cb, 2., 0);
1108   shutdown_timer_.data = this;
1109 
1110   ev_prepare_init(&prep_, prepare_cb);
1111   prep_.data = this;
1112   ev_prepare_start(handler_->get_loop(), &prep_);
1113 
1114 #if defined(TCP_INFO) && defined(TCP_NOTSENT_LOWAT)
1115   if (http2conf.upstream.optimize_write_buffer_size) {
1116     auto conn = handler_->get_connection();
1117     conn->tls_dyn_rec_warmup_threshold = 0;
1118 
1119     uint32_t pollout_thres = 1;
1120     rv = setsockopt(conn->fd, IPPROTO_TCP, TCP_NOTSENT_LOWAT, &pollout_thres,
1121                     static_cast<socklen_t>(sizeof(pollout_thres)));
1122 
1123     if (rv != 0) {
1124       if (LOG_ENABLED(INFO)) {
1125         auto error = errno;
1126         LOG(INFO) << "setsockopt(TCP_NOTSENT_LOWAT, " << pollout_thres
1127                   << ") failed: errno=" << error;
1128       }
1129     }
1130   }
1131 #endif // defined(TCP_INFO) && defined(TCP_NOTSENT_LOWAT)
1132 
1133   handler_->reset_upstream_read_timeout(
1134       config->conn.upstream.timeout.http2_idle);
1135 
1136   handler_->signal_write();
1137 }
1138 
~Http2Upstream()1139 Http2Upstream::~Http2Upstream() {
1140   nghttp2_session_del(session_);
1141   ev_prepare_stop(handler_->get_loop(), &prep_);
1142   ev_timer_stop(handler_->get_loop(), &shutdown_timer_);
1143   ev_timer_stop(handler_->get_loop(), &settings_timer_);
1144 }
1145 
on_read()1146 int Http2Upstream::on_read() {
1147   auto rb = handler_->get_rb();
1148   auto rlimit = handler_->get_rlimit();
1149 
1150   if (rb->rleft()) {
1151     auto rv = nghttp2_session_mem_recv2(session_, rb->pos(), rb->rleft());
1152     if (rv < 0) {
1153       if (rv != NGHTTP2_ERR_BAD_CLIENT_MAGIC) {
1154         ULOG(ERROR, this) << "nghttp2_session_mem_recv2() returned error: "
1155                           << nghttp2_strerror(rv);
1156       }
1157       return -1;
1158     }
1159 
1160     // nghttp2_session_mem_recv2 should consume all input bytes on
1161     // success.
1162     assert(static_cast<size_t>(rv) == rb->rleft());
1163     rb->reset();
1164     rlimit->startw();
1165   }
1166 
1167   if (nghttp2_session_want_read(session_) == 0 &&
1168       nghttp2_session_want_write(session_) == 0 && wb_.rleft() == 0) {
1169     if (LOG_ENABLED(INFO)) {
1170       ULOG(INFO, this) << "No more read/write for this HTTP2 session";
1171     }
1172     return -1;
1173   }
1174 
1175   handler_->signal_write();
1176   return 0;
1177 }
1178 
1179 // After this function call, downstream may be deleted.
on_write()1180 int Http2Upstream::on_write() {
1181   int rv;
1182   auto config = get_config();
1183   auto &http2conf = config->http2;
1184 
1185   if ((http2conf.upstream.optimize_write_buffer_size ||
1186        http2conf.upstream.optimize_window_size) &&
1187       handler_->get_ssl()) {
1188     auto conn = handler_->get_connection();
1189     TCPHint hint;
1190     rv = conn->get_tcp_hint(&hint);
1191     if (rv == 0) {
1192       if (http2conf.upstream.optimize_write_buffer_size) {
1193         max_buffer_size_ = std::min(MAX_BUFFER_SIZE, hint.write_buffer_size);
1194       }
1195 
1196       if (http2conf.upstream.optimize_window_size) {
1197         auto faddr = handler_->get_upstream_addr();
1198         if (faddr->alt_mode == UpstreamAltMode::NONE) {
1199           auto window_size = std::min(http2conf.upstream.connection_window_size,
1200                                       static_cast<int32_t>(hint.rwin * 2));
1201 
1202           rv = nghttp2_session_set_local_window_size(
1203               session_, NGHTTP2_FLAG_NONE, 0, window_size);
1204           if (rv != 0) {
1205             if (LOG_ENABLED(INFO)) {
1206               ULOG(INFO, this)
1207                   << "nghttp2_session_set_local_window_size() with window_size="
1208                   << window_size << " failed: " << nghttp2_strerror(rv);
1209             }
1210           }
1211         }
1212       }
1213     }
1214   }
1215 
1216   for (;;) {
1217     if (wb_.rleft() >= max_buffer_size_) {
1218       return 0;
1219     }
1220 
1221     const uint8_t *data;
1222     auto datalen = nghttp2_session_mem_send2(session_, &data);
1223 
1224     if (datalen < 0) {
1225       ULOG(ERROR, this) << "nghttp2_session_mem_send2() returned error: "
1226                         << nghttp2_strerror(datalen);
1227       return -1;
1228     }
1229     if (datalen == 0) {
1230       break;
1231     }
1232     wb_.append(data, datalen);
1233   }
1234 
1235   if (nghttp2_session_want_read(session_) == 0 &&
1236       nghttp2_session_want_write(session_) == 0 && wb_.rleft() == 0) {
1237     if (LOG_ENABLED(INFO)) {
1238       ULOG(INFO, this) << "No more read/write for this HTTP2 session";
1239     }
1240     return -1;
1241   }
1242 
1243   return 0;
1244 }
1245 
get_client_handler() const1246 ClientHandler *Http2Upstream::get_client_handler() const { return handler_; }
1247 
downstream_read(DownstreamConnection * dconn)1248 int Http2Upstream::downstream_read(DownstreamConnection *dconn) {
1249   auto downstream = dconn->get_downstream();
1250 
1251   if (downstream->get_response_state() == DownstreamState::MSG_RESET) {
1252     // The downstream stream was reset (canceled). In this case,
1253     // RST_STREAM to the upstream and delete downstream connection
1254     // here. Deleting downstream will be taken place at
1255     // on_stream_close_callback.
1256     rst_stream(downstream,
1257                infer_upstream_rst_stream_error_code(
1258                    downstream->get_response_rst_stream_error_code()));
1259     downstream->pop_downstream_connection();
1260     // dconn was deleted
1261     dconn = nullptr;
1262   } else if (downstream->get_response_state() ==
1263              DownstreamState::MSG_BAD_HEADER) {
1264     if (error_reply(downstream, 502) != 0) {
1265       return -1;
1266     }
1267     downstream->pop_downstream_connection();
1268     // dconn was deleted
1269     dconn = nullptr;
1270   } else {
1271     auto rv = downstream->on_read();
1272     if (rv == SHRPX_ERR_EOF) {
1273       if (downstream->get_request_header_sent()) {
1274         return downstream_eof(dconn);
1275       }
1276       return SHRPX_ERR_RETRY;
1277     }
1278     if (rv == SHRPX_ERR_DCONN_CANCELED) {
1279       downstream->pop_downstream_connection();
1280       handler_->signal_write();
1281       return 0;
1282     }
1283     if (rv != 0) {
1284       if (rv != SHRPX_ERR_NETWORK) {
1285         if (LOG_ENABLED(INFO)) {
1286           DCLOG(INFO, dconn) << "HTTP parser failure";
1287         }
1288       }
1289       return downstream_error(dconn, Downstream::EVENT_ERROR);
1290     }
1291 
1292     if (downstream->can_detach_downstream_connection()) {
1293       // Keep-alive
1294       downstream->detach_downstream_connection();
1295     }
1296   }
1297 
1298   handler_->signal_write();
1299 
1300   // At this point, downstream may be deleted.
1301 
1302   return 0;
1303 }
1304 
downstream_write(DownstreamConnection * dconn)1305 int Http2Upstream::downstream_write(DownstreamConnection *dconn) {
1306   int rv;
1307   rv = dconn->on_write();
1308   if (rv == SHRPX_ERR_NETWORK) {
1309     return downstream_error(dconn, Downstream::EVENT_ERROR);
1310   }
1311   if (rv != 0) {
1312     return rv;
1313   }
1314   return 0;
1315 }
1316 
downstream_eof(DownstreamConnection * dconn)1317 int Http2Upstream::downstream_eof(DownstreamConnection *dconn) {
1318   auto downstream = dconn->get_downstream();
1319 
1320   if (LOG_ENABLED(INFO)) {
1321     DCLOG(INFO, dconn) << "EOF. stream_id=" << downstream->get_stream_id();
1322   }
1323 
1324   // Delete downstream connection. If we don't delete it here, it will
1325   // be pooled in on_stream_close_callback.
1326   downstream->pop_downstream_connection();
1327   // dconn was deleted
1328   dconn = nullptr;
1329   // downstream will be deleted in on_stream_close_callback.
1330   if (downstream->get_response_state() == DownstreamState::HEADER_COMPLETE) {
1331     // Server may indicate the end of the request by EOF
1332     if (LOG_ENABLED(INFO)) {
1333       ULOG(INFO, this) << "Downstream body was ended by EOF";
1334     }
1335     downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1336 
1337     // For tunneled connection, MSG_COMPLETE signals
1338     // downstream_data_read_callback to send RST_STREAM after pending
1339     // response body is sent. This is needed to ensure that RST_STREAM
1340     // is sent after all pending data are sent.
1341     on_downstream_body_complete(downstream);
1342   } else if (downstream->get_response_state() !=
1343              DownstreamState::MSG_COMPLETE) {
1344     // If stream was not closed, then we set MSG_COMPLETE and let
1345     // on_stream_close_callback delete downstream.
1346     if (error_reply(downstream, 502) != 0) {
1347       return -1;
1348     }
1349   }
1350   handler_->signal_write();
1351   // At this point, downstream may be deleted.
1352   return 0;
1353 }
1354 
downstream_error(DownstreamConnection * dconn,int events)1355 int Http2Upstream::downstream_error(DownstreamConnection *dconn, int events) {
1356   auto downstream = dconn->get_downstream();
1357 
1358   if (LOG_ENABLED(INFO)) {
1359     if (events & Downstream::EVENT_ERROR) {
1360       DCLOG(INFO, dconn) << "Downstream network/general error";
1361     } else {
1362       DCLOG(INFO, dconn) << "Timeout";
1363     }
1364     if (downstream->get_upgraded()) {
1365       DCLOG(INFO, dconn) << "Note: this is tunnel connection";
1366     }
1367   }
1368 
1369   // Delete downstream connection. If we don't delete it here, it will
1370   // be pooled in on_stream_close_callback.
1371   downstream->pop_downstream_connection();
1372   // dconn was deleted
1373   dconn = nullptr;
1374 
1375   if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1376     // For SSL tunneling, we issue RST_STREAM. For other types of
1377     // stream, we don't have to do anything since response was
1378     // complete.
1379     if (downstream->get_upgraded()) {
1380       rst_stream(downstream, NGHTTP2_NO_ERROR);
1381     }
1382   } else {
1383     if (downstream->get_response_state() == DownstreamState::HEADER_COMPLETE) {
1384       if (downstream->get_upgraded()) {
1385         on_downstream_body_complete(downstream);
1386       } else {
1387         rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
1388       }
1389     } else {
1390       unsigned int status;
1391       if (events & Downstream::EVENT_TIMEOUT) {
1392         if (downstream->get_request_header_sent()) {
1393           status = 504;
1394         } else {
1395           status = 408;
1396         }
1397       } else {
1398         status = 502;
1399       }
1400       if (error_reply(downstream, status) != 0) {
1401         return -1;
1402       }
1403     }
1404     downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1405   }
1406   handler_->signal_write();
1407   // At this point, downstream may be deleted.
1408   return 0;
1409 }
1410 
rst_stream(Downstream * downstream,uint32_t error_code)1411 int Http2Upstream::rst_stream(Downstream *downstream, uint32_t error_code) {
1412   if (LOG_ENABLED(INFO)) {
1413     ULOG(INFO, this) << "RST_STREAM stream_id=" << downstream->get_stream_id()
1414                      << " with error_code=" << error_code;
1415   }
1416   int rv;
1417   rv = nghttp2_submit_rst_stream(session_, NGHTTP2_FLAG_NONE,
1418                                  downstream->get_stream_id(), error_code);
1419   if (rv < NGHTTP2_ERR_FATAL) {
1420     ULOG(FATAL, this) << "nghttp2_submit_rst_stream() failed: "
1421                       << nghttp2_strerror(rv);
1422     return -1;
1423   }
1424   return 0;
1425 }
1426 
terminate_session(uint32_t error_code)1427 int Http2Upstream::terminate_session(uint32_t error_code) {
1428   int rv;
1429   rv = nghttp2_session_terminate_session(session_, error_code);
1430   if (rv != 0) {
1431     return -1;
1432   }
1433   return 0;
1434 }
1435 
1436 namespace {
downstream_data_read_callback(nghttp2_session * session,int32_t stream_id,uint8_t * buf,size_t length,uint32_t * data_flags,nghttp2_data_source * source,void * user_data)1437 nghttp2_ssize downstream_data_read_callback(nghttp2_session *session,
1438                                             int32_t stream_id, uint8_t *buf,
1439                                             size_t length, uint32_t *data_flags,
1440                                             nghttp2_data_source *source,
1441                                             void *user_data) {
1442   int rv;
1443   auto downstream = static_cast<Downstream *>(source->ptr);
1444   auto body = downstream->get_response_buf();
1445   assert(body);
1446   auto upstream = static_cast<Http2Upstream *>(user_data);
1447 
1448   const auto &resp = downstream->response();
1449 
1450   auto nread = std::min(body->rleft(), length);
1451 
1452   auto max_buffer_size = upstream->get_max_buffer_size();
1453 
1454   auto buffer = upstream->get_response_buf();
1455 
1456   if (max_buffer_size <
1457       std::min(nread, static_cast<size_t>(256)) + 9 + buffer->rleft()) {
1458     if (LOG_ENABLED(INFO)) {
1459       ULOG(INFO, upstream) << "Buffer is almost full.  Skip write DATA";
1460     }
1461     return NGHTTP2_ERR_PAUSE;
1462   }
1463 
1464   nread = std::min(nread, max_buffer_size - 9 - buffer->rleft());
1465 
1466   auto body_empty = body->rleft() == nread;
1467 
1468   *data_flags |= NGHTTP2_DATA_FLAG_NO_COPY;
1469 
1470   if (body_empty &&
1471       downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1472 
1473     *data_flags |= NGHTTP2_DATA_FLAG_EOF;
1474 
1475     if (!downstream->get_upgraded()) {
1476       const auto &trailers = resp.fs.trailers();
1477       if (!trailers.empty()) {
1478         std::vector<nghttp2_nv> nva;
1479         nva.reserve(trailers.size());
1480         http2::copy_headers_to_nva_nocopy(nva, trailers, http2::HDOP_STRIP_ALL);
1481         if (!nva.empty()) {
1482           rv = nghttp2_submit_trailer(session, stream_id, nva.data(),
1483                                       nva.size());
1484           if (rv != 0) {
1485             if (nghttp2_is_fatal(rv)) {
1486               return NGHTTP2_ERR_CALLBACK_FAILURE;
1487             }
1488           } else {
1489             *data_flags |= NGHTTP2_DATA_FLAG_NO_END_STREAM;
1490           }
1491         }
1492       }
1493     }
1494   }
1495 
1496   if (nread == 0 && ((*data_flags) & NGHTTP2_DATA_FLAG_EOF) == 0) {
1497     downstream->disable_upstream_wtimer();
1498     return NGHTTP2_ERR_DEFERRED;
1499   }
1500 
1501   return nread;
1502 }
1503 } // namespace
1504 
send_reply(Downstream * downstream,const uint8_t * body,size_t bodylen)1505 int Http2Upstream::send_reply(Downstream *downstream, const uint8_t *body,
1506                               size_t bodylen) {
1507   int rv;
1508 
1509   nghttp2_data_provider2 data_prd, *data_prd_ptr = nullptr;
1510 
1511   const auto &req = downstream->request();
1512 
1513   if (req.method != HTTP_HEAD && bodylen) {
1514     data_prd.source.ptr = downstream;
1515     data_prd.read_callback = downstream_data_read_callback;
1516     data_prd_ptr = &data_prd;
1517 
1518     auto buf = downstream->get_response_buf();
1519 
1520     buf->append(body, bodylen);
1521   }
1522 
1523   const auto &resp = downstream->response();
1524   auto config = get_config();
1525   auto &httpconf = config->http;
1526 
1527   auto &balloc = downstream->get_block_allocator();
1528 
1529   const auto &headers = resp.fs.headers();
1530   auto nva = std::vector<nghttp2_nv>();
1531   // 2 for :status and server
1532   nva.reserve(2 + headers.size() + httpconf.add_response_headers.size());
1533 
1534   auto response_status = http2::stringify_status(balloc, resp.http_status);
1535 
1536   nva.push_back(http2::make_field(":status"_sr, response_status));
1537 
1538   for (auto &kv : headers) {
1539     if (kv.name.empty() || kv.name[0] == ':') {
1540       continue;
1541     }
1542     switch (kv.token) {
1543     case http2::HD_CONNECTION:
1544     case http2::HD_KEEP_ALIVE:
1545     case http2::HD_PROXY_CONNECTION:
1546     case http2::HD_TE:
1547     case http2::HD_TRANSFER_ENCODING:
1548     case http2::HD_UPGRADE:
1549       continue;
1550     }
1551     nva.push_back(
1552         http2::make_field(kv.name, kv.value, http2::no_index(kv.no_index)));
1553   }
1554 
1555   if (!resp.fs.header(http2::HD_SERVER)) {
1556     nva.push_back(http2::make_field("server"_sr, config->http.server_name));
1557   }
1558 
1559   for (auto &p : httpconf.add_response_headers) {
1560     nva.push_back(http2::make_field(p.name, p.value));
1561   }
1562 
1563   rv = nghttp2_submit_response2(session_, downstream->get_stream_id(),
1564                                 nva.data(), nva.size(), data_prd_ptr);
1565   if (nghttp2_is_fatal(rv)) {
1566     ULOG(FATAL, this) << "nghttp2_submit_response2() failed: "
1567                       << nghttp2_strerror(rv);
1568     return -1;
1569   }
1570 
1571   downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1572 
1573   if (data_prd_ptr) {
1574     downstream->reset_upstream_wtimer();
1575   }
1576 
1577   return 0;
1578 }
1579 
error_reply(Downstream * downstream,unsigned int status_code)1580 int Http2Upstream::error_reply(Downstream *downstream,
1581                                unsigned int status_code) {
1582   int rv;
1583   auto &resp = downstream->response();
1584 
1585   auto &balloc = downstream->get_block_allocator();
1586 
1587   auto html = http::create_error_html(balloc, status_code);
1588   resp.http_status = status_code;
1589 
1590   nghttp2_data_provider2 data_prd, *data_prd_ptr = nullptr;
1591 
1592   const auto &req = downstream->request();
1593 
1594   if (req.method != HTTP_HEAD) {
1595     data_prd.source.ptr = downstream;
1596     data_prd.read_callback = downstream_data_read_callback;
1597     data_prd_ptr = &data_prd;
1598 
1599     auto body = downstream->get_response_buf();
1600 
1601     body->append(html);
1602   }
1603 
1604   downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1605 
1606   auto lgconf = log_config();
1607   lgconf->update_tstamp(std::chrono::system_clock::now());
1608 
1609   auto response_status = http2::stringify_status(balloc, status_code);
1610   auto content_length = util::make_string_ref_uint(balloc, html.size());
1611   auto date = make_string_ref(balloc, lgconf->tstamp->time_http);
1612 
1613   auto nva = std::to_array(
1614       {http2::make_field(":status"_sr, response_status),
1615        http2::make_field("content-type"_sr, "text/html; charset=UTF-8"_sr),
1616        http2::make_field("server"_sr, get_config()->http.server_name),
1617        http2::make_field("content-length"_sr, content_length),
1618        http2::make_field("date"_sr, date)});
1619 
1620   rv = nghttp2_submit_response2(session_, downstream->get_stream_id(),
1621                                 nva.data(), nva.size(), data_prd_ptr);
1622   if (rv < NGHTTP2_ERR_FATAL) {
1623     ULOG(FATAL, this) << "nghttp2_submit_response2() failed: "
1624                       << nghttp2_strerror(rv);
1625     return -1;
1626   }
1627 
1628   downstream->reset_upstream_wtimer();
1629 
1630   return 0;
1631 }
1632 
add_pending_downstream(std::unique_ptr<Downstream> downstream)1633 void Http2Upstream::add_pending_downstream(
1634     std::unique_ptr<Downstream> downstream) {
1635   downstream_queue_.add_pending(std::move(downstream));
1636 }
1637 
remove_downstream(Downstream * downstream)1638 void Http2Upstream::remove_downstream(Downstream *downstream) {
1639   if (downstream->accesslog_ready()) {
1640     handler_->write_accesslog(downstream);
1641   }
1642 
1643   nghttp2_session_set_stream_user_data(session_, downstream->get_stream_id(),
1644                                        nullptr);
1645 
1646   auto next_downstream = downstream_queue_.remove_and_get_blocked(downstream);
1647 
1648   if (next_downstream) {
1649     initiate_downstream(next_downstream);
1650   }
1651 
1652   if (downstream_queue_.get_downstreams() == nullptr) {
1653     // There is no downstream at the moment.  Start idle timer now.
1654     auto config = get_config();
1655     auto &upstreamconf = config->conn.upstream;
1656 
1657     handler_->reset_upstream_read_timeout(upstreamconf.timeout.http2_idle);
1658   }
1659 }
1660 
1661 // WARNING: Never call directly or indirectly nghttp2_session_send or
1662 // nghttp2_session_recv. These calls may delete downstream.
on_downstream_header_complete(Downstream * downstream)1663 int Http2Upstream::on_downstream_header_complete(Downstream *downstream) {
1664   int rv;
1665 
1666   const auto &req = downstream->request();
1667   auto &resp = downstream->response();
1668 
1669   auto &balloc = downstream->get_block_allocator();
1670 
1671   if (LOG_ENABLED(INFO)) {
1672     if (downstream->get_non_final_response()) {
1673       DLOG(INFO, downstream) << "HTTP non-final response header";
1674     } else {
1675       DLOG(INFO, downstream) << "HTTP response header completed";
1676     }
1677   }
1678 
1679   auto config = get_config();
1680   auto &httpconf = config->http;
1681 
1682   if (!config->http2_proxy && !httpconf.no_location_rewrite) {
1683     downstream->rewrite_location_response_header(req.scheme);
1684   }
1685 
1686 #ifdef HAVE_MRUBY
1687   if (!downstream->get_non_final_response()) {
1688     auto dconn = downstream->get_downstream_connection();
1689     const auto &group = dconn->get_downstream_addr_group();
1690     if (group) {
1691       const auto &dmruby_ctx = group->shared_addr->mruby_ctx;
1692 
1693       if (dmruby_ctx->run_on_response_proc(downstream) != 0) {
1694         if (error_reply(downstream, 500) != 0) {
1695           return -1;
1696         }
1697         // Returning -1 will signal deletion of dconn.
1698         return -1;
1699       }
1700 
1701       if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1702         return -1;
1703       }
1704     }
1705 
1706     auto worker = handler_->get_worker();
1707     auto mruby_ctx = worker->get_mruby_context();
1708 
1709     if (mruby_ctx->run_on_response_proc(downstream) != 0) {
1710       if (error_reply(downstream, 500) != 0) {
1711         return -1;
1712       }
1713       // Returning -1 will signal deletion of dconn.
1714       return -1;
1715     }
1716 
1717     if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1718       return -1;
1719     }
1720   }
1721 #endif // HAVE_MRUBY
1722 
1723   auto &http2conf = config->http2;
1724 
1725   // We need some conditions that must be fulfilled to initiate server
1726   // push.
1727   //
1728   // * Server push is disabled for http2 proxy or client proxy, since
1729   //   incoming headers are mixed origins.  We don't know how to
1730   //   reliably determine the authority yet.
1731   //
1732   // * We need non-final response or 200 response code for associated
1733   //   resource.  This is too restrictive, we will review this later.
1734   //
1735   // * We requires GET or POST for associated resource.  Probably we
1736   //   don't want to push for HEAD request.  Not sure other methods
1737   //   are also eligible for push.
1738   if (!http2conf.no_server_push &&
1739       nghttp2_session_get_remote_settings(session_,
1740                                           NGHTTP2_SETTINGS_ENABLE_PUSH) == 1 &&
1741       !config->http2_proxy && (downstream->get_stream_id() % 2) &&
1742       resp.fs.header(http2::HD_LINK) &&
1743       (downstream->get_non_final_response() || resp.http_status == 200) &&
1744       (req.method == HTTP_GET || req.method == HTTP_POST)) {
1745 
1746     if (prepare_push_promise(downstream) != 0) {
1747       // Continue to send response even if push was failed.
1748     }
1749   }
1750 
1751   auto nva = std::vector<nghttp2_nv>();
1752   // 6 means :status and possible server, via, x-http2-push, alt-svc,
1753   // and set-cookie (for affinity cookie) header field.
1754   nva.reserve(resp.fs.headers().size() + 6 +
1755               httpconf.add_response_headers.size());
1756 
1757   if (downstream->get_non_final_response()) {
1758     auto response_status = http2::stringify_status(balloc, resp.http_status);
1759 
1760     nva.push_back(http2::make_field(":status"_sr, response_status));
1761 
1762     http2::copy_headers_to_nva_nocopy(nva, resp.fs.headers(),
1763                                       http2::HDOP_STRIP_ALL);
1764 
1765     if (LOG_ENABLED(INFO)) {
1766       log_response_headers(downstream, nva);
1767     }
1768 
1769     rv = nghttp2_submit_headers(session_, NGHTTP2_FLAG_NONE,
1770                                 downstream->get_stream_id(), nullptr,
1771                                 nva.data(), nva.size(), nullptr);
1772 
1773     resp.fs.clear_headers();
1774 
1775     if (rv != 0) {
1776       ULOG(FATAL, this) << "nghttp2_submit_headers() failed";
1777       return -1;
1778     }
1779 
1780     return 0;
1781   }
1782 
1783   auto striphd_flags = http2::HDOP_STRIP_ALL & ~http2::HDOP_STRIP_VIA;
1784   StringRef response_status;
1785 
1786   if (req.connect_proto == ConnectProto::WEBSOCKET && resp.http_status == 101) {
1787     response_status = http2::stringify_status(balloc, 200);
1788     striphd_flags |= http2::HDOP_STRIP_SEC_WEBSOCKET_ACCEPT;
1789   } else {
1790     response_status = http2::stringify_status(balloc, resp.http_status);
1791   }
1792 
1793   nva.push_back(http2::make_field(":status"_sr, response_status));
1794 
1795   http2::copy_headers_to_nva_nocopy(nva, resp.fs.headers(), striphd_flags);
1796 
1797   if (!config->http2_proxy && !httpconf.no_server_rewrite) {
1798     nva.push_back(http2::make_field("server"_sr, httpconf.server_name));
1799   } else {
1800     auto server = resp.fs.header(http2::HD_SERVER);
1801     if (server) {
1802       nva.push_back(http2::make_field("server"_sr, (*server).value));
1803     }
1804   }
1805 
1806   if (!req.regular_connect_method() || !downstream->get_upgraded()) {
1807     auto affinity_cookie = downstream->get_affinity_cookie_to_send();
1808     if (affinity_cookie) {
1809       auto dconn = downstream->get_downstream_connection();
1810       assert(dconn);
1811       auto &group = dconn->get_downstream_addr_group();
1812       auto &shared_addr = group->shared_addr;
1813       auto &cookieconf = shared_addr->affinity.cookie;
1814       auto secure =
1815           http::require_cookie_secure_attribute(cookieconf.secure, req.scheme);
1816       auto cookie_str = http::create_affinity_cookie(
1817           balloc, cookieconf.name, affinity_cookie, cookieconf.path, secure);
1818       nva.push_back(http2::make_field("set-cookie"_sr, cookie_str));
1819     }
1820   }
1821 
1822   if (!resp.fs.header(http2::HD_ALT_SVC)) {
1823     // We won't change or alter alt-svc from backend for now
1824     if (!httpconf.http2_altsvc_header_value.empty()) {
1825       nva.push_back(
1826           http2::make_field("alt-svc"_sr, httpconf.http2_altsvc_header_value));
1827     }
1828   }
1829 
1830   auto via = resp.fs.header(http2::HD_VIA);
1831   if (httpconf.no_via) {
1832     if (via) {
1833       nva.push_back(http2::make_field("via"_sr, (*via).value));
1834     }
1835   } else {
1836     // we don't create more than 16 bytes in
1837     // http::create_via_header_value.
1838     size_t len = 16;
1839     if (via) {
1840       len += via->value.size() + 2;
1841     }
1842 
1843     auto iov = make_byte_ref(balloc, len + 1);
1844     auto p = std::begin(iov);
1845     if (via) {
1846       p = std::copy(std::begin(via->value), std::end(via->value), p);
1847       p = util::copy_lit(p, ", ");
1848     }
1849     p = http::create_via_header_value(p, resp.http_major, resp.http_minor);
1850     *p = '\0';
1851 
1852     nva.push_back(
1853         http2::make_field("via"_sr, StringRef{std::span{std::begin(iov), p}}));
1854   }
1855 
1856   for (auto &p : httpconf.add_response_headers) {
1857     nva.push_back(http2::make_field(p.name, p.value));
1858   }
1859 
1860   if (downstream->get_stream_id() % 2 == 0) {
1861     // This header field is basically for human on client side to
1862     // figure out that the resource is pushed.
1863     nva.push_back(http2::make_field("x-http2-push"_sr, "1"_sr));
1864   }
1865 
1866   if (LOG_ENABLED(INFO)) {
1867     log_response_headers(downstream, nva);
1868   }
1869 
1870   if (http2conf.upstream.debug.dump.response_header) {
1871     http2::dump_nv(http2conf.upstream.debug.dump.response_header, nva.data(),
1872                    nva.size());
1873   }
1874 
1875   auto priority = resp.fs.header(http2::HD_PRIORITY);
1876   if (priority) {
1877     nghttp2_extpri extpri;
1878 
1879     if (nghttp2_session_get_extpri_stream_priority(
1880             session_, &extpri, downstream->get_stream_id()) == 0 &&
1881         nghttp2_extpri_parse_priority(&extpri, priority->value.byte(),
1882                                       priority->value.size()) == 0) {
1883       rv = nghttp2_session_change_extpri_stream_priority(
1884           session_, downstream->get_stream_id(), &extpri,
1885           /* ignore_client_signal = */ 1);
1886       if (rv != 0) {
1887         ULOG(ERROR, this) << "nghttp2_session_change_extpri_stream_priority: "
1888                           << nghttp2_strerror(rv);
1889       }
1890     }
1891   }
1892 
1893   nghttp2_data_provider2 data_prd;
1894   data_prd.source.ptr = downstream;
1895   data_prd.read_callback = downstream_data_read_callback;
1896 
1897   nghttp2_data_provider2 *data_prdptr;
1898 
1899   if (downstream->expect_response_body() ||
1900       downstream->expect_response_trailer()) {
1901     data_prdptr = &data_prd;
1902   } else {
1903     data_prdptr = nullptr;
1904   }
1905 
1906   rv = nghttp2_submit_response2(session_, downstream->get_stream_id(),
1907                                 nva.data(), nva.size(), data_prdptr);
1908   if (rv != 0) {
1909     ULOG(FATAL, this) << "nghttp2_submit_response2() failed";
1910     return -1;
1911   }
1912 
1913   if (data_prdptr) {
1914     downstream->reset_upstream_wtimer();
1915   }
1916 
1917   return 0;
1918 }
1919 
1920 // WARNING: Never call directly or indirectly nghttp2_session_send or
1921 // nghttp2_session_recv. These calls may delete downstream.
on_downstream_body(Downstream * downstream,const uint8_t * data,size_t len,bool flush)1922 int Http2Upstream::on_downstream_body(Downstream *downstream,
1923                                       const uint8_t *data, size_t len,
1924                                       bool flush) {
1925   auto body = downstream->get_response_buf();
1926   body->append(data, len);
1927 
1928   if (flush) {
1929     nghttp2_session_resume_data(session_, downstream->get_stream_id());
1930 
1931     downstream->ensure_upstream_wtimer();
1932   }
1933 
1934   return 0;
1935 }
1936 
1937 // WARNING: Never call directly or indirectly nghttp2_session_send or
1938 // nghttp2_session_recv. These calls may delete downstream.
on_downstream_body_complete(Downstream * downstream)1939 int Http2Upstream::on_downstream_body_complete(Downstream *downstream) {
1940   if (LOG_ENABLED(INFO)) {
1941     DLOG(INFO, downstream) << "HTTP response completed";
1942   }
1943 
1944   auto &resp = downstream->response();
1945 
1946   if (!downstream->validate_response_recv_body_length()) {
1947     rst_stream(downstream, NGHTTP2_PROTOCOL_ERROR);
1948     resp.connection_close = true;
1949     return 0;
1950   }
1951 
1952   nghttp2_session_resume_data(session_, downstream->get_stream_id());
1953   downstream->ensure_upstream_wtimer();
1954 
1955   return 0;
1956 }
1957 
get_flow_control() const1958 bool Http2Upstream::get_flow_control() const { return flow_control_; }
1959 
pause_read(IOCtrlReason reason)1960 void Http2Upstream::pause_read(IOCtrlReason reason) {}
1961 
resume_read(IOCtrlReason reason,Downstream * downstream,size_t consumed)1962 int Http2Upstream::resume_read(IOCtrlReason reason, Downstream *downstream,
1963                                size_t consumed) {
1964   if (get_flow_control()) {
1965     if (consume(downstream->get_stream_id(), consumed) != 0) {
1966       return -1;
1967     }
1968 
1969     auto &req = downstream->request();
1970 
1971     req.consume(consumed);
1972   }
1973 
1974   handler_->signal_write();
1975   return 0;
1976 }
1977 
on_downstream_abort_request(Downstream * downstream,unsigned int status_code)1978 int Http2Upstream::on_downstream_abort_request(Downstream *downstream,
1979                                                unsigned int status_code) {
1980   int rv;
1981 
1982   rv = error_reply(downstream, status_code);
1983 
1984   if (rv != 0) {
1985     return -1;
1986   }
1987 
1988   handler_->signal_write();
1989   return 0;
1990 }
1991 
on_downstream_abort_request_with_https_redirect(Downstream * downstream)1992 int Http2Upstream::on_downstream_abort_request_with_https_redirect(
1993     Downstream *downstream) {
1994   int rv;
1995 
1996   rv = redirect_to_https(downstream);
1997   if (rv != 0) {
1998     return -1;
1999   }
2000 
2001   handler_->signal_write();
2002   return 0;
2003 }
2004 
redirect_to_https(Downstream * downstream)2005 int Http2Upstream::redirect_to_https(Downstream *downstream) {
2006   auto &req = downstream->request();
2007   if (req.regular_connect_method() || req.scheme != "http"_sr) {
2008     return error_reply(downstream, 400);
2009   }
2010 
2011   auto authority = util::extract_host(req.authority);
2012   if (authority.empty()) {
2013     return error_reply(downstream, 400);
2014   }
2015 
2016   auto &balloc = downstream->get_block_allocator();
2017   auto config = get_config();
2018   auto &httpconf = config->http;
2019 
2020   StringRef loc;
2021   if (httpconf.redirect_https_port == "443"_sr) {
2022     loc = concat_string_ref(balloc, "https://"_sr, authority, req.path);
2023   } else {
2024     loc = concat_string_ref(balloc, "https://"_sr, authority, ":"_sr,
2025                             httpconf.redirect_https_port, req.path);
2026   }
2027 
2028   auto &resp = downstream->response();
2029   resp.http_status = 308;
2030   resp.fs.add_header_token("location"_sr, loc, false, http2::HD_LOCATION);
2031 
2032   return send_reply(downstream, nullptr, 0);
2033 }
2034 
consume(int32_t stream_id,size_t len)2035 int Http2Upstream::consume(int32_t stream_id, size_t len) {
2036   int rv;
2037 
2038   auto faddr = handler_->get_upstream_addr();
2039 
2040   if (faddr->alt_mode != UpstreamAltMode::NONE) {
2041     return 0;
2042   }
2043 
2044   rv = nghttp2_session_consume(session_, stream_id, len);
2045 
2046   if (rv != 0) {
2047     ULOG(WARN, this) << "nghttp2_session_consume() returned error: "
2048                      << nghttp2_strerror(rv);
2049     return -1;
2050   }
2051 
2052   return 0;
2053 }
2054 
log_response_headers(Downstream * downstream,const std::vector<nghttp2_nv> & nva) const2055 void Http2Upstream::log_response_headers(
2056     Downstream *downstream, const std::vector<nghttp2_nv> &nva) const {
2057   std::stringstream ss;
2058   for (auto &nv : nva) {
2059     ss << TTY_HTTP_HD << StringRef{nv.name, nv.namelen} << TTY_RST << ": "
2060        << StringRef{nv.value, nv.valuelen} << "\n";
2061   }
2062   ULOG(INFO, this) << "HTTP response headers. stream_id="
2063                    << downstream->get_stream_id() << "\n"
2064                    << ss.str();
2065 }
2066 
on_timeout(Downstream * downstream)2067 int Http2Upstream::on_timeout(Downstream *downstream) {
2068   if (LOG_ENABLED(INFO)) {
2069     ULOG(INFO, this) << "Stream timeout stream_id="
2070                      << downstream->get_stream_id();
2071   }
2072 
2073   rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
2074   handler_->signal_write();
2075 
2076   return 0;
2077 }
2078 
on_handler_delete()2079 void Http2Upstream::on_handler_delete() {
2080   for (auto d = downstream_queue_.get_downstreams(); d; d = d->dlnext) {
2081     if (d->get_dispatch_state() == DispatchState::ACTIVE &&
2082         d->accesslog_ready()) {
2083       handler_->write_accesslog(d);
2084     }
2085   }
2086 }
2087 
on_downstream_reset(Downstream * downstream,bool no_retry)2088 int Http2Upstream::on_downstream_reset(Downstream *downstream, bool no_retry) {
2089   int rv;
2090 
2091   if (downstream->get_dispatch_state() != DispatchState::ACTIVE) {
2092     // This is error condition when we failed push_request_headers()
2093     // in initiate_downstream().  Otherwise, we have
2094     // DispatchState::ACTIVE state, or we did not set
2095     // DownstreamConnection.
2096     downstream->pop_downstream_connection();
2097     handler_->signal_write();
2098 
2099     return 0;
2100   }
2101 
2102   if (!downstream->request_submission_ready()) {
2103     if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
2104       // We have got all response body already.  Send it off.
2105       downstream->pop_downstream_connection();
2106       return 0;
2107     }
2108     // pushed stream is handled here
2109     rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
2110     downstream->pop_downstream_connection();
2111 
2112     handler_->signal_write();
2113 
2114     return 0;
2115   }
2116 
2117   downstream->pop_downstream_connection();
2118 
2119   downstream->add_retry();
2120 
2121   std::unique_ptr<DownstreamConnection> dconn;
2122 
2123   rv = 0;
2124 
2125   if (no_retry || downstream->no_more_retry()) {
2126     goto fail;
2127   }
2128 
2129   // downstream connection is clean; we can retry with new
2130   // downstream connection.
2131 
2132   for (;;) {
2133     auto dconn = handler_->get_downstream_connection(rv, downstream);
2134     if (!dconn) {
2135       goto fail;
2136     }
2137 
2138     rv = downstream->attach_downstream_connection(std::move(dconn));
2139     if (rv == 0) {
2140       break;
2141     }
2142   }
2143 
2144   rv = downstream->push_request_headers();
2145   if (rv != 0) {
2146     goto fail;
2147   }
2148 
2149   return 0;
2150 
2151 fail:
2152   if (rv == SHRPX_ERR_TLS_REQUIRED) {
2153     rv = on_downstream_abort_request_with_https_redirect(downstream);
2154   } else {
2155     rv = on_downstream_abort_request(downstream, 502);
2156   }
2157   if (rv != 0) {
2158     rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
2159   }
2160   downstream->pop_downstream_connection();
2161 
2162   handler_->signal_write();
2163 
2164   return 0;
2165 }
2166 
prepare_push_promise(Downstream * downstream)2167 int Http2Upstream::prepare_push_promise(Downstream *downstream) {
2168   int rv;
2169 
2170   const auto &req = downstream->request();
2171   auto &resp = downstream->response();
2172 
2173   auto base = http2::get_pure_path_component(req.path);
2174   if (base.empty()) {
2175     return 0;
2176   }
2177 
2178   auto &balloc = downstream->get_block_allocator();
2179 
2180   for (auto &kv : resp.fs.headers()) {
2181     if (kv.token != http2::HD_LINK) {
2182       continue;
2183     }
2184     for (auto &link : http2::parse_link_header(kv.value)) {
2185       StringRef scheme, authority, path;
2186 
2187       rv = http2::construct_push_component(balloc, scheme, authority, path,
2188                                            base, link.uri);
2189       if (rv != 0) {
2190         continue;
2191       }
2192 
2193       if (scheme.empty()) {
2194         scheme = req.scheme;
2195       }
2196 
2197       if (authority.empty()) {
2198         authority = req.authority;
2199       }
2200 
2201       if (resp.is_resource_pushed(scheme, authority, path)) {
2202         continue;
2203       }
2204 
2205       rv = submit_push_promise(scheme, authority, path, downstream);
2206       if (rv != 0) {
2207         return -1;
2208       }
2209 
2210       resp.resource_pushed(scheme, authority, path);
2211     }
2212   }
2213   return 0;
2214 }
2215 
submit_push_promise(const StringRef & scheme,const StringRef & authority,const StringRef & path,Downstream * downstream)2216 int Http2Upstream::submit_push_promise(const StringRef &scheme,
2217                                        const StringRef &authority,
2218                                        const StringRef &path,
2219                                        Downstream *downstream) {
2220   const auto &req = downstream->request();
2221 
2222   std::vector<nghttp2_nv> nva;
2223   // 4 for :method, :scheme, :path and :authority
2224   nva.reserve(4 + req.fs.headers().size());
2225 
2226   // just use "GET" for now
2227   nva.push_back(http2::make_field(":method"_sr, "GET"_sr));
2228   nva.push_back(http2::make_field(":scheme"_sr, scheme));
2229   nva.push_back(http2::make_field(":path"_sr, path));
2230   nva.push_back(http2::make_field(":authority"_sr, authority));
2231 
2232   for (auto &kv : req.fs.headers()) {
2233     switch (kv.token) {
2234     // TODO generate referer
2235     case http2::HD__AUTHORITY:
2236     case http2::HD__SCHEME:
2237     case http2::HD__METHOD:
2238     case http2::HD__PATH:
2239       continue;
2240     case http2::HD_ACCEPT_ENCODING:
2241     case http2::HD_ACCEPT_LANGUAGE:
2242     case http2::HD_CACHE_CONTROL:
2243     case http2::HD_HOST:
2244     case http2::HD_USER_AGENT:
2245       nva.push_back(
2246           http2::make_field(kv.name, kv.value, http2::no_index(kv.no_index)));
2247       break;
2248     }
2249   }
2250 
2251   auto promised_stream_id = nghttp2_submit_push_promise(
2252       session_, NGHTTP2_FLAG_NONE, downstream->get_stream_id(), nva.data(),
2253       nva.size(), nullptr);
2254 
2255   if (promised_stream_id < 0) {
2256     if (LOG_ENABLED(INFO)) {
2257       ULOG(INFO, this) << "nghttp2_submit_push_promise() failed: "
2258                        << nghttp2_strerror(promised_stream_id);
2259     }
2260     if (nghttp2_is_fatal(promised_stream_id)) {
2261       return -1;
2262     }
2263     return 0;
2264   }
2265 
2266   if (LOG_ENABLED(INFO)) {
2267     std::stringstream ss;
2268     for (auto &nv : nva) {
2269       ss << TTY_HTTP_HD << StringRef{nv.name, nv.namelen} << TTY_RST << ": "
2270          << StringRef{nv.value, nv.valuelen} << "\n";
2271     }
2272     ULOG(INFO, this) << "HTTP push request headers. promised_stream_id="
2273                      << promised_stream_id << "\n"
2274                      << ss.str();
2275   }
2276 
2277   return 0;
2278 }
2279 
push_enabled() const2280 bool Http2Upstream::push_enabled() const {
2281   auto config = get_config();
2282   return !(config->http2.no_server_push ||
2283            nghttp2_session_get_remote_settings(
2284                session_, NGHTTP2_SETTINGS_ENABLE_PUSH) == 0 ||
2285            config->http2_proxy);
2286 }
2287 
initiate_push(Downstream * downstream,const StringRef & uri)2288 int Http2Upstream::initiate_push(Downstream *downstream, const StringRef &uri) {
2289   int rv;
2290 
2291   if (uri.empty() || !push_enabled() ||
2292       (downstream->get_stream_id() % 2) == 0) {
2293     return 0;
2294   }
2295 
2296   const auto &req = downstream->request();
2297 
2298   auto base = http2::get_pure_path_component(req.path);
2299   if (base.empty()) {
2300     return -1;
2301   }
2302 
2303   auto &balloc = downstream->get_block_allocator();
2304 
2305   StringRef scheme, authority, path;
2306 
2307   rv = http2::construct_push_component(balloc, scheme, authority, path, base,
2308                                        uri);
2309   if (rv != 0) {
2310     return -1;
2311   }
2312 
2313   if (scheme.empty()) {
2314     scheme = req.scheme;
2315   }
2316 
2317   if (authority.empty()) {
2318     authority = req.authority;
2319   }
2320 
2321   auto &resp = downstream->response();
2322 
2323   if (resp.is_resource_pushed(scheme, authority, path)) {
2324     return 0;
2325   }
2326 
2327   rv = submit_push_promise(scheme, authority, path, downstream);
2328 
2329   if (rv != 0) {
2330     return -1;
2331   }
2332 
2333   resp.resource_pushed(scheme, authority, path);
2334 
2335   return 0;
2336 }
2337 
response_riovec(struct iovec * iov,int iovcnt) const2338 int Http2Upstream::response_riovec(struct iovec *iov, int iovcnt) const {
2339   if (iovcnt == 0 || wb_.rleft() == 0) {
2340     return 0;
2341   }
2342 
2343   return wb_.riovec(iov, iovcnt);
2344 }
2345 
response_drain(size_t n)2346 void Http2Upstream::response_drain(size_t n) { wb_.drain(n); }
2347 
response_empty() const2348 bool Http2Upstream::response_empty() const { return wb_.rleft() == 0; }
2349 
get_response_buf()2350 DefaultMemchunks *Http2Upstream::get_response_buf() { return &wb_; }
2351 
2352 Downstream *
on_downstream_push_promise(Downstream * downstream,int32_t promised_stream_id)2353 Http2Upstream::on_downstream_push_promise(Downstream *downstream,
2354                                           int32_t promised_stream_id) {
2355   // promised_stream_id is for backend HTTP/2 session, not for
2356   // frontend.
2357   auto promised_downstream =
2358       std::make_unique<Downstream>(this, handler_->get_mcpool(), 0);
2359   auto &promised_req = promised_downstream->request();
2360 
2361   promised_downstream->set_downstream_stream_id(promised_stream_id);
2362   // Set associated stream in frontend
2363   promised_downstream->set_assoc_stream_id(downstream->get_stream_id());
2364 
2365   promised_downstream->disable_upstream_rtimer();
2366 
2367   promised_req.http_major = 2;
2368   promised_req.http_minor = 0;
2369 
2370   promised_req.fs.content_length = 0;
2371   promised_req.http2_expect_body = false;
2372 
2373   auto ptr = promised_downstream.get();
2374   add_pending_downstream(std::move(promised_downstream));
2375   downstream_queue_.mark_active(ptr);
2376 
2377   return ptr;
2378 }
2379 
on_downstream_push_promise_complete(Downstream * downstream,Downstream * promised_downstream)2380 int Http2Upstream::on_downstream_push_promise_complete(
2381     Downstream *downstream, Downstream *promised_downstream) {
2382   std::vector<nghttp2_nv> nva;
2383 
2384   const auto &promised_req = promised_downstream->request();
2385   const auto &headers = promised_req.fs.headers();
2386 
2387   nva.reserve(headers.size());
2388 
2389   for (auto &kv : headers) {
2390     nva.push_back(
2391         http2::make_field_nv(kv.name, kv.value, http2::no_index(kv.no_index)));
2392   }
2393 
2394   auto promised_stream_id = nghttp2_submit_push_promise(
2395       session_, NGHTTP2_FLAG_NONE, downstream->get_stream_id(), nva.data(),
2396       nva.size(), promised_downstream);
2397   if (promised_stream_id < 0) {
2398     return -1;
2399   }
2400 
2401   promised_downstream->set_stream_id(promised_stream_id);
2402 
2403   return 0;
2404 }
2405 
cancel_premature_downstream(Downstream * promised_downstream)2406 void Http2Upstream::cancel_premature_downstream(
2407     Downstream *promised_downstream) {
2408   if (LOG_ENABLED(INFO)) {
2409     ULOG(INFO, this) << "Remove premature promised stream "
2410                      << promised_downstream;
2411   }
2412   downstream_queue_.remove_and_get_blocked(promised_downstream, false);
2413 }
2414 
get_max_buffer_size() const2415 size_t Http2Upstream::get_max_buffer_size() const { return max_buffer_size_; }
2416 
2417 } // namespace shrpx
2418