• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * nghttp2 - HTTP/2 C Library
3  *
4  * Copyright (c) 2012 Tatsuhiro Tsujikawa
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining
7  * a copy of this software and associated documentation files (the
8  * "Software"), to deal in the Software without restriction, including
9  * without limitation the rights to use, copy, modify, merge, publish,
10  * distribute, sublicense, and/or sell copies of the Software, and to
11  * permit persons to whom the Software is furnished to do so, subject to
12  * the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be
15  * included in all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21  * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22  * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23  * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24  */
25 #include "shrpx_http2_upstream.h"
26 
27 #include <netinet/tcp.h>
28 #include <assert.h>
29 #include <cerrno>
30 #include <sstream>
31 
32 #include "shrpx_client_handler.h"
33 #include "shrpx_https_upstream.h"
34 #include "shrpx_downstream.h"
35 #include "shrpx_downstream_connection.h"
36 #include "shrpx_config.h"
37 #include "shrpx_http.h"
38 #include "shrpx_worker.h"
39 #include "shrpx_http2_session.h"
40 #include "shrpx_log.h"
41 #ifdef HAVE_MRUBY
42 #  include "shrpx_mruby.h"
43 #endif // HAVE_MRUBY
44 #include "http2.h"
45 #include "util.h"
46 #include "base64.h"
47 #include "app_helper.h"
48 #include "template.h"
49 
50 using namespace nghttp2;
51 
52 namespace shrpx {
53 
54 namespace {
55 constexpr size_t MAX_BUFFER_SIZE = 32_k;
56 } // namespace
57 
58 namespace {
on_stream_close_callback(nghttp2_session * session,int32_t stream_id,uint32_t error_code,void * user_data)59 int on_stream_close_callback(nghttp2_session *session, int32_t stream_id,
60                              uint32_t error_code, void *user_data) {
61   auto upstream = static_cast<Http2Upstream *>(user_data);
62   if (LOG_ENABLED(INFO)) {
63     ULOG(INFO, upstream) << "Stream stream_id=" << stream_id
64                          << " is being closed";
65   }
66 
67   auto downstream = static_cast<Downstream *>(
68       nghttp2_session_get_stream_user_data(session, stream_id));
69 
70   if (!downstream) {
71     return 0;
72   }
73 
74   auto &req = downstream->request();
75 
76   upstream->consume(stream_id, req.unconsumed_body_length);
77 
78   req.unconsumed_body_length = 0;
79 
80   if (downstream->get_request_state() == DownstreamState::CONNECT_FAIL) {
81     upstream->remove_downstream(downstream);
82     // downstream was deleted
83 
84     return 0;
85   }
86 
87   if (downstream->can_detach_downstream_connection()) {
88     // Keep-alive
89     downstream->detach_downstream_connection();
90   }
91 
92   downstream->set_request_state(DownstreamState::STREAM_CLOSED);
93 
94   // At this point, downstream read may be paused.
95 
96   // If shrpx_downstream::push_request_headers() failed, the
97   // error is handled here.
98   upstream->remove_downstream(downstream);
99   // downstream was deleted
100 
101   // How to test this case? Request sufficient large download
102   // and make client send RST_STREAM after it gets first DATA
103   // frame chunk.
104 
105   return 0;
106 }
107 } // namespace
108 
upgrade_upstream(HttpsUpstream * http)109 int Http2Upstream::upgrade_upstream(HttpsUpstream *http) {
110   int rv;
111 
112   auto &balloc = http->get_downstream()->get_block_allocator();
113 
114   auto http2_settings = http->get_downstream()->get_http2_settings();
115   http2_settings = util::to_base64(balloc, http2_settings);
116 
117   auto settings_payload = base64::decode(balloc, std::begin(http2_settings),
118                                          std::end(http2_settings));
119 
120   rv = nghttp2_session_upgrade2(
121       session_, settings_payload.byte(), settings_payload.size(),
122       http->get_downstream()->request().method == HTTP_HEAD, nullptr);
123   if (rv != 0) {
124     if (LOG_ENABLED(INFO)) {
125       ULOG(INFO, this) << "nghttp2_session_upgrade() returned error: "
126                        << nghttp2_strerror(rv);
127     }
128     return -1;
129   }
130   pre_upstream_.reset(http);
131   auto downstream = http->pop_downstream();
132   downstream->reset_upstream(this);
133   downstream->set_stream_id(1);
134   downstream->reset_upstream_rtimer();
135   downstream->set_stream_id(1);
136 
137   auto ptr = downstream.get();
138 
139   nghttp2_session_set_stream_user_data(session_, 1, ptr);
140   downstream_queue_.add_pending(std::move(downstream));
141   downstream_queue_.mark_active(ptr);
142 
143   // TODO This might not be necessary
144   handler_->stop_read_timer();
145 
146   if (LOG_ENABLED(INFO)) {
147     ULOG(INFO, this) << "Connection upgraded to HTTP/2";
148   }
149 
150   return 0;
151 }
152 
start_settings_timer()153 void Http2Upstream::start_settings_timer() {
154   ev_timer_start(handler_->get_loop(), &settings_timer_);
155 }
156 
stop_settings_timer()157 void Http2Upstream::stop_settings_timer() {
158   ev_timer_stop(handler_->get_loop(), &settings_timer_);
159 }
160 
161 namespace {
on_header_callback2(nghttp2_session * session,const nghttp2_frame * frame,nghttp2_rcbuf * name,nghttp2_rcbuf * value,uint8_t flags,void * user_data)162 int on_header_callback2(nghttp2_session *session, const nghttp2_frame *frame,
163                         nghttp2_rcbuf *name, nghttp2_rcbuf *value,
164                         uint8_t flags, void *user_data) {
165   auto namebuf = nghttp2_rcbuf_get_buf(name);
166   auto valuebuf = nghttp2_rcbuf_get_buf(value);
167   auto config = get_config();
168 
169   if (config->http2.upstream.debug.frame_debug) {
170     verbose_on_header_callback(session, frame, namebuf.base, namebuf.len,
171                                valuebuf.base, valuebuf.len, flags, user_data);
172   }
173   if (frame->hd.type != NGHTTP2_HEADERS) {
174     return 0;
175   }
176   auto upstream = static_cast<Http2Upstream *>(user_data);
177   auto downstream = static_cast<Downstream *>(
178       nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
179   if (!downstream) {
180     return 0;
181   }
182 
183   auto &req = downstream->request();
184 
185   auto &httpconf = config->http;
186 
187   if (req.fs.buffer_size() + namebuf.len + valuebuf.len >
188           httpconf.request_header_field_buffer ||
189       req.fs.num_fields() >= httpconf.max_request_header_fields) {
190     if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
191       return 0;
192     }
193 
194     if (LOG_ENABLED(INFO)) {
195       ULOG(INFO, upstream) << "Too large or many header field size="
196                            << req.fs.buffer_size() + namebuf.len + valuebuf.len
197                            << ", num=" << req.fs.num_fields() + 1;
198     }
199 
200     // just ignore header fields if this is trailer part.
201     if (frame->headers.cat == NGHTTP2_HCAT_HEADERS) {
202       return 0;
203     }
204 
205     if (upstream->error_reply(downstream, 431) != 0) {
206       return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
207     }
208 
209     return 0;
210   }
211 
212   auto token = http2::lookup_token(namebuf.base, namebuf.len);
213   auto no_index = flags & NGHTTP2_NV_FLAG_NO_INDEX;
214 
215   downstream->add_rcbuf(name);
216   downstream->add_rcbuf(value);
217 
218   if (frame->headers.cat == NGHTTP2_HCAT_HEADERS) {
219     // just store header fields for trailer part
220     req.fs.add_trailer_token(StringRef{namebuf.base, namebuf.len},
221                              StringRef{valuebuf.base, valuebuf.len}, no_index,
222                              token);
223     return 0;
224   }
225 
226   req.fs.add_header_token(StringRef{namebuf.base, namebuf.len},
227                           StringRef{valuebuf.base, valuebuf.len}, no_index,
228                           token);
229   return 0;
230 }
231 } // namespace
232 
233 namespace {
on_invalid_header_callback2(nghttp2_session * session,const nghttp2_frame * frame,nghttp2_rcbuf * name,nghttp2_rcbuf * value,uint8_t flags,void * user_data)234 int on_invalid_header_callback2(nghttp2_session *session,
235                                 const nghttp2_frame *frame, nghttp2_rcbuf *name,
236                                 nghttp2_rcbuf *value, uint8_t flags,
237                                 void *user_data) {
238   auto upstream = static_cast<Http2Upstream *>(user_data);
239   auto downstream = static_cast<Downstream *>(
240       nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
241   if (!downstream) {
242     return 0;
243   }
244 
245   if (LOG_ENABLED(INFO)) {
246     auto namebuf = nghttp2_rcbuf_get_buf(name);
247     auto valuebuf = nghttp2_rcbuf_get_buf(value);
248 
249     ULOG(INFO, upstream) << "Invalid header field for stream_id="
250                          << frame->hd.stream_id << ": name=["
251                          << StringRef{namebuf.base, namebuf.len} << "], value=["
252                          << StringRef{valuebuf.base, valuebuf.len} << "]";
253   }
254 
255   upstream->rst_stream(downstream, NGHTTP2_PROTOCOL_ERROR);
256 
257   return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
258 }
259 } // namespace
260 
261 namespace {
on_begin_headers_callback(nghttp2_session * session,const nghttp2_frame * frame,void * user_data)262 int on_begin_headers_callback(nghttp2_session *session,
263                               const nghttp2_frame *frame, void *user_data) {
264   auto upstream = static_cast<Http2Upstream *>(user_data);
265 
266   if (frame->headers.cat != NGHTTP2_HCAT_REQUEST) {
267     return 0;
268   }
269   if (LOG_ENABLED(INFO)) {
270     ULOG(INFO, upstream) << "Received upstream request HEADERS stream_id="
271                          << frame->hd.stream_id;
272   }
273 
274   upstream->on_start_request(frame);
275 
276   return 0;
277 }
278 } // namespace
279 
on_start_request(const nghttp2_frame * frame)280 void Http2Upstream::on_start_request(const nghttp2_frame *frame) {
281   auto downstream = std::make_unique<Downstream>(this, handler_->get_mcpool(),
282                                                  frame->hd.stream_id);
283   nghttp2_session_set_stream_user_data(session_, frame->hd.stream_id,
284                                        downstream.get());
285 
286   downstream->reset_upstream_rtimer();
287 
288   handler_->repeat_read_timer();
289 
290   auto &req = downstream->request();
291 
292   // Although, we deprecated minor version from HTTP/2, we supply
293   // minor version 0 to use via header field in a conventional way.
294   req.http_major = 2;
295   req.http_minor = 0;
296 
297   add_pending_downstream(std::move(downstream));
298 
299   ++num_requests_;
300 
301   auto config = get_config();
302   auto &httpconf = config->http;
303   if (httpconf.max_requests <= num_requests_) {
304     start_graceful_shutdown();
305   }
306 }
307 
on_request_headers(Downstream * downstream,const nghttp2_frame * frame)308 int Http2Upstream::on_request_headers(Downstream *downstream,
309                                       const nghttp2_frame *frame) {
310   auto lgconf = log_config();
311   lgconf->update_tstamp(std::chrono::system_clock::now());
312   auto &req = downstream->request();
313   req.tstamp = lgconf->tstamp;
314 
315   if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
316     return 0;
317   }
318 
319   auto &nva = req.fs.headers();
320 
321   if (LOG_ENABLED(INFO)) {
322     std::stringstream ss;
323     for (auto &nv : nva) {
324       if (nv.name == "authorization") {
325         ss << TTY_HTTP_HD << nv.name << TTY_RST << ": <redacted>\n";
326         continue;
327       }
328       ss << TTY_HTTP_HD << nv.name << TTY_RST << ": " << nv.value << "\n";
329     }
330     ULOG(INFO, this) << "HTTP request headers. stream_id="
331                      << downstream->get_stream_id() << "\n"
332                      << ss.str();
333   }
334 
335   auto config = get_config();
336   auto &dump = config->http2.upstream.debug.dump;
337 
338   if (dump.request_header) {
339     http2::dump_nv(dump.request_header, nva);
340   }
341 
342   auto content_length = req.fs.header(http2::HD_CONTENT_LENGTH);
343   if (content_length) {
344     // libnghttp2 guarantees this can be parsed
345     req.fs.content_length = util::parse_uint(content_length->value);
346   }
347 
348   // presence of mandatory header fields are guaranteed by libnghttp2.
349   auto authority = req.fs.header(http2::HD__AUTHORITY);
350   auto path = req.fs.header(http2::HD__PATH);
351   auto method = req.fs.header(http2::HD__METHOD);
352   auto scheme = req.fs.header(http2::HD__SCHEME);
353 
354   auto method_token = http2::lookup_method_token(method->value);
355   if (method_token == -1) {
356     if (error_reply(downstream, 501) != 0) {
357       return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
358     }
359     return 0;
360   }
361 
362   auto faddr = handler_->get_upstream_addr();
363 
364   // For HTTP/2 proxy, we require :authority.
365   if (method_token != HTTP_CONNECT && config->http2_proxy &&
366       faddr->alt_mode == UpstreamAltMode::NONE && !authority) {
367     rst_stream(downstream, NGHTTP2_PROTOCOL_ERROR);
368     return 0;
369   }
370 
371   req.method = method_token;
372   if (scheme) {
373     req.scheme = scheme->value;
374   }
375 
376   // nghttp2 library guarantees either :authority or host exist
377   if (!authority) {
378     req.no_authority = true;
379     authority = req.fs.header(http2::HD_HOST);
380   }
381 
382   if (authority) {
383     req.authority = authority->value;
384   }
385 
386   if (path) {
387     if (method_token == HTTP_OPTIONS &&
388         path->value == StringRef::from_lit("*")) {
389       // Server-wide OPTIONS request.  Path is empty.
390     } else if (config->http2_proxy &&
391                faddr->alt_mode == UpstreamAltMode::NONE) {
392       req.path = path->value;
393     } else {
394       req.path = http2::rewrite_clean_path(downstream->get_block_allocator(),
395                                            path->value);
396     }
397   }
398 
399   auto connect_proto = req.fs.header(http2::HD__PROTOCOL);
400   if (connect_proto) {
401     if (connect_proto->value != "websocket") {
402       if (error_reply(downstream, 400) != 0) {
403         return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
404       }
405       return 0;
406     }
407     req.connect_proto = ConnectProto::WEBSOCKET;
408   }
409 
410   if (!(frame->hd.flags & NGHTTP2_FLAG_END_STREAM)) {
411     req.http2_expect_body = true;
412   } else if (req.fs.content_length == -1) {
413     // If END_STREAM flag is set to HEADERS frame, we are sure that
414     // content-length is 0.
415     req.fs.content_length = 0;
416   }
417 
418   downstream->inspect_http2_request();
419 
420   downstream->set_request_state(DownstreamState::HEADER_COMPLETE);
421 
422 #ifdef HAVE_MRUBY
423   auto upstream = downstream->get_upstream();
424   auto handler = upstream->get_client_handler();
425   auto worker = handler->get_worker();
426   auto mruby_ctx = worker->get_mruby_context();
427 
428   if (mruby_ctx->run_on_request_proc(downstream) != 0) {
429     if (error_reply(downstream, 500) != 0) {
430       return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
431     }
432     return 0;
433   }
434 #endif // HAVE_MRUBY
435 
436   if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
437     downstream->disable_upstream_rtimer();
438 
439     downstream->set_request_state(DownstreamState::MSG_COMPLETE);
440   }
441 
442   if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
443     return 0;
444   }
445 
446   start_downstream(downstream);
447 
448   return 0;
449 }
450 
start_downstream(Downstream * downstream)451 void Http2Upstream::start_downstream(Downstream *downstream) {
452   if (downstream_queue_.can_activate(downstream->request().authority)) {
453     initiate_downstream(downstream);
454     return;
455   }
456 
457   downstream_queue_.mark_blocked(downstream);
458 }
459 
initiate_downstream(Downstream * downstream)460 void Http2Upstream::initiate_downstream(Downstream *downstream) {
461   int rv;
462 
463   DownstreamConnection *dconn_ptr;
464 
465   for (;;) {
466     auto dconn = handler_->get_downstream_connection(rv, downstream);
467     if (!dconn) {
468       if (rv == SHRPX_ERR_TLS_REQUIRED) {
469         rv = redirect_to_https(downstream);
470       } else {
471         rv = error_reply(downstream, 502);
472       }
473       if (rv != 0) {
474         rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
475       }
476 
477       downstream->set_request_state(DownstreamState::CONNECT_FAIL);
478       downstream_queue_.mark_failure(downstream);
479 
480       return;
481     }
482 
483 #ifdef HAVE_MRUBY
484     dconn_ptr = dconn.get();
485 #endif // HAVE_MRUBY
486     rv = downstream->attach_downstream_connection(std::move(dconn));
487     if (rv == 0) {
488       break;
489     }
490   }
491 
492 #ifdef HAVE_MRUBY
493   const auto &group = dconn_ptr->get_downstream_addr_group();
494   if (group) {
495     const auto &mruby_ctx = group->shared_addr->mruby_ctx;
496     if (mruby_ctx->run_on_request_proc(downstream) != 0) {
497       if (error_reply(downstream, 500) != 0) {
498         rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
499       }
500 
501       downstream_queue_.mark_failure(downstream);
502 
503       return;
504     }
505 
506     if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
507       return;
508     }
509   }
510 #endif // HAVE_MRUBY
511 
512   rv = downstream->push_request_headers();
513   if (rv != 0) {
514 
515     if (error_reply(downstream, 502) != 0) {
516       rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
517     }
518 
519     downstream_queue_.mark_failure(downstream);
520 
521     return;
522   }
523 
524   downstream_queue_.mark_active(downstream);
525 
526   auto &req = downstream->request();
527   if (!req.http2_expect_body) {
528     rv = downstream->end_upload_data();
529     if (rv != 0) {
530       rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
531     }
532   }
533 
534   return;
535 }
536 
537 namespace {
on_frame_recv_callback(nghttp2_session * session,const nghttp2_frame * frame,void * user_data)538 int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame,
539                            void *user_data) {
540   if (get_config()->http2.upstream.debug.frame_debug) {
541     verbose_on_frame_recv_callback(session, frame, user_data);
542   }
543   auto upstream = static_cast<Http2Upstream *>(user_data);
544   auto handler = upstream->get_client_handler();
545 
546   switch (frame->hd.type) {
547   case NGHTTP2_DATA: {
548     auto downstream = static_cast<Downstream *>(
549         nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
550     if (!downstream) {
551       return 0;
552     }
553 
554     if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
555       downstream->disable_upstream_rtimer();
556 
557       if (downstream->end_upload_data() != 0) {
558         if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE) {
559           upstream->rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
560         }
561       }
562 
563       downstream->set_request_state(DownstreamState::MSG_COMPLETE);
564     }
565 
566     return 0;
567   }
568   case NGHTTP2_HEADERS: {
569     auto downstream = static_cast<Downstream *>(
570         nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
571     if (!downstream) {
572       return 0;
573     }
574 
575     if (frame->headers.cat == NGHTTP2_HCAT_REQUEST) {
576       downstream->reset_upstream_rtimer();
577 
578       handler->stop_read_timer();
579 
580       return upstream->on_request_headers(downstream, frame);
581     }
582 
583     if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
584       downstream->disable_upstream_rtimer();
585 
586       if (downstream->end_upload_data() != 0) {
587         if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE) {
588           upstream->rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
589         }
590       }
591 
592       downstream->set_request_state(DownstreamState::MSG_COMPLETE);
593     }
594 
595     return 0;
596   }
597   case NGHTTP2_SETTINGS:
598     if ((frame->hd.flags & NGHTTP2_FLAG_ACK) == 0) {
599       return 0;
600     }
601     upstream->stop_settings_timer();
602     return 0;
603   case NGHTTP2_GOAWAY:
604     if (LOG_ENABLED(INFO)) {
605       auto debug_data = util::ascii_dump(frame->goaway.opaque_data,
606                                          frame->goaway.opaque_data_len);
607 
608       ULOG(INFO, upstream) << "GOAWAY received: last-stream-id="
609                            << frame->goaway.last_stream_id
610                            << ", error_code=" << frame->goaway.error_code
611                            << ", debug_data=" << debug_data;
612     }
613     return 0;
614   default:
615     return 0;
616   }
617 }
618 } // namespace
619 
620 namespace {
on_data_chunk_recv_callback(nghttp2_session * session,uint8_t flags,int32_t stream_id,const uint8_t * data,size_t len,void * user_data)621 int on_data_chunk_recv_callback(nghttp2_session *session, uint8_t flags,
622                                 int32_t stream_id, const uint8_t *data,
623                                 size_t len, void *user_data) {
624   auto upstream = static_cast<Http2Upstream *>(user_data);
625   auto downstream = static_cast<Downstream *>(
626       nghttp2_session_get_stream_user_data(session, stream_id));
627 
628   if (!downstream) {
629     if (upstream->consume(stream_id, len) != 0) {
630       return NGHTTP2_ERR_CALLBACK_FAILURE;
631     }
632 
633     return 0;
634   }
635 
636   downstream->reset_upstream_rtimer();
637 
638   if (downstream->push_upload_data_chunk(data, len) != 0) {
639     if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE) {
640       upstream->rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
641     }
642 
643     if (upstream->consume(stream_id, len) != 0) {
644       return NGHTTP2_ERR_CALLBACK_FAILURE;
645     }
646 
647     return 0;
648   }
649 
650   return 0;
651 }
652 } // namespace
653 
654 namespace {
on_frame_send_callback(nghttp2_session * session,const nghttp2_frame * frame,void * user_data)655 int on_frame_send_callback(nghttp2_session *session, const nghttp2_frame *frame,
656                            void *user_data) {
657   if (get_config()->http2.upstream.debug.frame_debug) {
658     verbose_on_frame_send_callback(session, frame, user_data);
659   }
660   auto upstream = static_cast<Http2Upstream *>(user_data);
661   auto handler = upstream->get_client_handler();
662 
663   switch (frame->hd.type) {
664   case NGHTTP2_DATA:
665   case NGHTTP2_HEADERS: {
666     if ((frame->hd.flags & NGHTTP2_FLAG_END_STREAM) == 0) {
667       return 0;
668     }
669     // RST_STREAM if request is still incomplete.
670     auto stream_id = frame->hd.stream_id;
671     auto downstream = static_cast<Downstream *>(
672         nghttp2_session_get_stream_user_data(session, stream_id));
673 
674     if (!downstream) {
675       return 0;
676     }
677 
678     // For tunneling, issue RST_STREAM to finish the stream.
679     if (downstream->get_upgraded() ||
680         nghttp2_session_get_stream_remote_close(session, stream_id) == 0) {
681       if (LOG_ENABLED(INFO)) {
682         ULOG(INFO, upstream)
683             << "Send RST_STREAM to "
684             << (downstream->get_upgraded() ? "tunneled " : "")
685             << "stream stream_id=" << downstream->get_stream_id()
686             << " to finish off incomplete request";
687       }
688 
689       upstream->rst_stream(downstream, NGHTTP2_NO_ERROR);
690     }
691 
692     return 0;
693   }
694   case NGHTTP2_SETTINGS:
695     if ((frame->hd.flags & NGHTTP2_FLAG_ACK) == 0) {
696       upstream->start_settings_timer();
697     }
698     return 0;
699   case NGHTTP2_PUSH_PROMISE: {
700     auto promised_stream_id = frame->push_promise.promised_stream_id;
701 
702     if (nghttp2_session_get_stream_user_data(session, promised_stream_id)) {
703       // In case of push from backend, downstream object was already
704       // created.
705       return 0;
706     }
707 
708     auto promised_downstream = std::make_unique<Downstream>(
709         upstream, handler->get_mcpool(), promised_stream_id);
710     auto &req = promised_downstream->request();
711 
712     // As long as we use nghttp2_session_mem_send(), setting stream
713     // user data here should not fail.  This is because this callback
714     // is called just after frame was serialized.  So no worries about
715     // hanging Downstream.
716     nghttp2_session_set_stream_user_data(session, promised_stream_id,
717                                          promised_downstream.get());
718 
719     promised_downstream->set_assoc_stream_id(frame->hd.stream_id);
720     promised_downstream->disable_upstream_rtimer();
721 
722     req.http_major = 2;
723     req.http_minor = 0;
724 
725     req.fs.content_length = 0;
726     req.http2_expect_body = false;
727 
728     auto &promised_balloc = promised_downstream->get_block_allocator();
729 
730     for (size_t i = 0; i < frame->push_promise.nvlen; ++i) {
731       auto &nv = frame->push_promise.nva[i];
732 
733       auto name =
734           make_string_ref(promised_balloc, StringRef{nv.name, nv.namelen});
735       auto value =
736           make_string_ref(promised_balloc, StringRef{nv.value, nv.valuelen});
737 
738       auto token = http2::lookup_token(nv.name, nv.namelen);
739       switch (token) {
740       case http2::HD__METHOD:
741         req.method = http2::lookup_method_token(value);
742         break;
743       case http2::HD__SCHEME:
744         req.scheme = value;
745         break;
746       case http2::HD__AUTHORITY:
747         req.authority = value;
748         break;
749       case http2::HD__PATH:
750         req.path = http2::rewrite_clean_path(promised_balloc, value);
751         break;
752       }
753       req.fs.add_header_token(name, value, nv.flags & NGHTTP2_NV_FLAG_NO_INDEX,
754                               token);
755     }
756 
757     promised_downstream->inspect_http2_request();
758 
759     promised_downstream->set_request_state(DownstreamState::MSG_COMPLETE);
760 
761     // a bit weird but start_downstream() expects that given
762     // downstream is in pending queue.
763     auto ptr = promised_downstream.get();
764     upstream->add_pending_downstream(std::move(promised_downstream));
765 
766 #ifdef HAVE_MRUBY
767     auto worker = handler->get_worker();
768     auto mruby_ctx = worker->get_mruby_context();
769 
770     if (mruby_ctx->run_on_request_proc(ptr) != 0) {
771       if (upstream->error_reply(ptr, 500) != 0) {
772         upstream->rst_stream(ptr, NGHTTP2_INTERNAL_ERROR);
773         return 0;
774       }
775       return 0;
776     }
777 #endif // HAVE_MRUBY
778 
779     upstream->start_downstream(ptr);
780 
781     return 0;
782   }
783   case NGHTTP2_GOAWAY:
784     if (LOG_ENABLED(INFO)) {
785       auto debug_data = util::ascii_dump(frame->goaway.opaque_data,
786                                          frame->goaway.opaque_data_len);
787 
788       ULOG(INFO, upstream) << "Sending GOAWAY: last-stream-id="
789                            << frame->goaway.last_stream_id
790                            << ", error_code=" << frame->goaway.error_code
791                            << ", debug_data=" << debug_data;
792     }
793     return 0;
794   default:
795     return 0;
796   }
797 }
798 } // namespace
799 
800 namespace {
on_frame_not_send_callback(nghttp2_session * session,const nghttp2_frame * frame,int lib_error_code,void * user_data)801 int on_frame_not_send_callback(nghttp2_session *session,
802                                const nghttp2_frame *frame, int lib_error_code,
803                                void *user_data) {
804   auto upstream = static_cast<Http2Upstream *>(user_data);
805   if (LOG_ENABLED(INFO)) {
806     ULOG(INFO, upstream) << "Failed to send control frame type="
807                          << static_cast<uint32_t>(frame->hd.type)
808                          << ", lib_error_code=" << lib_error_code << ":"
809                          << nghttp2_strerror(lib_error_code);
810   }
811   if (frame->hd.type == NGHTTP2_HEADERS &&
812       lib_error_code != NGHTTP2_ERR_STREAM_CLOSED &&
813       lib_error_code != NGHTTP2_ERR_STREAM_CLOSING) {
814     // To avoid stream hanging around, issue RST_STREAM.
815     auto downstream = static_cast<Downstream *>(
816         nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
817     if (downstream) {
818       upstream->rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
819     }
820   }
821   return 0;
822 }
823 } // namespace
824 
825 namespace {
826 constexpr auto PADDING = std::array<uint8_t, 256>{};
827 } // namespace
828 
829 namespace {
send_data_callback(nghttp2_session * session,nghttp2_frame * frame,const uint8_t * framehd,size_t length,nghttp2_data_source * source,void * user_data)830 int send_data_callback(nghttp2_session *session, nghttp2_frame *frame,
831                        const uint8_t *framehd, size_t length,
832                        nghttp2_data_source *source, void *user_data) {
833   auto downstream = static_cast<Downstream *>(source->ptr);
834   auto upstream = static_cast<Http2Upstream *>(downstream->get_upstream());
835   auto body = downstream->get_response_buf();
836 
837   auto wb = upstream->get_response_buf();
838 
839   size_t padlen = 0;
840 
841   wb->append(framehd, 9);
842   if (frame->data.padlen > 0) {
843     padlen = frame->data.padlen - 1;
844     wb->append(static_cast<uint8_t>(padlen));
845   }
846 
847   body->remove(*wb, length);
848 
849   wb->append(PADDING.data(), padlen);
850 
851   if (body->rleft() == 0) {
852     downstream->disable_upstream_wtimer();
853   } else {
854     downstream->reset_upstream_wtimer();
855   }
856 
857   if (length > 0 && downstream->resume_read(SHRPX_NO_BUFFER, length) != 0) {
858     return NGHTTP2_ERR_CALLBACK_FAILURE;
859   }
860 
861   // We have to add length here, so that we can log this amount of
862   // data transferred.
863   downstream->response_sent_body_length += length;
864 
865   auto max_buffer_size = upstream->get_max_buffer_size();
866 
867   return wb->rleft() >= max_buffer_size ? NGHTTP2_ERR_PAUSE : 0;
868 }
869 } // namespace
870 
871 namespace {
infer_upstream_rst_stream_error_code(uint32_t downstream_error_code)872 uint32_t infer_upstream_rst_stream_error_code(uint32_t downstream_error_code) {
873   // NGHTTP2_REFUSED_STREAM is important because it tells upstream
874   // client to retry.
875   switch (downstream_error_code) {
876   case NGHTTP2_NO_ERROR:
877   case NGHTTP2_REFUSED_STREAM:
878     return downstream_error_code;
879   default:
880     return NGHTTP2_INTERNAL_ERROR;
881   }
882 }
883 } // namespace
884 
885 namespace {
settings_timeout_cb(struct ev_loop * loop,ev_timer * w,int revents)886 void settings_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
887   auto upstream = static_cast<Http2Upstream *>(w->data);
888   auto handler = upstream->get_client_handler();
889   ULOG(INFO, upstream) << "SETTINGS timeout";
890   if (upstream->terminate_session(NGHTTP2_SETTINGS_TIMEOUT) != 0) {
891     delete handler;
892     return;
893   }
894   handler->signal_write();
895 }
896 } // namespace
897 
898 namespace {
shutdown_timeout_cb(struct ev_loop * loop,ev_timer * w,int revents)899 void shutdown_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
900   auto upstream = static_cast<Http2Upstream *>(w->data);
901   auto handler = upstream->get_client_handler();
902   upstream->submit_goaway();
903   handler->signal_write();
904 }
905 } // namespace
906 
907 namespace {
prepare_cb(struct ev_loop * loop,ev_prepare * w,int revents)908 void prepare_cb(struct ev_loop *loop, ev_prepare *w, int revents) {
909   auto upstream = static_cast<Http2Upstream *>(w->data);
910   upstream->check_shutdown();
911 }
912 } // namespace
913 
submit_goaway()914 void Http2Upstream::submit_goaway() {
915   auto last_stream_id = nghttp2_session_get_last_proc_stream_id(session_);
916   nghttp2_submit_goaway(session_, NGHTTP2_FLAG_NONE, last_stream_id,
917                         NGHTTP2_NO_ERROR, nullptr, 0);
918 }
919 
check_shutdown()920 void Http2Upstream::check_shutdown() {
921   auto worker = handler_->get_worker();
922 
923   if (!worker->get_graceful_shutdown()) {
924     return;
925   }
926 
927   ev_prepare_stop(handler_->get_loop(), &prep_);
928 
929   start_graceful_shutdown();
930 }
931 
start_graceful_shutdown()932 void Http2Upstream::start_graceful_shutdown() {
933   int rv;
934   if (ev_is_active(&shutdown_timer_)) {
935     return;
936   }
937 
938   rv = nghttp2_submit_shutdown_notice(session_);
939   if (rv != 0) {
940     ULOG(FATAL, this) << "nghttp2_submit_shutdown_notice() failed: "
941                       << nghttp2_strerror(rv);
942     return;
943   }
944 
945   handler_->signal_write();
946 
947   ev_timer_start(handler_->get_loop(), &shutdown_timer_);
948 }
949 
create_http2_upstream_callbacks()950 nghttp2_session_callbacks *create_http2_upstream_callbacks() {
951   int rv;
952   nghttp2_session_callbacks *callbacks;
953 
954   rv = nghttp2_session_callbacks_new(&callbacks);
955 
956   if (rv != 0) {
957     return nullptr;
958   }
959 
960   nghttp2_session_callbacks_set_on_stream_close_callback(
961       callbacks, on_stream_close_callback);
962 
963   nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks,
964                                                        on_frame_recv_callback);
965 
966   nghttp2_session_callbacks_set_on_data_chunk_recv_callback(
967       callbacks, on_data_chunk_recv_callback);
968 
969   nghttp2_session_callbacks_set_on_frame_send_callback(callbacks,
970                                                        on_frame_send_callback);
971 
972   nghttp2_session_callbacks_set_on_frame_not_send_callback(
973       callbacks, on_frame_not_send_callback);
974 
975   nghttp2_session_callbacks_set_on_header_callback2(callbacks,
976                                                     on_header_callback2);
977 
978   nghttp2_session_callbacks_set_on_invalid_header_callback2(
979       callbacks, on_invalid_header_callback2);
980 
981   nghttp2_session_callbacks_set_on_begin_headers_callback(
982       callbacks, on_begin_headers_callback);
983 
984   nghttp2_session_callbacks_set_send_data_callback(callbacks,
985                                                    send_data_callback);
986 
987   auto config = get_config();
988 
989   if (config->padding) {
990     nghttp2_session_callbacks_set_select_padding_callback(
991         callbacks, http::select_padding_callback);
992   }
993 
994   if (config->http2.upstream.debug.frame_debug) {
995     nghttp2_session_callbacks_set_error_callback2(callbacks,
996                                                   verbose_error_callback);
997   }
998 
999   return callbacks;
1000 }
1001 
1002 namespace {
downstream_queue_size(Worker * worker)1003 size_t downstream_queue_size(Worker *worker) {
1004   auto &downstreamconf = *worker->get_downstream_config();
1005 
1006   if (get_config()->http2_proxy) {
1007     return downstreamconf.connections_per_host;
1008   }
1009 
1010   return downstreamconf.connections_per_frontend;
1011 }
1012 } // namespace
1013 
Http2Upstream(ClientHandler * handler)1014 Http2Upstream::Http2Upstream(ClientHandler *handler)
1015     : wb_(handler->get_worker()->get_mcpool()),
1016       downstream_queue_(downstream_queue_size(handler->get_worker()),
1017                         !get_config()->http2_proxy),
1018       handler_(handler),
1019       session_(nullptr),
1020       max_buffer_size_(MAX_BUFFER_SIZE),
1021       num_requests_(0) {
1022   int rv;
1023 
1024   auto config = get_config();
1025   auto &http2conf = config->http2;
1026 
1027   auto faddr = handler_->get_upstream_addr();
1028 
1029   rv =
1030       nghttp2_session_server_new2(&session_, http2conf.upstream.callbacks, this,
1031                                   faddr->alt_mode != UpstreamAltMode::NONE
1032                                       ? http2conf.upstream.alt_mode_option
1033                                       : http2conf.upstream.option);
1034 
1035   assert(rv == 0);
1036 
1037   flow_control_ = true;
1038 
1039   // TODO Maybe call from outside?
1040   std::array<nghttp2_settings_entry, 4> entry;
1041   size_t nentry = 2;
1042 
1043   entry[0].settings_id = NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS;
1044   entry[0].value = http2conf.upstream.max_concurrent_streams;
1045 
1046   entry[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
1047   if (faddr->alt_mode != UpstreamAltMode::NONE) {
1048     entry[1].value = (1u << 31) - 1;
1049   } else {
1050     entry[1].value = http2conf.upstream.window_size;
1051   }
1052 
1053   if (!config->http2_proxy) {
1054     entry[nentry].settings_id = NGHTTP2_SETTINGS_ENABLE_CONNECT_PROTOCOL;
1055     entry[nentry].value = 1;
1056     ++nentry;
1057   }
1058 
1059   if (http2conf.upstream.decoder_dynamic_table_size !=
1060       NGHTTP2_DEFAULT_HEADER_TABLE_SIZE) {
1061     entry[nentry].settings_id = NGHTTP2_SETTINGS_HEADER_TABLE_SIZE;
1062     entry[nentry].value = http2conf.upstream.decoder_dynamic_table_size;
1063     ++nentry;
1064   }
1065 
1066   rv = nghttp2_submit_settings(session_, NGHTTP2_FLAG_NONE, entry.data(),
1067                                nentry);
1068   if (rv != 0) {
1069     ULOG(ERROR, this) << "nghttp2_submit_settings() returned error: "
1070                       << nghttp2_strerror(rv);
1071   }
1072 
1073   auto window_size =
1074       faddr->alt_mode != UpstreamAltMode::NONE
1075           ? std::numeric_limits<int32_t>::max()
1076           : http2conf.upstream.optimize_window_size
1077                 ? std::min(http2conf.upstream.connection_window_size,
1078                            NGHTTP2_INITIAL_CONNECTION_WINDOW_SIZE)
1079                 : http2conf.upstream.connection_window_size;
1080 
1081   rv = nghttp2_session_set_local_window_size(session_, NGHTTP2_FLAG_NONE, 0,
1082                                              window_size);
1083 
1084   if (rv != 0) {
1085     ULOG(ERROR, this)
1086         << "nghttp2_session_set_local_window_size() returned error: "
1087         << nghttp2_strerror(rv);
1088   }
1089 
1090   // We wait for SETTINGS ACK at least 10 seconds.
1091   ev_timer_init(&settings_timer_, settings_timeout_cb,
1092                 http2conf.upstream.timeout.settings, 0.);
1093 
1094   settings_timer_.data = this;
1095 
1096   // timer for 2nd GOAWAY.  HTTP/2 spec recommend 1 RTT.  We wait for
1097   // 2 seconds.
1098   ev_timer_init(&shutdown_timer_, shutdown_timeout_cb, 2., 0);
1099   shutdown_timer_.data = this;
1100 
1101   ev_prepare_init(&prep_, prepare_cb);
1102   prep_.data = this;
1103   ev_prepare_start(handler_->get_loop(), &prep_);
1104 
1105 #if defined(TCP_INFO) && defined(TCP_NOTSENT_LOWAT)
1106   if (http2conf.upstream.optimize_write_buffer_size) {
1107     auto conn = handler_->get_connection();
1108     conn->tls_dyn_rec_warmup_threshold = 0;
1109 
1110     uint32_t pollout_thres = 1;
1111     rv = setsockopt(conn->fd, IPPROTO_TCP, TCP_NOTSENT_LOWAT, &pollout_thres,
1112                     static_cast<socklen_t>(sizeof(pollout_thres)));
1113 
1114     if (rv != 0) {
1115       if (LOG_ENABLED(INFO)) {
1116         auto error = errno;
1117         LOG(INFO) << "setsockopt(TCP_NOTSENT_LOWAT, " << pollout_thres
1118                   << ") failed: errno=" << error;
1119       }
1120     }
1121   }
1122 #endif // defined(TCP_INFO) && defined(TCP_NOTSENT_LOWAT)
1123 
1124   handler_->reset_upstream_read_timeout(
1125       config->conn.upstream.timeout.http2_read);
1126 
1127   handler_->signal_write();
1128 }
1129 
~Http2Upstream()1130 Http2Upstream::~Http2Upstream() {
1131   nghttp2_session_del(session_);
1132   ev_prepare_stop(handler_->get_loop(), &prep_);
1133   ev_timer_stop(handler_->get_loop(), &shutdown_timer_);
1134   ev_timer_stop(handler_->get_loop(), &settings_timer_);
1135 }
1136 
on_read()1137 int Http2Upstream::on_read() {
1138   ssize_t rv = 0;
1139   auto rb = handler_->get_rb();
1140   auto rlimit = handler_->get_rlimit();
1141 
1142   if (rb->rleft()) {
1143     rv = nghttp2_session_mem_recv(session_, rb->pos(), rb->rleft());
1144     if (rv < 0) {
1145       if (rv != NGHTTP2_ERR_BAD_CLIENT_MAGIC) {
1146         ULOG(ERROR, this) << "nghttp2_session_mem_recv() returned error: "
1147                           << nghttp2_strerror(rv);
1148       }
1149       return -1;
1150     }
1151 
1152     // nghttp2_session_mem_recv should consume all input bytes on
1153     // success.
1154     assert(static_cast<size_t>(rv) == rb->rleft());
1155     rb->reset();
1156     rlimit->startw();
1157   }
1158 
1159   if (nghttp2_session_want_read(session_) == 0 &&
1160       nghttp2_session_want_write(session_) == 0 && wb_.rleft() == 0) {
1161     if (LOG_ENABLED(INFO)) {
1162       ULOG(INFO, this) << "No more read/write for this HTTP2 session";
1163     }
1164     return -1;
1165   }
1166 
1167   handler_->signal_write();
1168   return 0;
1169 }
1170 
1171 // After this function call, downstream may be deleted.
on_write()1172 int Http2Upstream::on_write() {
1173   int rv;
1174   auto config = get_config();
1175   auto &http2conf = config->http2;
1176 
1177   if ((http2conf.upstream.optimize_write_buffer_size ||
1178        http2conf.upstream.optimize_window_size) &&
1179       handler_->get_ssl()) {
1180     auto conn = handler_->get_connection();
1181     TCPHint hint;
1182     rv = conn->get_tcp_hint(&hint);
1183     if (rv == 0) {
1184       if (http2conf.upstream.optimize_write_buffer_size) {
1185         max_buffer_size_ = std::min(MAX_BUFFER_SIZE, hint.write_buffer_size);
1186       }
1187 
1188       if (http2conf.upstream.optimize_window_size) {
1189         auto faddr = handler_->get_upstream_addr();
1190         if (faddr->alt_mode == UpstreamAltMode::NONE) {
1191           auto window_size = std::min(http2conf.upstream.connection_window_size,
1192                                       static_cast<int32_t>(hint.rwin * 2));
1193 
1194           rv = nghttp2_session_set_local_window_size(
1195               session_, NGHTTP2_FLAG_NONE, 0, window_size);
1196           if (rv != 0) {
1197             if (LOG_ENABLED(INFO)) {
1198               ULOG(INFO, this)
1199                   << "nghttp2_session_set_local_window_size() with window_size="
1200                   << window_size << " failed: " << nghttp2_strerror(rv);
1201             }
1202           }
1203         }
1204       }
1205     }
1206   }
1207 
1208   for (;;) {
1209     if (wb_.rleft() >= max_buffer_size_) {
1210       return 0;
1211     }
1212 
1213     const uint8_t *data;
1214     auto datalen = nghttp2_session_mem_send(session_, &data);
1215 
1216     if (datalen < 0) {
1217       ULOG(ERROR, this) << "nghttp2_session_mem_send() returned error: "
1218                         << nghttp2_strerror(datalen);
1219       return -1;
1220     }
1221     if (datalen == 0) {
1222       break;
1223     }
1224     wb_.append(data, datalen);
1225   }
1226 
1227   if (nghttp2_session_want_read(session_) == 0 &&
1228       nghttp2_session_want_write(session_) == 0 && wb_.rleft() == 0) {
1229     if (LOG_ENABLED(INFO)) {
1230       ULOG(INFO, this) << "No more read/write for this HTTP2 session";
1231     }
1232     return -1;
1233   }
1234 
1235   return 0;
1236 }
1237 
get_client_handler() const1238 ClientHandler *Http2Upstream::get_client_handler() const { return handler_; }
1239 
downstream_read(DownstreamConnection * dconn)1240 int Http2Upstream::downstream_read(DownstreamConnection *dconn) {
1241   auto downstream = dconn->get_downstream();
1242 
1243   if (downstream->get_response_state() == DownstreamState::MSG_RESET) {
1244     // The downstream stream was reset (canceled). In this case,
1245     // RST_STREAM to the upstream and delete downstream connection
1246     // here. Deleting downstream will be taken place at
1247     // on_stream_close_callback.
1248     rst_stream(downstream,
1249                infer_upstream_rst_stream_error_code(
1250                    downstream->get_response_rst_stream_error_code()));
1251     downstream->pop_downstream_connection();
1252     // dconn was deleted
1253     dconn = nullptr;
1254   } else if (downstream->get_response_state() ==
1255              DownstreamState::MSG_BAD_HEADER) {
1256     if (error_reply(downstream, 502) != 0) {
1257       return -1;
1258     }
1259     downstream->pop_downstream_connection();
1260     // dconn was deleted
1261     dconn = nullptr;
1262   } else {
1263     auto rv = downstream->on_read();
1264     if (rv == SHRPX_ERR_EOF) {
1265       if (downstream->get_request_header_sent()) {
1266         return downstream_eof(dconn);
1267       }
1268       return SHRPX_ERR_RETRY;
1269     }
1270     if (rv == SHRPX_ERR_DCONN_CANCELED) {
1271       downstream->pop_downstream_connection();
1272       handler_->signal_write();
1273       return 0;
1274     }
1275     if (rv != 0) {
1276       if (rv != SHRPX_ERR_NETWORK) {
1277         if (LOG_ENABLED(INFO)) {
1278           DCLOG(INFO, dconn) << "HTTP parser failure";
1279         }
1280       }
1281       return downstream_error(dconn, Downstream::EVENT_ERROR);
1282     }
1283 
1284     if (downstream->can_detach_downstream_connection()) {
1285       // Keep-alive
1286       downstream->detach_downstream_connection();
1287     }
1288   }
1289 
1290   handler_->signal_write();
1291 
1292   // At this point, downstream may be deleted.
1293 
1294   return 0;
1295 }
1296 
downstream_write(DownstreamConnection * dconn)1297 int Http2Upstream::downstream_write(DownstreamConnection *dconn) {
1298   int rv;
1299   rv = dconn->on_write();
1300   if (rv == SHRPX_ERR_NETWORK) {
1301     return downstream_error(dconn, Downstream::EVENT_ERROR);
1302   }
1303   if (rv != 0) {
1304     return rv;
1305   }
1306   return 0;
1307 }
1308 
downstream_eof(DownstreamConnection * dconn)1309 int Http2Upstream::downstream_eof(DownstreamConnection *dconn) {
1310   auto downstream = dconn->get_downstream();
1311 
1312   if (LOG_ENABLED(INFO)) {
1313     DCLOG(INFO, dconn) << "EOF. stream_id=" << downstream->get_stream_id();
1314   }
1315 
1316   // Delete downstream connection. If we don't delete it here, it will
1317   // be pooled in on_stream_close_callback.
1318   downstream->pop_downstream_connection();
1319   // dconn was deleted
1320   dconn = nullptr;
1321   // downstream wil be deleted in on_stream_close_callback.
1322   if (downstream->get_response_state() == DownstreamState::HEADER_COMPLETE) {
1323     // Server may indicate the end of the request by EOF
1324     if (LOG_ENABLED(INFO)) {
1325       ULOG(INFO, this) << "Downstream body was ended by EOF";
1326     }
1327     downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1328 
1329     // For tunneled connection, MSG_COMPLETE signals
1330     // downstream_data_read_callback to send RST_STREAM after pending
1331     // response body is sent. This is needed to ensure that RST_STREAM
1332     // is sent after all pending data are sent.
1333     on_downstream_body_complete(downstream);
1334   } else if (downstream->get_response_state() !=
1335              DownstreamState::MSG_COMPLETE) {
1336     // If stream was not closed, then we set MSG_COMPLETE and let
1337     // on_stream_close_callback delete downstream.
1338     if (error_reply(downstream, 502) != 0) {
1339       return -1;
1340     }
1341   }
1342   handler_->signal_write();
1343   // At this point, downstream may be deleted.
1344   return 0;
1345 }
1346 
downstream_error(DownstreamConnection * dconn,int events)1347 int Http2Upstream::downstream_error(DownstreamConnection *dconn, int events) {
1348   auto downstream = dconn->get_downstream();
1349 
1350   if (LOG_ENABLED(INFO)) {
1351     if (events & Downstream::EVENT_ERROR) {
1352       DCLOG(INFO, dconn) << "Downstream network/general error";
1353     } else {
1354       DCLOG(INFO, dconn) << "Timeout";
1355     }
1356     if (downstream->get_upgraded()) {
1357       DCLOG(INFO, dconn) << "Note: this is tunnel connection";
1358     }
1359   }
1360 
1361   // Delete downstream connection. If we don't delete it here, it will
1362   // be pooled in on_stream_close_callback.
1363   downstream->pop_downstream_connection();
1364   // dconn was deleted
1365   dconn = nullptr;
1366 
1367   if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1368     // For SSL tunneling, we issue RST_STREAM. For other types of
1369     // stream, we don't have to do anything since response was
1370     // complete.
1371     if (downstream->get_upgraded()) {
1372       rst_stream(downstream, NGHTTP2_NO_ERROR);
1373     }
1374   } else {
1375     if (downstream->get_response_state() == DownstreamState::HEADER_COMPLETE) {
1376       if (downstream->get_upgraded()) {
1377         on_downstream_body_complete(downstream);
1378       } else {
1379         rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
1380       }
1381     } else {
1382       unsigned int status;
1383       if (events & Downstream::EVENT_TIMEOUT) {
1384         if (downstream->get_request_header_sent()) {
1385           status = 504;
1386         } else {
1387           status = 408;
1388         }
1389       } else {
1390         status = 502;
1391       }
1392       if (error_reply(downstream, status) != 0) {
1393         return -1;
1394       }
1395     }
1396     downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1397   }
1398   handler_->signal_write();
1399   // At this point, downstream may be deleted.
1400   return 0;
1401 }
1402 
rst_stream(Downstream * downstream,uint32_t error_code)1403 int Http2Upstream::rst_stream(Downstream *downstream, uint32_t error_code) {
1404   if (LOG_ENABLED(INFO)) {
1405     ULOG(INFO, this) << "RST_STREAM stream_id=" << downstream->get_stream_id()
1406                      << " with error_code=" << error_code;
1407   }
1408   int rv;
1409   rv = nghttp2_submit_rst_stream(session_, NGHTTP2_FLAG_NONE,
1410                                  downstream->get_stream_id(), error_code);
1411   if (rv < NGHTTP2_ERR_FATAL) {
1412     ULOG(FATAL, this) << "nghttp2_submit_rst_stream() failed: "
1413                       << nghttp2_strerror(rv);
1414     return -1;
1415   }
1416   return 0;
1417 }
1418 
terminate_session(uint32_t error_code)1419 int Http2Upstream::terminate_session(uint32_t error_code) {
1420   int rv;
1421   rv = nghttp2_session_terminate_session(session_, error_code);
1422   if (rv != 0) {
1423     return -1;
1424   }
1425   return 0;
1426 }
1427 
1428 namespace {
downstream_data_read_callback(nghttp2_session * session,int32_t stream_id,uint8_t * buf,size_t length,uint32_t * data_flags,nghttp2_data_source * source,void * user_data)1429 ssize_t downstream_data_read_callback(nghttp2_session *session,
1430                                       int32_t stream_id, uint8_t *buf,
1431                                       size_t length, uint32_t *data_flags,
1432                                       nghttp2_data_source *source,
1433                                       void *user_data) {
1434   int rv;
1435   auto downstream = static_cast<Downstream *>(source->ptr);
1436   auto body = downstream->get_response_buf();
1437   assert(body);
1438   auto upstream = static_cast<Http2Upstream *>(user_data);
1439 
1440   const auto &resp = downstream->response();
1441 
1442   auto nread = std::min(body->rleft(), length);
1443 
1444   auto max_buffer_size = upstream->get_max_buffer_size();
1445 
1446   auto buffer = upstream->get_response_buf();
1447 
1448   if (max_buffer_size <
1449       std::min(nread, static_cast<size_t>(256)) + 9 + buffer->rleft()) {
1450     if (LOG_ENABLED(INFO)) {
1451       ULOG(INFO, upstream) << "Buffer is almost full.  Skip write DATA";
1452     }
1453     return NGHTTP2_ERR_PAUSE;
1454   }
1455 
1456   nread = std::min(nread, max_buffer_size - 9 - buffer->rleft());
1457 
1458   auto body_empty = body->rleft() == nread;
1459 
1460   *data_flags |= NGHTTP2_DATA_FLAG_NO_COPY;
1461 
1462   if (body_empty &&
1463       downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1464 
1465     *data_flags |= NGHTTP2_DATA_FLAG_EOF;
1466 
1467     if (!downstream->get_upgraded()) {
1468       const auto &trailers = resp.fs.trailers();
1469       if (!trailers.empty()) {
1470         std::vector<nghttp2_nv> nva;
1471         nva.reserve(trailers.size());
1472         http2::copy_headers_to_nva_nocopy(nva, trailers, http2::HDOP_STRIP_ALL);
1473         if (!nva.empty()) {
1474           rv = nghttp2_submit_trailer(session, stream_id, nva.data(),
1475                                       nva.size());
1476           if (rv != 0) {
1477             if (nghttp2_is_fatal(rv)) {
1478               return NGHTTP2_ERR_CALLBACK_FAILURE;
1479             }
1480           } else {
1481             *data_flags |= NGHTTP2_DATA_FLAG_NO_END_STREAM;
1482           }
1483         }
1484       }
1485     }
1486   }
1487 
1488   if (nread == 0 && ((*data_flags) & NGHTTP2_DATA_FLAG_EOF) == 0) {
1489     downstream->disable_upstream_wtimer();
1490     return NGHTTP2_ERR_DEFERRED;
1491   }
1492 
1493   return nread;
1494 }
1495 } // namespace
1496 
send_reply(Downstream * downstream,const uint8_t * body,size_t bodylen)1497 int Http2Upstream::send_reply(Downstream *downstream, const uint8_t *body,
1498                               size_t bodylen) {
1499   int rv;
1500 
1501   nghttp2_data_provider data_prd, *data_prd_ptr = nullptr;
1502 
1503   if (bodylen) {
1504     data_prd.source.ptr = downstream;
1505     data_prd.read_callback = downstream_data_read_callback;
1506     data_prd_ptr = &data_prd;
1507   }
1508 
1509   const auto &resp = downstream->response();
1510   auto config = get_config();
1511   auto &httpconf = config->http;
1512 
1513   auto &balloc = downstream->get_block_allocator();
1514 
1515   const auto &headers = resp.fs.headers();
1516   auto nva = std::vector<nghttp2_nv>();
1517   // 2 for :status and server
1518   nva.reserve(2 + headers.size() + httpconf.add_response_headers.size());
1519 
1520   auto response_status = http2::stringify_status(balloc, resp.http_status);
1521 
1522   nva.push_back(http2::make_nv_ls_nocopy(":status", response_status));
1523 
1524   for (auto &kv : headers) {
1525     if (kv.name.empty() || kv.name[0] == ':') {
1526       continue;
1527     }
1528     switch (kv.token) {
1529     case http2::HD_CONNECTION:
1530     case http2::HD_KEEP_ALIVE:
1531     case http2::HD_PROXY_CONNECTION:
1532     case http2::HD_TE:
1533     case http2::HD_TRANSFER_ENCODING:
1534     case http2::HD_UPGRADE:
1535       continue;
1536     }
1537     nva.push_back(http2::make_nv_nocopy(kv.name, kv.value, kv.no_index));
1538   }
1539 
1540   if (!resp.fs.header(http2::HD_SERVER)) {
1541     nva.push_back(http2::make_nv_ls_nocopy("server", config->http.server_name));
1542   }
1543 
1544   for (auto &p : httpconf.add_response_headers) {
1545     nva.push_back(http2::make_nv_nocopy(p.name, p.value));
1546   }
1547 
1548   rv = nghttp2_submit_response(session_, downstream->get_stream_id(),
1549                                nva.data(), nva.size(), data_prd_ptr);
1550   if (nghttp2_is_fatal(rv)) {
1551     ULOG(FATAL, this) << "nghttp2_submit_response() failed: "
1552                       << nghttp2_strerror(rv);
1553     return -1;
1554   }
1555 
1556   auto buf = downstream->get_response_buf();
1557 
1558   buf->append(body, bodylen);
1559 
1560   downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1561 
1562   if (data_prd_ptr) {
1563     downstream->reset_upstream_wtimer();
1564   }
1565 
1566   return 0;
1567 }
1568 
error_reply(Downstream * downstream,unsigned int status_code)1569 int Http2Upstream::error_reply(Downstream *downstream,
1570                                unsigned int status_code) {
1571   int rv;
1572   auto &resp = downstream->response();
1573 
1574   auto &balloc = downstream->get_block_allocator();
1575 
1576   auto html = http::create_error_html(balloc, status_code);
1577   resp.http_status = status_code;
1578   auto body = downstream->get_response_buf();
1579   body->append(html);
1580   downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1581 
1582   nghttp2_data_provider data_prd;
1583   data_prd.source.ptr = downstream;
1584   data_prd.read_callback = downstream_data_read_callback;
1585 
1586   auto lgconf = log_config();
1587   lgconf->update_tstamp(std::chrono::system_clock::now());
1588 
1589   auto response_status = http2::stringify_status(balloc, status_code);
1590   auto content_length = util::make_string_ref_uint(balloc, html.size());
1591   auto date = make_string_ref(balloc, lgconf->tstamp->time_http);
1592 
1593   auto nva = std::array<nghttp2_nv, 5>{
1594       {http2::make_nv_ls_nocopy(":status", response_status),
1595        http2::make_nv_ll("content-type", "text/html; charset=UTF-8"),
1596        http2::make_nv_ls_nocopy("server", get_config()->http.server_name),
1597        http2::make_nv_ls_nocopy("content-length", content_length),
1598        http2::make_nv_ls_nocopy("date", date)}};
1599 
1600   rv = nghttp2_submit_response(session_, downstream->get_stream_id(),
1601                                nva.data(), nva.size(), &data_prd);
1602   if (rv < NGHTTP2_ERR_FATAL) {
1603     ULOG(FATAL, this) << "nghttp2_submit_response() failed: "
1604                       << nghttp2_strerror(rv);
1605     return -1;
1606   }
1607 
1608   downstream->reset_upstream_wtimer();
1609 
1610   return 0;
1611 }
1612 
add_pending_downstream(std::unique_ptr<Downstream> downstream)1613 void Http2Upstream::add_pending_downstream(
1614     std::unique_ptr<Downstream> downstream) {
1615   downstream_queue_.add_pending(std::move(downstream));
1616 }
1617 
remove_downstream(Downstream * downstream)1618 void Http2Upstream::remove_downstream(Downstream *downstream) {
1619   if (downstream->accesslog_ready()) {
1620     handler_->write_accesslog(downstream);
1621   }
1622 
1623   nghttp2_session_set_stream_user_data(session_, downstream->get_stream_id(),
1624                                        nullptr);
1625 
1626   auto next_downstream = downstream_queue_.remove_and_get_blocked(downstream);
1627 
1628   if (next_downstream) {
1629     initiate_downstream(next_downstream);
1630   }
1631 
1632   if (downstream_queue_.get_downstreams() == nullptr) {
1633     // There is no downstream at the moment.  Start idle timer now.
1634     handler_->repeat_read_timer();
1635   }
1636 }
1637 
1638 // WARNING: Never call directly or indirectly nghttp2_session_send or
1639 // nghttp2_session_recv. These calls may delete downstream.
on_downstream_header_complete(Downstream * downstream)1640 int Http2Upstream::on_downstream_header_complete(Downstream *downstream) {
1641   int rv;
1642 
1643   const auto &req = downstream->request();
1644   auto &resp = downstream->response();
1645 
1646   auto &balloc = downstream->get_block_allocator();
1647 
1648   if (LOG_ENABLED(INFO)) {
1649     if (downstream->get_non_final_response()) {
1650       DLOG(INFO, downstream) << "HTTP non-final response header";
1651     } else {
1652       DLOG(INFO, downstream) << "HTTP response header completed";
1653     }
1654   }
1655 
1656   auto config = get_config();
1657   auto &httpconf = config->http;
1658 
1659   if (!config->http2_proxy && !httpconf.no_location_rewrite) {
1660     downstream->rewrite_location_response_header(req.scheme);
1661   }
1662 
1663 #ifdef HAVE_MRUBY
1664   if (!downstream->get_non_final_response()) {
1665     auto dconn = downstream->get_downstream_connection();
1666     const auto &group = dconn->get_downstream_addr_group();
1667     if (group) {
1668       const auto &dmruby_ctx = group->shared_addr->mruby_ctx;
1669 
1670       if (dmruby_ctx->run_on_response_proc(downstream) != 0) {
1671         if (error_reply(downstream, 500) != 0) {
1672           return -1;
1673         }
1674         // Returning -1 will signal deletion of dconn.
1675         return -1;
1676       }
1677 
1678       if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1679         return -1;
1680       }
1681     }
1682 
1683     auto worker = handler_->get_worker();
1684     auto mruby_ctx = worker->get_mruby_context();
1685 
1686     if (mruby_ctx->run_on_response_proc(downstream) != 0) {
1687       if (error_reply(downstream, 500) != 0) {
1688         return -1;
1689       }
1690       // Returning -1 will signal deletion of dconn.
1691       return -1;
1692     }
1693 
1694     if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1695       return -1;
1696     }
1697   }
1698 #endif // HAVE_MRUBY
1699 
1700   auto &http2conf = config->http2;
1701 
1702   // We need some conditions that must be fulfilled to initiate server
1703   // push.
1704   //
1705   // * Server push is disabled for http2 proxy or client proxy, since
1706   //   incoming headers are mixed origins.  We don't know how to
1707   //   reliably determine the authority yet.
1708   //
1709   // * We need non-final response or 200 response code for associated
1710   //   resource.  This is too restrictive, we will review this later.
1711   //
1712   // * We requires GET or POST for associated resource.  Probably we
1713   //   don't want to push for HEAD request.  Not sure other methods
1714   //   are also eligible for push.
1715   if (!http2conf.no_server_push &&
1716       nghttp2_session_get_remote_settings(session_,
1717                                           NGHTTP2_SETTINGS_ENABLE_PUSH) == 1 &&
1718       !config->http2_proxy && (downstream->get_stream_id() % 2) &&
1719       resp.fs.header(http2::HD_LINK) &&
1720       (downstream->get_non_final_response() || resp.http_status == 200) &&
1721       (req.method == HTTP_GET || req.method == HTTP_POST)) {
1722 
1723     if (prepare_push_promise(downstream) != 0) {
1724       // Continue to send response even if push was failed.
1725     }
1726   }
1727 
1728   auto nva = std::vector<nghttp2_nv>();
1729   // 5 means :status and possible server, via, x-http2-push, and
1730   // set-cookie (for affinity cookie) header field.
1731   nva.reserve(resp.fs.headers().size() + 5 +
1732               httpconf.add_response_headers.size());
1733 
1734   if (downstream->get_non_final_response()) {
1735     auto response_status = http2::stringify_status(balloc, resp.http_status);
1736 
1737     nva.push_back(http2::make_nv_ls_nocopy(":status", response_status));
1738 
1739     http2::copy_headers_to_nva_nocopy(nva, resp.fs.headers(),
1740                                       http2::HDOP_STRIP_ALL);
1741 
1742     if (LOG_ENABLED(INFO)) {
1743       log_response_headers(downstream, nva);
1744     }
1745 
1746     rv = nghttp2_submit_headers(session_, NGHTTP2_FLAG_NONE,
1747                                 downstream->get_stream_id(), nullptr,
1748                                 nva.data(), nva.size(), nullptr);
1749 
1750     resp.fs.clear_headers();
1751 
1752     if (rv != 0) {
1753       ULOG(FATAL, this) << "nghttp2_submit_headers() failed";
1754       return -1;
1755     }
1756 
1757     return 0;
1758   }
1759 
1760   auto striphd_flags = http2::HDOP_STRIP_ALL & ~http2::HDOP_STRIP_VIA;
1761   StringRef response_status;
1762 
1763   if (req.connect_proto == ConnectProto::WEBSOCKET && resp.http_status == 101) {
1764     response_status = http2::stringify_status(balloc, 200);
1765     striphd_flags |= http2::HDOP_STRIP_SEC_WEBSOCKET_ACCEPT;
1766   } else {
1767     response_status = http2::stringify_status(balloc, resp.http_status);
1768   }
1769 
1770   nva.push_back(http2::make_nv_ls_nocopy(":status", response_status));
1771 
1772   http2::copy_headers_to_nva_nocopy(nva, resp.fs.headers(), striphd_flags);
1773 
1774   if (!config->http2_proxy && !httpconf.no_server_rewrite) {
1775     nva.push_back(http2::make_nv_ls_nocopy("server", httpconf.server_name));
1776   } else {
1777     auto server = resp.fs.header(http2::HD_SERVER);
1778     if (server) {
1779       nva.push_back(http2::make_nv_ls_nocopy("server", (*server).value));
1780     }
1781   }
1782 
1783   if (!req.regular_connect_method() || !downstream->get_upgraded()) {
1784     auto affinity_cookie = downstream->get_affinity_cookie_to_send();
1785     if (affinity_cookie) {
1786       auto dconn = downstream->get_downstream_connection();
1787       assert(dconn);
1788       auto &group = dconn->get_downstream_addr_group();
1789       auto &shared_addr = group->shared_addr;
1790       auto &cookieconf = shared_addr->affinity.cookie;
1791       auto secure =
1792           http::require_cookie_secure_attribute(cookieconf.secure, req.scheme);
1793       auto cookie_str = http::create_affinity_cookie(
1794           balloc, cookieconf.name, affinity_cookie, cookieconf.path, secure);
1795       nva.push_back(http2::make_nv_ls_nocopy("set-cookie", cookie_str));
1796     }
1797   }
1798 
1799   auto via = resp.fs.header(http2::HD_VIA);
1800   if (httpconf.no_via) {
1801     if (via) {
1802       nva.push_back(http2::make_nv_ls_nocopy("via", (*via).value));
1803     }
1804   } else {
1805     // we don't create more than 16 bytes in
1806     // http::create_via_header_value.
1807     size_t len = 16;
1808     if (via) {
1809       len += via->value.size() + 2;
1810     }
1811 
1812     auto iov = make_byte_ref(balloc, len + 1);
1813     auto p = iov.base;
1814     if (via) {
1815       p = std::copy(std::begin(via->value), std::end(via->value), p);
1816       p = util::copy_lit(p, ", ");
1817     }
1818     p = http::create_via_header_value(p, resp.http_major, resp.http_minor);
1819     *p = '\0';
1820 
1821     nva.push_back(http2::make_nv_ls_nocopy("via", StringRef{iov.base, p}));
1822   }
1823 
1824   for (auto &p : httpconf.add_response_headers) {
1825     nva.push_back(http2::make_nv_nocopy(p.name, p.value));
1826   }
1827 
1828   if (downstream->get_stream_id() % 2 == 0) {
1829     // This header field is basically for human on client side to
1830     // figure out that the resource is pushed.
1831     nva.push_back(http2::make_nv_ll("x-http2-push", "1"));
1832   }
1833 
1834   if (LOG_ENABLED(INFO)) {
1835     log_response_headers(downstream, nva);
1836   }
1837 
1838   if (http2conf.upstream.debug.dump.response_header) {
1839     http2::dump_nv(http2conf.upstream.debug.dump.response_header, nva.data(),
1840                    nva.size());
1841   }
1842 
1843   nghttp2_data_provider data_prd;
1844   data_prd.source.ptr = downstream;
1845   data_prd.read_callback = downstream_data_read_callback;
1846 
1847   nghttp2_data_provider *data_prdptr;
1848 
1849   if (downstream->expect_response_body() ||
1850       downstream->expect_response_trailer()) {
1851     data_prdptr = &data_prd;
1852   } else {
1853     data_prdptr = nullptr;
1854   }
1855 
1856   rv = nghttp2_submit_response(session_, downstream->get_stream_id(),
1857                                nva.data(), nva.size(), data_prdptr);
1858   if (rv != 0) {
1859     ULOG(FATAL, this) << "nghttp2_submit_response() failed";
1860     return -1;
1861   }
1862 
1863   if (data_prdptr) {
1864     downstream->reset_upstream_wtimer();
1865   }
1866 
1867   return 0;
1868 }
1869 
1870 // WARNING: Never call directly or indirectly nghttp2_session_send or
1871 // nghttp2_session_recv. These calls may delete downstream.
on_downstream_body(Downstream * downstream,const uint8_t * data,size_t len,bool flush)1872 int Http2Upstream::on_downstream_body(Downstream *downstream,
1873                                       const uint8_t *data, size_t len,
1874                                       bool flush) {
1875   auto body = downstream->get_response_buf();
1876   body->append(data, len);
1877 
1878   if (flush) {
1879     nghttp2_session_resume_data(session_, downstream->get_stream_id());
1880 
1881     downstream->ensure_upstream_wtimer();
1882   }
1883 
1884   return 0;
1885 }
1886 
1887 // WARNING: Never call directly or indirectly nghttp2_session_send or
1888 // nghttp2_session_recv. These calls may delete downstream.
on_downstream_body_complete(Downstream * downstream)1889 int Http2Upstream::on_downstream_body_complete(Downstream *downstream) {
1890   if (LOG_ENABLED(INFO)) {
1891     DLOG(INFO, downstream) << "HTTP response completed";
1892   }
1893 
1894   auto &resp = downstream->response();
1895 
1896   if (!downstream->validate_response_recv_body_length()) {
1897     rst_stream(downstream, NGHTTP2_PROTOCOL_ERROR);
1898     resp.connection_close = true;
1899     return 0;
1900   }
1901 
1902   nghttp2_session_resume_data(session_, downstream->get_stream_id());
1903   downstream->ensure_upstream_wtimer();
1904 
1905   return 0;
1906 }
1907 
get_flow_control() const1908 bool Http2Upstream::get_flow_control() const { return flow_control_; }
1909 
pause_read(IOCtrlReason reason)1910 void Http2Upstream::pause_read(IOCtrlReason reason) {}
1911 
resume_read(IOCtrlReason reason,Downstream * downstream,size_t consumed)1912 int Http2Upstream::resume_read(IOCtrlReason reason, Downstream *downstream,
1913                                size_t consumed) {
1914   if (get_flow_control()) {
1915     if (consume(downstream->get_stream_id(), consumed) != 0) {
1916       return -1;
1917     }
1918 
1919     auto &req = downstream->request();
1920 
1921     req.consume(consumed);
1922   }
1923 
1924   handler_->signal_write();
1925   return 0;
1926 }
1927 
on_downstream_abort_request(Downstream * downstream,unsigned int status_code)1928 int Http2Upstream::on_downstream_abort_request(Downstream *downstream,
1929                                                unsigned int status_code) {
1930   int rv;
1931 
1932   rv = error_reply(downstream, status_code);
1933 
1934   if (rv != 0) {
1935     return -1;
1936   }
1937 
1938   handler_->signal_write();
1939   return 0;
1940 }
1941 
on_downstream_abort_request_with_https_redirect(Downstream * downstream)1942 int Http2Upstream::on_downstream_abort_request_with_https_redirect(
1943     Downstream *downstream) {
1944   int rv;
1945 
1946   rv = redirect_to_https(downstream);
1947   if (rv != 0) {
1948     return -1;
1949   }
1950 
1951   handler_->signal_write();
1952   return 0;
1953 }
1954 
redirect_to_https(Downstream * downstream)1955 int Http2Upstream::redirect_to_https(Downstream *downstream) {
1956   auto &req = downstream->request();
1957   if (req.regular_connect_method() || req.scheme != "http") {
1958     return error_reply(downstream, 400);
1959   }
1960 
1961   auto authority = util::extract_host(req.authority);
1962   if (authority.empty()) {
1963     return error_reply(downstream, 400);
1964   }
1965 
1966   auto &balloc = downstream->get_block_allocator();
1967   auto config = get_config();
1968   auto &httpconf = config->http;
1969 
1970   StringRef loc;
1971   if (httpconf.redirect_https_port == StringRef::from_lit("443")) {
1972     loc = concat_string_ref(balloc, StringRef::from_lit("https://"), authority,
1973                             req.path);
1974   } else {
1975     loc = concat_string_ref(balloc, StringRef::from_lit("https://"), authority,
1976                             StringRef::from_lit(":"),
1977                             httpconf.redirect_https_port, req.path);
1978   }
1979 
1980   auto &resp = downstream->response();
1981   resp.http_status = 308;
1982   resp.fs.add_header_token(StringRef::from_lit("location"), loc, false,
1983                            http2::HD_LOCATION);
1984 
1985   return send_reply(downstream, nullptr, 0);
1986 }
1987 
consume(int32_t stream_id,size_t len)1988 int Http2Upstream::consume(int32_t stream_id, size_t len) {
1989   int rv;
1990 
1991   auto faddr = handler_->get_upstream_addr();
1992 
1993   if (faddr->alt_mode != UpstreamAltMode::NONE) {
1994     return 0;
1995   }
1996 
1997   rv = nghttp2_session_consume(session_, stream_id, len);
1998 
1999   if (rv != 0) {
2000     ULOG(WARN, this) << "nghttp2_session_consume() returned error: "
2001                      << nghttp2_strerror(rv);
2002     return -1;
2003   }
2004 
2005   return 0;
2006 }
2007 
log_response_headers(Downstream * downstream,const std::vector<nghttp2_nv> & nva) const2008 void Http2Upstream::log_response_headers(
2009     Downstream *downstream, const std::vector<nghttp2_nv> &nva) const {
2010   std::stringstream ss;
2011   for (auto &nv : nva) {
2012     ss << TTY_HTTP_HD << StringRef{nv.name, nv.namelen} << TTY_RST << ": "
2013        << StringRef{nv.value, nv.valuelen} << "\n";
2014   }
2015   ULOG(INFO, this) << "HTTP response headers. stream_id="
2016                    << downstream->get_stream_id() << "\n"
2017                    << ss.str();
2018 }
2019 
on_timeout(Downstream * downstream)2020 int Http2Upstream::on_timeout(Downstream *downstream) {
2021   if (LOG_ENABLED(INFO)) {
2022     ULOG(INFO, this) << "Stream timeout stream_id="
2023                      << downstream->get_stream_id();
2024   }
2025 
2026   rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
2027   handler_->signal_write();
2028 
2029   return 0;
2030 }
2031 
on_handler_delete()2032 void Http2Upstream::on_handler_delete() {
2033   for (auto d = downstream_queue_.get_downstreams(); d; d = d->dlnext) {
2034     if (d->get_dispatch_state() == DispatchState::ACTIVE &&
2035         d->accesslog_ready()) {
2036       handler_->write_accesslog(d);
2037     }
2038   }
2039 }
2040 
on_downstream_reset(Downstream * downstream,bool no_retry)2041 int Http2Upstream::on_downstream_reset(Downstream *downstream, bool no_retry) {
2042   int rv;
2043 
2044   if (downstream->get_dispatch_state() != DispatchState::ACTIVE) {
2045     // This is error condition when we failed push_request_headers()
2046     // in initiate_downstream().  Otherwise, we have
2047     // DispatchState::ACTIVE state, or we did not set
2048     // DownstreamConnection.
2049     downstream->pop_downstream_connection();
2050     handler_->signal_write();
2051 
2052     return 0;
2053   }
2054 
2055   if (!downstream->request_submission_ready()) {
2056     if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
2057       // We have got all response body already.  Send it off.
2058       downstream->pop_downstream_connection();
2059       return 0;
2060     }
2061     // pushed stream is handled here
2062     rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
2063     downstream->pop_downstream_connection();
2064 
2065     handler_->signal_write();
2066 
2067     return 0;
2068   }
2069 
2070   downstream->pop_downstream_connection();
2071 
2072   downstream->add_retry();
2073 
2074   std::unique_ptr<DownstreamConnection> dconn;
2075 
2076   rv = 0;
2077 
2078   if (no_retry || downstream->no_more_retry()) {
2079     goto fail;
2080   }
2081 
2082   // downstream connection is clean; we can retry with new
2083   // downstream connection.
2084 
2085   for (;;) {
2086     auto dconn = handler_->get_downstream_connection(rv, downstream);
2087     if (!dconn) {
2088       goto fail;
2089     }
2090 
2091     rv = downstream->attach_downstream_connection(std::move(dconn));
2092     if (rv == 0) {
2093       break;
2094     }
2095   }
2096 
2097   rv = downstream->push_request_headers();
2098   if (rv != 0) {
2099     goto fail;
2100   }
2101 
2102   return 0;
2103 
2104 fail:
2105   if (rv == SHRPX_ERR_TLS_REQUIRED) {
2106     rv = on_downstream_abort_request_with_https_redirect(downstream);
2107   } else {
2108     rv = on_downstream_abort_request(downstream, 502);
2109   }
2110   if (rv != 0) {
2111     rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
2112   }
2113   downstream->pop_downstream_connection();
2114 
2115   handler_->signal_write();
2116 
2117   return 0;
2118 }
2119 
prepare_push_promise(Downstream * downstream)2120 int Http2Upstream::prepare_push_promise(Downstream *downstream) {
2121   int rv;
2122 
2123   const auto &req = downstream->request();
2124   auto &resp = downstream->response();
2125 
2126   auto base = http2::get_pure_path_component(req.path);
2127   if (base.empty()) {
2128     return 0;
2129   }
2130 
2131   auto &balloc = downstream->get_block_allocator();
2132 
2133   for (auto &kv : resp.fs.headers()) {
2134     if (kv.token != http2::HD_LINK) {
2135       continue;
2136     }
2137     for (auto &link : http2::parse_link_header(kv.value)) {
2138       StringRef scheme, authority, path;
2139 
2140       rv = http2::construct_push_component(balloc, scheme, authority, path,
2141                                            base, link.uri);
2142       if (rv != 0) {
2143         continue;
2144       }
2145 
2146       if (scheme.empty()) {
2147         scheme = req.scheme;
2148       }
2149 
2150       if (authority.empty()) {
2151         authority = req.authority;
2152       }
2153 
2154       if (resp.is_resource_pushed(scheme, authority, path)) {
2155         continue;
2156       }
2157 
2158       rv = submit_push_promise(scheme, authority, path, downstream);
2159       if (rv != 0) {
2160         return -1;
2161       }
2162 
2163       resp.resource_pushed(scheme, authority, path);
2164     }
2165   }
2166   return 0;
2167 }
2168 
submit_push_promise(const StringRef & scheme,const StringRef & authority,const StringRef & path,Downstream * downstream)2169 int Http2Upstream::submit_push_promise(const StringRef &scheme,
2170                                        const StringRef &authority,
2171                                        const StringRef &path,
2172                                        Downstream *downstream) {
2173   const auto &req = downstream->request();
2174 
2175   std::vector<nghttp2_nv> nva;
2176   // 4 for :method, :scheme, :path and :authority
2177   nva.reserve(4 + req.fs.headers().size());
2178 
2179   // juse use "GET" for now
2180   nva.push_back(http2::make_nv_ll(":method", "GET"));
2181   nva.push_back(http2::make_nv_ls_nocopy(":scheme", scheme));
2182   nva.push_back(http2::make_nv_ls_nocopy(":path", path));
2183   nva.push_back(http2::make_nv_ls_nocopy(":authority", authority));
2184 
2185   for (auto &kv : req.fs.headers()) {
2186     switch (kv.token) {
2187     // TODO generate referer
2188     case http2::HD__AUTHORITY:
2189     case http2::HD__SCHEME:
2190     case http2::HD__METHOD:
2191     case http2::HD__PATH:
2192       continue;
2193     case http2::HD_ACCEPT_ENCODING:
2194     case http2::HD_ACCEPT_LANGUAGE:
2195     case http2::HD_CACHE_CONTROL:
2196     case http2::HD_HOST:
2197     case http2::HD_USER_AGENT:
2198       nva.push_back(http2::make_nv_nocopy(kv.name, kv.value, kv.no_index));
2199       break;
2200     }
2201   }
2202 
2203   auto promised_stream_id = nghttp2_submit_push_promise(
2204       session_, NGHTTP2_FLAG_NONE, downstream->get_stream_id(), nva.data(),
2205       nva.size(), nullptr);
2206 
2207   if (promised_stream_id < 0) {
2208     if (LOG_ENABLED(INFO)) {
2209       ULOG(INFO, this) << "nghttp2_submit_push_promise() failed: "
2210                        << nghttp2_strerror(promised_stream_id);
2211     }
2212     if (nghttp2_is_fatal(promised_stream_id)) {
2213       return -1;
2214     }
2215     return 0;
2216   }
2217 
2218   if (LOG_ENABLED(INFO)) {
2219     std::stringstream ss;
2220     for (auto &nv : nva) {
2221       ss << TTY_HTTP_HD << StringRef{nv.name, nv.namelen} << TTY_RST << ": "
2222          << StringRef{nv.value, nv.valuelen} << "\n";
2223     }
2224     ULOG(INFO, this) << "HTTP push request headers. promised_stream_id="
2225                      << promised_stream_id << "\n"
2226                      << ss.str();
2227   }
2228 
2229   return 0;
2230 }
2231 
push_enabled() const2232 bool Http2Upstream::push_enabled() const {
2233   auto config = get_config();
2234   return !(config->http2.no_server_push ||
2235            nghttp2_session_get_remote_settings(
2236                session_, NGHTTP2_SETTINGS_ENABLE_PUSH) == 0 ||
2237            config->http2_proxy);
2238 }
2239 
initiate_push(Downstream * downstream,const StringRef & uri)2240 int Http2Upstream::initiate_push(Downstream *downstream, const StringRef &uri) {
2241   int rv;
2242 
2243   if (uri.empty() || !push_enabled() ||
2244       (downstream->get_stream_id() % 2) == 0) {
2245     return 0;
2246   }
2247 
2248   const auto &req = downstream->request();
2249 
2250   auto base = http2::get_pure_path_component(req.path);
2251   if (base.empty()) {
2252     return -1;
2253   }
2254 
2255   auto &balloc = downstream->get_block_allocator();
2256 
2257   StringRef scheme, authority, path;
2258 
2259   rv = http2::construct_push_component(balloc, scheme, authority, path, base,
2260                                        uri);
2261   if (rv != 0) {
2262     return -1;
2263   }
2264 
2265   if (scheme.empty()) {
2266     scheme = req.scheme;
2267   }
2268 
2269   if (authority.empty()) {
2270     authority = req.authority;
2271   }
2272 
2273   auto &resp = downstream->response();
2274 
2275   if (resp.is_resource_pushed(scheme, authority, path)) {
2276     return 0;
2277   }
2278 
2279   rv = submit_push_promise(scheme, authority, path, downstream);
2280 
2281   if (rv != 0) {
2282     return -1;
2283   }
2284 
2285   resp.resource_pushed(scheme, authority, path);
2286 
2287   return 0;
2288 }
2289 
response_riovec(struct iovec * iov,int iovcnt) const2290 int Http2Upstream::response_riovec(struct iovec *iov, int iovcnt) const {
2291   if (iovcnt == 0 || wb_.rleft() == 0) {
2292     return 0;
2293   }
2294 
2295   return wb_.riovec(iov, iovcnt);
2296 }
2297 
response_drain(size_t n)2298 void Http2Upstream::response_drain(size_t n) { wb_.drain(n); }
2299 
response_empty() const2300 bool Http2Upstream::response_empty() const { return wb_.rleft() == 0; }
2301 
get_response_buf()2302 DefaultMemchunks *Http2Upstream::get_response_buf() { return &wb_; }
2303 
2304 Downstream *
on_downstream_push_promise(Downstream * downstream,int32_t promised_stream_id)2305 Http2Upstream::on_downstream_push_promise(Downstream *downstream,
2306                                           int32_t promised_stream_id) {
2307   // promised_stream_id is for backend HTTP/2 session, not for
2308   // frontend.
2309   auto promised_downstream =
2310       std::make_unique<Downstream>(this, handler_->get_mcpool(), 0);
2311   auto &promised_req = promised_downstream->request();
2312 
2313   promised_downstream->set_downstream_stream_id(promised_stream_id);
2314   // Set associated stream in frontend
2315   promised_downstream->set_assoc_stream_id(downstream->get_stream_id());
2316 
2317   promised_downstream->disable_upstream_rtimer();
2318 
2319   promised_req.http_major = 2;
2320   promised_req.http_minor = 0;
2321 
2322   promised_req.fs.content_length = 0;
2323   promised_req.http2_expect_body = false;
2324 
2325   auto ptr = promised_downstream.get();
2326   add_pending_downstream(std::move(promised_downstream));
2327   downstream_queue_.mark_active(ptr);
2328 
2329   return ptr;
2330 }
2331 
on_downstream_push_promise_complete(Downstream * downstream,Downstream * promised_downstream)2332 int Http2Upstream::on_downstream_push_promise_complete(
2333     Downstream *downstream, Downstream *promised_downstream) {
2334   std::vector<nghttp2_nv> nva;
2335 
2336   const auto &promised_req = promised_downstream->request();
2337   const auto &headers = promised_req.fs.headers();
2338 
2339   nva.reserve(headers.size());
2340 
2341   for (auto &kv : headers) {
2342     nva.push_back(http2::make_nv(kv.name, kv.value, kv.no_index));
2343   }
2344 
2345   auto promised_stream_id = nghttp2_submit_push_promise(
2346       session_, NGHTTP2_FLAG_NONE, downstream->get_stream_id(), nva.data(),
2347       nva.size(), promised_downstream);
2348   if (promised_stream_id < 0) {
2349     return -1;
2350   }
2351 
2352   promised_downstream->set_stream_id(promised_stream_id);
2353 
2354   return 0;
2355 }
2356 
cancel_premature_downstream(Downstream * promised_downstream)2357 void Http2Upstream::cancel_premature_downstream(
2358     Downstream *promised_downstream) {
2359   if (LOG_ENABLED(INFO)) {
2360     ULOG(INFO, this) << "Remove premature promised stream "
2361                      << promised_downstream;
2362   }
2363   downstream_queue_.remove_and_get_blocked(promised_downstream, false);
2364 }
2365 
get_max_buffer_size() const2366 size_t Http2Upstream::get_max_buffer_size() const { return max_buffer_size_; }
2367 
2368 } // namespace shrpx
2369