• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * nghttp2 - HTTP/2 C Library
3  *
4  * Copyright (c) 2012 Tatsuhiro Tsujikawa
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining
7  * a copy of this software and associated documentation files (the
8  * "Software"), to deal in the Software without restriction, including
9  * without limitation the rights to use, copy, modify, merge, publish,
10  * distribute, sublicense, and/or sell copies of the Software, and to
11  * permit persons to whom the Software is furnished to do so, subject to
12  * the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be
15  * included in all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21  * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22  * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23  * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24  */
25 #include "shrpx_http2_upstream.h"
26 
27 #include <netinet/tcp.h>
28 #include <assert.h>
29 #include <cerrno>
30 #include <sstream>
31 
32 #include "shrpx_client_handler.h"
33 #include "shrpx_https_upstream.h"
34 #include "shrpx_downstream.h"
35 #include "shrpx_downstream_connection.h"
36 #include "shrpx_config.h"
37 #include "shrpx_http.h"
38 #include "shrpx_worker.h"
39 #include "shrpx_http2_session.h"
40 #include "shrpx_log.h"
41 #ifdef HAVE_MRUBY
42 #  include "shrpx_mruby.h"
43 #endif // HAVE_MRUBY
44 #include "http2.h"
45 #include "util.h"
46 #include "base64.h"
47 #include "app_helper.h"
48 #include "template.h"
49 
50 using namespace nghttp2;
51 
52 namespace shrpx {
53 
54 namespace {
55 constexpr size_t MAX_BUFFER_SIZE = 32_k;
56 } // namespace
57 
58 namespace {
on_stream_close_callback(nghttp2_session * session,int32_t stream_id,uint32_t error_code,void * user_data)59 int on_stream_close_callback(nghttp2_session *session, int32_t stream_id,
60                              uint32_t error_code, void *user_data) {
61   auto upstream = static_cast<Http2Upstream *>(user_data);
62   if (LOG_ENABLED(INFO)) {
63     ULOG(INFO, upstream) << "Stream stream_id=" << stream_id
64                          << " is being closed";
65   }
66 
67   auto downstream = static_cast<Downstream *>(
68       nghttp2_session_get_stream_user_data(session, stream_id));
69 
70   if (!downstream) {
71     return 0;
72   }
73 
74   auto &req = downstream->request();
75 
76   upstream->consume(stream_id, req.unconsumed_body_length);
77 
78   req.unconsumed_body_length = 0;
79 
80   if (downstream->get_request_state() == DownstreamState::CONNECT_FAIL) {
81     upstream->remove_downstream(downstream);
82     // downstream was deleted
83 
84     return 0;
85   }
86 
87   if (downstream->can_detach_downstream_connection()) {
88     // Keep-alive
89     downstream->detach_downstream_connection();
90   }
91 
92   downstream->set_request_state(DownstreamState::STREAM_CLOSED);
93 
94   // At this point, downstream read may be paused.
95 
96   // If shrpx_downstream::push_request_headers() failed, the
97   // error is handled here.
98   upstream->remove_downstream(downstream);
99   // downstream was deleted
100 
101   // How to test this case? Request sufficient large download
102   // and make client send RST_STREAM after it gets first DATA
103   // frame chunk.
104 
105   return 0;
106 }
107 } // namespace
108 
upgrade_upstream(HttpsUpstream * http)109 int Http2Upstream::upgrade_upstream(HttpsUpstream *http) {
110   int rv;
111 
112   auto &balloc = http->get_downstream()->get_block_allocator();
113 
114   auto http2_settings = http->get_downstream()->get_http2_settings();
115   http2_settings = util::to_base64(balloc, http2_settings);
116 
117   auto settings_payload = base64::decode(balloc, std::begin(http2_settings),
118                                          std::end(http2_settings));
119 
120   rv = nghttp2_session_upgrade2(
121       session_, settings_payload.byte(), settings_payload.size(),
122       http->get_downstream()->request().method == HTTP_HEAD, nullptr);
123   if (rv != 0) {
124     if (LOG_ENABLED(INFO)) {
125       ULOG(INFO, this) << "nghttp2_session_upgrade() returned error: "
126                        << nghttp2_strerror(rv);
127     }
128     return -1;
129   }
130   pre_upstream_.reset(http);
131   auto downstream = http->pop_downstream();
132   downstream->reset_upstream(this);
133   downstream->set_stream_id(1);
134   downstream->reset_upstream_rtimer();
135   downstream->set_stream_id(1);
136 
137   auto ptr = downstream.get();
138 
139   nghttp2_session_set_stream_user_data(session_, 1, ptr);
140   downstream_queue_.add_pending(std::move(downstream));
141   downstream_queue_.mark_active(ptr);
142 
143   // TODO This might not be necessary
144   handler_->stop_read_timer();
145 
146   if (LOG_ENABLED(INFO)) {
147     ULOG(INFO, this) << "Connection upgraded to HTTP/2";
148   }
149 
150   return 0;
151 }
152 
start_settings_timer()153 void Http2Upstream::start_settings_timer() {
154   ev_timer_start(handler_->get_loop(), &settings_timer_);
155 }
156 
stop_settings_timer()157 void Http2Upstream::stop_settings_timer() {
158   ev_timer_stop(handler_->get_loop(), &settings_timer_);
159 }
160 
161 namespace {
on_header_callback2(nghttp2_session * session,const nghttp2_frame * frame,nghttp2_rcbuf * name,nghttp2_rcbuf * value,uint8_t flags,void * user_data)162 int on_header_callback2(nghttp2_session *session, const nghttp2_frame *frame,
163                         nghttp2_rcbuf *name, nghttp2_rcbuf *value,
164                         uint8_t flags, void *user_data) {
165   auto namebuf = nghttp2_rcbuf_get_buf(name);
166   auto valuebuf = nghttp2_rcbuf_get_buf(value);
167   auto config = get_config();
168 
169   if (config->http2.upstream.debug.frame_debug) {
170     verbose_on_header_callback(session, frame, namebuf.base, namebuf.len,
171                                valuebuf.base, valuebuf.len, flags, user_data);
172   }
173   if (frame->hd.type != NGHTTP2_HEADERS) {
174     return 0;
175   }
176   auto upstream = static_cast<Http2Upstream *>(user_data);
177   auto downstream = static_cast<Downstream *>(
178       nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
179   if (!downstream) {
180     return 0;
181   }
182 
183   auto &req = downstream->request();
184 
185   auto &httpconf = config->http;
186 
187   if (req.fs.buffer_size() + namebuf.len + valuebuf.len >
188           httpconf.request_header_field_buffer ||
189       req.fs.num_fields() >= httpconf.max_request_header_fields) {
190     if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
191       return 0;
192     }
193 
194     if (LOG_ENABLED(INFO)) {
195       ULOG(INFO, upstream) << "Too large or many header field size="
196                            << req.fs.buffer_size() + namebuf.len + valuebuf.len
197                            << ", num=" << req.fs.num_fields() + 1;
198     }
199 
200     // just ignore header fields if this is trailer part.
201     if (frame->headers.cat == NGHTTP2_HCAT_HEADERS) {
202       return 0;
203     }
204 
205     if (upstream->error_reply(downstream, 431) != 0) {
206       return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
207     }
208 
209     return 0;
210   }
211 
212   auto token = http2::lookup_token(namebuf.base, namebuf.len);
213   auto no_index = flags & NGHTTP2_NV_FLAG_NO_INDEX;
214 
215   downstream->add_rcbuf(name);
216   downstream->add_rcbuf(value);
217 
218   if (frame->headers.cat == NGHTTP2_HCAT_HEADERS) {
219     // just store header fields for trailer part
220     req.fs.add_trailer_token(StringRef{namebuf.base, namebuf.len},
221                              StringRef{valuebuf.base, valuebuf.len}, no_index,
222                              token);
223     return 0;
224   }
225 
226   req.fs.add_header_token(StringRef{namebuf.base, namebuf.len},
227                           StringRef{valuebuf.base, valuebuf.len}, no_index,
228                           token);
229   return 0;
230 }
231 } // namespace
232 
233 namespace {
on_invalid_header_callback2(nghttp2_session * session,const nghttp2_frame * frame,nghttp2_rcbuf * name,nghttp2_rcbuf * value,uint8_t flags,void * user_data)234 int on_invalid_header_callback2(nghttp2_session *session,
235                                 const nghttp2_frame *frame, nghttp2_rcbuf *name,
236                                 nghttp2_rcbuf *value, uint8_t flags,
237                                 void *user_data) {
238   auto upstream = static_cast<Http2Upstream *>(user_data);
239   auto downstream = static_cast<Downstream *>(
240       nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
241   if (!downstream) {
242     return 0;
243   }
244 
245   if (LOG_ENABLED(INFO)) {
246     auto namebuf = nghttp2_rcbuf_get_buf(name);
247     auto valuebuf = nghttp2_rcbuf_get_buf(value);
248 
249     ULOG(INFO, upstream) << "Invalid header field for stream_id="
250                          << frame->hd.stream_id << ": name=["
251                          << StringRef{namebuf.base, namebuf.len} << "], value=["
252                          << StringRef{valuebuf.base, valuebuf.len} << "]";
253   }
254 
255   upstream->rst_stream(downstream, NGHTTP2_PROTOCOL_ERROR);
256 
257   return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
258 }
259 } // namespace
260 
261 namespace {
on_begin_headers_callback(nghttp2_session * session,const nghttp2_frame * frame,void * user_data)262 int on_begin_headers_callback(nghttp2_session *session,
263                               const nghttp2_frame *frame, void *user_data) {
264   auto upstream = static_cast<Http2Upstream *>(user_data);
265 
266   if (frame->headers.cat != NGHTTP2_HCAT_REQUEST) {
267     return 0;
268   }
269   if (LOG_ENABLED(INFO)) {
270     ULOG(INFO, upstream) << "Received upstream request HEADERS stream_id="
271                          << frame->hd.stream_id;
272   }
273 
274   upstream->on_start_request(frame);
275 
276   return 0;
277 }
278 } // namespace
279 
on_start_request(const nghttp2_frame * frame)280 void Http2Upstream::on_start_request(const nghttp2_frame *frame) {
281   auto downstream = std::make_unique<Downstream>(this, handler_->get_mcpool(),
282                                                  frame->hd.stream_id);
283   nghttp2_session_set_stream_user_data(session_, frame->hd.stream_id,
284                                        downstream.get());
285 
286   downstream->reset_upstream_rtimer();
287 
288   handler_->repeat_read_timer();
289 
290   auto &req = downstream->request();
291 
292   // Although, we deprecated minor version from HTTP/2, we supply
293   // minor version 0 to use via header field in a conventional way.
294   req.http_major = 2;
295   req.http_minor = 0;
296 
297   add_pending_downstream(std::move(downstream));
298 
299   ++num_requests_;
300 
301   auto config = get_config();
302   auto &httpconf = config->http;
303   if (httpconf.max_requests <= num_requests_) {
304     start_graceful_shutdown();
305   }
306 }
307 
on_request_headers(Downstream * downstream,const nghttp2_frame * frame)308 int Http2Upstream::on_request_headers(Downstream *downstream,
309                                       const nghttp2_frame *frame) {
310   auto lgconf = log_config();
311   lgconf->update_tstamp(std::chrono::system_clock::now());
312   auto &req = downstream->request();
313   req.tstamp = lgconf->tstamp;
314 
315   if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
316     return 0;
317   }
318 
319   auto &nva = req.fs.headers();
320 
321   if (LOG_ENABLED(INFO)) {
322     std::stringstream ss;
323     for (auto &nv : nva) {
324       if (nv.name == "authorization") {
325         ss << TTY_HTTP_HD << nv.name << TTY_RST << ": <redacted>\n";
326         continue;
327       }
328       ss << TTY_HTTP_HD << nv.name << TTY_RST << ": " << nv.value << "\n";
329     }
330     ULOG(INFO, this) << "HTTP request headers. stream_id="
331                      << downstream->get_stream_id() << "\n"
332                      << ss.str();
333   }
334 
335   auto config = get_config();
336   auto &dump = config->http2.upstream.debug.dump;
337 
338   if (dump.request_header) {
339     http2::dump_nv(dump.request_header, nva);
340   }
341 
342   auto content_length = req.fs.header(http2::HD_CONTENT_LENGTH);
343   if (content_length) {
344     // libnghttp2 guarantees this can be parsed
345     req.fs.content_length = util::parse_uint(content_length->value);
346   }
347 
348   // presence of mandatory header fields are guaranteed by libnghttp2.
349   auto authority = req.fs.header(http2::HD__AUTHORITY);
350   auto path = req.fs.header(http2::HD__PATH);
351   auto method = req.fs.header(http2::HD__METHOD);
352   auto scheme = req.fs.header(http2::HD__SCHEME);
353 
354   auto method_token = http2::lookup_method_token(method->value);
355   if (method_token == -1) {
356     if (error_reply(downstream, 501) != 0) {
357       return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
358     }
359     return 0;
360   }
361 
362   auto faddr = handler_->get_upstream_addr();
363 
364   // For HTTP/2 proxy, we require :authority.
365   if (method_token != HTTP_CONNECT && config->http2_proxy &&
366       faddr->alt_mode == UpstreamAltMode::NONE && !authority) {
367     rst_stream(downstream, NGHTTP2_PROTOCOL_ERROR);
368     return 0;
369   }
370 
371   req.method = method_token;
372   if (scheme) {
373     req.scheme = scheme->value;
374   }
375 
376   // nghttp2 library guarantees either :authority or host exist
377   if (!authority) {
378     req.no_authority = true;
379     authority = req.fs.header(http2::HD_HOST);
380   }
381 
382   if (authority) {
383     req.authority = authority->value;
384   }
385 
386   if (path) {
387     if (method_token == HTTP_OPTIONS &&
388         path->value == StringRef::from_lit("*")) {
389       // Server-wide OPTIONS request.  Path is empty.
390     } else if (config->http2_proxy &&
391                faddr->alt_mode == UpstreamAltMode::NONE) {
392       req.path = path->value;
393     } else {
394       req.path = http2::rewrite_clean_path(downstream->get_block_allocator(),
395                                            path->value);
396     }
397   }
398 
399   auto connect_proto = req.fs.header(http2::HD__PROTOCOL);
400   if (connect_proto) {
401     if (connect_proto->value != "websocket") {
402       if (error_reply(downstream, 400) != 0) {
403         return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
404       }
405       return 0;
406     }
407     req.connect_proto = ConnectProto::WEBSOCKET;
408   }
409 
410   if (!(frame->hd.flags & NGHTTP2_FLAG_END_STREAM)) {
411     req.http2_expect_body = true;
412   } else if (req.fs.content_length == -1) {
413     // If END_STREAM flag is set to HEADERS frame, we are sure that
414     // content-length is 0.
415     req.fs.content_length = 0;
416   }
417 
418   downstream->inspect_http2_request();
419 
420   downstream->set_request_state(DownstreamState::HEADER_COMPLETE);
421 
422   if (config->http.require_http_scheme &&
423       !http::check_http_scheme(req.scheme, handler_->get_ssl() != nullptr)) {
424     if (error_reply(downstream, 400) != 0) {
425       return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
426     }
427     return 0;
428   }
429 
430 #ifdef HAVE_MRUBY
431   auto worker = handler_->get_worker();
432   auto mruby_ctx = worker->get_mruby_context();
433 
434   if (mruby_ctx->run_on_request_proc(downstream) != 0) {
435     if (error_reply(downstream, 500) != 0) {
436       return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
437     }
438     return 0;
439   }
440 #endif // HAVE_MRUBY
441 
442   if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
443     downstream->disable_upstream_rtimer();
444 
445     downstream->set_request_state(DownstreamState::MSG_COMPLETE);
446   }
447 
448   if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
449     return 0;
450   }
451 
452   start_downstream(downstream);
453 
454   return 0;
455 }
456 
start_downstream(Downstream * downstream)457 void Http2Upstream::start_downstream(Downstream *downstream) {
458   if (downstream_queue_.can_activate(downstream->request().authority)) {
459     initiate_downstream(downstream);
460     return;
461   }
462 
463   downstream_queue_.mark_blocked(downstream);
464 }
465 
initiate_downstream(Downstream * downstream)466 void Http2Upstream::initiate_downstream(Downstream *downstream) {
467   int rv;
468 
469 #ifdef HAVE_MRUBY
470   DownstreamConnection *dconn_ptr;
471 #endif // HAVE_MRUBY
472 
473   for (;;) {
474     auto dconn = handler_->get_downstream_connection(rv, downstream);
475     if (!dconn) {
476       if (rv == SHRPX_ERR_TLS_REQUIRED) {
477         rv = redirect_to_https(downstream);
478       } else {
479         rv = error_reply(downstream, 502);
480       }
481       if (rv != 0) {
482         rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
483       }
484 
485       downstream->set_request_state(DownstreamState::CONNECT_FAIL);
486       downstream_queue_.mark_failure(downstream);
487 
488       return;
489     }
490 
491 #ifdef HAVE_MRUBY
492     dconn_ptr = dconn.get();
493 #endif // HAVE_MRUBY
494     rv = downstream->attach_downstream_connection(std::move(dconn));
495     if (rv == 0) {
496       break;
497     }
498   }
499 
500 #ifdef HAVE_MRUBY
501   const auto &group = dconn_ptr->get_downstream_addr_group();
502   if (group) {
503     const auto &mruby_ctx = group->shared_addr->mruby_ctx;
504     if (mruby_ctx->run_on_request_proc(downstream) != 0) {
505       if (error_reply(downstream, 500) != 0) {
506         rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
507       }
508 
509       downstream_queue_.mark_failure(downstream);
510 
511       return;
512     }
513 
514     if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
515       return;
516     }
517   }
518 #endif // HAVE_MRUBY
519 
520   rv = downstream->push_request_headers();
521   if (rv != 0) {
522 
523     if (error_reply(downstream, 502) != 0) {
524       rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
525     }
526 
527     downstream_queue_.mark_failure(downstream);
528 
529     return;
530   }
531 
532   downstream_queue_.mark_active(downstream);
533 
534   auto &req = downstream->request();
535   if (!req.http2_expect_body) {
536     rv = downstream->end_upload_data();
537     if (rv != 0) {
538       rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
539     }
540   }
541 
542   return;
543 }
544 
545 namespace {
on_frame_recv_callback(nghttp2_session * session,const nghttp2_frame * frame,void * user_data)546 int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame,
547                            void *user_data) {
548   if (get_config()->http2.upstream.debug.frame_debug) {
549     verbose_on_frame_recv_callback(session, frame, user_data);
550   }
551   auto upstream = static_cast<Http2Upstream *>(user_data);
552   auto handler = upstream->get_client_handler();
553 
554   switch (frame->hd.type) {
555   case NGHTTP2_DATA: {
556     auto downstream = static_cast<Downstream *>(
557         nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
558     if (!downstream) {
559       return 0;
560     }
561 
562     if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
563       downstream->disable_upstream_rtimer();
564 
565       if (downstream->end_upload_data() != 0) {
566         if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE) {
567           upstream->rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
568         }
569       }
570 
571       downstream->set_request_state(DownstreamState::MSG_COMPLETE);
572     }
573 
574     return 0;
575   }
576   case NGHTTP2_HEADERS: {
577     auto downstream = static_cast<Downstream *>(
578         nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
579     if (!downstream) {
580       return 0;
581     }
582 
583     if (frame->headers.cat == NGHTTP2_HCAT_REQUEST) {
584       downstream->reset_upstream_rtimer();
585 
586       handler->stop_read_timer();
587 
588       return upstream->on_request_headers(downstream, frame);
589     }
590 
591     if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
592       downstream->disable_upstream_rtimer();
593 
594       if (downstream->end_upload_data() != 0) {
595         if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE) {
596           upstream->rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
597         }
598       }
599 
600       downstream->set_request_state(DownstreamState::MSG_COMPLETE);
601     }
602 
603     return 0;
604   }
605   case NGHTTP2_SETTINGS:
606     if ((frame->hd.flags & NGHTTP2_FLAG_ACK) == 0) {
607       return 0;
608     }
609     upstream->stop_settings_timer();
610     return 0;
611   case NGHTTP2_GOAWAY:
612     if (LOG_ENABLED(INFO)) {
613       auto debug_data = util::ascii_dump(frame->goaway.opaque_data,
614                                          frame->goaway.opaque_data_len);
615 
616       ULOG(INFO, upstream) << "GOAWAY received: last-stream-id="
617                            << frame->goaway.last_stream_id
618                            << ", error_code=" << frame->goaway.error_code
619                            << ", debug_data=" << debug_data;
620     }
621     return 0;
622   default:
623     return 0;
624   }
625 }
626 } // namespace
627 
628 namespace {
on_data_chunk_recv_callback(nghttp2_session * session,uint8_t flags,int32_t stream_id,const uint8_t * data,size_t len,void * user_data)629 int on_data_chunk_recv_callback(nghttp2_session *session, uint8_t flags,
630                                 int32_t stream_id, const uint8_t *data,
631                                 size_t len, void *user_data) {
632   auto upstream = static_cast<Http2Upstream *>(user_data);
633   auto downstream = static_cast<Downstream *>(
634       nghttp2_session_get_stream_user_data(session, stream_id));
635 
636   if (!downstream) {
637     if (upstream->consume(stream_id, len) != 0) {
638       return NGHTTP2_ERR_CALLBACK_FAILURE;
639     }
640 
641     return 0;
642   }
643 
644   downstream->reset_upstream_rtimer();
645 
646   if (downstream->push_upload_data_chunk(data, len) != 0) {
647     if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE) {
648       upstream->rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
649     }
650 
651     if (upstream->consume(stream_id, len) != 0) {
652       return NGHTTP2_ERR_CALLBACK_FAILURE;
653     }
654 
655     return 0;
656   }
657 
658   return 0;
659 }
660 } // namespace
661 
662 namespace {
on_frame_send_callback(nghttp2_session * session,const nghttp2_frame * frame,void * user_data)663 int on_frame_send_callback(nghttp2_session *session, const nghttp2_frame *frame,
664                            void *user_data) {
665   if (get_config()->http2.upstream.debug.frame_debug) {
666     verbose_on_frame_send_callback(session, frame, user_data);
667   }
668   auto upstream = static_cast<Http2Upstream *>(user_data);
669   auto handler = upstream->get_client_handler();
670 
671   switch (frame->hd.type) {
672   case NGHTTP2_DATA:
673   case NGHTTP2_HEADERS: {
674     if ((frame->hd.flags & NGHTTP2_FLAG_END_STREAM) == 0) {
675       return 0;
676     }
677     // RST_STREAM if request is still incomplete.
678     auto stream_id = frame->hd.stream_id;
679     auto downstream = static_cast<Downstream *>(
680         nghttp2_session_get_stream_user_data(session, stream_id));
681 
682     if (!downstream) {
683       return 0;
684     }
685 
686     // For tunneling, issue RST_STREAM to finish the stream.
687     if (downstream->get_upgraded() ||
688         nghttp2_session_get_stream_remote_close(session, stream_id) == 0) {
689       if (LOG_ENABLED(INFO)) {
690         ULOG(INFO, upstream)
691             << "Send RST_STREAM to "
692             << (downstream->get_upgraded() ? "tunneled " : "")
693             << "stream stream_id=" << downstream->get_stream_id()
694             << " to finish off incomplete request";
695       }
696 
697       upstream->rst_stream(downstream, NGHTTP2_NO_ERROR);
698     }
699 
700     return 0;
701   }
702   case NGHTTP2_SETTINGS:
703     if ((frame->hd.flags & NGHTTP2_FLAG_ACK) == 0) {
704       upstream->start_settings_timer();
705     }
706     return 0;
707   case NGHTTP2_PUSH_PROMISE: {
708     auto promised_stream_id = frame->push_promise.promised_stream_id;
709 
710     if (nghttp2_session_get_stream_user_data(session, promised_stream_id)) {
711       // In case of push from backend, downstream object was already
712       // created.
713       return 0;
714     }
715 
716     auto promised_downstream = std::make_unique<Downstream>(
717         upstream, handler->get_mcpool(), promised_stream_id);
718     auto &req = promised_downstream->request();
719 
720     // As long as we use nghttp2_session_mem_send(), setting stream
721     // user data here should not fail.  This is because this callback
722     // is called just after frame was serialized.  So no worries about
723     // hanging Downstream.
724     nghttp2_session_set_stream_user_data(session, promised_stream_id,
725                                          promised_downstream.get());
726 
727     promised_downstream->set_assoc_stream_id(frame->hd.stream_id);
728     promised_downstream->disable_upstream_rtimer();
729 
730     req.http_major = 2;
731     req.http_minor = 0;
732 
733     req.fs.content_length = 0;
734     req.http2_expect_body = false;
735 
736     auto &promised_balloc = promised_downstream->get_block_allocator();
737 
738     for (size_t i = 0; i < frame->push_promise.nvlen; ++i) {
739       auto &nv = frame->push_promise.nva[i];
740 
741       auto name =
742           make_string_ref(promised_balloc, StringRef{nv.name, nv.namelen});
743       auto value =
744           make_string_ref(promised_balloc, StringRef{nv.value, nv.valuelen});
745 
746       auto token = http2::lookup_token(nv.name, nv.namelen);
747       switch (token) {
748       case http2::HD__METHOD:
749         req.method = http2::lookup_method_token(value);
750         break;
751       case http2::HD__SCHEME:
752         req.scheme = value;
753         break;
754       case http2::HD__AUTHORITY:
755         req.authority = value;
756         break;
757       case http2::HD__PATH:
758         req.path = http2::rewrite_clean_path(promised_balloc, value);
759         break;
760       }
761       req.fs.add_header_token(name, value, nv.flags & NGHTTP2_NV_FLAG_NO_INDEX,
762                               token);
763     }
764 
765     promised_downstream->inspect_http2_request();
766 
767     promised_downstream->set_request_state(DownstreamState::MSG_COMPLETE);
768 
769     // a bit weird but start_downstream() expects that given
770     // downstream is in pending queue.
771     auto ptr = promised_downstream.get();
772     upstream->add_pending_downstream(std::move(promised_downstream));
773 
774 #ifdef HAVE_MRUBY
775     auto worker = handler->get_worker();
776     auto mruby_ctx = worker->get_mruby_context();
777 
778     if (mruby_ctx->run_on_request_proc(ptr) != 0) {
779       if (upstream->error_reply(ptr, 500) != 0) {
780         upstream->rst_stream(ptr, NGHTTP2_INTERNAL_ERROR);
781         return 0;
782       }
783       return 0;
784     }
785 #endif // HAVE_MRUBY
786 
787     upstream->start_downstream(ptr);
788 
789     return 0;
790   }
791   case NGHTTP2_GOAWAY:
792     if (LOG_ENABLED(INFO)) {
793       auto debug_data = util::ascii_dump(frame->goaway.opaque_data,
794                                          frame->goaway.opaque_data_len);
795 
796       ULOG(INFO, upstream) << "Sending GOAWAY: last-stream-id="
797                            << frame->goaway.last_stream_id
798                            << ", error_code=" << frame->goaway.error_code
799                            << ", debug_data=" << debug_data;
800     }
801     return 0;
802   default:
803     return 0;
804   }
805 }
806 } // namespace
807 
808 namespace {
on_frame_not_send_callback(nghttp2_session * session,const nghttp2_frame * frame,int lib_error_code,void * user_data)809 int on_frame_not_send_callback(nghttp2_session *session,
810                                const nghttp2_frame *frame, int lib_error_code,
811                                void *user_data) {
812   auto upstream = static_cast<Http2Upstream *>(user_data);
813   if (LOG_ENABLED(INFO)) {
814     ULOG(INFO, upstream) << "Failed to send control frame type="
815                          << static_cast<uint32_t>(frame->hd.type)
816                          << ", lib_error_code=" << lib_error_code << ":"
817                          << nghttp2_strerror(lib_error_code);
818   }
819   if (frame->hd.type == NGHTTP2_HEADERS &&
820       lib_error_code != NGHTTP2_ERR_STREAM_CLOSED &&
821       lib_error_code != NGHTTP2_ERR_STREAM_CLOSING) {
822     // To avoid stream hanging around, issue RST_STREAM.
823     auto downstream = static_cast<Downstream *>(
824         nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
825     if (downstream) {
826       upstream->rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
827     }
828   }
829   return 0;
830 }
831 } // namespace
832 
833 namespace {
834 constexpr auto PADDING = std::array<uint8_t, 256>{};
835 } // namespace
836 
837 namespace {
send_data_callback(nghttp2_session * session,nghttp2_frame * frame,const uint8_t * framehd,size_t length,nghttp2_data_source * source,void * user_data)838 int send_data_callback(nghttp2_session *session, nghttp2_frame *frame,
839                        const uint8_t *framehd, size_t length,
840                        nghttp2_data_source *source, void *user_data) {
841   auto downstream = static_cast<Downstream *>(source->ptr);
842   auto upstream = static_cast<Http2Upstream *>(downstream->get_upstream());
843   auto body = downstream->get_response_buf();
844 
845   auto wb = upstream->get_response_buf();
846 
847   size_t padlen = 0;
848 
849   wb->append(framehd, 9);
850   if (frame->data.padlen > 0) {
851     padlen = frame->data.padlen - 1;
852     wb->append(static_cast<uint8_t>(padlen));
853   }
854 
855   body->remove(*wb, length);
856 
857   wb->append(PADDING.data(), padlen);
858 
859   if (body->rleft() == 0) {
860     downstream->disable_upstream_wtimer();
861   } else {
862     downstream->reset_upstream_wtimer();
863   }
864 
865   if (length > 0 && downstream->resume_read(SHRPX_NO_BUFFER, length) != 0) {
866     return NGHTTP2_ERR_CALLBACK_FAILURE;
867   }
868 
869   // We have to add length here, so that we can log this amount of
870   // data transferred.
871   downstream->response_sent_body_length += length;
872 
873   auto max_buffer_size = upstream->get_max_buffer_size();
874 
875   return wb->rleft() >= max_buffer_size ? NGHTTP2_ERR_PAUSE : 0;
876 }
877 } // namespace
878 
879 namespace {
infer_upstream_rst_stream_error_code(uint32_t downstream_error_code)880 uint32_t infer_upstream_rst_stream_error_code(uint32_t downstream_error_code) {
881   // NGHTTP2_REFUSED_STREAM is important because it tells upstream
882   // client to retry.
883   switch (downstream_error_code) {
884   case NGHTTP2_NO_ERROR:
885   case NGHTTP2_REFUSED_STREAM:
886     return downstream_error_code;
887   default:
888     return NGHTTP2_INTERNAL_ERROR;
889   }
890 }
891 } // namespace
892 
893 namespace {
settings_timeout_cb(struct ev_loop * loop,ev_timer * w,int revents)894 void settings_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
895   auto upstream = static_cast<Http2Upstream *>(w->data);
896   auto handler = upstream->get_client_handler();
897   ULOG(INFO, upstream) << "SETTINGS timeout";
898   if (upstream->terminate_session(NGHTTP2_SETTINGS_TIMEOUT) != 0) {
899     delete handler;
900     return;
901   }
902   handler->signal_write();
903 }
904 } // namespace
905 
906 namespace {
shutdown_timeout_cb(struct ev_loop * loop,ev_timer * w,int revents)907 void shutdown_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
908   auto upstream = static_cast<Http2Upstream *>(w->data);
909   auto handler = upstream->get_client_handler();
910   upstream->submit_goaway();
911   handler->signal_write();
912 }
913 } // namespace
914 
915 namespace {
prepare_cb(struct ev_loop * loop,ev_prepare * w,int revents)916 void prepare_cb(struct ev_loop *loop, ev_prepare *w, int revents) {
917   auto upstream = static_cast<Http2Upstream *>(w->data);
918   upstream->check_shutdown();
919 }
920 } // namespace
921 
submit_goaway()922 void Http2Upstream::submit_goaway() {
923   auto last_stream_id = nghttp2_session_get_last_proc_stream_id(session_);
924   nghttp2_submit_goaway(session_, NGHTTP2_FLAG_NONE, last_stream_id,
925                         NGHTTP2_NO_ERROR, nullptr, 0);
926 }
927 
check_shutdown()928 void Http2Upstream::check_shutdown() {
929   auto worker = handler_->get_worker();
930 
931   if (!worker->get_graceful_shutdown()) {
932     return;
933   }
934 
935   ev_prepare_stop(handler_->get_loop(), &prep_);
936 
937   start_graceful_shutdown();
938 }
939 
start_graceful_shutdown()940 void Http2Upstream::start_graceful_shutdown() {
941   int rv;
942   if (ev_is_active(&shutdown_timer_)) {
943     return;
944   }
945 
946   rv = nghttp2_submit_shutdown_notice(session_);
947   if (rv != 0) {
948     ULOG(FATAL, this) << "nghttp2_submit_shutdown_notice() failed: "
949                       << nghttp2_strerror(rv);
950     return;
951   }
952 
953   handler_->signal_write();
954 
955   ev_timer_start(handler_->get_loop(), &shutdown_timer_);
956 }
957 
create_http2_upstream_callbacks()958 nghttp2_session_callbacks *create_http2_upstream_callbacks() {
959   int rv;
960   nghttp2_session_callbacks *callbacks;
961 
962   rv = nghttp2_session_callbacks_new(&callbacks);
963 
964   if (rv != 0) {
965     return nullptr;
966   }
967 
968   nghttp2_session_callbacks_set_on_stream_close_callback(
969       callbacks, on_stream_close_callback);
970 
971   nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks,
972                                                        on_frame_recv_callback);
973 
974   nghttp2_session_callbacks_set_on_data_chunk_recv_callback(
975       callbacks, on_data_chunk_recv_callback);
976 
977   nghttp2_session_callbacks_set_on_frame_send_callback(callbacks,
978                                                        on_frame_send_callback);
979 
980   nghttp2_session_callbacks_set_on_frame_not_send_callback(
981       callbacks, on_frame_not_send_callback);
982 
983   nghttp2_session_callbacks_set_on_header_callback2(callbacks,
984                                                     on_header_callback2);
985 
986   nghttp2_session_callbacks_set_on_invalid_header_callback2(
987       callbacks, on_invalid_header_callback2);
988 
989   nghttp2_session_callbacks_set_on_begin_headers_callback(
990       callbacks, on_begin_headers_callback);
991 
992   nghttp2_session_callbacks_set_send_data_callback(callbacks,
993                                                    send_data_callback);
994 
995   auto config = get_config();
996 
997   if (config->padding) {
998     nghttp2_session_callbacks_set_select_padding_callback(
999         callbacks, http::select_padding_callback);
1000   }
1001 
1002   if (config->http2.upstream.debug.frame_debug) {
1003     nghttp2_session_callbacks_set_error_callback2(callbacks,
1004                                                   verbose_error_callback);
1005   }
1006 
1007   return callbacks;
1008 }
1009 
1010 namespace {
downstream_queue_size(Worker * worker)1011 size_t downstream_queue_size(Worker *worker) {
1012   auto &downstreamconf = *worker->get_downstream_config();
1013 
1014   if (get_config()->http2_proxy) {
1015     return downstreamconf.connections_per_host;
1016   }
1017 
1018   return downstreamconf.connections_per_frontend;
1019 }
1020 } // namespace
1021 
Http2Upstream(ClientHandler * handler)1022 Http2Upstream::Http2Upstream(ClientHandler *handler)
1023     : wb_(handler->get_worker()->get_mcpool()),
1024       downstream_queue_(downstream_queue_size(handler->get_worker()),
1025                         !get_config()->http2_proxy),
1026       handler_(handler),
1027       session_(nullptr),
1028       max_buffer_size_(MAX_BUFFER_SIZE),
1029       num_requests_(0) {
1030   int rv;
1031 
1032   auto config = get_config();
1033   auto &http2conf = config->http2;
1034 
1035   auto faddr = handler_->get_upstream_addr();
1036 
1037   rv =
1038       nghttp2_session_server_new2(&session_, http2conf.upstream.callbacks, this,
1039                                   faddr->alt_mode != UpstreamAltMode::NONE
1040                                       ? http2conf.upstream.alt_mode_option
1041                                       : http2conf.upstream.option);
1042 
1043   assert(rv == 0);
1044 
1045   flow_control_ = true;
1046 
1047   // TODO Maybe call from outside?
1048   std::array<nghttp2_settings_entry, 5> entry;
1049   size_t nentry = 3;
1050 
1051   entry[0].settings_id = NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS;
1052   entry[0].value = http2conf.upstream.max_concurrent_streams;
1053 
1054   entry[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
1055   if (faddr->alt_mode != UpstreamAltMode::NONE) {
1056     entry[1].value = (1u << 31) - 1;
1057   } else {
1058     entry[1].value = http2conf.upstream.window_size;
1059   }
1060 
1061   entry[2].settings_id = NGHTTP2_SETTINGS_NO_RFC7540_PRIORITIES;
1062   entry[2].value = 1;
1063 
1064   if (!config->http2_proxy) {
1065     entry[nentry].settings_id = NGHTTP2_SETTINGS_ENABLE_CONNECT_PROTOCOL;
1066     entry[nentry].value = 1;
1067     ++nentry;
1068   }
1069 
1070   if (http2conf.upstream.decoder_dynamic_table_size !=
1071       NGHTTP2_DEFAULT_HEADER_TABLE_SIZE) {
1072     entry[nentry].settings_id = NGHTTP2_SETTINGS_HEADER_TABLE_SIZE;
1073     entry[nentry].value = http2conf.upstream.decoder_dynamic_table_size;
1074     ++nentry;
1075   }
1076 
1077   rv = nghttp2_submit_settings(session_, NGHTTP2_FLAG_NONE, entry.data(),
1078                                nentry);
1079   if (rv != 0) {
1080     ULOG(ERROR, this) << "nghttp2_submit_settings() returned error: "
1081                       << nghttp2_strerror(rv);
1082   }
1083 
1084   auto window_size = faddr->alt_mode != UpstreamAltMode::NONE
1085                          ? std::numeric_limits<int32_t>::max()
1086                      : http2conf.upstream.optimize_window_size
1087                          ? std::min(http2conf.upstream.connection_window_size,
1088                                     NGHTTP2_INITIAL_CONNECTION_WINDOW_SIZE)
1089                          : http2conf.upstream.connection_window_size;
1090 
1091   rv = nghttp2_session_set_local_window_size(session_, NGHTTP2_FLAG_NONE, 0,
1092                                              window_size);
1093 
1094   if (rv != 0) {
1095     ULOG(ERROR, this)
1096         << "nghttp2_session_set_local_window_size() returned error: "
1097         << nghttp2_strerror(rv);
1098   }
1099 
1100   // We wait for SETTINGS ACK at least 10 seconds.
1101   ev_timer_init(&settings_timer_, settings_timeout_cb,
1102                 http2conf.upstream.timeout.settings, 0.);
1103 
1104   settings_timer_.data = this;
1105 
1106   // timer for 2nd GOAWAY.  HTTP/2 spec recommend 1 RTT.  We wait for
1107   // 2 seconds.
1108   ev_timer_init(&shutdown_timer_, shutdown_timeout_cb, 2., 0);
1109   shutdown_timer_.data = this;
1110 
1111   ev_prepare_init(&prep_, prepare_cb);
1112   prep_.data = this;
1113   ev_prepare_start(handler_->get_loop(), &prep_);
1114 
1115 #if defined(TCP_INFO) && defined(TCP_NOTSENT_LOWAT)
1116   if (http2conf.upstream.optimize_write_buffer_size) {
1117     auto conn = handler_->get_connection();
1118     conn->tls_dyn_rec_warmup_threshold = 0;
1119 
1120     uint32_t pollout_thres = 1;
1121     rv = setsockopt(conn->fd, IPPROTO_TCP, TCP_NOTSENT_LOWAT, &pollout_thres,
1122                     static_cast<socklen_t>(sizeof(pollout_thres)));
1123 
1124     if (rv != 0) {
1125       if (LOG_ENABLED(INFO)) {
1126         auto error = errno;
1127         LOG(INFO) << "setsockopt(TCP_NOTSENT_LOWAT, " << pollout_thres
1128                   << ") failed: errno=" << error;
1129       }
1130     }
1131   }
1132 #endif // defined(TCP_INFO) && defined(TCP_NOTSENT_LOWAT)
1133 
1134   handler_->reset_upstream_read_timeout(
1135       config->conn.upstream.timeout.http2_read);
1136 
1137   handler_->signal_write();
1138 }
1139 
~Http2Upstream()1140 Http2Upstream::~Http2Upstream() {
1141   nghttp2_session_del(session_);
1142   ev_prepare_stop(handler_->get_loop(), &prep_);
1143   ev_timer_stop(handler_->get_loop(), &shutdown_timer_);
1144   ev_timer_stop(handler_->get_loop(), &settings_timer_);
1145 }
1146 
on_read()1147 int Http2Upstream::on_read() {
1148   ssize_t rv = 0;
1149   auto rb = handler_->get_rb();
1150   auto rlimit = handler_->get_rlimit();
1151 
1152   if (rb->rleft()) {
1153     rv = nghttp2_session_mem_recv(session_, rb->pos(), rb->rleft());
1154     if (rv < 0) {
1155       if (rv != NGHTTP2_ERR_BAD_CLIENT_MAGIC) {
1156         ULOG(ERROR, this) << "nghttp2_session_mem_recv() returned error: "
1157                           << nghttp2_strerror(rv);
1158       }
1159       return -1;
1160     }
1161 
1162     // nghttp2_session_mem_recv should consume all input bytes on
1163     // success.
1164     assert(static_cast<size_t>(rv) == rb->rleft());
1165     rb->reset();
1166     rlimit->startw();
1167   }
1168 
1169   if (nghttp2_session_want_read(session_) == 0 &&
1170       nghttp2_session_want_write(session_) == 0 && wb_.rleft() == 0) {
1171     if (LOG_ENABLED(INFO)) {
1172       ULOG(INFO, this) << "No more read/write for this HTTP2 session";
1173     }
1174     return -1;
1175   }
1176 
1177   handler_->signal_write();
1178   return 0;
1179 }
1180 
1181 // After this function call, downstream may be deleted.
on_write()1182 int Http2Upstream::on_write() {
1183   int rv;
1184   auto config = get_config();
1185   auto &http2conf = config->http2;
1186 
1187   if ((http2conf.upstream.optimize_write_buffer_size ||
1188        http2conf.upstream.optimize_window_size) &&
1189       handler_->get_ssl()) {
1190     auto conn = handler_->get_connection();
1191     TCPHint hint;
1192     rv = conn->get_tcp_hint(&hint);
1193     if (rv == 0) {
1194       if (http2conf.upstream.optimize_write_buffer_size) {
1195         max_buffer_size_ = std::min(MAX_BUFFER_SIZE, hint.write_buffer_size);
1196       }
1197 
1198       if (http2conf.upstream.optimize_window_size) {
1199         auto faddr = handler_->get_upstream_addr();
1200         if (faddr->alt_mode == UpstreamAltMode::NONE) {
1201           auto window_size = std::min(http2conf.upstream.connection_window_size,
1202                                       static_cast<int32_t>(hint.rwin * 2));
1203 
1204           rv = nghttp2_session_set_local_window_size(
1205               session_, NGHTTP2_FLAG_NONE, 0, window_size);
1206           if (rv != 0) {
1207             if (LOG_ENABLED(INFO)) {
1208               ULOG(INFO, this)
1209                   << "nghttp2_session_set_local_window_size() with window_size="
1210                   << window_size << " failed: " << nghttp2_strerror(rv);
1211             }
1212           }
1213         }
1214       }
1215     }
1216   }
1217 
1218   for (;;) {
1219     if (wb_.rleft() >= max_buffer_size_) {
1220       return 0;
1221     }
1222 
1223     const uint8_t *data;
1224     auto datalen = nghttp2_session_mem_send(session_, &data);
1225 
1226     if (datalen < 0) {
1227       ULOG(ERROR, this) << "nghttp2_session_mem_send() returned error: "
1228                         << nghttp2_strerror(datalen);
1229       return -1;
1230     }
1231     if (datalen == 0) {
1232       break;
1233     }
1234     wb_.append(data, datalen);
1235   }
1236 
1237   if (nghttp2_session_want_read(session_) == 0 &&
1238       nghttp2_session_want_write(session_) == 0 && wb_.rleft() == 0) {
1239     if (LOG_ENABLED(INFO)) {
1240       ULOG(INFO, this) << "No more read/write for this HTTP2 session";
1241     }
1242     return -1;
1243   }
1244 
1245   return 0;
1246 }
1247 
get_client_handler() const1248 ClientHandler *Http2Upstream::get_client_handler() const { return handler_; }
1249 
downstream_read(DownstreamConnection * dconn)1250 int Http2Upstream::downstream_read(DownstreamConnection *dconn) {
1251   auto downstream = dconn->get_downstream();
1252 
1253   if (downstream->get_response_state() == DownstreamState::MSG_RESET) {
1254     // The downstream stream was reset (canceled). In this case,
1255     // RST_STREAM to the upstream and delete downstream connection
1256     // here. Deleting downstream will be taken place at
1257     // on_stream_close_callback.
1258     rst_stream(downstream,
1259                infer_upstream_rst_stream_error_code(
1260                    downstream->get_response_rst_stream_error_code()));
1261     downstream->pop_downstream_connection();
1262     // dconn was deleted
1263     dconn = nullptr;
1264   } else if (downstream->get_response_state() ==
1265              DownstreamState::MSG_BAD_HEADER) {
1266     if (error_reply(downstream, 502) != 0) {
1267       return -1;
1268     }
1269     downstream->pop_downstream_connection();
1270     // dconn was deleted
1271     dconn = nullptr;
1272   } else {
1273     auto rv = downstream->on_read();
1274     if (rv == SHRPX_ERR_EOF) {
1275       if (downstream->get_request_header_sent()) {
1276         return downstream_eof(dconn);
1277       }
1278       return SHRPX_ERR_RETRY;
1279     }
1280     if (rv == SHRPX_ERR_DCONN_CANCELED) {
1281       downstream->pop_downstream_connection();
1282       handler_->signal_write();
1283       return 0;
1284     }
1285     if (rv != 0) {
1286       if (rv != SHRPX_ERR_NETWORK) {
1287         if (LOG_ENABLED(INFO)) {
1288           DCLOG(INFO, dconn) << "HTTP parser failure";
1289         }
1290       }
1291       return downstream_error(dconn, Downstream::EVENT_ERROR);
1292     }
1293 
1294     if (downstream->can_detach_downstream_connection()) {
1295       // Keep-alive
1296       downstream->detach_downstream_connection();
1297     }
1298   }
1299 
1300   handler_->signal_write();
1301 
1302   // At this point, downstream may be deleted.
1303 
1304   return 0;
1305 }
1306 
downstream_write(DownstreamConnection * dconn)1307 int Http2Upstream::downstream_write(DownstreamConnection *dconn) {
1308   int rv;
1309   rv = dconn->on_write();
1310   if (rv == SHRPX_ERR_NETWORK) {
1311     return downstream_error(dconn, Downstream::EVENT_ERROR);
1312   }
1313   if (rv != 0) {
1314     return rv;
1315   }
1316   return 0;
1317 }
1318 
downstream_eof(DownstreamConnection * dconn)1319 int Http2Upstream::downstream_eof(DownstreamConnection *dconn) {
1320   auto downstream = dconn->get_downstream();
1321 
1322   if (LOG_ENABLED(INFO)) {
1323     DCLOG(INFO, dconn) << "EOF. stream_id=" << downstream->get_stream_id();
1324   }
1325 
1326   // Delete downstream connection. If we don't delete it here, it will
1327   // be pooled in on_stream_close_callback.
1328   downstream->pop_downstream_connection();
1329   // dconn was deleted
1330   dconn = nullptr;
1331   // downstream will be deleted in on_stream_close_callback.
1332   if (downstream->get_response_state() == DownstreamState::HEADER_COMPLETE) {
1333     // Server may indicate the end of the request by EOF
1334     if (LOG_ENABLED(INFO)) {
1335       ULOG(INFO, this) << "Downstream body was ended by EOF";
1336     }
1337     downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1338 
1339     // For tunneled connection, MSG_COMPLETE signals
1340     // downstream_data_read_callback to send RST_STREAM after pending
1341     // response body is sent. This is needed to ensure that RST_STREAM
1342     // is sent after all pending data are sent.
1343     on_downstream_body_complete(downstream);
1344   } else if (downstream->get_response_state() !=
1345              DownstreamState::MSG_COMPLETE) {
1346     // If stream was not closed, then we set MSG_COMPLETE and let
1347     // on_stream_close_callback delete downstream.
1348     if (error_reply(downstream, 502) != 0) {
1349       return -1;
1350     }
1351   }
1352   handler_->signal_write();
1353   // At this point, downstream may be deleted.
1354   return 0;
1355 }
1356 
downstream_error(DownstreamConnection * dconn,int events)1357 int Http2Upstream::downstream_error(DownstreamConnection *dconn, int events) {
1358   auto downstream = dconn->get_downstream();
1359 
1360   if (LOG_ENABLED(INFO)) {
1361     if (events & Downstream::EVENT_ERROR) {
1362       DCLOG(INFO, dconn) << "Downstream network/general error";
1363     } else {
1364       DCLOG(INFO, dconn) << "Timeout";
1365     }
1366     if (downstream->get_upgraded()) {
1367       DCLOG(INFO, dconn) << "Note: this is tunnel connection";
1368     }
1369   }
1370 
1371   // Delete downstream connection. If we don't delete it here, it will
1372   // be pooled in on_stream_close_callback.
1373   downstream->pop_downstream_connection();
1374   // dconn was deleted
1375   dconn = nullptr;
1376 
1377   if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1378     // For SSL tunneling, we issue RST_STREAM. For other types of
1379     // stream, we don't have to do anything since response was
1380     // complete.
1381     if (downstream->get_upgraded()) {
1382       rst_stream(downstream, NGHTTP2_NO_ERROR);
1383     }
1384   } else {
1385     if (downstream->get_response_state() == DownstreamState::HEADER_COMPLETE) {
1386       if (downstream->get_upgraded()) {
1387         on_downstream_body_complete(downstream);
1388       } else {
1389         rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
1390       }
1391     } else {
1392       unsigned int status;
1393       if (events & Downstream::EVENT_TIMEOUT) {
1394         if (downstream->get_request_header_sent()) {
1395           status = 504;
1396         } else {
1397           status = 408;
1398         }
1399       } else {
1400         status = 502;
1401       }
1402       if (error_reply(downstream, status) != 0) {
1403         return -1;
1404       }
1405     }
1406     downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1407   }
1408   handler_->signal_write();
1409   // At this point, downstream may be deleted.
1410   return 0;
1411 }
1412 
rst_stream(Downstream * downstream,uint32_t error_code)1413 int Http2Upstream::rst_stream(Downstream *downstream, uint32_t error_code) {
1414   if (LOG_ENABLED(INFO)) {
1415     ULOG(INFO, this) << "RST_STREAM stream_id=" << downstream->get_stream_id()
1416                      << " with error_code=" << error_code;
1417   }
1418   int rv;
1419   rv = nghttp2_submit_rst_stream(session_, NGHTTP2_FLAG_NONE,
1420                                  downstream->get_stream_id(), error_code);
1421   if (rv < NGHTTP2_ERR_FATAL) {
1422     ULOG(FATAL, this) << "nghttp2_submit_rst_stream() failed: "
1423                       << nghttp2_strerror(rv);
1424     return -1;
1425   }
1426   return 0;
1427 }
1428 
terminate_session(uint32_t error_code)1429 int Http2Upstream::terminate_session(uint32_t error_code) {
1430   int rv;
1431   rv = nghttp2_session_terminate_session(session_, error_code);
1432   if (rv != 0) {
1433     return -1;
1434   }
1435   return 0;
1436 }
1437 
1438 namespace {
downstream_data_read_callback(nghttp2_session * session,int32_t stream_id,uint8_t * buf,size_t length,uint32_t * data_flags,nghttp2_data_source * source,void * user_data)1439 ssize_t downstream_data_read_callback(nghttp2_session *session,
1440                                       int32_t stream_id, uint8_t *buf,
1441                                       size_t length, uint32_t *data_flags,
1442                                       nghttp2_data_source *source,
1443                                       void *user_data) {
1444   int rv;
1445   auto downstream = static_cast<Downstream *>(source->ptr);
1446   auto body = downstream->get_response_buf();
1447   assert(body);
1448   auto upstream = static_cast<Http2Upstream *>(user_data);
1449 
1450   const auto &resp = downstream->response();
1451 
1452   auto nread = std::min(body->rleft(), length);
1453 
1454   auto max_buffer_size = upstream->get_max_buffer_size();
1455 
1456   auto buffer = upstream->get_response_buf();
1457 
1458   if (max_buffer_size <
1459       std::min(nread, static_cast<size_t>(256)) + 9 + buffer->rleft()) {
1460     if (LOG_ENABLED(INFO)) {
1461       ULOG(INFO, upstream) << "Buffer is almost full.  Skip write DATA";
1462     }
1463     return NGHTTP2_ERR_PAUSE;
1464   }
1465 
1466   nread = std::min(nread, max_buffer_size - 9 - buffer->rleft());
1467 
1468   auto body_empty = body->rleft() == nread;
1469 
1470   *data_flags |= NGHTTP2_DATA_FLAG_NO_COPY;
1471 
1472   if (body_empty &&
1473       downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1474 
1475     *data_flags |= NGHTTP2_DATA_FLAG_EOF;
1476 
1477     if (!downstream->get_upgraded()) {
1478       const auto &trailers = resp.fs.trailers();
1479       if (!trailers.empty()) {
1480         std::vector<nghttp2_nv> nva;
1481         nva.reserve(trailers.size());
1482         http2::copy_headers_to_nva_nocopy(nva, trailers, http2::HDOP_STRIP_ALL);
1483         if (!nva.empty()) {
1484           rv = nghttp2_submit_trailer(session, stream_id, nva.data(),
1485                                       nva.size());
1486           if (rv != 0) {
1487             if (nghttp2_is_fatal(rv)) {
1488               return NGHTTP2_ERR_CALLBACK_FAILURE;
1489             }
1490           } else {
1491             *data_flags |= NGHTTP2_DATA_FLAG_NO_END_STREAM;
1492           }
1493         }
1494       }
1495     }
1496   }
1497 
1498   if (nread == 0 && ((*data_flags) & NGHTTP2_DATA_FLAG_EOF) == 0) {
1499     downstream->disable_upstream_wtimer();
1500     return NGHTTP2_ERR_DEFERRED;
1501   }
1502 
1503   return nread;
1504 }
1505 } // namespace
1506 
send_reply(Downstream * downstream,const uint8_t * body,size_t bodylen)1507 int Http2Upstream::send_reply(Downstream *downstream, const uint8_t *body,
1508                               size_t bodylen) {
1509   int rv;
1510 
1511   nghttp2_data_provider data_prd, *data_prd_ptr = nullptr;
1512 
1513   if (bodylen) {
1514     data_prd.source.ptr = downstream;
1515     data_prd.read_callback = downstream_data_read_callback;
1516     data_prd_ptr = &data_prd;
1517   }
1518 
1519   const auto &resp = downstream->response();
1520   auto config = get_config();
1521   auto &httpconf = config->http;
1522 
1523   auto &balloc = downstream->get_block_allocator();
1524 
1525   const auto &headers = resp.fs.headers();
1526   auto nva = std::vector<nghttp2_nv>();
1527   // 2 for :status and server
1528   nva.reserve(2 + headers.size() + httpconf.add_response_headers.size());
1529 
1530   auto response_status = http2::stringify_status(balloc, resp.http_status);
1531 
1532   nva.push_back(http2::make_nv_ls_nocopy(":status", response_status));
1533 
1534   for (auto &kv : headers) {
1535     if (kv.name.empty() || kv.name[0] == ':') {
1536       continue;
1537     }
1538     switch (kv.token) {
1539     case http2::HD_CONNECTION:
1540     case http2::HD_KEEP_ALIVE:
1541     case http2::HD_PROXY_CONNECTION:
1542     case http2::HD_TE:
1543     case http2::HD_TRANSFER_ENCODING:
1544     case http2::HD_UPGRADE:
1545       continue;
1546     }
1547     nva.push_back(http2::make_nv_nocopy(kv.name, kv.value, kv.no_index));
1548   }
1549 
1550   if (!resp.fs.header(http2::HD_SERVER)) {
1551     nva.push_back(http2::make_nv_ls_nocopy("server", config->http.server_name));
1552   }
1553 
1554   for (auto &p : httpconf.add_response_headers) {
1555     nva.push_back(http2::make_nv_nocopy(p.name, p.value));
1556   }
1557 
1558   rv = nghttp2_submit_response(session_, downstream->get_stream_id(),
1559                                nva.data(), nva.size(), data_prd_ptr);
1560   if (nghttp2_is_fatal(rv)) {
1561     ULOG(FATAL, this) << "nghttp2_submit_response() failed: "
1562                       << nghttp2_strerror(rv);
1563     return -1;
1564   }
1565 
1566   auto buf = downstream->get_response_buf();
1567 
1568   buf->append(body, bodylen);
1569 
1570   downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1571 
1572   if (data_prd_ptr) {
1573     downstream->reset_upstream_wtimer();
1574   }
1575 
1576   return 0;
1577 }
1578 
error_reply(Downstream * downstream,unsigned int status_code)1579 int Http2Upstream::error_reply(Downstream *downstream,
1580                                unsigned int status_code) {
1581   int rv;
1582   auto &resp = downstream->response();
1583 
1584   auto &balloc = downstream->get_block_allocator();
1585 
1586   auto html = http::create_error_html(balloc, status_code);
1587   resp.http_status = status_code;
1588   auto body = downstream->get_response_buf();
1589   body->append(html);
1590   downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1591 
1592   nghttp2_data_provider data_prd;
1593   data_prd.source.ptr = downstream;
1594   data_prd.read_callback = downstream_data_read_callback;
1595 
1596   auto lgconf = log_config();
1597   lgconf->update_tstamp(std::chrono::system_clock::now());
1598 
1599   auto response_status = http2::stringify_status(balloc, status_code);
1600   auto content_length = util::make_string_ref_uint(balloc, html.size());
1601   auto date = make_string_ref(balloc, lgconf->tstamp->time_http);
1602 
1603   auto nva = std::array<nghttp2_nv, 5>{
1604       {http2::make_nv_ls_nocopy(":status", response_status),
1605        http2::make_nv_ll("content-type", "text/html; charset=UTF-8"),
1606        http2::make_nv_ls_nocopy("server", get_config()->http.server_name),
1607        http2::make_nv_ls_nocopy("content-length", content_length),
1608        http2::make_nv_ls_nocopy("date", date)}};
1609 
1610   rv = nghttp2_submit_response(session_, downstream->get_stream_id(),
1611                                nva.data(), nva.size(), &data_prd);
1612   if (rv < NGHTTP2_ERR_FATAL) {
1613     ULOG(FATAL, this) << "nghttp2_submit_response() failed: "
1614                       << nghttp2_strerror(rv);
1615     return -1;
1616   }
1617 
1618   downstream->reset_upstream_wtimer();
1619 
1620   return 0;
1621 }
1622 
add_pending_downstream(std::unique_ptr<Downstream> downstream)1623 void Http2Upstream::add_pending_downstream(
1624     std::unique_ptr<Downstream> downstream) {
1625   downstream_queue_.add_pending(std::move(downstream));
1626 }
1627 
remove_downstream(Downstream * downstream)1628 void Http2Upstream::remove_downstream(Downstream *downstream) {
1629   if (downstream->accesslog_ready()) {
1630     handler_->write_accesslog(downstream);
1631   }
1632 
1633   nghttp2_session_set_stream_user_data(session_, downstream->get_stream_id(),
1634                                        nullptr);
1635 
1636   auto next_downstream = downstream_queue_.remove_and_get_blocked(downstream);
1637 
1638   if (next_downstream) {
1639     initiate_downstream(next_downstream);
1640   }
1641 
1642   if (downstream_queue_.get_downstreams() == nullptr) {
1643     // There is no downstream at the moment.  Start idle timer now.
1644     handler_->repeat_read_timer();
1645   }
1646 }
1647 
1648 // WARNING: Never call directly or indirectly nghttp2_session_send or
1649 // nghttp2_session_recv. These calls may delete downstream.
on_downstream_header_complete(Downstream * downstream)1650 int Http2Upstream::on_downstream_header_complete(Downstream *downstream) {
1651   int rv;
1652 
1653   const auto &req = downstream->request();
1654   auto &resp = downstream->response();
1655 
1656   auto &balloc = downstream->get_block_allocator();
1657 
1658   if (LOG_ENABLED(INFO)) {
1659     if (downstream->get_non_final_response()) {
1660       DLOG(INFO, downstream) << "HTTP non-final response header";
1661     } else {
1662       DLOG(INFO, downstream) << "HTTP response header completed";
1663     }
1664   }
1665 
1666   auto config = get_config();
1667   auto &httpconf = config->http;
1668 
1669   if (!config->http2_proxy && !httpconf.no_location_rewrite) {
1670     downstream->rewrite_location_response_header(req.scheme);
1671   }
1672 
1673 #ifdef HAVE_MRUBY
1674   if (!downstream->get_non_final_response()) {
1675     auto dconn = downstream->get_downstream_connection();
1676     const auto &group = dconn->get_downstream_addr_group();
1677     if (group) {
1678       const auto &dmruby_ctx = group->shared_addr->mruby_ctx;
1679 
1680       if (dmruby_ctx->run_on_response_proc(downstream) != 0) {
1681         if (error_reply(downstream, 500) != 0) {
1682           return -1;
1683         }
1684         // Returning -1 will signal deletion of dconn.
1685         return -1;
1686       }
1687 
1688       if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1689         return -1;
1690       }
1691     }
1692 
1693     auto worker = handler_->get_worker();
1694     auto mruby_ctx = worker->get_mruby_context();
1695 
1696     if (mruby_ctx->run_on_response_proc(downstream) != 0) {
1697       if (error_reply(downstream, 500) != 0) {
1698         return -1;
1699       }
1700       // Returning -1 will signal deletion of dconn.
1701       return -1;
1702     }
1703 
1704     if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1705       return -1;
1706     }
1707   }
1708 #endif // HAVE_MRUBY
1709 
1710   auto &http2conf = config->http2;
1711 
1712   // We need some conditions that must be fulfilled to initiate server
1713   // push.
1714   //
1715   // * Server push is disabled for http2 proxy or client proxy, since
1716   //   incoming headers are mixed origins.  We don't know how to
1717   //   reliably determine the authority yet.
1718   //
1719   // * We need non-final response or 200 response code for associated
1720   //   resource.  This is too restrictive, we will review this later.
1721   //
1722   // * We requires GET or POST for associated resource.  Probably we
1723   //   don't want to push for HEAD request.  Not sure other methods
1724   //   are also eligible for push.
1725   if (!http2conf.no_server_push &&
1726       nghttp2_session_get_remote_settings(session_,
1727                                           NGHTTP2_SETTINGS_ENABLE_PUSH) == 1 &&
1728       !config->http2_proxy && (downstream->get_stream_id() % 2) &&
1729       resp.fs.header(http2::HD_LINK) &&
1730       (downstream->get_non_final_response() || resp.http_status == 200) &&
1731       (req.method == HTTP_GET || req.method == HTTP_POST)) {
1732 
1733     if (prepare_push_promise(downstream) != 0) {
1734       // Continue to send response even if push was failed.
1735     }
1736   }
1737 
1738   auto nva = std::vector<nghttp2_nv>();
1739   // 6 means :status and possible server, via, x-http2-push, alt-svc,
1740   // and set-cookie (for affinity cookie) header field.
1741   nva.reserve(resp.fs.headers().size() + 6 +
1742               httpconf.add_response_headers.size());
1743 
1744   if (downstream->get_non_final_response()) {
1745     auto response_status = http2::stringify_status(balloc, resp.http_status);
1746 
1747     nva.push_back(http2::make_nv_ls_nocopy(":status", response_status));
1748 
1749     http2::copy_headers_to_nva_nocopy(nva, resp.fs.headers(),
1750                                       http2::HDOP_STRIP_ALL);
1751 
1752     if (LOG_ENABLED(INFO)) {
1753       log_response_headers(downstream, nva);
1754     }
1755 
1756     rv = nghttp2_submit_headers(session_, NGHTTP2_FLAG_NONE,
1757                                 downstream->get_stream_id(), nullptr,
1758                                 nva.data(), nva.size(), nullptr);
1759 
1760     resp.fs.clear_headers();
1761 
1762     if (rv != 0) {
1763       ULOG(FATAL, this) << "nghttp2_submit_headers() failed";
1764       return -1;
1765     }
1766 
1767     return 0;
1768   }
1769 
1770   auto striphd_flags = http2::HDOP_STRIP_ALL & ~http2::HDOP_STRIP_VIA;
1771   StringRef response_status;
1772 
1773   if (req.connect_proto == ConnectProto::WEBSOCKET && resp.http_status == 101) {
1774     response_status = http2::stringify_status(balloc, 200);
1775     striphd_flags |= http2::HDOP_STRIP_SEC_WEBSOCKET_ACCEPT;
1776   } else {
1777     response_status = http2::stringify_status(balloc, resp.http_status);
1778   }
1779 
1780   nva.push_back(http2::make_nv_ls_nocopy(":status", response_status));
1781 
1782   http2::copy_headers_to_nva_nocopy(nva, resp.fs.headers(), striphd_flags);
1783 
1784   if (!config->http2_proxy && !httpconf.no_server_rewrite) {
1785     nva.push_back(http2::make_nv_ls_nocopy("server", httpconf.server_name));
1786   } else {
1787     auto server = resp.fs.header(http2::HD_SERVER);
1788     if (server) {
1789       nva.push_back(http2::make_nv_ls_nocopy("server", (*server).value));
1790     }
1791   }
1792 
1793   if (!req.regular_connect_method() || !downstream->get_upgraded()) {
1794     auto affinity_cookie = downstream->get_affinity_cookie_to_send();
1795     if (affinity_cookie) {
1796       auto dconn = downstream->get_downstream_connection();
1797       assert(dconn);
1798       auto &group = dconn->get_downstream_addr_group();
1799       auto &shared_addr = group->shared_addr;
1800       auto &cookieconf = shared_addr->affinity.cookie;
1801       auto secure =
1802           http::require_cookie_secure_attribute(cookieconf.secure, req.scheme);
1803       auto cookie_str = http::create_affinity_cookie(
1804           balloc, cookieconf.name, affinity_cookie, cookieconf.path, secure);
1805       nva.push_back(http2::make_nv_ls_nocopy("set-cookie", cookie_str));
1806     }
1807   }
1808 
1809   if (!resp.fs.header(http2::HD_ALT_SVC)) {
1810     // We won't change or alter alt-svc from backend for now
1811     if (!httpconf.http2_altsvc_header_value.empty()) {
1812       nva.push_back(http2::make_nv_ls_nocopy(
1813           "alt-svc", httpconf.http2_altsvc_header_value));
1814     }
1815   }
1816 
1817   auto via = resp.fs.header(http2::HD_VIA);
1818   if (httpconf.no_via) {
1819     if (via) {
1820       nva.push_back(http2::make_nv_ls_nocopy("via", (*via).value));
1821     }
1822   } else {
1823     // we don't create more than 16 bytes in
1824     // http::create_via_header_value.
1825     size_t len = 16;
1826     if (via) {
1827       len += via->value.size() + 2;
1828     }
1829 
1830     auto iov = make_byte_ref(balloc, len + 1);
1831     auto p = iov.base;
1832     if (via) {
1833       p = std::copy(std::begin(via->value), std::end(via->value), p);
1834       p = util::copy_lit(p, ", ");
1835     }
1836     p = http::create_via_header_value(p, resp.http_major, resp.http_minor);
1837     *p = '\0';
1838 
1839     nva.push_back(http2::make_nv_ls_nocopy("via", StringRef{iov.base, p}));
1840   }
1841 
1842   for (auto &p : httpconf.add_response_headers) {
1843     nva.push_back(http2::make_nv_nocopy(p.name, p.value));
1844   }
1845 
1846   if (downstream->get_stream_id() % 2 == 0) {
1847     // This header field is basically for human on client side to
1848     // figure out that the resource is pushed.
1849     nva.push_back(http2::make_nv_ll("x-http2-push", "1"));
1850   }
1851 
1852   if (LOG_ENABLED(INFO)) {
1853     log_response_headers(downstream, nva);
1854   }
1855 
1856   if (http2conf.upstream.debug.dump.response_header) {
1857     http2::dump_nv(http2conf.upstream.debug.dump.response_header, nva.data(),
1858                    nva.size());
1859   }
1860 
1861   nghttp2_data_provider data_prd;
1862   data_prd.source.ptr = downstream;
1863   data_prd.read_callback = downstream_data_read_callback;
1864 
1865   nghttp2_data_provider *data_prdptr;
1866 
1867   if (downstream->expect_response_body() ||
1868       downstream->expect_response_trailer()) {
1869     data_prdptr = &data_prd;
1870   } else {
1871     data_prdptr = nullptr;
1872   }
1873 
1874   rv = nghttp2_submit_response(session_, downstream->get_stream_id(),
1875                                nva.data(), nva.size(), data_prdptr);
1876   if (rv != 0) {
1877     ULOG(FATAL, this) << "nghttp2_submit_response() failed";
1878     return -1;
1879   }
1880 
1881   if (data_prdptr) {
1882     downstream->reset_upstream_wtimer();
1883   }
1884 
1885   return 0;
1886 }
1887 
1888 // WARNING: Never call directly or indirectly nghttp2_session_send or
1889 // nghttp2_session_recv. These calls may delete downstream.
on_downstream_body(Downstream * downstream,const uint8_t * data,size_t len,bool flush)1890 int Http2Upstream::on_downstream_body(Downstream *downstream,
1891                                       const uint8_t *data, size_t len,
1892                                       bool flush) {
1893   auto body = downstream->get_response_buf();
1894   body->append(data, len);
1895 
1896   if (flush) {
1897     nghttp2_session_resume_data(session_, downstream->get_stream_id());
1898 
1899     downstream->ensure_upstream_wtimer();
1900   }
1901 
1902   return 0;
1903 }
1904 
1905 // WARNING: Never call directly or indirectly nghttp2_session_send or
1906 // nghttp2_session_recv. These calls may delete downstream.
on_downstream_body_complete(Downstream * downstream)1907 int Http2Upstream::on_downstream_body_complete(Downstream *downstream) {
1908   if (LOG_ENABLED(INFO)) {
1909     DLOG(INFO, downstream) << "HTTP response completed";
1910   }
1911 
1912   auto &resp = downstream->response();
1913 
1914   if (!downstream->validate_response_recv_body_length()) {
1915     rst_stream(downstream, NGHTTP2_PROTOCOL_ERROR);
1916     resp.connection_close = true;
1917     return 0;
1918   }
1919 
1920   nghttp2_session_resume_data(session_, downstream->get_stream_id());
1921   downstream->ensure_upstream_wtimer();
1922 
1923   return 0;
1924 }
1925 
get_flow_control() const1926 bool Http2Upstream::get_flow_control() const { return flow_control_; }
1927 
pause_read(IOCtrlReason reason)1928 void Http2Upstream::pause_read(IOCtrlReason reason) {}
1929 
resume_read(IOCtrlReason reason,Downstream * downstream,size_t consumed)1930 int Http2Upstream::resume_read(IOCtrlReason reason, Downstream *downstream,
1931                                size_t consumed) {
1932   if (get_flow_control()) {
1933     if (consume(downstream->get_stream_id(), consumed) != 0) {
1934       return -1;
1935     }
1936 
1937     auto &req = downstream->request();
1938 
1939     req.consume(consumed);
1940   }
1941 
1942   handler_->signal_write();
1943   return 0;
1944 }
1945 
on_downstream_abort_request(Downstream * downstream,unsigned int status_code)1946 int Http2Upstream::on_downstream_abort_request(Downstream *downstream,
1947                                                unsigned int status_code) {
1948   int rv;
1949 
1950   rv = error_reply(downstream, status_code);
1951 
1952   if (rv != 0) {
1953     return -1;
1954   }
1955 
1956   handler_->signal_write();
1957   return 0;
1958 }
1959 
on_downstream_abort_request_with_https_redirect(Downstream * downstream)1960 int Http2Upstream::on_downstream_abort_request_with_https_redirect(
1961     Downstream *downstream) {
1962   int rv;
1963 
1964   rv = redirect_to_https(downstream);
1965   if (rv != 0) {
1966     return -1;
1967   }
1968 
1969   handler_->signal_write();
1970   return 0;
1971 }
1972 
redirect_to_https(Downstream * downstream)1973 int Http2Upstream::redirect_to_https(Downstream *downstream) {
1974   auto &req = downstream->request();
1975   if (req.regular_connect_method() || req.scheme != "http") {
1976     return error_reply(downstream, 400);
1977   }
1978 
1979   auto authority = util::extract_host(req.authority);
1980   if (authority.empty()) {
1981     return error_reply(downstream, 400);
1982   }
1983 
1984   auto &balloc = downstream->get_block_allocator();
1985   auto config = get_config();
1986   auto &httpconf = config->http;
1987 
1988   StringRef loc;
1989   if (httpconf.redirect_https_port == StringRef::from_lit("443")) {
1990     loc = concat_string_ref(balloc, StringRef::from_lit("https://"), authority,
1991                             req.path);
1992   } else {
1993     loc = concat_string_ref(balloc, StringRef::from_lit("https://"), authority,
1994                             StringRef::from_lit(":"),
1995                             httpconf.redirect_https_port, req.path);
1996   }
1997 
1998   auto &resp = downstream->response();
1999   resp.http_status = 308;
2000   resp.fs.add_header_token(StringRef::from_lit("location"), loc, false,
2001                            http2::HD_LOCATION);
2002 
2003   return send_reply(downstream, nullptr, 0);
2004 }
2005 
consume(int32_t stream_id,size_t len)2006 int Http2Upstream::consume(int32_t stream_id, size_t len) {
2007   int rv;
2008 
2009   auto faddr = handler_->get_upstream_addr();
2010 
2011   if (faddr->alt_mode != UpstreamAltMode::NONE) {
2012     return 0;
2013   }
2014 
2015   rv = nghttp2_session_consume(session_, stream_id, len);
2016 
2017   if (rv != 0) {
2018     ULOG(WARN, this) << "nghttp2_session_consume() returned error: "
2019                      << nghttp2_strerror(rv);
2020     return -1;
2021   }
2022 
2023   return 0;
2024 }
2025 
log_response_headers(Downstream * downstream,const std::vector<nghttp2_nv> & nva) const2026 void Http2Upstream::log_response_headers(
2027     Downstream *downstream, const std::vector<nghttp2_nv> &nva) const {
2028   std::stringstream ss;
2029   for (auto &nv : nva) {
2030     ss << TTY_HTTP_HD << StringRef{nv.name, nv.namelen} << TTY_RST << ": "
2031        << StringRef{nv.value, nv.valuelen} << "\n";
2032   }
2033   ULOG(INFO, this) << "HTTP response headers. stream_id="
2034                    << downstream->get_stream_id() << "\n"
2035                    << ss.str();
2036 }
2037 
on_timeout(Downstream * downstream)2038 int Http2Upstream::on_timeout(Downstream *downstream) {
2039   if (LOG_ENABLED(INFO)) {
2040     ULOG(INFO, this) << "Stream timeout stream_id="
2041                      << downstream->get_stream_id();
2042   }
2043 
2044   rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
2045   handler_->signal_write();
2046 
2047   return 0;
2048 }
2049 
on_handler_delete()2050 void Http2Upstream::on_handler_delete() {
2051   for (auto d = downstream_queue_.get_downstreams(); d; d = d->dlnext) {
2052     if (d->get_dispatch_state() == DispatchState::ACTIVE &&
2053         d->accesslog_ready()) {
2054       handler_->write_accesslog(d);
2055     }
2056   }
2057 }
2058 
on_downstream_reset(Downstream * downstream,bool no_retry)2059 int Http2Upstream::on_downstream_reset(Downstream *downstream, bool no_retry) {
2060   int rv;
2061 
2062   if (downstream->get_dispatch_state() != DispatchState::ACTIVE) {
2063     // This is error condition when we failed push_request_headers()
2064     // in initiate_downstream().  Otherwise, we have
2065     // DispatchState::ACTIVE state, or we did not set
2066     // DownstreamConnection.
2067     downstream->pop_downstream_connection();
2068     handler_->signal_write();
2069 
2070     return 0;
2071   }
2072 
2073   if (!downstream->request_submission_ready()) {
2074     if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
2075       // We have got all response body already.  Send it off.
2076       downstream->pop_downstream_connection();
2077       return 0;
2078     }
2079     // pushed stream is handled here
2080     rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
2081     downstream->pop_downstream_connection();
2082 
2083     handler_->signal_write();
2084 
2085     return 0;
2086   }
2087 
2088   downstream->pop_downstream_connection();
2089 
2090   downstream->add_retry();
2091 
2092   std::unique_ptr<DownstreamConnection> dconn;
2093 
2094   rv = 0;
2095 
2096   if (no_retry || downstream->no_more_retry()) {
2097     goto fail;
2098   }
2099 
2100   // downstream connection is clean; we can retry with new
2101   // downstream connection.
2102 
2103   for (;;) {
2104     auto dconn = handler_->get_downstream_connection(rv, downstream);
2105     if (!dconn) {
2106       goto fail;
2107     }
2108 
2109     rv = downstream->attach_downstream_connection(std::move(dconn));
2110     if (rv == 0) {
2111       break;
2112     }
2113   }
2114 
2115   rv = downstream->push_request_headers();
2116   if (rv != 0) {
2117     goto fail;
2118   }
2119 
2120   return 0;
2121 
2122 fail:
2123   if (rv == SHRPX_ERR_TLS_REQUIRED) {
2124     rv = on_downstream_abort_request_with_https_redirect(downstream);
2125   } else {
2126     rv = on_downstream_abort_request(downstream, 502);
2127   }
2128   if (rv != 0) {
2129     rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
2130   }
2131   downstream->pop_downstream_connection();
2132 
2133   handler_->signal_write();
2134 
2135   return 0;
2136 }
2137 
prepare_push_promise(Downstream * downstream)2138 int Http2Upstream::prepare_push_promise(Downstream *downstream) {
2139   int rv;
2140 
2141   const auto &req = downstream->request();
2142   auto &resp = downstream->response();
2143 
2144   auto base = http2::get_pure_path_component(req.path);
2145   if (base.empty()) {
2146     return 0;
2147   }
2148 
2149   auto &balloc = downstream->get_block_allocator();
2150 
2151   for (auto &kv : resp.fs.headers()) {
2152     if (kv.token != http2::HD_LINK) {
2153       continue;
2154     }
2155     for (auto &link : http2::parse_link_header(kv.value)) {
2156       StringRef scheme, authority, path;
2157 
2158       rv = http2::construct_push_component(balloc, scheme, authority, path,
2159                                            base, link.uri);
2160       if (rv != 0) {
2161         continue;
2162       }
2163 
2164       if (scheme.empty()) {
2165         scheme = req.scheme;
2166       }
2167 
2168       if (authority.empty()) {
2169         authority = req.authority;
2170       }
2171 
2172       if (resp.is_resource_pushed(scheme, authority, path)) {
2173         continue;
2174       }
2175 
2176       rv = submit_push_promise(scheme, authority, path, downstream);
2177       if (rv != 0) {
2178         return -1;
2179       }
2180 
2181       resp.resource_pushed(scheme, authority, path);
2182     }
2183   }
2184   return 0;
2185 }
2186 
submit_push_promise(const StringRef & scheme,const StringRef & authority,const StringRef & path,Downstream * downstream)2187 int Http2Upstream::submit_push_promise(const StringRef &scheme,
2188                                        const StringRef &authority,
2189                                        const StringRef &path,
2190                                        Downstream *downstream) {
2191   const auto &req = downstream->request();
2192 
2193   std::vector<nghttp2_nv> nva;
2194   // 4 for :method, :scheme, :path and :authority
2195   nva.reserve(4 + req.fs.headers().size());
2196 
2197   // just use "GET" for now
2198   nva.push_back(http2::make_nv_ll(":method", "GET"));
2199   nva.push_back(http2::make_nv_ls_nocopy(":scheme", scheme));
2200   nva.push_back(http2::make_nv_ls_nocopy(":path", path));
2201   nva.push_back(http2::make_nv_ls_nocopy(":authority", authority));
2202 
2203   for (auto &kv : req.fs.headers()) {
2204     switch (kv.token) {
2205     // TODO generate referer
2206     case http2::HD__AUTHORITY:
2207     case http2::HD__SCHEME:
2208     case http2::HD__METHOD:
2209     case http2::HD__PATH:
2210       continue;
2211     case http2::HD_ACCEPT_ENCODING:
2212     case http2::HD_ACCEPT_LANGUAGE:
2213     case http2::HD_CACHE_CONTROL:
2214     case http2::HD_HOST:
2215     case http2::HD_USER_AGENT:
2216       nva.push_back(http2::make_nv_nocopy(kv.name, kv.value, kv.no_index));
2217       break;
2218     }
2219   }
2220 
2221   auto promised_stream_id = nghttp2_submit_push_promise(
2222       session_, NGHTTP2_FLAG_NONE, downstream->get_stream_id(), nva.data(),
2223       nva.size(), nullptr);
2224 
2225   if (promised_stream_id < 0) {
2226     if (LOG_ENABLED(INFO)) {
2227       ULOG(INFO, this) << "nghttp2_submit_push_promise() failed: "
2228                        << nghttp2_strerror(promised_stream_id);
2229     }
2230     if (nghttp2_is_fatal(promised_stream_id)) {
2231       return -1;
2232     }
2233     return 0;
2234   }
2235 
2236   if (LOG_ENABLED(INFO)) {
2237     std::stringstream ss;
2238     for (auto &nv : nva) {
2239       ss << TTY_HTTP_HD << StringRef{nv.name, nv.namelen} << TTY_RST << ": "
2240          << StringRef{nv.value, nv.valuelen} << "\n";
2241     }
2242     ULOG(INFO, this) << "HTTP push request headers. promised_stream_id="
2243                      << promised_stream_id << "\n"
2244                      << ss.str();
2245   }
2246 
2247   return 0;
2248 }
2249 
push_enabled() const2250 bool Http2Upstream::push_enabled() const {
2251   auto config = get_config();
2252   return !(config->http2.no_server_push ||
2253            nghttp2_session_get_remote_settings(
2254                session_, NGHTTP2_SETTINGS_ENABLE_PUSH) == 0 ||
2255            config->http2_proxy);
2256 }
2257 
initiate_push(Downstream * downstream,const StringRef & uri)2258 int Http2Upstream::initiate_push(Downstream *downstream, const StringRef &uri) {
2259   int rv;
2260 
2261   if (uri.empty() || !push_enabled() ||
2262       (downstream->get_stream_id() % 2) == 0) {
2263     return 0;
2264   }
2265 
2266   const auto &req = downstream->request();
2267 
2268   auto base = http2::get_pure_path_component(req.path);
2269   if (base.empty()) {
2270     return -1;
2271   }
2272 
2273   auto &balloc = downstream->get_block_allocator();
2274 
2275   StringRef scheme, authority, path;
2276 
2277   rv = http2::construct_push_component(balloc, scheme, authority, path, base,
2278                                        uri);
2279   if (rv != 0) {
2280     return -1;
2281   }
2282 
2283   if (scheme.empty()) {
2284     scheme = req.scheme;
2285   }
2286 
2287   if (authority.empty()) {
2288     authority = req.authority;
2289   }
2290 
2291   auto &resp = downstream->response();
2292 
2293   if (resp.is_resource_pushed(scheme, authority, path)) {
2294     return 0;
2295   }
2296 
2297   rv = submit_push_promise(scheme, authority, path, downstream);
2298 
2299   if (rv != 0) {
2300     return -1;
2301   }
2302 
2303   resp.resource_pushed(scheme, authority, path);
2304 
2305   return 0;
2306 }
2307 
response_riovec(struct iovec * iov,int iovcnt) const2308 int Http2Upstream::response_riovec(struct iovec *iov, int iovcnt) const {
2309   if (iovcnt == 0 || wb_.rleft() == 0) {
2310     return 0;
2311   }
2312 
2313   return wb_.riovec(iov, iovcnt);
2314 }
2315 
response_drain(size_t n)2316 void Http2Upstream::response_drain(size_t n) { wb_.drain(n); }
2317 
response_empty() const2318 bool Http2Upstream::response_empty() const { return wb_.rleft() == 0; }
2319 
get_response_buf()2320 DefaultMemchunks *Http2Upstream::get_response_buf() { return &wb_; }
2321 
2322 Downstream *
on_downstream_push_promise(Downstream * downstream,int32_t promised_stream_id)2323 Http2Upstream::on_downstream_push_promise(Downstream *downstream,
2324                                           int32_t promised_stream_id) {
2325   // promised_stream_id is for backend HTTP/2 session, not for
2326   // frontend.
2327   auto promised_downstream =
2328       std::make_unique<Downstream>(this, handler_->get_mcpool(), 0);
2329   auto &promised_req = promised_downstream->request();
2330 
2331   promised_downstream->set_downstream_stream_id(promised_stream_id);
2332   // Set associated stream in frontend
2333   promised_downstream->set_assoc_stream_id(downstream->get_stream_id());
2334 
2335   promised_downstream->disable_upstream_rtimer();
2336 
2337   promised_req.http_major = 2;
2338   promised_req.http_minor = 0;
2339 
2340   promised_req.fs.content_length = 0;
2341   promised_req.http2_expect_body = false;
2342 
2343   auto ptr = promised_downstream.get();
2344   add_pending_downstream(std::move(promised_downstream));
2345   downstream_queue_.mark_active(ptr);
2346 
2347   return ptr;
2348 }
2349 
on_downstream_push_promise_complete(Downstream * downstream,Downstream * promised_downstream)2350 int Http2Upstream::on_downstream_push_promise_complete(
2351     Downstream *downstream, Downstream *promised_downstream) {
2352   std::vector<nghttp2_nv> nva;
2353 
2354   const auto &promised_req = promised_downstream->request();
2355   const auto &headers = promised_req.fs.headers();
2356 
2357   nva.reserve(headers.size());
2358 
2359   for (auto &kv : headers) {
2360     nva.push_back(http2::make_nv(kv.name, kv.value, kv.no_index));
2361   }
2362 
2363   auto promised_stream_id = nghttp2_submit_push_promise(
2364       session_, NGHTTP2_FLAG_NONE, downstream->get_stream_id(), nva.data(),
2365       nva.size(), promised_downstream);
2366   if (promised_stream_id < 0) {
2367     return -1;
2368   }
2369 
2370   promised_downstream->set_stream_id(promised_stream_id);
2371 
2372   return 0;
2373 }
2374 
cancel_premature_downstream(Downstream * promised_downstream)2375 void Http2Upstream::cancel_premature_downstream(
2376     Downstream *promised_downstream) {
2377   if (LOG_ENABLED(INFO)) {
2378     ULOG(INFO, this) << "Remove premature promised stream "
2379                      << promised_downstream;
2380   }
2381   downstream_queue_.remove_and_get_blocked(promised_downstream, false);
2382 }
2383 
get_max_buffer_size() const2384 size_t Http2Upstream::get_max_buffer_size() const { return max_buffer_size_; }
2385 
2386 } // namespace shrpx
2387