• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * nghttp2 - HTTP/2 C Library
3  *
4  * Copyright (c) 2012 Tatsuhiro Tsujikawa
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining
7  * a copy of this software and associated documentation files (the
8  * "Software"), to deal in the Software without restriction, including
9  * without limitation the rights to use, copy, modify, merge, publish,
10  * distribute, sublicense, and/or sell copies of the Software, and to
11  * permit persons to whom the Software is furnished to do so, subject to
12  * the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be
15  * included in all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21  * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22  * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23  * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24  */
25 #include "shrpx_http2_upstream.h"
26 
27 #include <netinet/tcp.h>
28 #include <assert.h>
29 #include <cerrno>
30 #include <sstream>
31 
32 #include "shrpx_client_handler.h"
33 #include "shrpx_https_upstream.h"
34 #include "shrpx_downstream.h"
35 #include "shrpx_downstream_connection.h"
36 #include "shrpx_config.h"
37 #include "shrpx_http.h"
38 #include "shrpx_worker.h"
39 #include "shrpx_http2_session.h"
40 #include "shrpx_log.h"
41 #ifdef HAVE_MRUBY
42 #  include "shrpx_mruby.h"
43 #endif // HAVE_MRUBY
44 #include "http2.h"
45 #include "util.h"
46 #include "base64.h"
47 #include "app_helper.h"
48 #include "template.h"
49 
50 using namespace nghttp2;
51 
52 namespace shrpx {
53 
54 namespace {
55 constexpr size_t MAX_BUFFER_SIZE = 32_k;
56 } // namespace
57 
58 namespace {
on_stream_close_callback(nghttp2_session * session,int32_t stream_id,uint32_t error_code,void * user_data)59 int on_stream_close_callback(nghttp2_session *session, int32_t stream_id,
60                              uint32_t error_code, void *user_data) {
61   auto upstream = static_cast<Http2Upstream *>(user_data);
62   if (LOG_ENABLED(INFO)) {
63     ULOG(INFO, upstream) << "Stream stream_id=" << stream_id
64                          << " is being closed";
65   }
66 
67   auto downstream = static_cast<Downstream *>(
68     nghttp2_session_get_stream_user_data(session, stream_id));
69 
70   if (!downstream) {
71     return 0;
72   }
73 
74   auto &req = downstream->request();
75 
76   upstream->consume(stream_id, req.unconsumed_body_length);
77 
78   req.unconsumed_body_length = 0;
79 
80   if (downstream->get_request_state() == DownstreamState::CONNECT_FAIL) {
81     upstream->remove_downstream(downstream);
82     // downstream was deleted
83 
84     return 0;
85   }
86 
87   if (downstream->can_detach_downstream_connection()) {
88     // Keep-alive
89     downstream->detach_downstream_connection();
90   }
91 
92   downstream->set_request_state(DownstreamState::STREAM_CLOSED);
93 
94   // At this point, downstream read may be paused.
95 
96   // If shrpx_downstream::push_request_headers() failed, the
97   // error is handled here.
98   upstream->remove_downstream(downstream);
99   // downstream was deleted
100 
101   // How to test this case? Request sufficient large download
102   // and make client send RST_STREAM after it gets first DATA
103   // frame chunk.
104 
105   return 0;
106 }
107 } // namespace
108 
upgrade_upstream(HttpsUpstream * http)109 int Http2Upstream::upgrade_upstream(HttpsUpstream *http) {
110   int rv;
111 
112   auto &balloc = http->get_downstream()->get_block_allocator();
113 
114   auto http2_settings = http->get_downstream()->get_http2_settings();
115   http2_settings = util::to_base64(balloc, http2_settings);
116 
117   auto settings_payload = base64::decode(balloc, std::begin(http2_settings),
118                                          std::end(http2_settings));
119 
120   rv = nghttp2_session_upgrade2(
121     session_, settings_payload.data(), settings_payload.size(),
122     http->get_downstream()->request().method == HTTP_HEAD, nullptr);
123   if (rv != 0) {
124     if (LOG_ENABLED(INFO)) {
125       ULOG(INFO, this) << "nghttp2_session_upgrade() returned error: "
126                        << nghttp2_strerror(rv);
127     }
128     return -1;
129   }
130   pre_upstream_.reset(http);
131   auto downstream = http->pop_downstream();
132   downstream->reset_upstream(this);
133   downstream->set_stream_id(1);
134   downstream->reset_upstream_rtimer();
135   downstream->set_stream_id(1);
136 
137   auto ptr = downstream.get();
138 
139   nghttp2_session_set_stream_user_data(session_, 1, ptr);
140   downstream_queue_.add_pending(std::move(downstream));
141   downstream_queue_.mark_active(ptr);
142 
143   // TODO This might not be necessary
144   handler_->stop_read_timer();
145 
146   if (LOG_ENABLED(INFO)) {
147     ULOG(INFO, this) << "Connection upgraded to HTTP/2";
148   }
149 
150   return 0;
151 }
152 
start_settings_timer()153 void Http2Upstream::start_settings_timer() {
154   ev_timer_start(handler_->get_loop(), &settings_timer_);
155 }
156 
stop_settings_timer()157 void Http2Upstream::stop_settings_timer() {
158   ev_timer_stop(handler_->get_loop(), &settings_timer_);
159 }
160 
161 namespace {
on_header_callback2(nghttp2_session * session,const nghttp2_frame * frame,nghttp2_rcbuf * name,nghttp2_rcbuf * value,uint8_t flags,void * user_data)162 int on_header_callback2(nghttp2_session *session, const nghttp2_frame *frame,
163                         nghttp2_rcbuf *name, nghttp2_rcbuf *value,
164                         uint8_t flags, void *user_data) {
165   auto namebuf = nghttp2_rcbuf_get_buf(name);
166   auto valuebuf = nghttp2_rcbuf_get_buf(value);
167   auto config = get_config();
168 
169   if (config->http2.upstream.debug.frame_debug) {
170     verbose_on_header_callback(session, frame, namebuf.base, namebuf.len,
171                                valuebuf.base, valuebuf.len, flags, user_data);
172   }
173   if (frame->hd.type != NGHTTP2_HEADERS) {
174     return 0;
175   }
176   auto upstream = static_cast<Http2Upstream *>(user_data);
177   auto downstream = static_cast<Downstream *>(
178     nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
179   if (!downstream) {
180     return 0;
181   }
182 
183   auto &req = downstream->request();
184 
185   auto &httpconf = config->http;
186 
187   if (req.fs.buffer_size() + namebuf.len + valuebuf.len >
188         httpconf.request_header_field_buffer ||
189       req.fs.num_fields() >= httpconf.max_request_header_fields) {
190     if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
191       return 0;
192     }
193 
194     if (LOG_ENABLED(INFO)) {
195       ULOG(INFO, upstream) << "Too large or many header field size="
196                            << req.fs.buffer_size() + namebuf.len + valuebuf.len
197                            << ", num=" << req.fs.num_fields() + 1;
198     }
199 
200     // just ignore header fields if this is trailer part.
201     if (frame->headers.cat == NGHTTP2_HCAT_HEADERS) {
202       return 0;
203     }
204 
205     if (upstream->error_reply(downstream, 431) != 0) {
206       return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
207     }
208 
209     return 0;
210   }
211 
212   auto nameref = StringRef{namebuf.base, namebuf.len};
213   auto valueref = StringRef{valuebuf.base, valuebuf.len};
214   auto token = http2::lookup_token(nameref);
215   auto no_index = flags & NGHTTP2_NV_FLAG_NO_INDEX;
216 
217   downstream->add_rcbuf(name);
218   downstream->add_rcbuf(value);
219 
220   if (frame->headers.cat == NGHTTP2_HCAT_HEADERS) {
221     // just store header fields for trailer part
222     req.fs.add_trailer_token(nameref, valueref, no_index, token);
223     return 0;
224   }
225 
226   req.fs.add_header_token(nameref, valueref, no_index, token);
227   return 0;
228 }
229 } // namespace
230 
231 namespace {
on_invalid_header_callback2(nghttp2_session * session,const nghttp2_frame * frame,nghttp2_rcbuf * name,nghttp2_rcbuf * value,uint8_t flags,void * user_data)232 int on_invalid_header_callback2(nghttp2_session *session,
233                                 const nghttp2_frame *frame, nghttp2_rcbuf *name,
234                                 nghttp2_rcbuf *value, uint8_t flags,
235                                 void *user_data) {
236   auto upstream = static_cast<Http2Upstream *>(user_data);
237   auto downstream = static_cast<Downstream *>(
238     nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
239   if (!downstream) {
240     return 0;
241   }
242 
243   if (LOG_ENABLED(INFO)) {
244     auto namebuf = nghttp2_rcbuf_get_buf(name);
245     auto valuebuf = nghttp2_rcbuf_get_buf(value);
246 
247     ULOG(INFO, upstream) << "Invalid header field for stream_id="
248                          << frame->hd.stream_id << ": name=["
249                          << StringRef{namebuf.base, namebuf.len} << "], value=["
250                          << StringRef{valuebuf.base, valuebuf.len} << "]";
251   }
252 
253   upstream->rst_stream(downstream, NGHTTP2_PROTOCOL_ERROR);
254 
255   return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
256 }
257 } // namespace
258 
259 namespace {
on_begin_headers_callback(nghttp2_session * session,const nghttp2_frame * frame,void * user_data)260 int on_begin_headers_callback(nghttp2_session *session,
261                               const nghttp2_frame *frame, void *user_data) {
262   auto upstream = static_cast<Http2Upstream *>(user_data);
263 
264   if (frame->headers.cat != NGHTTP2_HCAT_REQUEST) {
265     return 0;
266   }
267   if (LOG_ENABLED(INFO)) {
268     ULOG(INFO, upstream) << "Received upstream request HEADERS stream_id="
269                          << frame->hd.stream_id;
270   }
271 
272   upstream->on_start_request(frame);
273 
274   return 0;
275 }
276 } // namespace
277 
on_start_request(const nghttp2_frame * frame)278 void Http2Upstream::on_start_request(const nghttp2_frame *frame) {
279   auto downstream = std::make_unique<Downstream>(this, handler_->get_mcpool(),
280                                                  frame->hd.stream_id);
281   nghttp2_session_set_stream_user_data(session_, frame->hd.stream_id,
282                                        downstream.get());
283 
284   downstream->reset_upstream_rtimer();
285 
286   auto config = get_config();
287   auto &httpconf = config->http;
288 
289   handler_->reset_upstream_read_timeout(httpconf.timeout.header);
290 
291   auto &req = downstream->request();
292 
293   // Although, we deprecated minor version from HTTP/2, we supply
294   // minor version 0 to use via header field in a conventional way.
295   req.http_major = 2;
296   req.http_minor = 0;
297 
298   add_pending_downstream(std::move(downstream));
299 
300   ++num_requests_;
301 
302   if (httpconf.max_requests <= num_requests_) {
303     start_graceful_shutdown();
304   }
305 }
306 
on_request_headers(Downstream * downstream,const nghttp2_frame * frame)307 int Http2Upstream::on_request_headers(Downstream *downstream,
308                                       const nghttp2_frame *frame) {
309   auto lgconf = log_config();
310   lgconf->update_tstamp(std::chrono::system_clock::now());
311   auto &req = downstream->request();
312   req.tstamp = lgconf->tstamp;
313 
314   if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
315     return 0;
316   }
317 
318   auto &nva = req.fs.headers();
319 
320   if (LOG_ENABLED(INFO)) {
321     std::stringstream ss;
322     for (auto &nv : nva) {
323       if (nv.name == "authorization"_sr) {
324         ss << TTY_HTTP_HD << nv.name << TTY_RST << ": <redacted>\n";
325         continue;
326       }
327       ss << TTY_HTTP_HD << nv.name << TTY_RST << ": " << nv.value << "\n";
328     }
329     ULOG(INFO, this) << "HTTP request headers. stream_id="
330                      << downstream->get_stream_id() << "\n"
331                      << ss.str();
332   }
333 
334   auto config = get_config();
335   auto &dump = config->http2.upstream.debug.dump;
336 
337   if (dump.request_header) {
338     http2::dump_nv(dump.request_header, nva);
339   }
340 
341   auto content_length = req.fs.header(http2::HD_CONTENT_LENGTH);
342   if (content_length) {
343     // libnghttp2 guarantees this can be parsed
344     req.fs.content_length =
345       util::parse_uint(content_length->value).value_or(-1);
346   }
347 
348   // presence of mandatory header fields are guaranteed by libnghttp2.
349   auto authority = req.fs.header(http2::HD__AUTHORITY);
350   auto path = req.fs.header(http2::HD__PATH);
351   auto method = req.fs.header(http2::HD__METHOD);
352   auto scheme = req.fs.header(http2::HD__SCHEME);
353 
354   auto method_token = http2::lookup_method_token(method->value);
355   if (method_token == -1) {
356     if (error_reply(downstream, 501) != 0) {
357       return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
358     }
359     return 0;
360   }
361 
362   auto faddr = handler_->get_upstream_addr();
363 
364   // For HTTP/2 proxy, we require :authority.
365   if (method_token != HTTP_CONNECT && config->http2_proxy &&
366       faddr->alt_mode == UpstreamAltMode::NONE && !authority) {
367     rst_stream(downstream, NGHTTP2_PROTOCOL_ERROR);
368     return 0;
369   }
370 
371   req.method = method_token;
372   if (scheme) {
373     req.scheme = scheme->value;
374   }
375 
376   // nghttp2 library guarantees either :authority or host exist
377   if (!authority) {
378     req.no_authority = true;
379     authority = req.fs.header(http2::HD_HOST);
380   }
381 
382   if (authority) {
383     req.authority = authority->value;
384   }
385 
386   if (path) {
387     if (method_token == HTTP_OPTIONS && path->value == "*"_sr) {
388       // Server-wide OPTIONS request.  Path is empty.
389     } else if (config->http2_proxy &&
390                faddr->alt_mode == UpstreamAltMode::NONE) {
391       req.path = path->value;
392     } else {
393       req.path = http2::rewrite_clean_path(downstream->get_block_allocator(),
394                                            path->value);
395     }
396   }
397 
398   auto connect_proto = req.fs.header(http2::HD__PROTOCOL);
399   if (connect_proto) {
400     if (connect_proto->value != "websocket"_sr) {
401       if (error_reply(downstream, 400) != 0) {
402         return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
403       }
404       return 0;
405     }
406     req.connect_proto = ConnectProto::WEBSOCKET;
407   }
408 
409   if (!(frame->hd.flags & NGHTTP2_FLAG_END_STREAM)) {
410     req.http2_expect_body = true;
411   } else if (req.fs.content_length == -1) {
412     // If END_STREAM flag is set to HEADERS frame, we are sure that
413     // content-length is 0.
414     req.fs.content_length = 0;
415   }
416 
417   downstream->inspect_http2_request();
418 
419   downstream->set_request_state(DownstreamState::HEADER_COMPLETE);
420 
421   if (config->http.require_http_scheme &&
422       !http::check_http_scheme(req.scheme, handler_->get_ssl() != nullptr)) {
423     if (error_reply(downstream, 400) != 0) {
424       return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
425     }
426     return 0;
427   }
428 
429 #ifdef HAVE_MRUBY
430   auto worker = handler_->get_worker();
431   auto mruby_ctx = worker->get_mruby_context();
432 
433   if (mruby_ctx->run_on_request_proc(downstream) != 0) {
434     if (error_reply(downstream, 500) != 0) {
435       return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
436     }
437     return 0;
438   }
439 #endif // HAVE_MRUBY
440 
441   if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
442     downstream->disable_upstream_rtimer();
443 
444     downstream->set_request_state(DownstreamState::MSG_COMPLETE);
445   }
446 
447   if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
448     return 0;
449   }
450 
451   start_downstream(downstream);
452 
453   return 0;
454 }
455 
start_downstream(Downstream * downstream)456 void Http2Upstream::start_downstream(Downstream *downstream) {
457   if (downstream_queue_.can_activate(downstream->request().authority)) {
458     initiate_downstream(downstream);
459     return;
460   }
461 
462   downstream_queue_.mark_blocked(downstream);
463 }
464 
initiate_downstream(Downstream * downstream)465 void Http2Upstream::initiate_downstream(Downstream *downstream) {
466   int rv;
467 
468 #ifdef HAVE_MRUBY
469   DownstreamConnection *dconn_ptr;
470 #endif // HAVE_MRUBY
471 
472   for (;;) {
473     auto dconn = handler_->get_downstream_connection(rv, downstream);
474     if (!dconn) {
475       if (rv == SHRPX_ERR_TLS_REQUIRED) {
476         rv = redirect_to_https(downstream);
477       } else {
478         rv = error_reply(downstream, 502);
479       }
480       if (rv != 0) {
481         rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
482       }
483 
484       downstream->set_request_state(DownstreamState::CONNECT_FAIL);
485       downstream_queue_.mark_failure(downstream);
486 
487       return;
488     }
489 
490 #ifdef HAVE_MRUBY
491     dconn_ptr = dconn.get();
492 #endif // HAVE_MRUBY
493     rv = downstream->attach_downstream_connection(std::move(dconn));
494     if (rv == 0) {
495       break;
496     }
497   }
498 
499 #ifdef HAVE_MRUBY
500   const auto &group = dconn_ptr->get_downstream_addr_group();
501   if (group) {
502     const auto &mruby_ctx = group->shared_addr->mruby_ctx;
503     if (mruby_ctx->run_on_request_proc(downstream) != 0) {
504       if (error_reply(downstream, 500) != 0) {
505         rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
506       }
507 
508       downstream_queue_.mark_failure(downstream);
509 
510       return;
511     }
512 
513     if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
514       return;
515     }
516   }
517 #endif // HAVE_MRUBY
518 
519   rv = downstream->push_request_headers();
520   if (rv != 0) {
521     if (error_reply(downstream, 502) != 0) {
522       rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
523     }
524 
525     downstream_queue_.mark_failure(downstream);
526 
527     return;
528   }
529 
530   downstream_queue_.mark_active(downstream);
531 
532   auto &req = downstream->request();
533   if (!req.http2_expect_body) {
534     rv = downstream->end_upload_data();
535     if (rv != 0) {
536       rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
537     }
538   }
539 
540   return;
541 }
542 
543 namespace {
on_frame_recv_callback(nghttp2_session * session,const nghttp2_frame * frame,void * user_data)544 int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame,
545                            void *user_data) {
546   if (get_config()->http2.upstream.debug.frame_debug) {
547     verbose_on_frame_recv_callback(session, frame, user_data);
548   }
549   auto upstream = static_cast<Http2Upstream *>(user_data);
550   auto handler = upstream->get_client_handler();
551 
552   switch (frame->hd.type) {
553   case NGHTTP2_DATA: {
554     auto downstream = static_cast<Downstream *>(
555       nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
556     if (!downstream) {
557       return 0;
558     }
559 
560     if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
561       downstream->disable_upstream_rtimer();
562 
563       if (downstream->end_upload_data() != 0) {
564         if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE) {
565           upstream->rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
566         }
567       }
568 
569       downstream->set_request_state(DownstreamState::MSG_COMPLETE);
570     }
571 
572     return 0;
573   }
574   case NGHTTP2_HEADERS: {
575     auto downstream = static_cast<Downstream *>(
576       nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
577     if (!downstream) {
578       return 0;
579     }
580 
581     if (frame->headers.cat == NGHTTP2_HCAT_REQUEST) {
582       downstream->reset_upstream_rtimer();
583 
584       handler->stop_read_timer();
585 
586       return upstream->on_request_headers(downstream, frame);
587     }
588 
589     if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
590       downstream->disable_upstream_rtimer();
591 
592       if (downstream->end_upload_data() != 0) {
593         if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE) {
594           upstream->rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
595         }
596       }
597 
598       downstream->set_request_state(DownstreamState::MSG_COMPLETE);
599     }
600 
601     return 0;
602   }
603   case NGHTTP2_SETTINGS:
604     if ((frame->hd.flags & NGHTTP2_FLAG_ACK) == 0) {
605       return 0;
606     }
607     upstream->stop_settings_timer();
608     return 0;
609   case NGHTTP2_GOAWAY:
610     if (LOG_ENABLED(INFO)) {
611       auto debug_data = util::ascii_dump(frame->goaway.opaque_data,
612                                          frame->goaway.opaque_data_len);
613 
614       ULOG(INFO, upstream) << "GOAWAY received: last-stream-id="
615                            << frame->goaway.last_stream_id
616                            << ", error_code=" << frame->goaway.error_code
617                            << ", debug_data=" << debug_data;
618     }
619     return 0;
620   default:
621     return 0;
622   }
623 }
624 } // namespace
625 
626 namespace {
on_data_chunk_recv_callback(nghttp2_session * session,uint8_t flags,int32_t stream_id,const uint8_t * data,size_t len,void * user_data)627 int on_data_chunk_recv_callback(nghttp2_session *session, uint8_t flags,
628                                 int32_t stream_id, const uint8_t *data,
629                                 size_t len, void *user_data) {
630   auto upstream = static_cast<Http2Upstream *>(user_data);
631   auto downstream = static_cast<Downstream *>(
632     nghttp2_session_get_stream_user_data(session, stream_id));
633 
634   if (!downstream) {
635     if (upstream->consume(stream_id, len) != 0) {
636       return NGHTTP2_ERR_CALLBACK_FAILURE;
637     }
638 
639     return 0;
640   }
641 
642   downstream->reset_upstream_rtimer();
643 
644   if (downstream->push_upload_data_chunk(data, len) != 0) {
645     if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE) {
646       upstream->rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
647     }
648 
649     if (upstream->consume(stream_id, len) != 0) {
650       return NGHTTP2_ERR_CALLBACK_FAILURE;
651     }
652 
653     return 0;
654   }
655 
656   return 0;
657 }
658 } // namespace
659 
660 namespace {
on_frame_send_callback(nghttp2_session * session,const nghttp2_frame * frame,void * user_data)661 int on_frame_send_callback(nghttp2_session *session, const nghttp2_frame *frame,
662                            void *user_data) {
663   if (get_config()->http2.upstream.debug.frame_debug) {
664     verbose_on_frame_send_callback(session, frame, user_data);
665   }
666   auto upstream = static_cast<Http2Upstream *>(user_data);
667   auto handler = upstream->get_client_handler();
668 
669   switch (frame->hd.type) {
670   case NGHTTP2_DATA:
671   case NGHTTP2_HEADERS: {
672     if ((frame->hd.flags & NGHTTP2_FLAG_END_STREAM) == 0) {
673       return 0;
674     }
675     // RST_STREAM if request is still incomplete.
676     auto stream_id = frame->hd.stream_id;
677     auto downstream = static_cast<Downstream *>(
678       nghttp2_session_get_stream_user_data(session, stream_id));
679 
680     if (!downstream) {
681       return 0;
682     }
683 
684     // For tunneling, issue RST_STREAM to finish the stream.
685     if (downstream->get_upgraded() ||
686         nghttp2_session_get_stream_remote_close(session, stream_id) == 0) {
687       if (LOG_ENABLED(INFO)) {
688         ULOG(INFO, upstream)
689           << "Send RST_STREAM to "
690           << (downstream->get_upgraded() ? "tunneled " : "")
691           << "stream stream_id=" << downstream->get_stream_id()
692           << " to finish off incomplete request";
693       }
694 
695       upstream->rst_stream(downstream, NGHTTP2_NO_ERROR);
696     }
697 
698     return 0;
699   }
700   case NGHTTP2_SETTINGS:
701     if ((frame->hd.flags & NGHTTP2_FLAG_ACK) == 0) {
702       upstream->start_settings_timer();
703     }
704     return 0;
705   case NGHTTP2_PUSH_PROMISE: {
706     auto promised_stream_id = frame->push_promise.promised_stream_id;
707 
708     if (nghttp2_session_get_stream_user_data(session, promised_stream_id)) {
709       // In case of push from backend, downstream object was already
710       // created.
711       return 0;
712     }
713 
714     auto promised_downstream = std::make_unique<Downstream>(
715       upstream, handler->get_mcpool(), promised_stream_id);
716     auto &req = promised_downstream->request();
717 
718     // As long as we use nghttp2_session_mem_send2(), setting stream
719     // user data here should not fail.  This is because this callback
720     // is called just after frame was serialized.  So no worries about
721     // hanging Downstream.
722     nghttp2_session_set_stream_user_data(session, promised_stream_id,
723                                          promised_downstream.get());
724 
725     promised_downstream->set_assoc_stream_id(frame->hd.stream_id);
726     promised_downstream->disable_upstream_rtimer();
727 
728     req.http_major = 2;
729     req.http_minor = 0;
730 
731     req.fs.content_length = 0;
732     req.http2_expect_body = false;
733 
734     auto &promised_balloc = promised_downstream->get_block_allocator();
735 
736     for (size_t i = 0; i < frame->push_promise.nvlen; ++i) {
737       auto &nv = frame->push_promise.nva[i];
738 
739       auto name =
740         make_string_ref(promised_balloc, StringRef{nv.name, nv.namelen});
741       auto value =
742         make_string_ref(promised_balloc, StringRef{nv.value, nv.valuelen});
743 
744       auto token = http2::lookup_token(name);
745       switch (token) {
746       case http2::HD__METHOD:
747         req.method = http2::lookup_method_token(value);
748         break;
749       case http2::HD__SCHEME:
750         req.scheme = value;
751         break;
752       case http2::HD__AUTHORITY:
753         req.authority = value;
754         break;
755       case http2::HD__PATH:
756         req.path = http2::rewrite_clean_path(promised_balloc, value);
757         break;
758       }
759       req.fs.add_header_token(name, value, nv.flags & NGHTTP2_NV_FLAG_NO_INDEX,
760                               token);
761     }
762 
763     promised_downstream->inspect_http2_request();
764 
765     promised_downstream->set_request_state(DownstreamState::MSG_COMPLETE);
766 
767     // a bit weird but start_downstream() expects that given
768     // downstream is in pending queue.
769     auto ptr = promised_downstream.get();
770     upstream->add_pending_downstream(std::move(promised_downstream));
771 
772 #ifdef HAVE_MRUBY
773     auto worker = handler->get_worker();
774     auto mruby_ctx = worker->get_mruby_context();
775 
776     if (mruby_ctx->run_on_request_proc(ptr) != 0) {
777       if (upstream->error_reply(ptr, 500) != 0) {
778         upstream->rst_stream(ptr, NGHTTP2_INTERNAL_ERROR);
779         return 0;
780       }
781       return 0;
782     }
783 #endif // HAVE_MRUBY
784 
785     upstream->start_downstream(ptr);
786 
787     return 0;
788   }
789   case NGHTTP2_GOAWAY:
790     if (LOG_ENABLED(INFO)) {
791       auto debug_data = util::ascii_dump(frame->goaway.opaque_data,
792                                          frame->goaway.opaque_data_len);
793 
794       ULOG(INFO, upstream) << "Sending GOAWAY: last-stream-id="
795                            << frame->goaway.last_stream_id
796                            << ", error_code=" << frame->goaway.error_code
797                            << ", debug_data=" << debug_data;
798     }
799     return 0;
800   default:
801     return 0;
802   }
803 }
804 } // namespace
805 
806 namespace {
on_frame_not_send_callback(nghttp2_session * session,const nghttp2_frame * frame,int lib_error_code,void * user_data)807 int on_frame_not_send_callback(nghttp2_session *session,
808                                const nghttp2_frame *frame, int lib_error_code,
809                                void *user_data) {
810   auto upstream = static_cast<Http2Upstream *>(user_data);
811   if (LOG_ENABLED(INFO)) {
812     ULOG(INFO, upstream) << "Failed to send control frame type="
813                          << static_cast<uint32_t>(frame->hd.type)
814                          << ", lib_error_code=" << lib_error_code << ":"
815                          << nghttp2_strerror(lib_error_code);
816   }
817   if (frame->hd.type == NGHTTP2_HEADERS &&
818       lib_error_code != NGHTTP2_ERR_STREAM_CLOSED &&
819       lib_error_code != NGHTTP2_ERR_STREAM_CLOSING) {
820     // To avoid stream hanging around, issue RST_STREAM.
821     auto downstream = static_cast<Downstream *>(
822       nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
823     if (downstream) {
824       upstream->rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
825     }
826   }
827   return 0;
828 }
829 } // namespace
830 
831 namespace {
832 constexpr auto PADDING = std::array<uint8_t, 256>{};
833 } // namespace
834 
835 namespace {
send_data_callback(nghttp2_session * session,nghttp2_frame * frame,const uint8_t * framehd,size_t length,nghttp2_data_source * source,void * user_data)836 int send_data_callback(nghttp2_session *session, nghttp2_frame *frame,
837                        const uint8_t *framehd, size_t length,
838                        nghttp2_data_source *source, void *user_data) {
839   auto downstream = static_cast<Downstream *>(source->ptr);
840   auto upstream = static_cast<Http2Upstream *>(downstream->get_upstream());
841   auto body = downstream->get_response_buf();
842 
843   auto wb = upstream->get_response_buf();
844 
845   size_t padlen = 0;
846 
847   wb->append(framehd, 9);
848   if (frame->data.padlen > 0) {
849     padlen = frame->data.padlen - 1;
850     wb->append(static_cast<uint8_t>(padlen));
851   }
852 
853   body->remove(*wb, length);
854 
855   wb->append(PADDING.data(), padlen);
856 
857   if (body->rleft() == 0) {
858     downstream->disable_upstream_wtimer();
859   } else {
860     downstream->reset_upstream_wtimer();
861   }
862 
863   if (length > 0 && downstream->resume_read(SHRPX_NO_BUFFER, length) != 0) {
864     return NGHTTP2_ERR_CALLBACK_FAILURE;
865   }
866 
867   // We have to add length here, so that we can log this amount of
868   // data transferred.
869   downstream->response_sent_body_length += length;
870 
871   auto max_buffer_size = upstream->get_max_buffer_size();
872 
873   return wb->rleft() >= max_buffer_size ? NGHTTP2_ERR_PAUSE : 0;
874 }
875 } // namespace
876 
877 namespace {
infer_upstream_rst_stream_error_code(uint32_t downstream_error_code)878 uint32_t infer_upstream_rst_stream_error_code(uint32_t downstream_error_code) {
879   // NGHTTP2_REFUSED_STREAM is important because it tells upstream
880   // client to retry.
881   switch (downstream_error_code) {
882   case NGHTTP2_NO_ERROR:
883   case NGHTTP2_REFUSED_STREAM:
884     return downstream_error_code;
885   default:
886     return NGHTTP2_INTERNAL_ERROR;
887   }
888 }
889 } // namespace
890 
891 namespace {
settings_timeout_cb(struct ev_loop * loop,ev_timer * w,int revents)892 void settings_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
893   auto upstream = static_cast<Http2Upstream *>(w->data);
894   auto handler = upstream->get_client_handler();
895   ULOG(INFO, upstream) << "SETTINGS timeout";
896   if (upstream->terminate_session(NGHTTP2_SETTINGS_TIMEOUT) != 0) {
897     delete handler;
898     return;
899   }
900   handler->signal_write();
901 }
902 } // namespace
903 
904 namespace {
shutdown_timeout_cb(struct ev_loop * loop,ev_timer * w,int revents)905 void shutdown_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
906   auto upstream = static_cast<Http2Upstream *>(w->data);
907   auto handler = upstream->get_client_handler();
908   upstream->submit_goaway();
909   handler->signal_write();
910 }
911 } // namespace
912 
913 namespace {
prepare_cb(struct ev_loop * loop,ev_prepare * w,int revents)914 void prepare_cb(struct ev_loop *loop, ev_prepare *w, int revents) {
915   auto upstream = static_cast<Http2Upstream *>(w->data);
916   upstream->check_shutdown();
917 }
918 } // namespace
919 
submit_goaway()920 void Http2Upstream::submit_goaway() {
921   auto last_stream_id = nghttp2_session_get_last_proc_stream_id(session_);
922   nghttp2_submit_goaway(session_, NGHTTP2_FLAG_NONE, last_stream_id,
923                         NGHTTP2_NO_ERROR, nullptr, 0);
924 }
925 
check_shutdown()926 void Http2Upstream::check_shutdown() {
927   auto worker = handler_->get_worker();
928 
929   if (!worker->get_graceful_shutdown()) {
930     return;
931   }
932 
933   ev_prepare_stop(handler_->get_loop(), &prep_);
934 
935   start_graceful_shutdown();
936 }
937 
start_graceful_shutdown()938 void Http2Upstream::start_graceful_shutdown() {
939   int rv;
940   if (ev_is_active(&shutdown_timer_)) {
941     return;
942   }
943 
944   rv = nghttp2_submit_shutdown_notice(session_);
945   if (rv != 0) {
946     ULOG(FATAL, this) << "nghttp2_submit_shutdown_notice() failed: "
947                       << nghttp2_strerror(rv);
948     return;
949   }
950 
951   handler_->signal_write();
952 
953   ev_timer_start(handler_->get_loop(), &shutdown_timer_);
954 }
955 
create_http2_upstream_callbacks()956 nghttp2_session_callbacks *create_http2_upstream_callbacks() {
957   int rv;
958   nghttp2_session_callbacks *callbacks;
959 
960   rv = nghttp2_session_callbacks_new(&callbacks);
961 
962   if (rv != 0) {
963     return nullptr;
964   }
965 
966   nghttp2_session_callbacks_set_on_stream_close_callback(
967     callbacks, on_stream_close_callback);
968 
969   nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks,
970                                                        on_frame_recv_callback);
971 
972   nghttp2_session_callbacks_set_on_data_chunk_recv_callback(
973     callbacks, on_data_chunk_recv_callback);
974 
975   nghttp2_session_callbacks_set_on_frame_send_callback(callbacks,
976                                                        on_frame_send_callback);
977 
978   nghttp2_session_callbacks_set_on_frame_not_send_callback(
979     callbacks, on_frame_not_send_callback);
980 
981   nghttp2_session_callbacks_set_on_header_callback2(callbacks,
982                                                     on_header_callback2);
983 
984   nghttp2_session_callbacks_set_on_invalid_header_callback2(
985     callbacks, on_invalid_header_callback2);
986 
987   nghttp2_session_callbacks_set_on_begin_headers_callback(
988     callbacks, on_begin_headers_callback);
989 
990   nghttp2_session_callbacks_set_send_data_callback(callbacks,
991                                                    send_data_callback);
992 
993   auto config = get_config();
994 
995   if (config->padding) {
996     nghttp2_session_callbacks_set_select_padding_callback2(
997       callbacks, http::select_padding_callback);
998   }
999 
1000   if (config->http2.upstream.debug.frame_debug) {
1001     nghttp2_session_callbacks_set_error_callback2(callbacks,
1002                                                   verbose_error_callback);
1003   }
1004 
1005   return callbacks;
1006 }
1007 
1008 namespace {
downstream_queue_size(Worker * worker)1009 size_t downstream_queue_size(Worker *worker) {
1010   auto &downstreamconf = *worker->get_downstream_config();
1011 
1012   if (get_config()->http2_proxy) {
1013     return downstreamconf.connections_per_host;
1014   }
1015 
1016   return downstreamconf.connections_per_frontend;
1017 }
1018 } // namespace
1019 
Http2Upstream(ClientHandler * handler)1020 Http2Upstream::Http2Upstream(ClientHandler *handler)
1021   : wb_(handler->get_worker()->get_mcpool()),
1022     downstream_queue_(downstream_queue_size(handler->get_worker()),
1023                       !get_config()->http2_proxy),
1024     handler_(handler),
1025     session_(nullptr),
1026     max_buffer_size_(MAX_BUFFER_SIZE),
1027     num_requests_(0) {
1028   int rv;
1029 
1030   auto config = get_config();
1031   auto &http2conf = config->http2;
1032 
1033   auto faddr = handler_->get_upstream_addr();
1034 
1035   rv =
1036     nghttp2_session_server_new2(&session_, http2conf.upstream.callbacks, this,
1037                                 faddr->alt_mode != UpstreamAltMode::NONE
1038                                   ? http2conf.upstream.alt_mode_option
1039                                   : http2conf.upstream.option);
1040 
1041   assert(rv == 0);
1042 
1043   flow_control_ = true;
1044 
1045   // TODO Maybe call from outside?
1046   std::array<nghttp2_settings_entry, 5> entry;
1047   size_t nentry = 3;
1048 
1049   entry[0].settings_id = NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS;
1050   entry[0].value = http2conf.upstream.max_concurrent_streams;
1051 
1052   entry[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
1053   if (faddr->alt_mode != UpstreamAltMode::NONE) {
1054     entry[1].value = (1u << 31) - 1;
1055   } else {
1056     entry[1].value = http2conf.upstream.window_size;
1057   }
1058 
1059   entry[2].settings_id = NGHTTP2_SETTINGS_NO_RFC7540_PRIORITIES;
1060   entry[2].value = 1;
1061 
1062   if (!config->http2_proxy) {
1063     entry[nentry].settings_id = NGHTTP2_SETTINGS_ENABLE_CONNECT_PROTOCOL;
1064     entry[nentry].value = 1;
1065     ++nentry;
1066   }
1067 
1068   if (http2conf.upstream.decoder_dynamic_table_size !=
1069       NGHTTP2_DEFAULT_HEADER_TABLE_SIZE) {
1070     entry[nentry].settings_id = NGHTTP2_SETTINGS_HEADER_TABLE_SIZE;
1071     entry[nentry].value = http2conf.upstream.decoder_dynamic_table_size;
1072     ++nentry;
1073   }
1074 
1075   rv =
1076     nghttp2_submit_settings(session_, NGHTTP2_FLAG_NONE, entry.data(), nentry);
1077   if (rv != 0) {
1078     ULOG(ERROR, this) << "nghttp2_submit_settings() returned error: "
1079                       << nghttp2_strerror(rv);
1080   }
1081 
1082   auto window_size = faddr->alt_mode != UpstreamAltMode::NONE
1083                        ? std::numeric_limits<int32_t>::max()
1084                      : http2conf.upstream.optimize_window_size
1085                        ? std::min(http2conf.upstream.connection_window_size,
1086                                   NGHTTP2_INITIAL_CONNECTION_WINDOW_SIZE)
1087                        : http2conf.upstream.connection_window_size;
1088 
1089   rv = nghttp2_session_set_local_window_size(session_, NGHTTP2_FLAG_NONE, 0,
1090                                              window_size);
1091 
1092   if (rv != 0) {
1093     ULOG(ERROR, this)
1094       << "nghttp2_session_set_local_window_size() returned error: "
1095       << nghttp2_strerror(rv);
1096   }
1097 
1098   // We wait for SETTINGS ACK at least 10 seconds.
1099   ev_timer_init(&settings_timer_, settings_timeout_cb,
1100                 http2conf.upstream.timeout.settings, 0.);
1101 
1102   settings_timer_.data = this;
1103 
1104   // timer for 2nd GOAWAY.  HTTP/2 spec recommend 1 RTT.  We wait for
1105   // 2 seconds.
1106   ev_timer_init(&shutdown_timer_, shutdown_timeout_cb, 2., 0);
1107   shutdown_timer_.data = this;
1108 
1109   ev_prepare_init(&prep_, prepare_cb);
1110   prep_.data = this;
1111   ev_prepare_start(handler_->get_loop(), &prep_);
1112 
1113 #if defined(TCP_INFO) && defined(TCP_NOTSENT_LOWAT)
1114   if (http2conf.upstream.optimize_write_buffer_size) {
1115     auto conn = handler_->get_connection();
1116     conn->tls_dyn_rec_warmup_threshold = 0;
1117 
1118     uint32_t pollout_thres = 1;
1119     rv = setsockopt(conn->fd, IPPROTO_TCP, TCP_NOTSENT_LOWAT, &pollout_thres,
1120                     static_cast<socklen_t>(sizeof(pollout_thres)));
1121 
1122     if (rv != 0) {
1123       if (LOG_ENABLED(INFO)) {
1124         auto error = errno;
1125         LOG(INFO) << "setsockopt(TCP_NOTSENT_LOWAT, " << pollout_thres
1126                   << ") failed: errno=" << error;
1127       }
1128     }
1129   }
1130 #endif // defined(TCP_INFO) && defined(TCP_NOTSENT_LOWAT)
1131 
1132   handler_->reset_upstream_read_timeout(
1133     config->conn.upstream.timeout.http2_idle);
1134 
1135   handler_->signal_write();
1136 }
1137 
~Http2Upstream()1138 Http2Upstream::~Http2Upstream() {
1139   nghttp2_session_del(session_);
1140   ev_prepare_stop(handler_->get_loop(), &prep_);
1141   ev_timer_stop(handler_->get_loop(), &shutdown_timer_);
1142   ev_timer_stop(handler_->get_loop(), &settings_timer_);
1143 }
1144 
on_read()1145 int Http2Upstream::on_read() {
1146   auto rb = handler_->get_rb();
1147   auto rlimit = handler_->get_rlimit();
1148 
1149   if (rb->rleft()) {
1150     auto rv = nghttp2_session_mem_recv2(session_, rb->pos(), rb->rleft());
1151     if (rv < 0) {
1152       if (rv != NGHTTP2_ERR_BAD_CLIENT_MAGIC) {
1153         ULOG(ERROR, this) << "nghttp2_session_mem_recv2() returned error: "
1154                           << nghttp2_strerror(rv);
1155       }
1156       return -1;
1157     }
1158 
1159     // nghttp2_session_mem_recv2 should consume all input bytes on
1160     // success.
1161     assert(static_cast<size_t>(rv) == rb->rleft());
1162     rb->reset();
1163     rlimit->startw();
1164   }
1165 
1166   if (nghttp2_session_want_read(session_) == 0 &&
1167       nghttp2_session_want_write(session_) == 0 && wb_.rleft() == 0) {
1168     if (LOG_ENABLED(INFO)) {
1169       ULOG(INFO, this) << "No more read/write for this HTTP2 session";
1170     }
1171     return -1;
1172   }
1173 
1174   handler_->signal_write();
1175   return 0;
1176 }
1177 
1178 // After this function call, downstream may be deleted.
on_write()1179 int Http2Upstream::on_write() {
1180   int rv;
1181   auto config = get_config();
1182   auto &http2conf = config->http2;
1183 
1184   if ((http2conf.upstream.optimize_write_buffer_size ||
1185        http2conf.upstream.optimize_window_size) &&
1186       handler_->get_ssl()) {
1187     auto conn = handler_->get_connection();
1188     TCPHint hint;
1189     rv = conn->get_tcp_hint(&hint);
1190     if (rv == 0) {
1191       if (http2conf.upstream.optimize_write_buffer_size) {
1192         max_buffer_size_ = std::min(MAX_BUFFER_SIZE, hint.write_buffer_size);
1193       }
1194 
1195       if (http2conf.upstream.optimize_window_size) {
1196         auto faddr = handler_->get_upstream_addr();
1197         if (faddr->alt_mode == UpstreamAltMode::NONE) {
1198           auto window_size = std::min(http2conf.upstream.connection_window_size,
1199                                       static_cast<int32_t>(hint.rwin * 2));
1200 
1201           rv = nghttp2_session_set_local_window_size(
1202             session_, NGHTTP2_FLAG_NONE, 0, window_size);
1203           if (rv != 0) {
1204             if (LOG_ENABLED(INFO)) {
1205               ULOG(INFO, this)
1206                 << "nghttp2_session_set_local_window_size() with window_size="
1207                 << window_size << " failed: " << nghttp2_strerror(rv);
1208             }
1209           }
1210         }
1211       }
1212     }
1213   }
1214 
1215   for (;;) {
1216     if (wb_.rleft() >= max_buffer_size_) {
1217       return 0;
1218     }
1219 
1220     const uint8_t *data;
1221     auto datalen = nghttp2_session_mem_send2(session_, &data);
1222 
1223     if (datalen < 0) {
1224       ULOG(ERROR, this) << "nghttp2_session_mem_send2() returned error: "
1225                         << nghttp2_strerror(datalen);
1226       return -1;
1227     }
1228     if (datalen == 0) {
1229       break;
1230     }
1231     wb_.append(data, datalen);
1232   }
1233 
1234   if (nghttp2_session_want_read(session_) == 0 &&
1235       nghttp2_session_want_write(session_) == 0 && wb_.rleft() == 0) {
1236     if (LOG_ENABLED(INFO)) {
1237       ULOG(INFO, this) << "No more read/write for this HTTP2 session";
1238     }
1239     return -1;
1240   }
1241 
1242   return 0;
1243 }
1244 
get_client_handler() const1245 ClientHandler *Http2Upstream::get_client_handler() const { return handler_; }
1246 
downstream_read(DownstreamConnection * dconn)1247 int Http2Upstream::downstream_read(DownstreamConnection *dconn) {
1248   auto downstream = dconn->get_downstream();
1249 
1250   if (downstream->get_response_state() == DownstreamState::MSG_RESET) {
1251     // The downstream stream was reset (canceled). In this case,
1252     // RST_STREAM to the upstream and delete downstream connection
1253     // here. Deleting downstream will be taken place at
1254     // on_stream_close_callback.
1255     rst_stream(downstream, infer_upstream_rst_stream_error_code(
1256                              downstream->get_response_rst_stream_error_code()));
1257     downstream->pop_downstream_connection();
1258     // dconn was deleted
1259     dconn = nullptr;
1260   } else if (downstream->get_response_state() ==
1261              DownstreamState::MSG_BAD_HEADER) {
1262     if (error_reply(downstream, 502) != 0) {
1263       return -1;
1264     }
1265     downstream->pop_downstream_connection();
1266     // dconn was deleted
1267     dconn = nullptr;
1268   } else {
1269     auto rv = downstream->on_read();
1270     if (rv == SHRPX_ERR_EOF) {
1271       if (downstream->get_request_header_sent()) {
1272         return downstream_eof(dconn);
1273       }
1274       return SHRPX_ERR_RETRY;
1275     }
1276     if (rv == SHRPX_ERR_DCONN_CANCELED) {
1277       downstream->pop_downstream_connection();
1278       handler_->signal_write();
1279       return 0;
1280     }
1281     if (rv != 0) {
1282       if (rv != SHRPX_ERR_NETWORK) {
1283         if (LOG_ENABLED(INFO)) {
1284           DCLOG(INFO, dconn) << "HTTP parser failure";
1285         }
1286       }
1287       return downstream_error(dconn, Downstream::EVENT_ERROR);
1288     }
1289 
1290     if (downstream->can_detach_downstream_connection()) {
1291       // Keep-alive
1292       downstream->detach_downstream_connection();
1293     }
1294   }
1295 
1296   handler_->signal_write();
1297 
1298   // At this point, downstream may be deleted.
1299 
1300   return 0;
1301 }
1302 
downstream_write(DownstreamConnection * dconn)1303 int Http2Upstream::downstream_write(DownstreamConnection *dconn) {
1304   int rv;
1305   rv = dconn->on_write();
1306   if (rv == SHRPX_ERR_NETWORK) {
1307     return downstream_error(dconn, Downstream::EVENT_ERROR);
1308   }
1309   if (rv != 0) {
1310     return rv;
1311   }
1312   return 0;
1313 }
1314 
downstream_eof(DownstreamConnection * dconn)1315 int Http2Upstream::downstream_eof(DownstreamConnection *dconn) {
1316   auto downstream = dconn->get_downstream();
1317 
1318   if (LOG_ENABLED(INFO)) {
1319     DCLOG(INFO, dconn) << "EOF. stream_id=" << downstream->get_stream_id();
1320   }
1321 
1322   // Delete downstream connection. If we don't delete it here, it will
1323   // be pooled in on_stream_close_callback.
1324   downstream->pop_downstream_connection();
1325   // dconn was deleted
1326   dconn = nullptr;
1327   // downstream will be deleted in on_stream_close_callback.
1328   if (downstream->get_response_state() == DownstreamState::HEADER_COMPLETE) {
1329     // Server may indicate the end of the request by EOF
1330     if (LOG_ENABLED(INFO)) {
1331       ULOG(INFO, this) << "Downstream body was ended by EOF";
1332     }
1333     downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1334 
1335     // For tunneled connection, MSG_COMPLETE signals
1336     // downstream_data_read_callback to send RST_STREAM after pending
1337     // response body is sent. This is needed to ensure that RST_STREAM
1338     // is sent after all pending data are sent.
1339     on_downstream_body_complete(downstream);
1340   } else if (downstream->get_response_state() !=
1341              DownstreamState::MSG_COMPLETE) {
1342     // If stream was not closed, then we set MSG_COMPLETE and let
1343     // on_stream_close_callback delete downstream.
1344     if (error_reply(downstream, 502) != 0) {
1345       return -1;
1346     }
1347   }
1348   handler_->signal_write();
1349   // At this point, downstream may be deleted.
1350   return 0;
1351 }
1352 
downstream_error(DownstreamConnection * dconn,int events)1353 int Http2Upstream::downstream_error(DownstreamConnection *dconn, int events) {
1354   auto downstream = dconn->get_downstream();
1355 
1356   if (LOG_ENABLED(INFO)) {
1357     if (events & Downstream::EVENT_ERROR) {
1358       DCLOG(INFO, dconn) << "Downstream network/general error";
1359     } else {
1360       DCLOG(INFO, dconn) << "Timeout";
1361     }
1362     if (downstream->get_upgraded()) {
1363       DCLOG(INFO, dconn) << "Note: this is tunnel connection";
1364     }
1365   }
1366 
1367   // Delete downstream connection. If we don't delete it here, it will
1368   // be pooled in on_stream_close_callback.
1369   downstream->pop_downstream_connection();
1370   // dconn was deleted
1371   dconn = nullptr;
1372 
1373   if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1374     // For SSL tunneling, we issue RST_STREAM. For other types of
1375     // stream, we don't have to do anything since response was
1376     // complete.
1377     if (downstream->get_upgraded()) {
1378       rst_stream(downstream, NGHTTP2_NO_ERROR);
1379     }
1380   } else {
1381     if (downstream->get_response_state() == DownstreamState::HEADER_COMPLETE) {
1382       if (downstream->get_upgraded()) {
1383         on_downstream_body_complete(downstream);
1384       } else {
1385         rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
1386       }
1387     } else {
1388       unsigned int status;
1389       if (events & Downstream::EVENT_TIMEOUT) {
1390         if (downstream->get_request_header_sent()) {
1391           status = 504;
1392         } else {
1393           status = 408;
1394         }
1395       } else {
1396         status = 502;
1397       }
1398       if (error_reply(downstream, status) != 0) {
1399         return -1;
1400       }
1401     }
1402     downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1403   }
1404   handler_->signal_write();
1405   // At this point, downstream may be deleted.
1406   return 0;
1407 }
1408 
rst_stream(Downstream * downstream,uint32_t error_code)1409 int Http2Upstream::rst_stream(Downstream *downstream, uint32_t error_code) {
1410   if (LOG_ENABLED(INFO)) {
1411     ULOG(INFO, this) << "RST_STREAM stream_id=" << downstream->get_stream_id()
1412                      << " with error_code=" << error_code;
1413   }
1414   int rv;
1415   rv = nghttp2_submit_rst_stream(session_, NGHTTP2_FLAG_NONE,
1416                                  downstream->get_stream_id(), error_code);
1417   if (rv < NGHTTP2_ERR_FATAL) {
1418     ULOG(FATAL, this) << "nghttp2_submit_rst_stream() failed: "
1419                       << nghttp2_strerror(rv);
1420     return -1;
1421   }
1422   return 0;
1423 }
1424 
terminate_session(uint32_t error_code)1425 int Http2Upstream::terminate_session(uint32_t error_code) {
1426   int rv;
1427   rv = nghttp2_session_terminate_session(session_, error_code);
1428   if (rv != 0) {
1429     return -1;
1430   }
1431   return 0;
1432 }
1433 
1434 namespace {
downstream_data_read_callback(nghttp2_session * session,int32_t stream_id,uint8_t * buf,size_t length,uint32_t * data_flags,nghttp2_data_source * source,void * user_data)1435 nghttp2_ssize downstream_data_read_callback(nghttp2_session *session,
1436                                             int32_t stream_id, uint8_t *buf,
1437                                             size_t length, uint32_t *data_flags,
1438                                             nghttp2_data_source *source,
1439                                             void *user_data) {
1440   int rv;
1441   auto downstream = static_cast<Downstream *>(source->ptr);
1442   auto body = downstream->get_response_buf();
1443   assert(body);
1444   auto upstream = static_cast<Http2Upstream *>(user_data);
1445 
1446   const auto &resp = downstream->response();
1447 
1448   auto nread = std::min(body->rleft(), length);
1449 
1450   auto max_buffer_size = upstream->get_max_buffer_size();
1451 
1452   auto buffer = upstream->get_response_buf();
1453 
1454   if (max_buffer_size <
1455       std::min(nread, static_cast<size_t>(256)) + 9 + buffer->rleft()) {
1456     if (LOG_ENABLED(INFO)) {
1457       ULOG(INFO, upstream) << "Buffer is almost full.  Skip write DATA";
1458     }
1459     return NGHTTP2_ERR_PAUSE;
1460   }
1461 
1462   nread = std::min(nread, max_buffer_size - 9 - buffer->rleft());
1463 
1464   auto body_empty = body->rleft() == nread;
1465 
1466   *data_flags |= NGHTTP2_DATA_FLAG_NO_COPY;
1467 
1468   if (body_empty &&
1469       downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1470     *data_flags |= NGHTTP2_DATA_FLAG_EOF;
1471 
1472     if (!downstream->get_upgraded()) {
1473       const auto &trailers = resp.fs.trailers();
1474       if (!trailers.empty()) {
1475         std::vector<nghttp2_nv> nva;
1476         nva.reserve(trailers.size());
1477         http2::copy_headers_to_nva_nocopy(nva, trailers, http2::HDOP_STRIP_ALL);
1478         if (!nva.empty()) {
1479           rv =
1480             nghttp2_submit_trailer(session, stream_id, nva.data(), nva.size());
1481           if (rv != 0) {
1482             if (nghttp2_is_fatal(rv)) {
1483               return NGHTTP2_ERR_CALLBACK_FAILURE;
1484             }
1485           } else {
1486             *data_flags |= NGHTTP2_DATA_FLAG_NO_END_STREAM;
1487           }
1488         }
1489       }
1490     }
1491   }
1492 
1493   if (nread == 0 && ((*data_flags) & NGHTTP2_DATA_FLAG_EOF) == 0) {
1494     downstream->disable_upstream_wtimer();
1495     return NGHTTP2_ERR_DEFERRED;
1496   }
1497 
1498   return nread;
1499 }
1500 } // namespace
1501 
send_reply(Downstream * downstream,const uint8_t * body,size_t bodylen)1502 int Http2Upstream::send_reply(Downstream *downstream, const uint8_t *body,
1503                               size_t bodylen) {
1504   int rv;
1505 
1506   nghttp2_data_provider2 data_prd, *data_prd_ptr = nullptr;
1507 
1508   const auto &req = downstream->request();
1509 
1510   if (req.method != HTTP_HEAD && bodylen) {
1511     data_prd.source.ptr = downstream;
1512     data_prd.read_callback = downstream_data_read_callback;
1513     data_prd_ptr = &data_prd;
1514 
1515     auto buf = downstream->get_response_buf();
1516 
1517     buf->append(body, bodylen);
1518   }
1519 
1520   const auto &resp = downstream->response();
1521   auto config = get_config();
1522   auto &httpconf = config->http;
1523 
1524   auto &balloc = downstream->get_block_allocator();
1525 
1526   const auto &headers = resp.fs.headers();
1527   auto nva = std::vector<nghttp2_nv>();
1528   // 2 for :status and server
1529   nva.reserve(2 + headers.size() + httpconf.add_response_headers.size());
1530 
1531   auto response_status = http2::stringify_status(balloc, resp.http_status);
1532 
1533   nva.push_back(http2::make_field(":status"_sr, response_status));
1534 
1535   for (auto &kv : headers) {
1536     if (kv.name.empty() || kv.name[0] == ':') {
1537       continue;
1538     }
1539     switch (kv.token) {
1540     case http2::HD_CONNECTION:
1541     case http2::HD_KEEP_ALIVE:
1542     case http2::HD_PROXY_CONNECTION:
1543     case http2::HD_TE:
1544     case http2::HD_TRANSFER_ENCODING:
1545     case http2::HD_UPGRADE:
1546       continue;
1547     }
1548     nva.push_back(
1549       http2::make_field(kv.name, kv.value, http2::no_index(kv.no_index)));
1550   }
1551 
1552   if (!resp.fs.header(http2::HD_SERVER)) {
1553     nva.push_back(http2::make_field("server"_sr, config->http.server_name));
1554   }
1555 
1556   for (auto &p : httpconf.add_response_headers) {
1557     nva.push_back(http2::make_field(p.name, p.value));
1558   }
1559 
1560   rv = nghttp2_submit_response2(session_, downstream->get_stream_id(),
1561                                 nva.data(), nva.size(), data_prd_ptr);
1562   if (nghttp2_is_fatal(rv)) {
1563     ULOG(FATAL, this) << "nghttp2_submit_response2() failed: "
1564                       << nghttp2_strerror(rv);
1565     return -1;
1566   }
1567 
1568   downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1569 
1570   if (data_prd_ptr) {
1571     downstream->reset_upstream_wtimer();
1572   }
1573 
1574   return 0;
1575 }
1576 
error_reply(Downstream * downstream,unsigned int status_code)1577 int Http2Upstream::error_reply(Downstream *downstream,
1578                                unsigned int status_code) {
1579   int rv;
1580   auto &resp = downstream->response();
1581 
1582   auto &balloc = downstream->get_block_allocator();
1583 
1584   auto html = http::create_error_html(balloc, status_code);
1585   resp.http_status = status_code;
1586 
1587   nghttp2_data_provider2 data_prd, *data_prd_ptr = nullptr;
1588 
1589   const auto &req = downstream->request();
1590 
1591   if (req.method != HTTP_HEAD) {
1592     data_prd.source.ptr = downstream;
1593     data_prd.read_callback = downstream_data_read_callback;
1594     data_prd_ptr = &data_prd;
1595 
1596     auto body = downstream->get_response_buf();
1597 
1598     body->append(html);
1599   }
1600 
1601   downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1602 
1603   auto lgconf = log_config();
1604   lgconf->update_tstamp(std::chrono::system_clock::now());
1605 
1606   auto response_status = http2::stringify_status(balloc, status_code);
1607   auto content_length = util::make_string_ref_uint(balloc, html.size());
1608   auto date = make_string_ref(balloc, lgconf->tstamp->time_http);
1609 
1610   auto nva = std::to_array(
1611     {http2::make_field(":status"_sr, response_status),
1612      http2::make_field("content-type"_sr, "text/html; charset=UTF-8"_sr),
1613      http2::make_field("server"_sr, get_config()->http.server_name),
1614      http2::make_field("content-length"_sr, content_length),
1615      http2::make_field("date"_sr, date)});
1616 
1617   rv = nghttp2_submit_response2(session_, downstream->get_stream_id(),
1618                                 nva.data(), nva.size(), data_prd_ptr);
1619   if (rv < NGHTTP2_ERR_FATAL) {
1620     ULOG(FATAL, this) << "nghttp2_submit_response2() failed: "
1621                       << nghttp2_strerror(rv);
1622     return -1;
1623   }
1624 
1625   downstream->reset_upstream_wtimer();
1626 
1627   return 0;
1628 }
1629 
add_pending_downstream(std::unique_ptr<Downstream> downstream)1630 void Http2Upstream::add_pending_downstream(
1631   std::unique_ptr<Downstream> downstream) {
1632   downstream_queue_.add_pending(std::move(downstream));
1633 }
1634 
remove_downstream(Downstream * downstream)1635 void Http2Upstream::remove_downstream(Downstream *downstream) {
1636   if (downstream->accesslog_ready()) {
1637     handler_->write_accesslog(downstream);
1638   }
1639 
1640   nghttp2_session_set_stream_user_data(session_, downstream->get_stream_id(),
1641                                        nullptr);
1642 
1643   auto next_downstream = downstream_queue_.remove_and_get_blocked(downstream);
1644 
1645   if (next_downstream) {
1646     initiate_downstream(next_downstream);
1647   }
1648 
1649   if (downstream_queue_.get_downstreams() == nullptr) {
1650     // There is no downstream at the moment.  Start idle timer now.
1651     auto config = get_config();
1652     auto &upstreamconf = config->conn.upstream;
1653 
1654     handler_->reset_upstream_read_timeout(upstreamconf.timeout.http2_idle);
1655   }
1656 }
1657 
1658 // WARNING: Never call directly or indirectly nghttp2_session_send or
1659 // nghttp2_session_recv. These calls may delete downstream.
on_downstream_header_complete(Downstream * downstream)1660 int Http2Upstream::on_downstream_header_complete(Downstream *downstream) {
1661   int rv;
1662 
1663   const auto &req = downstream->request();
1664   auto &resp = downstream->response();
1665 
1666   auto &balloc = downstream->get_block_allocator();
1667 
1668   if (LOG_ENABLED(INFO)) {
1669     if (downstream->get_non_final_response()) {
1670       DLOG(INFO, downstream) << "HTTP non-final response header";
1671     } else {
1672       DLOG(INFO, downstream) << "HTTP response header completed";
1673     }
1674   }
1675 
1676   auto config = get_config();
1677   auto &httpconf = config->http;
1678 
1679   if (!config->http2_proxy && !httpconf.no_location_rewrite) {
1680     downstream->rewrite_location_response_header(req.scheme);
1681   }
1682 
1683 #ifdef HAVE_MRUBY
1684   if (!downstream->get_non_final_response()) {
1685     auto dconn = downstream->get_downstream_connection();
1686     const auto &group = dconn->get_downstream_addr_group();
1687     if (group) {
1688       const auto &dmruby_ctx = group->shared_addr->mruby_ctx;
1689 
1690       if (dmruby_ctx->run_on_response_proc(downstream) != 0) {
1691         if (error_reply(downstream, 500) != 0) {
1692           return -1;
1693         }
1694         // Returning -1 will signal deletion of dconn.
1695         return -1;
1696       }
1697 
1698       if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1699         return -1;
1700       }
1701     }
1702 
1703     auto worker = handler_->get_worker();
1704     auto mruby_ctx = worker->get_mruby_context();
1705 
1706     if (mruby_ctx->run_on_response_proc(downstream) != 0) {
1707       if (error_reply(downstream, 500) != 0) {
1708         return -1;
1709       }
1710       // Returning -1 will signal deletion of dconn.
1711       return -1;
1712     }
1713 
1714     if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1715       return -1;
1716     }
1717   }
1718 #endif // HAVE_MRUBY
1719 
1720   auto &http2conf = config->http2;
1721 
1722   // We need some conditions that must be fulfilled to initiate server
1723   // push.
1724   //
1725   // * Server push is disabled for http2 proxy or client proxy, since
1726   //   incoming headers are mixed origins.  We don't know how to
1727   //   reliably determine the authority yet.
1728   //
1729   // * We need non-final response or 200 response code for associated
1730   //   resource.  This is too restrictive, we will review this later.
1731   //
1732   // * We requires GET or POST for associated resource.  Probably we
1733   //   don't want to push for HEAD request.  Not sure other methods
1734   //   are also eligible for push.
1735   if (!http2conf.no_server_push &&
1736       nghttp2_session_get_remote_settings(session_,
1737                                           NGHTTP2_SETTINGS_ENABLE_PUSH) == 1 &&
1738       !config->http2_proxy && (downstream->get_stream_id() % 2) &&
1739       resp.fs.header(http2::HD_LINK) &&
1740       (downstream->get_non_final_response() || resp.http_status == 200) &&
1741       (req.method == HTTP_GET || req.method == HTTP_POST)) {
1742     if (prepare_push_promise(downstream) != 0) {
1743       // Continue to send response even if push was failed.
1744     }
1745   }
1746 
1747   auto nva = std::vector<nghttp2_nv>();
1748   // 6 means :status and possible server, via, x-http2-push, alt-svc,
1749   // and set-cookie (for affinity cookie) header field.
1750   nva.reserve(resp.fs.headers().size() + 6 +
1751               httpconf.add_response_headers.size());
1752 
1753   if (downstream->get_non_final_response()) {
1754     auto response_status = http2::stringify_status(balloc, resp.http_status);
1755 
1756     nva.push_back(http2::make_field(":status"_sr, response_status));
1757 
1758     http2::copy_headers_to_nva_nocopy(nva, resp.fs.headers(),
1759                                       http2::HDOP_STRIP_ALL);
1760 
1761     if (LOG_ENABLED(INFO)) {
1762       log_response_headers(downstream, nva);
1763     }
1764 
1765     rv = nghttp2_submit_headers(session_, NGHTTP2_FLAG_NONE,
1766                                 downstream->get_stream_id(), nullptr,
1767                                 nva.data(), nva.size(), nullptr);
1768 
1769     resp.fs.clear_headers();
1770 
1771     if (rv != 0) {
1772       ULOG(FATAL, this) << "nghttp2_submit_headers() failed";
1773       return -1;
1774     }
1775 
1776     return 0;
1777   }
1778 
1779   auto striphd_flags = http2::HDOP_STRIP_ALL & ~http2::HDOP_STRIP_VIA;
1780   StringRef response_status;
1781 
1782   if (req.connect_proto == ConnectProto::WEBSOCKET && resp.http_status == 101) {
1783     response_status = http2::stringify_status(balloc, 200);
1784     striphd_flags |= http2::HDOP_STRIP_SEC_WEBSOCKET_ACCEPT;
1785   } else {
1786     response_status = http2::stringify_status(balloc, resp.http_status);
1787   }
1788 
1789   nva.push_back(http2::make_field(":status"_sr, response_status));
1790 
1791   http2::copy_headers_to_nva_nocopy(nva, resp.fs.headers(), striphd_flags);
1792 
1793   if (!config->http2_proxy && !httpconf.no_server_rewrite) {
1794     nva.push_back(http2::make_field("server"_sr, httpconf.server_name));
1795   } else {
1796     auto server = resp.fs.header(http2::HD_SERVER);
1797     if (server) {
1798       nva.push_back(http2::make_field("server"_sr, (*server).value));
1799     }
1800   }
1801 
1802   if (!req.regular_connect_method() || !downstream->get_upgraded()) {
1803     auto affinity_cookie = downstream->get_affinity_cookie_to_send();
1804     if (affinity_cookie) {
1805       auto dconn = downstream->get_downstream_connection();
1806       assert(dconn);
1807       auto &group = dconn->get_downstream_addr_group();
1808       auto &shared_addr = group->shared_addr;
1809       auto &cookieconf = shared_addr->affinity.cookie;
1810       auto secure =
1811         http::require_cookie_secure_attribute(cookieconf.secure, req.scheme);
1812       auto cookie_str = http::create_affinity_cookie(
1813         balloc, cookieconf.name, affinity_cookie, cookieconf.path, secure);
1814       nva.push_back(http2::make_field("set-cookie"_sr, cookie_str));
1815     }
1816   }
1817 
1818   if (!resp.fs.header(http2::HD_ALT_SVC)) {
1819     // We won't change or alter alt-svc from backend for now
1820     if (!httpconf.http2_altsvc_header_value.empty()) {
1821       nva.push_back(
1822         http2::make_field("alt-svc"_sr, httpconf.http2_altsvc_header_value));
1823     }
1824   }
1825 
1826   auto via = resp.fs.header(http2::HD_VIA);
1827   if (httpconf.no_via) {
1828     if (via) {
1829       nva.push_back(http2::make_field("via"_sr, (*via).value));
1830     }
1831   } else {
1832     // we don't create more than 16 bytes in
1833     // http::create_via_header_value.
1834     size_t len = 16;
1835     if (via) {
1836       len += via->value.size() + 2;
1837     }
1838 
1839     auto iov = make_byte_ref(balloc, len + 1);
1840     auto p = std::begin(iov);
1841     if (via) {
1842       p = std::copy(std::begin(via->value), std::end(via->value), p);
1843       p = util::copy_lit(p, ", ");
1844     }
1845     p = http::create_via_header_value(p, resp.http_major, resp.http_minor);
1846     *p = '\0';
1847 
1848     nva.push_back(
1849       http2::make_field("via"_sr, StringRef{std::span{std::begin(iov), p}}));
1850   }
1851 
1852   for (auto &p : httpconf.add_response_headers) {
1853     nva.push_back(http2::make_field(p.name, p.value));
1854   }
1855 
1856   if (downstream->get_stream_id() % 2 == 0) {
1857     // This header field is basically for human on client side to
1858     // figure out that the resource is pushed.
1859     nva.push_back(http2::make_field("x-http2-push"_sr, "1"_sr));
1860   }
1861 
1862   if (LOG_ENABLED(INFO)) {
1863     log_response_headers(downstream, nva);
1864   }
1865 
1866   if (http2conf.upstream.debug.dump.response_header) {
1867     http2::dump_nv(http2conf.upstream.debug.dump.response_header, nva.data(),
1868                    nva.size());
1869   }
1870 
1871   auto priority = resp.fs.header(http2::HD_PRIORITY);
1872   if (priority) {
1873     nghttp2_extpri extpri;
1874 
1875     if (nghttp2_session_get_extpri_stream_priority(
1876           session_, &extpri, downstream->get_stream_id()) == 0 &&
1877         nghttp2_extpri_parse_priority(&extpri, priority->value.byte(),
1878                                       priority->value.size()) == 0) {
1879       rv = nghttp2_session_change_extpri_stream_priority(
1880         session_, downstream->get_stream_id(), &extpri,
1881         /* ignore_client_signal = */ 1);
1882       if (rv != 0) {
1883         ULOG(ERROR, this) << "nghttp2_session_change_extpri_stream_priority: "
1884                           << nghttp2_strerror(rv);
1885       }
1886     }
1887   }
1888 
1889   nghttp2_data_provider2 data_prd;
1890   data_prd.source.ptr = downstream;
1891   data_prd.read_callback = downstream_data_read_callback;
1892 
1893   nghttp2_data_provider2 *data_prdptr;
1894 
1895   if (downstream->expect_response_body() ||
1896       downstream->expect_response_trailer()) {
1897     data_prdptr = &data_prd;
1898   } else {
1899     data_prdptr = nullptr;
1900   }
1901 
1902   rv = nghttp2_submit_response2(session_, downstream->get_stream_id(),
1903                                 nva.data(), nva.size(), data_prdptr);
1904   if (rv != 0) {
1905     ULOG(FATAL, this) << "nghttp2_submit_response2() failed";
1906     return -1;
1907   }
1908 
1909   if (data_prdptr) {
1910     downstream->reset_upstream_wtimer();
1911   }
1912 
1913   return 0;
1914 }
1915 
1916 // WARNING: Never call directly or indirectly nghttp2_session_send or
1917 // nghttp2_session_recv. These calls may delete downstream.
on_downstream_body(Downstream * downstream,const uint8_t * data,size_t len,bool flush)1918 int Http2Upstream::on_downstream_body(Downstream *downstream,
1919                                       const uint8_t *data, size_t len,
1920                                       bool flush) {
1921   auto body = downstream->get_response_buf();
1922   body->append(data, len);
1923 
1924   if (flush) {
1925     nghttp2_session_resume_data(session_, downstream->get_stream_id());
1926 
1927     downstream->ensure_upstream_wtimer();
1928   }
1929 
1930   return 0;
1931 }
1932 
1933 // WARNING: Never call directly or indirectly nghttp2_session_send or
1934 // nghttp2_session_recv. These calls may delete downstream.
on_downstream_body_complete(Downstream * downstream)1935 int Http2Upstream::on_downstream_body_complete(Downstream *downstream) {
1936   if (LOG_ENABLED(INFO)) {
1937     DLOG(INFO, downstream) << "HTTP response completed";
1938   }
1939 
1940   auto &resp = downstream->response();
1941 
1942   if (!downstream->validate_response_recv_body_length()) {
1943     rst_stream(downstream, NGHTTP2_PROTOCOL_ERROR);
1944     resp.connection_close = true;
1945     return 0;
1946   }
1947 
1948   nghttp2_session_resume_data(session_, downstream->get_stream_id());
1949   downstream->ensure_upstream_wtimer();
1950 
1951   return 0;
1952 }
1953 
get_flow_control() const1954 bool Http2Upstream::get_flow_control() const { return flow_control_; }
1955 
pause_read(IOCtrlReason reason)1956 void Http2Upstream::pause_read(IOCtrlReason reason) {}
1957 
resume_read(IOCtrlReason reason,Downstream * downstream,size_t consumed)1958 int Http2Upstream::resume_read(IOCtrlReason reason, Downstream *downstream,
1959                                size_t consumed) {
1960   if (get_flow_control()) {
1961     if (consume(downstream->get_stream_id(), consumed) != 0) {
1962       return -1;
1963     }
1964 
1965     auto &req = downstream->request();
1966 
1967     req.consume(consumed);
1968   }
1969 
1970   handler_->signal_write();
1971   return 0;
1972 }
1973 
on_downstream_abort_request(Downstream * downstream,unsigned int status_code)1974 int Http2Upstream::on_downstream_abort_request(Downstream *downstream,
1975                                                unsigned int status_code) {
1976   int rv;
1977 
1978   rv = error_reply(downstream, status_code);
1979 
1980   if (rv != 0) {
1981     return -1;
1982   }
1983 
1984   handler_->signal_write();
1985   return 0;
1986 }
1987 
on_downstream_abort_request_with_https_redirect(Downstream * downstream)1988 int Http2Upstream::on_downstream_abort_request_with_https_redirect(
1989   Downstream *downstream) {
1990   int rv;
1991 
1992   rv = redirect_to_https(downstream);
1993   if (rv != 0) {
1994     return -1;
1995   }
1996 
1997   handler_->signal_write();
1998   return 0;
1999 }
2000 
redirect_to_https(Downstream * downstream)2001 int Http2Upstream::redirect_to_https(Downstream *downstream) {
2002   auto &req = downstream->request();
2003   if (req.regular_connect_method() || req.scheme != "http"_sr) {
2004     return error_reply(downstream, 400);
2005   }
2006 
2007   auto authority = util::extract_host(req.authority);
2008   if (authority.empty()) {
2009     return error_reply(downstream, 400);
2010   }
2011 
2012   auto &balloc = downstream->get_block_allocator();
2013   auto config = get_config();
2014   auto &httpconf = config->http;
2015 
2016   StringRef loc;
2017   if (httpconf.redirect_https_port == "443"_sr) {
2018     loc = concat_string_ref(balloc, "https://"_sr, authority, req.path);
2019   } else {
2020     loc = concat_string_ref(balloc, "https://"_sr, authority, ":"_sr,
2021                             httpconf.redirect_https_port, req.path);
2022   }
2023 
2024   auto &resp = downstream->response();
2025   resp.http_status = 308;
2026   resp.fs.add_header_token("location"_sr, loc, false, http2::HD_LOCATION);
2027 
2028   return send_reply(downstream, nullptr, 0);
2029 }
2030 
consume(int32_t stream_id,size_t len)2031 int Http2Upstream::consume(int32_t stream_id, size_t len) {
2032   int rv;
2033 
2034   auto faddr = handler_->get_upstream_addr();
2035 
2036   if (faddr->alt_mode != UpstreamAltMode::NONE) {
2037     return 0;
2038   }
2039 
2040   rv = nghttp2_session_consume(session_, stream_id, len);
2041 
2042   if (rv != 0) {
2043     ULOG(WARN, this) << "nghttp2_session_consume() returned error: "
2044                      << nghttp2_strerror(rv);
2045     return -1;
2046   }
2047 
2048   return 0;
2049 }
2050 
log_response_headers(Downstream * downstream,const std::vector<nghttp2_nv> & nva) const2051 void Http2Upstream::log_response_headers(
2052   Downstream *downstream, const std::vector<nghttp2_nv> &nva) const {
2053   std::stringstream ss;
2054   for (auto &nv : nva) {
2055     ss << TTY_HTTP_HD << StringRef{nv.name, nv.namelen} << TTY_RST << ": "
2056        << StringRef{nv.value, nv.valuelen} << "\n";
2057   }
2058   ULOG(INFO, this) << "HTTP response headers. stream_id="
2059                    << downstream->get_stream_id() << "\n"
2060                    << ss.str();
2061 }
2062 
on_timeout(Downstream * downstream)2063 int Http2Upstream::on_timeout(Downstream *downstream) {
2064   if (LOG_ENABLED(INFO)) {
2065     ULOG(INFO, this) << "Stream timeout stream_id="
2066                      << downstream->get_stream_id();
2067   }
2068 
2069   rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
2070   handler_->signal_write();
2071 
2072   return 0;
2073 }
2074 
on_handler_delete()2075 void Http2Upstream::on_handler_delete() {
2076   for (auto d = downstream_queue_.get_downstreams(); d; d = d->dlnext) {
2077     if (d->get_dispatch_state() == DispatchState::ACTIVE &&
2078         d->accesslog_ready()) {
2079       handler_->write_accesslog(d);
2080     }
2081   }
2082 }
2083 
on_downstream_reset(Downstream * downstream,bool no_retry)2084 int Http2Upstream::on_downstream_reset(Downstream *downstream, bool no_retry) {
2085   int rv;
2086 
2087   if (downstream->get_dispatch_state() != DispatchState::ACTIVE) {
2088     // This is error condition when we failed push_request_headers()
2089     // in initiate_downstream().  Otherwise, we have
2090     // DispatchState::ACTIVE state, or we did not set
2091     // DownstreamConnection.
2092     downstream->pop_downstream_connection();
2093     handler_->signal_write();
2094 
2095     return 0;
2096   }
2097 
2098   if (!downstream->request_submission_ready()) {
2099     if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
2100       // We have got all response body already.  Send it off.
2101       downstream->pop_downstream_connection();
2102       return 0;
2103     }
2104     // pushed stream is handled here
2105     rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
2106     downstream->pop_downstream_connection();
2107 
2108     handler_->signal_write();
2109 
2110     return 0;
2111   }
2112 
2113   downstream->pop_downstream_connection();
2114 
2115   downstream->add_retry();
2116 
2117   std::unique_ptr<DownstreamConnection> dconn;
2118 
2119   rv = 0;
2120 
2121   if (no_retry || downstream->no_more_retry()) {
2122     goto fail;
2123   }
2124 
2125   // downstream connection is clean; we can retry with new
2126   // downstream connection.
2127 
2128   for (;;) {
2129     auto dconn = handler_->get_downstream_connection(rv, downstream);
2130     if (!dconn) {
2131       goto fail;
2132     }
2133 
2134     rv = downstream->attach_downstream_connection(std::move(dconn));
2135     if (rv == 0) {
2136       break;
2137     }
2138   }
2139 
2140   rv = downstream->push_request_headers();
2141   if (rv != 0) {
2142     goto fail;
2143   }
2144 
2145   return 0;
2146 
2147 fail:
2148   if (rv == SHRPX_ERR_TLS_REQUIRED) {
2149     rv = on_downstream_abort_request_with_https_redirect(downstream);
2150   } else {
2151     rv = on_downstream_abort_request(downstream, 502);
2152   }
2153   if (rv != 0) {
2154     rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
2155   }
2156   downstream->pop_downstream_connection();
2157 
2158   handler_->signal_write();
2159 
2160   return 0;
2161 }
2162 
prepare_push_promise(Downstream * downstream)2163 int Http2Upstream::prepare_push_promise(Downstream *downstream) {
2164   int rv;
2165 
2166   const auto &req = downstream->request();
2167   auto &resp = downstream->response();
2168 
2169   auto base = http2::get_pure_path_component(req.path);
2170   if (base.empty()) {
2171     return 0;
2172   }
2173 
2174   auto &balloc = downstream->get_block_allocator();
2175 
2176   for (auto &kv : resp.fs.headers()) {
2177     if (kv.token != http2::HD_LINK) {
2178       continue;
2179     }
2180     for (auto &link : http2::parse_link_header(kv.value)) {
2181       StringRef scheme, authority, path;
2182 
2183       rv = http2::construct_push_component(balloc, scheme, authority, path,
2184                                            base, link.uri);
2185       if (rv != 0) {
2186         continue;
2187       }
2188 
2189       if (scheme.empty()) {
2190         scheme = req.scheme;
2191       }
2192 
2193       if (authority.empty()) {
2194         authority = req.authority;
2195       }
2196 
2197       if (resp.is_resource_pushed(scheme, authority, path)) {
2198         continue;
2199       }
2200 
2201       rv = submit_push_promise(scheme, authority, path, downstream);
2202       if (rv != 0) {
2203         return -1;
2204       }
2205 
2206       resp.resource_pushed(scheme, authority, path);
2207     }
2208   }
2209   return 0;
2210 }
2211 
submit_push_promise(const StringRef & scheme,const StringRef & authority,const StringRef & path,Downstream * downstream)2212 int Http2Upstream::submit_push_promise(const StringRef &scheme,
2213                                        const StringRef &authority,
2214                                        const StringRef &path,
2215                                        Downstream *downstream) {
2216   const auto &req = downstream->request();
2217 
2218   std::vector<nghttp2_nv> nva;
2219   // 4 for :method, :scheme, :path and :authority
2220   nva.reserve(4 + req.fs.headers().size());
2221 
2222   // just use "GET" for now
2223   nva.push_back(http2::make_field(":method"_sr, "GET"_sr));
2224   nva.push_back(http2::make_field(":scheme"_sr, scheme));
2225   nva.push_back(http2::make_field(":path"_sr, path));
2226   nva.push_back(http2::make_field(":authority"_sr, authority));
2227 
2228   for (auto &kv : req.fs.headers()) {
2229     switch (kv.token) {
2230     // TODO generate referer
2231     case http2::HD__AUTHORITY:
2232     case http2::HD__SCHEME:
2233     case http2::HD__METHOD:
2234     case http2::HD__PATH:
2235       continue;
2236     case http2::HD_ACCEPT_ENCODING:
2237     case http2::HD_ACCEPT_LANGUAGE:
2238     case http2::HD_CACHE_CONTROL:
2239     case http2::HD_HOST:
2240     case http2::HD_USER_AGENT:
2241       nva.push_back(
2242         http2::make_field(kv.name, kv.value, http2::no_index(kv.no_index)));
2243       break;
2244     }
2245   }
2246 
2247   auto promised_stream_id = nghttp2_submit_push_promise(
2248     session_, NGHTTP2_FLAG_NONE, downstream->get_stream_id(), nva.data(),
2249     nva.size(), nullptr);
2250 
2251   if (promised_stream_id < 0) {
2252     if (LOG_ENABLED(INFO)) {
2253       ULOG(INFO, this) << "nghttp2_submit_push_promise() failed: "
2254                        << nghttp2_strerror(promised_stream_id);
2255     }
2256     if (nghttp2_is_fatal(promised_stream_id)) {
2257       return -1;
2258     }
2259     return 0;
2260   }
2261 
2262   if (LOG_ENABLED(INFO)) {
2263     std::stringstream ss;
2264     for (auto &nv : nva) {
2265       ss << TTY_HTTP_HD << StringRef{nv.name, nv.namelen} << TTY_RST << ": "
2266          << StringRef{nv.value, nv.valuelen} << "\n";
2267     }
2268     ULOG(INFO, this) << "HTTP push request headers. promised_stream_id="
2269                      << promised_stream_id << "\n"
2270                      << ss.str();
2271   }
2272 
2273   return 0;
2274 }
2275 
push_enabled() const2276 bool Http2Upstream::push_enabled() const {
2277   auto config = get_config();
2278   return !(config->http2.no_server_push ||
2279            nghttp2_session_get_remote_settings(
2280              session_, NGHTTP2_SETTINGS_ENABLE_PUSH) == 0 ||
2281            config->http2_proxy);
2282 }
2283 
initiate_push(Downstream * downstream,const StringRef & uri)2284 int Http2Upstream::initiate_push(Downstream *downstream, const StringRef &uri) {
2285   int rv;
2286 
2287   if (uri.empty() || !push_enabled() ||
2288       (downstream->get_stream_id() % 2) == 0) {
2289     return 0;
2290   }
2291 
2292   const auto &req = downstream->request();
2293 
2294   auto base = http2::get_pure_path_component(req.path);
2295   if (base.empty()) {
2296     return -1;
2297   }
2298 
2299   auto &balloc = downstream->get_block_allocator();
2300 
2301   StringRef scheme, authority, path;
2302 
2303   rv =
2304     http2::construct_push_component(balloc, scheme, authority, path, base, uri);
2305   if (rv != 0) {
2306     return -1;
2307   }
2308 
2309   if (scheme.empty()) {
2310     scheme = req.scheme;
2311   }
2312 
2313   if (authority.empty()) {
2314     authority = req.authority;
2315   }
2316 
2317   auto &resp = downstream->response();
2318 
2319   if (resp.is_resource_pushed(scheme, authority, path)) {
2320     return 0;
2321   }
2322 
2323   rv = submit_push_promise(scheme, authority, path, downstream);
2324 
2325   if (rv != 0) {
2326     return -1;
2327   }
2328 
2329   resp.resource_pushed(scheme, authority, path);
2330 
2331   return 0;
2332 }
2333 
response_riovec(struct iovec * iov,int iovcnt) const2334 int Http2Upstream::response_riovec(struct iovec *iov, int iovcnt) const {
2335   if (iovcnt == 0 || wb_.rleft() == 0) {
2336     return 0;
2337   }
2338 
2339   return wb_.riovec(iov, iovcnt);
2340 }
2341 
response_drain(size_t n)2342 void Http2Upstream::response_drain(size_t n) { wb_.drain(n); }
2343 
response_empty() const2344 bool Http2Upstream::response_empty() const { return wb_.rleft() == 0; }
2345 
get_response_buf()2346 DefaultMemchunks *Http2Upstream::get_response_buf() { return &wb_; }
2347 
2348 Downstream *
on_downstream_push_promise(Downstream * downstream,int32_t promised_stream_id)2349 Http2Upstream::on_downstream_push_promise(Downstream *downstream,
2350                                           int32_t promised_stream_id) {
2351   // promised_stream_id is for backend HTTP/2 session, not for
2352   // frontend.
2353   auto promised_downstream =
2354     std::make_unique<Downstream>(this, handler_->get_mcpool(), 0);
2355   auto &promised_req = promised_downstream->request();
2356 
2357   promised_downstream->set_downstream_stream_id(promised_stream_id);
2358   // Set associated stream in frontend
2359   promised_downstream->set_assoc_stream_id(downstream->get_stream_id());
2360 
2361   promised_downstream->disable_upstream_rtimer();
2362 
2363   promised_req.http_major = 2;
2364   promised_req.http_minor = 0;
2365 
2366   promised_req.fs.content_length = 0;
2367   promised_req.http2_expect_body = false;
2368 
2369   auto ptr = promised_downstream.get();
2370   add_pending_downstream(std::move(promised_downstream));
2371   downstream_queue_.mark_active(ptr);
2372 
2373   return ptr;
2374 }
2375 
on_downstream_push_promise_complete(Downstream * downstream,Downstream * promised_downstream)2376 int Http2Upstream::on_downstream_push_promise_complete(
2377   Downstream *downstream, Downstream *promised_downstream) {
2378   std::vector<nghttp2_nv> nva;
2379 
2380   const auto &promised_req = promised_downstream->request();
2381   const auto &headers = promised_req.fs.headers();
2382 
2383   nva.reserve(headers.size());
2384 
2385   for (auto &kv : headers) {
2386     nva.push_back(
2387       http2::make_field_nv(kv.name, kv.value, http2::no_index(kv.no_index)));
2388   }
2389 
2390   auto promised_stream_id = nghttp2_submit_push_promise(
2391     session_, NGHTTP2_FLAG_NONE, downstream->get_stream_id(), nva.data(),
2392     nva.size(), promised_downstream);
2393   if (promised_stream_id < 0) {
2394     return -1;
2395   }
2396 
2397   promised_downstream->set_stream_id(promised_stream_id);
2398 
2399   return 0;
2400 }
2401 
cancel_premature_downstream(Downstream * promised_downstream)2402 void Http2Upstream::cancel_premature_downstream(
2403   Downstream *promised_downstream) {
2404   if (LOG_ENABLED(INFO)) {
2405     ULOG(INFO, this) << "Remove premature promised stream "
2406                      << promised_downstream;
2407   }
2408   downstream_queue_.remove_and_get_blocked(promised_downstream, false);
2409 }
2410 
get_max_buffer_size() const2411 size_t Http2Upstream::get_max_buffer_size() const { return max_buffer_size_; }
2412 
2413 } // namespace shrpx
2414