• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * nghttp2 - HTTP/2 C Library
3  *
4  * Copyright (c) 2021 Tatsuhiro Tsujikawa
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining
7  * a copy of this software and associated documentation files (the
8  * "Software"), to deal in the Software without restriction, including
9  * without limitation the rights to use, copy, modify, merge, publish,
10  * distribute, sublicense, and/or sell copies of the Software, and to
11  * permit persons to whom the Software is furnished to do so, subject to
12  * the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be
15  * included in all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21  * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22  * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23  * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24  */
25 #include "shrpx_http3_upstream.h"
26 
27 #include <sys/types.h>
28 #include <sys/stat.h>
29 #include <fcntl.h>
30 #include <netinet/udp.h>
31 
32 #include <cstdio>
33 
34 #include <ngtcp2/ngtcp2_crypto.h>
35 
36 #include "shrpx_client_handler.h"
37 #include "shrpx_downstream.h"
38 #include "shrpx_downstream_connection.h"
39 #include "shrpx_log.h"
40 #include "shrpx_quic.h"
41 #include "shrpx_worker.h"
42 #include "shrpx_http.h"
43 #include "shrpx_connection_handler.h"
44 #ifdef HAVE_MRUBY
45 #  include "shrpx_mruby.h"
46 #endif // HAVE_MRUBY
47 #include "http3.h"
48 #include "util.h"
49 
50 namespace shrpx {
51 
52 namespace {
timeoutcb(struct ev_loop * loop,ev_timer * w,int revents)53 void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
54   auto upstream = static_cast<Http3Upstream *>(w->data);
55 
56   if (upstream->handle_expiry() != 0 || upstream->on_write() != 0) {
57     goto fail;
58   }
59 
60   return;
61 
62 fail:
63   auto handler = upstream->get_client_handler();
64 
65   delete handler;
66 }
67 } // namespace
68 
69 namespace {
shutdown_timeout_cb(struct ev_loop * loop,ev_timer * w,int revents)70 void shutdown_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
71   auto upstream = static_cast<Http3Upstream *>(w->data);
72   auto handler = upstream->get_client_handler();
73 
74   if (upstream->submit_goaway() != 0) {
75     delete handler;
76   }
77 }
78 } // namespace
79 
80 namespace {
prepare_cb(struct ev_loop * loop,ev_prepare * w,int revent)81 void prepare_cb(struct ev_loop *loop, ev_prepare *w, int revent) {
82   auto upstream = static_cast<Http3Upstream *>(w->data);
83   auto handler = upstream->get_client_handler();
84 
85   if (upstream->check_shutdown() != 0) {
86     delete handler;
87   }
88 }
89 } // namespace
90 
91 namespace {
downstream_queue_size(Worker * worker)92 size_t downstream_queue_size(Worker *worker) {
93   auto &downstreamconf = *worker->get_downstream_config();
94 
95   if (get_config()->http2_proxy) {
96     return downstreamconf.connections_per_host;
97   }
98 
99   return downstreamconf.connections_per_frontend;
100 }
101 } // namespace
102 
103 namespace {
get_conn(ngtcp2_crypto_conn_ref * conn_ref)104 ngtcp2_conn *get_conn(ngtcp2_crypto_conn_ref *conn_ref) {
105   auto conn = static_cast<Connection *>(conn_ref->user_data);
106   auto handler = static_cast<ClientHandler *>(conn->data);
107   auto upstream = static_cast<Http3Upstream *>(handler->get_upstream());
108   return upstream->get_conn();
109 }
110 } // namespace
111 
Http3Upstream(ClientHandler * handler)112 Http3Upstream::Http3Upstream(ClientHandler *handler)
113     : handler_{handler},
114       qlog_fd_{-1},
115       hashed_scid_{},
116       conn_{nullptr},
117       httpconn_{nullptr},
118       downstream_queue_{downstream_queue_size(handler->get_worker()),
119                         !get_config()->http2_proxy},
120       retry_close_{false},
121       tx_{
122           .data = std::unique_ptr<uint8_t[]>(new uint8_t[64_k]),
123       } {
124   auto conn = handler_->get_connection();
125   conn->conn_ref.get_conn = shrpx::get_conn;
126 
127   ev_timer_init(&timer_, timeoutcb, 0., 0.);
128   timer_.data = this;
129 
130   ngtcp2_ccerr_default(&last_error_);
131 
132   ev_timer_init(&shutdown_timer_, shutdown_timeout_cb, 0., 0.);
133   shutdown_timer_.data = this;
134 
135   ev_prepare_init(&prep_, prepare_cb);
136   prep_.data = this;
137   ev_prepare_start(handler_->get_loop(), &prep_);
138 }
139 
~Http3Upstream()140 Http3Upstream::~Http3Upstream() {
141   auto loop = handler_->get_loop();
142 
143   ev_prepare_stop(loop, &prep_);
144   ev_timer_stop(loop, &shutdown_timer_);
145   ev_timer_stop(loop, &timer_);
146 
147   nghttp3_conn_del(httpconn_);
148 
149   ngtcp2_conn_del(conn_);
150 
151   if (qlog_fd_ != -1) {
152     close(qlog_fd_);
153   }
154 }
155 
156 namespace {
log_printf(void * user_data,const char * fmt,...)157 void log_printf(void *user_data, const char *fmt, ...) {
158   va_list ap;
159   std::array<char, 4096> buf;
160 
161   va_start(ap, fmt);
162   auto nwrite = vsnprintf(buf.data(), buf.size(), fmt, ap);
163   va_end(ap);
164 
165   if (static_cast<size_t>(nwrite) >= buf.size()) {
166     nwrite = buf.size() - 1;
167   }
168 
169   buf[nwrite++] = '\n';
170 
171   while (write(fileno(stderr), buf.data(), nwrite) == -1 && errno == EINTR)
172     ;
173 }
174 } // namespace
175 
176 namespace {
qlog_write(void * user_data,uint32_t flags,const void * data,size_t datalen)177 void qlog_write(void *user_data, uint32_t flags, const void *data,
178                 size_t datalen) {
179   auto upstream = static_cast<Http3Upstream *>(user_data);
180 
181   upstream->qlog_write(data, datalen, flags & NGTCP2_QLOG_WRITE_FLAG_FIN);
182 }
183 } // namespace
184 
qlog_write(const void * data,size_t datalen,bool fin)185 void Http3Upstream::qlog_write(const void *data, size_t datalen, bool fin) {
186   assert(qlog_fd_ != -1);
187 
188   while (write(qlog_fd_, data, datalen) == -1 && errno == EINTR)
189     ;
190 
191   if (fin) {
192     close(qlog_fd_);
193     qlog_fd_ = -1;
194   }
195 }
196 
197 namespace {
rand(uint8_t * dest,size_t destlen,const ngtcp2_rand_ctx * rand_ctx)198 void rand(uint8_t *dest, size_t destlen, const ngtcp2_rand_ctx *rand_ctx) {
199   util::random_bytes(dest, dest + destlen,
200                      *static_cast<std::mt19937 *>(rand_ctx->native_handle));
201 }
202 } // namespace
203 
204 namespace {
get_new_connection_id(ngtcp2_conn * conn,ngtcp2_cid * cid,uint8_t * token,size_t cidlen,void * user_data)205 int get_new_connection_id(ngtcp2_conn *conn, ngtcp2_cid *cid, uint8_t *token,
206                           size_t cidlen, void *user_data) {
207   auto upstream = static_cast<Http3Upstream *>(user_data);
208   auto handler = upstream->get_client_handler();
209   auto worker = handler->get_worker();
210   auto conn_handler = worker->get_connection_handler();
211   auto &qkms = conn_handler->get_quic_keying_materials();
212   auto &qkm = qkms->keying_materials.front();
213 
214   if (generate_quic_connection_id(*cid, cidlen, worker->get_cid_prefix(),
215                                   qkm.id, qkm.cid_encryption_key.data()) != 0) {
216     return NGTCP2_ERR_CALLBACK_FAILURE;
217   }
218 
219   if (generate_quic_stateless_reset_token(token, *cid, qkm.secret.data(),
220                                           qkm.secret.size()) != 0) {
221     return NGTCP2_ERR_CALLBACK_FAILURE;
222   }
223 
224   auto quic_connection_handler = worker->get_quic_connection_handler();
225 
226   quic_connection_handler->add_connection_id(*cid, handler);
227 
228   return 0;
229 }
230 } // namespace
231 
232 namespace {
remove_connection_id(ngtcp2_conn * conn,const ngtcp2_cid * cid,void * user_data)233 int remove_connection_id(ngtcp2_conn *conn, const ngtcp2_cid *cid,
234                          void *user_data) {
235   auto upstream = static_cast<Http3Upstream *>(user_data);
236   auto handler = upstream->get_client_handler();
237   auto worker = handler->get_worker();
238   auto quic_conn_handler = worker->get_quic_connection_handler();
239 
240   quic_conn_handler->remove_connection_id(*cid);
241 
242   return 0;
243 }
244 } // namespace
245 
http_begin_request_headers(int64_t stream_id)246 void Http3Upstream::http_begin_request_headers(int64_t stream_id) {
247   auto downstream =
248       std::make_unique<Downstream>(this, handler_->get_mcpool(), stream_id);
249   nghttp3_conn_set_stream_user_data(httpconn_, stream_id, downstream.get());
250 
251   downstream->reset_upstream_rtimer();
252 
253   handler_->repeat_read_timer();
254 
255   auto &req = downstream->request();
256   req.http_major = 3;
257   req.http_minor = 0;
258 
259   add_pending_downstream(std::move(downstream));
260 }
261 
add_pending_downstream(std::unique_ptr<Downstream> downstream)262 void Http3Upstream::add_pending_downstream(
263     std::unique_ptr<Downstream> downstream) {
264   downstream_queue_.add_pending(std::move(downstream));
265 }
266 
267 namespace {
recv_stream_data(ngtcp2_conn * conn,uint32_t flags,int64_t stream_id,uint64_t offset,const uint8_t * data,size_t datalen,void * user_data,void * stream_user_data)268 int recv_stream_data(ngtcp2_conn *conn, uint32_t flags, int64_t stream_id,
269                      uint64_t offset, const uint8_t *data, size_t datalen,
270                      void *user_data, void *stream_user_data) {
271   auto upstream = static_cast<Http3Upstream *>(user_data);
272 
273   if (upstream->recv_stream_data(flags, stream_id, data, datalen) != 0) {
274     return NGTCP2_ERR_CALLBACK_FAILURE;
275   }
276 
277   return 0;
278 }
279 } // namespace
280 
recv_stream_data(uint32_t flags,int64_t stream_id,const uint8_t * data,size_t datalen)281 int Http3Upstream::recv_stream_data(uint32_t flags, int64_t stream_id,
282                                     const uint8_t *data, size_t datalen) {
283   assert(httpconn_);
284 
285   auto nconsumed = nghttp3_conn_read_stream(
286       httpconn_, stream_id, data, datalen, flags & NGTCP2_STREAM_DATA_FLAG_FIN);
287   if (nconsumed < 0) {
288     ULOG(ERROR, this) << "nghttp3_conn_read_stream: "
289                       << nghttp3_strerror(nconsumed);
290     ngtcp2_ccerr_set_application_error(
291         &last_error_, nghttp3_err_infer_quic_app_error_code(nconsumed), nullptr,
292         0);
293     return -1;
294   }
295 
296   ngtcp2_conn_extend_max_stream_offset(conn_, stream_id, nconsumed);
297   ngtcp2_conn_extend_max_offset(conn_, nconsumed);
298 
299   return 0;
300 }
301 
302 namespace {
stream_close(ngtcp2_conn * conn,uint32_t flags,int64_t stream_id,uint64_t app_error_code,void * user_data,void * stream_user_data)303 int stream_close(ngtcp2_conn *conn, uint32_t flags, int64_t stream_id,
304                  uint64_t app_error_code, void *user_data,
305                  void *stream_user_data) {
306   auto upstream = static_cast<Http3Upstream *>(user_data);
307 
308   if (!(flags & NGTCP2_STREAM_CLOSE_FLAG_APP_ERROR_CODE_SET)) {
309     app_error_code = NGHTTP3_H3_NO_ERROR;
310   }
311 
312   if (upstream->stream_close(stream_id, app_error_code) != 0) {
313     return NGTCP2_ERR_CALLBACK_FAILURE;
314   }
315 
316   return 0;
317 }
318 } // namespace
319 
stream_close(int64_t stream_id,uint64_t app_error_code)320 int Http3Upstream::stream_close(int64_t stream_id, uint64_t app_error_code) {
321   if (!httpconn_) {
322     return 0;
323   }
324 
325   auto rv = nghttp3_conn_close_stream(httpconn_, stream_id, app_error_code);
326   switch (rv) {
327   case 0:
328     break;
329   case NGHTTP3_ERR_STREAM_NOT_FOUND:
330     if (ngtcp2_is_bidi_stream(stream_id)) {
331       ngtcp2_conn_extend_max_streams_bidi(conn_, 1);
332     }
333     break;
334   default:
335     ULOG(ERROR, this) << "nghttp3_conn_close_stream: " << nghttp3_strerror(rv);
336     ngtcp2_ccerr_set_application_error(
337         &last_error_, nghttp3_err_infer_quic_app_error_code(rv), nullptr, 0);
338     return -1;
339   }
340 
341   return 0;
342 }
343 
344 namespace {
acked_stream_data_offset(ngtcp2_conn * conn,int64_t stream_id,uint64_t offset,uint64_t datalen,void * user_data,void * stream_user_data)345 int acked_stream_data_offset(ngtcp2_conn *conn, int64_t stream_id,
346                              uint64_t offset, uint64_t datalen, void *user_data,
347                              void *stream_user_data) {
348   auto upstream = static_cast<Http3Upstream *>(user_data);
349 
350   if (upstream->acked_stream_data_offset(stream_id, datalen) != 0) {
351     return NGTCP2_ERR_CALLBACK_FAILURE;
352   }
353 
354   return 0;
355 }
356 } // namespace
357 
acked_stream_data_offset(int64_t stream_id,uint64_t datalen)358 int Http3Upstream::acked_stream_data_offset(int64_t stream_id,
359                                             uint64_t datalen) {
360   if (!httpconn_) {
361     return 0;
362   }
363 
364   auto rv = nghttp3_conn_add_ack_offset(httpconn_, stream_id, datalen);
365   if (rv != 0) {
366     ULOG(ERROR, this) << "nghttp3_conn_add_ack_offset: "
367                       << nghttp3_strerror(rv);
368     return -1;
369   }
370 
371   return 0;
372 }
373 
374 namespace {
extend_max_stream_data(ngtcp2_conn * conn,int64_t stream_id,uint64_t max_data,void * user_data,void * stream_user_data)375 int extend_max_stream_data(ngtcp2_conn *conn, int64_t stream_id,
376                            uint64_t max_data, void *user_data,
377                            void *stream_user_data) {
378   auto upstream = static_cast<Http3Upstream *>(user_data);
379 
380   if (upstream->extend_max_stream_data(stream_id) != 0) {
381     return NGTCP2_ERR_CALLBACK_FAILURE;
382   }
383 
384   return 0;
385 }
386 } // namespace
387 
extend_max_stream_data(int64_t stream_id)388 int Http3Upstream::extend_max_stream_data(int64_t stream_id) {
389   if (!httpconn_) {
390     return 0;
391   }
392 
393   auto rv = nghttp3_conn_unblock_stream(httpconn_, stream_id);
394   if (rv != 0) {
395     ULOG(ERROR, this) << "nghttp3_conn_unblock_stream: "
396                       << nghttp3_strerror(rv);
397     return -1;
398   }
399 
400   return 0;
401 }
402 
403 namespace {
extend_max_remote_streams_bidi(ngtcp2_conn * conn,uint64_t max_streams,void * user_data)404 int extend_max_remote_streams_bidi(ngtcp2_conn *conn, uint64_t max_streams,
405                                    void *user_data) {
406   auto upstream = static_cast<Http3Upstream *>(user_data);
407 
408   upstream->extend_max_remote_streams_bidi(max_streams);
409 
410   return 0;
411 }
412 } // namespace
413 
extend_max_remote_streams_bidi(uint64_t max_streams)414 void Http3Upstream::extend_max_remote_streams_bidi(uint64_t max_streams) {
415   nghttp3_conn_set_max_client_streams_bidi(httpconn_, max_streams);
416 }
417 
418 namespace {
stream_reset(ngtcp2_conn * conn,int64_t stream_id,uint64_t final_size,uint64_t app_error_code,void * user_data,void * stream_user_data)419 int stream_reset(ngtcp2_conn *conn, int64_t stream_id, uint64_t final_size,
420                  uint64_t app_error_code, void *user_data,
421                  void *stream_user_data) {
422   auto upstream = static_cast<Http3Upstream *>(user_data);
423 
424   if (upstream->stream_reset(stream_id) != 0) {
425     return NGTCP2_ERR_CALLBACK_FAILURE;
426   }
427 
428   return 0;
429 }
430 } // namespace
431 
stream_reset(int64_t stream_id)432 int Http3Upstream::stream_reset(int64_t stream_id) {
433   if (http_shutdown_stream_read(stream_id) != 0) {
434     return -1;
435   }
436 
437   if (ngtcp2_is_bidi_stream(stream_id)) {
438     auto rv = ngtcp2_conn_shutdown_stream_write(conn_, 0, stream_id,
439                                                 NGHTTP3_H3_NO_ERROR);
440     if (rv != 0) {
441       ULOG(ERROR, this) << "ngtcp2_conn_shutdown_stream_write: "
442                         << ngtcp2_strerror(rv);
443       return -1;
444     }
445   }
446 
447   return 0;
448 }
449 
http_shutdown_stream_read(int64_t stream_id)450 int Http3Upstream::http_shutdown_stream_read(int64_t stream_id) {
451   if (!httpconn_) {
452     return 0;
453   }
454 
455   auto rv = nghttp3_conn_shutdown_stream_read(httpconn_, stream_id);
456   if (rv != 0) {
457     ULOG(ERROR, this) << "nghttp3_conn_shutdown_stream_read: "
458                       << nghttp3_strerror(rv);
459     return -1;
460   }
461 
462   return 0;
463 }
464 
465 namespace {
stream_stop_sending(ngtcp2_conn * conn,int64_t stream_id,uint64_t app_error_code,void * user_data,void * stream_user_data)466 int stream_stop_sending(ngtcp2_conn *conn, int64_t stream_id,
467                         uint64_t app_error_code, void *user_data,
468                         void *stream_user_data) {
469   auto upstream = static_cast<Http3Upstream *>(user_data);
470 
471   if (upstream->http_shutdown_stream_read(stream_id) != 0) {
472     return NGTCP2_ERR_CALLBACK_FAILURE;
473   }
474 
475   return 0;
476 }
477 } // namespace
478 
479 namespace {
handshake_completed(ngtcp2_conn * conn,void * user_data)480 int handshake_completed(ngtcp2_conn *conn, void *user_data) {
481   auto upstream = static_cast<Http3Upstream *>(user_data);
482 
483   if (upstream->handshake_completed() != 0) {
484     return NGTCP2_ERR_CALLBACK_FAILURE;
485   }
486 
487   return 0;
488 }
489 } // namespace
490 
handshake_completed()491 int Http3Upstream::handshake_completed() {
492   handler_->set_alpn_from_conn();
493 
494   auto alpn = handler_->get_alpn();
495   if (alpn.empty()) {
496     ULOG(ERROR, this) << "NO ALPN was negotiated";
497     return -1;
498   }
499 
500   auto path = ngtcp2_conn_get_path(conn_);
501 
502   return send_new_token(&path->remote);
503 }
504 
505 namespace {
path_validation(ngtcp2_conn * conn,uint32_t flags,const ngtcp2_path * path,const ngtcp2_path * old_path,ngtcp2_path_validation_result res,void * user_data)506 int path_validation(ngtcp2_conn *conn, uint32_t flags, const ngtcp2_path *path,
507                     const ngtcp2_path *old_path,
508                     ngtcp2_path_validation_result res, void *user_data) {
509   if (res != NGTCP2_PATH_VALIDATION_RESULT_SUCCESS ||
510       !(flags & NGTCP2_PATH_VALIDATION_FLAG_NEW_TOKEN)) {
511     return 0;
512   }
513 
514   auto upstream = static_cast<Http3Upstream *>(user_data);
515   if (upstream->send_new_token(&path->remote) != 0) {
516     return NGTCP2_ERR_CALLBACK_FAILURE;
517   }
518 
519   return 0;
520 }
521 } // namespace
522 
send_new_token(const ngtcp2_addr * remote_addr)523 int Http3Upstream::send_new_token(const ngtcp2_addr *remote_addr) {
524   std::array<uint8_t, NGTCP2_CRYPTO_MAX_REGULAR_TOKENLEN + 1> token;
525   size_t tokenlen;
526 
527   auto worker = handler_->get_worker();
528   auto conn_handler = worker->get_connection_handler();
529   auto &qkms = conn_handler->get_quic_keying_materials();
530   auto &qkm = qkms->keying_materials.front();
531 
532   if (generate_token(token.data(), tokenlen, remote_addr->addr,
533                      remote_addr->addrlen, qkm.secret.data(),
534                      qkm.secret.size()) != 0) {
535     return -1;
536   }
537 
538   assert(tokenlen == NGTCP2_CRYPTO_MAX_REGULAR_TOKENLEN);
539 
540   token[tokenlen++] = qkm.id;
541 
542   auto rv = ngtcp2_conn_submit_new_token(conn_, token.data(), tokenlen);
543   if (rv != 0) {
544     ULOG(ERROR, this) << "ngtcp2_conn_submit_new_token: "
545                       << ngtcp2_strerror(rv);
546     return -1;
547   }
548 
549   return 0;
550 }
551 
552 namespace {
recv_tx_key(ngtcp2_conn * conn,ngtcp2_encryption_level level,void * user_data)553 int recv_tx_key(ngtcp2_conn *conn, ngtcp2_encryption_level level,
554                 void *user_data) {
555   if (level != NGTCP2_ENCRYPTION_LEVEL_1RTT) {
556     return 0;
557   }
558 
559   auto upstream = static_cast<Http3Upstream *>(user_data);
560   if (upstream->setup_httpconn() != 0) {
561     return NGTCP2_ERR_CALLBACK_FAILURE;
562   }
563 
564   return 0;
565 }
566 } // namespace
567 
init(const UpstreamAddr * faddr,const Address & remote_addr,const Address & local_addr,const ngtcp2_pkt_hd & initial_hd,const ngtcp2_cid * odcid,const uint8_t * token,size_t tokenlen,ngtcp2_token_type token_type)568 int Http3Upstream::init(const UpstreamAddr *faddr, const Address &remote_addr,
569                         const Address &local_addr,
570                         const ngtcp2_pkt_hd &initial_hd,
571                         const ngtcp2_cid *odcid, const uint8_t *token,
572                         size_t tokenlen, ngtcp2_token_type token_type) {
573   int rv;
574 
575   auto worker = handler_->get_worker();
576   auto conn_handler = worker->get_connection_handler();
577 
578   auto callbacks = ngtcp2_callbacks{
579       nullptr, // client_initial
580       ngtcp2_crypto_recv_client_initial_cb,
581       ngtcp2_crypto_recv_crypto_data_cb,
582       shrpx::handshake_completed,
583       nullptr, // recv_version_negotiation
584       ngtcp2_crypto_encrypt_cb,
585       ngtcp2_crypto_decrypt_cb,
586       ngtcp2_crypto_hp_mask_cb,
587       shrpx::recv_stream_data,
588       shrpx::acked_stream_data_offset,
589       nullptr, // stream_open
590       shrpx::stream_close,
591       nullptr, // recv_stateless_reset
592       nullptr, // recv_retry
593       nullptr, // extend_max_local_streams_bidi
594       nullptr, // extend_max_local_streams_uni
595       rand,
596       get_new_connection_id,
597       remove_connection_id,
598       ngtcp2_crypto_update_key_cb,
599       shrpx::path_validation,
600       nullptr, // select_preferred_addr
601       shrpx::stream_reset,
602       shrpx::extend_max_remote_streams_bidi,
603       nullptr, // extend_max_remote_streams_uni
604       shrpx::extend_max_stream_data,
605       nullptr, // dcid_status
606       nullptr, // handshake_confirmed
607       nullptr, // recv_new_token
608       ngtcp2_crypto_delete_crypto_aead_ctx_cb,
609       ngtcp2_crypto_delete_crypto_cipher_ctx_cb,
610       nullptr, // recv_datagram
611       nullptr, // ack_datagram
612       nullptr, // lost_datagram
613       ngtcp2_crypto_get_path_challenge_data_cb,
614       shrpx::stream_stop_sending,
615       nullptr, // version_negotiation
616       nullptr, // recv_rx_key
617       shrpx::recv_tx_key,
618   };
619 
620   auto config = get_config();
621   auto &quicconf = config->quic;
622   auto &http3conf = config->http3;
623 
624   auto &qkms = conn_handler->get_quic_keying_materials();
625   auto &qkm = qkms->keying_materials.front();
626 
627   ngtcp2_cid scid;
628 
629   if (generate_quic_connection_id(scid, SHRPX_QUIC_SCIDLEN,
630                                   worker->get_cid_prefix(), qkm.id,
631                                   qkm.cid_encryption_key.data()) != 0) {
632     return -1;
633   }
634 
635   ngtcp2_settings settings;
636   ngtcp2_settings_default(&settings);
637   if (quicconf.upstream.debug.log) {
638     settings.log_printf = log_printf;
639   }
640 
641   if (!quicconf.upstream.qlog.dir.empty()) {
642     auto fd = open_qlog_file(quicconf.upstream.qlog.dir, scid);
643     if (fd != -1) {
644       qlog_fd_ = fd;
645       settings.qlog_write = shrpx::qlog_write;
646     }
647   }
648 
649   settings.initial_ts = quic_timestamp();
650   settings.initial_rtt = static_cast<ngtcp2_tstamp>(
651       quicconf.upstream.initial_rtt * NGTCP2_SECONDS);
652   settings.cc_algo = quicconf.upstream.congestion_controller;
653   settings.max_window = http3conf.upstream.max_connection_window_size;
654   settings.max_stream_window = http3conf.upstream.max_window_size;
655   settings.max_tx_udp_payload_size = SHRPX_QUIC_MAX_UDP_PAYLOAD_SIZE;
656   settings.rand_ctx.native_handle = &worker->get_randgen();
657   settings.token = token;
658   settings.tokenlen = tokenlen;
659   settings.token_type = token_type;
660   settings.initial_pkt_num = std::uniform_int_distribution<uint32_t>(
661       0, std::numeric_limits<int32_t>::max())(worker->get_randgen());
662 
663   ngtcp2_transport_params params;
664   ngtcp2_transport_params_default(&params);
665   params.initial_max_streams_bidi = http3conf.upstream.max_concurrent_streams;
666   // The minimum number of unidirectional streams required for HTTP/3.
667   params.initial_max_streams_uni = 3;
668   params.initial_max_data = http3conf.upstream.connection_window_size;
669   params.initial_max_stream_data_bidi_remote = http3conf.upstream.window_size;
670   params.initial_max_stream_data_uni = http3conf.upstream.window_size;
671   params.max_idle_timeout = static_cast<ngtcp2_tstamp>(
672       quicconf.upstream.timeout.idle * NGTCP2_SECONDS);
673 
674 #ifdef OPENSSL_IS_BORINGSSL
675   if (quicconf.upstream.early_data) {
676     ngtcp2_transport_params early_data_params;
677 
678     ngtcp2_transport_params_default(&early_data_params);
679 
680     early_data_params.initial_max_stream_data_bidi_local =
681         params.initial_max_stream_data_bidi_local;
682     early_data_params.initial_max_stream_data_bidi_remote =
683         params.initial_max_stream_data_bidi_remote;
684     early_data_params.initial_max_stream_data_uni =
685         params.initial_max_stream_data_uni;
686     early_data_params.initial_max_data = params.initial_max_data;
687     early_data_params.initial_max_streams_bidi =
688         params.initial_max_streams_bidi;
689     early_data_params.initial_max_streams_uni = params.initial_max_streams_uni;
690 
691     // TODO include HTTP/3 SETTINGS
692 
693     std::array<uint8_t, 128> quic_early_data_ctx;
694 
695     auto quic_early_data_ctxlen = ngtcp2_transport_params_encode(
696         quic_early_data_ctx.data(), quic_early_data_ctx.size(),
697         &early_data_params);
698 
699     assert(quic_early_data_ctxlen > 0);
700     assert(static_cast<size_t>(quic_early_data_ctxlen) <=
701            quic_early_data_ctx.size());
702 
703     if (SSL_set_quic_early_data_context(handler_->get_ssl(),
704                                         quic_early_data_ctx.data(),
705                                         quic_early_data_ctxlen) != 1) {
706       ULOG(ERROR, this) << "SSL_set_quic_early_data_context failed";
707       return -1;
708     }
709   }
710 #endif // OPENSSL_IS_BORINGSSL
711 
712   if (odcid) {
713     params.original_dcid = *odcid;
714     params.retry_scid = initial_hd.dcid;
715     params.retry_scid_present = 1;
716   } else {
717     params.original_dcid = initial_hd.dcid;
718   }
719 
720   params.original_dcid_present = 1;
721 
722   rv = generate_quic_stateless_reset_token(
723       params.stateless_reset_token, scid, qkm.secret.data(), qkm.secret.size());
724   if (rv != 0) {
725     ULOG(ERROR, this) << "generate_quic_stateless_reset_token failed";
726     return -1;
727   }
728   params.stateless_reset_token_present = 1;
729 
730   auto path = ngtcp2_path{
731       {
732           const_cast<sockaddr *>(&local_addr.su.sa),
733           static_cast<socklen_t>(local_addr.len),
734       },
735       {
736           const_cast<sockaddr *>(&remote_addr.su.sa),
737           static_cast<socklen_t>(remote_addr.len),
738       },
739       const_cast<UpstreamAddr *>(faddr),
740   };
741 
742   rv = ngtcp2_conn_server_new(&conn_, &initial_hd.scid, &scid, &path,
743                               initial_hd.version, &callbacks, &settings,
744                               &params, nullptr, this);
745   if (rv != 0) {
746     ULOG(ERROR, this) << "ngtcp2_conn_server_new: " << ngtcp2_strerror(rv);
747     return -1;
748   }
749 
750   ngtcp2_conn_set_tls_native_handle(conn_, handler_->get_ssl());
751 
752   auto quic_connection_handler = worker->get_quic_connection_handler();
753 
754   if (generate_quic_hashed_connection_id(hashed_scid_, remote_addr, local_addr,
755                                          initial_hd.dcid) != 0) {
756     return -1;
757   }
758 
759   quic_connection_handler->add_connection_id(hashed_scid_, handler_);
760   quic_connection_handler->add_connection_id(scid, handler_);
761 
762   return 0;
763 }
764 
on_read()765 int Http3Upstream::on_read() { return 0; }
766 
on_write()767 int Http3Upstream::on_write() {
768   int rv;
769 
770   if (tx_.send_blocked) {
771     rv = send_blocked_packet();
772     if (rv != 0) {
773       return -1;
774     }
775 
776     if (tx_.send_blocked) {
777       return 0;
778     }
779   }
780 
781   handler_->get_connection()->wlimit.stopw();
782 
783   if (write_streams() != 0) {
784     return -1;
785   }
786 
787   if (httpconn_ && nghttp3_conn_is_drained(httpconn_)) {
788     return -1;
789   }
790 
791   reset_timer();
792 
793   return 0;
794 }
795 
write_streams()796 int Http3Upstream::write_streams() {
797   std::array<nghttp3_vec, 16> vec;
798   auto max_udp_payload_size = ngtcp2_conn_get_max_tx_udp_payload_size(conn_);
799 #ifdef UDP_SEGMENT
800   auto path_max_udp_payload_size =
801       ngtcp2_conn_get_path_max_tx_udp_payload_size(conn_);
802 #endif // UDP_SEGMENT
803   auto max_pktcnt = ngtcp2_conn_get_send_quantum(conn_) / max_udp_payload_size;
804   ngtcp2_pkt_info pi, prev_pi;
805   uint8_t *bufpos = tx_.data.get();
806   ngtcp2_path_storage ps, prev_ps;
807   size_t pktcnt = 0;
808   int rv;
809   size_t gso_size = 0;
810   auto ts = quic_timestamp();
811 
812   ngtcp2_path_storage_zero(&ps);
813   ngtcp2_path_storage_zero(&prev_ps);
814 
815   for (;;) {
816     int64_t stream_id = -1;
817     int fin = 0;
818     nghttp3_ssize sveccnt = 0;
819 
820     if (httpconn_ && ngtcp2_conn_get_max_data_left(conn_)) {
821       sveccnt = nghttp3_conn_writev_stream(httpconn_, &stream_id, &fin,
822                                            vec.data(), vec.size());
823       if (sveccnt < 0) {
824         ULOG(ERROR, this) << "nghttp3_conn_writev_stream: "
825                           << nghttp3_strerror(sveccnt);
826         ngtcp2_ccerr_set_application_error(
827             &last_error_, nghttp3_err_infer_quic_app_error_code(sveccnt),
828             nullptr, 0);
829         return handle_error();
830       }
831     }
832 
833     ngtcp2_ssize ndatalen;
834     auto v = vec.data();
835     auto vcnt = static_cast<size_t>(sveccnt);
836 
837     uint32_t flags = NGTCP2_WRITE_STREAM_FLAG_MORE;
838     if (fin) {
839       flags |= NGTCP2_WRITE_STREAM_FLAG_FIN;
840     }
841 
842     auto nwrite = ngtcp2_conn_writev_stream(
843         conn_, &ps.path, &pi, bufpos, max_udp_payload_size, &ndatalen, flags,
844         stream_id, reinterpret_cast<const ngtcp2_vec *>(v), vcnt, ts);
845     if (nwrite < 0) {
846       switch (nwrite) {
847       case NGTCP2_ERR_STREAM_DATA_BLOCKED:
848         assert(ndatalen == -1);
849         nghttp3_conn_block_stream(httpconn_, stream_id);
850         continue;
851       case NGTCP2_ERR_STREAM_SHUT_WR:
852         assert(ndatalen == -1);
853         nghttp3_conn_shutdown_stream_write(httpconn_, stream_id);
854         continue;
855       case NGTCP2_ERR_WRITE_MORE:
856         assert(ndatalen >= 0);
857         rv = nghttp3_conn_add_write_offset(httpconn_, stream_id, ndatalen);
858         if (rv != 0) {
859           ULOG(ERROR, this)
860               << "nghttp3_conn_add_write_offset: " << nghttp3_strerror(rv);
861           ngtcp2_ccerr_set_application_error(
862               &last_error_, nghttp3_err_infer_quic_app_error_code(rv), nullptr,
863               0);
864           return handle_error();
865         }
866         continue;
867       }
868 
869       assert(ndatalen == -1);
870 
871       ULOG(ERROR, this) << "ngtcp2_conn_writev_stream: "
872                         << ngtcp2_strerror(nwrite);
873 
874       ngtcp2_ccerr_set_liberr(&last_error_, nwrite, nullptr, 0);
875 
876       return handle_error();
877     } else if (ndatalen >= 0) {
878       rv = nghttp3_conn_add_write_offset(httpconn_, stream_id, ndatalen);
879       if (rv != 0) {
880         ULOG(ERROR, this) << "nghttp3_conn_add_write_offset: "
881                           << nghttp3_strerror(rv);
882         ngtcp2_ccerr_set_application_error(
883             &last_error_, nghttp3_err_infer_quic_app_error_code(rv), nullptr,
884             0);
885         return handle_error();
886       }
887     }
888 
889     if (nwrite == 0) {
890       if (bufpos - tx_.data.get()) {
891         auto faddr = static_cast<UpstreamAddr *>(prev_ps.path.user_data);
892         auto data = tx_.data.get();
893         auto datalen = bufpos - data;
894 
895         rv = send_packet(faddr, prev_ps.path.remote.addr,
896                          prev_ps.path.remote.addrlen, prev_ps.path.local.addr,
897                          prev_ps.path.local.addrlen, prev_pi, data, datalen,
898                          gso_size);
899         if (rv == SHRPX_ERR_SEND_BLOCKED) {
900           on_send_blocked(faddr, prev_ps.path.remote, prev_ps.path.local,
901                           prev_pi, data, datalen, gso_size);
902 
903           signal_write_upstream_addr(faddr);
904         }
905       }
906 
907       ngtcp2_conn_update_pkt_tx_time(conn_, ts);
908 
909       return 0;
910     }
911 
912     bufpos += nwrite;
913 
914 #ifdef UDP_SEGMENT
915     if (pktcnt == 0) {
916       ngtcp2_path_copy(&prev_ps.path, &ps.path);
917       prev_pi = pi;
918       gso_size = nwrite;
919     } else if (!ngtcp2_path_eq(&prev_ps.path, &ps.path) ||
920                prev_pi.ecn != pi.ecn ||
921                static_cast<size_t>(nwrite) > gso_size ||
922                (gso_size > path_max_udp_payload_size &&
923                 static_cast<size_t>(nwrite) != gso_size)) {
924       auto faddr = static_cast<UpstreamAddr *>(prev_ps.path.user_data);
925       auto data = tx_.data.get();
926       auto datalen = bufpos - data - nwrite;
927 
928       rv = send_packet(faddr, prev_ps.path.remote.addr,
929                        prev_ps.path.remote.addrlen, prev_ps.path.local.addr,
930                        prev_ps.path.local.addrlen, prev_pi, data, datalen,
931                        gso_size);
932       switch (rv) {
933       case SHRPX_ERR_SEND_BLOCKED:
934         on_send_blocked(faddr, prev_ps.path.remote, prev_ps.path.local, prev_pi,
935                         data, datalen, gso_size);
936 
937         on_send_blocked(static_cast<UpstreamAddr *>(ps.path.user_data),
938                         ps.path.remote, ps.path.local, pi, bufpos - nwrite,
939                         nwrite, 0);
940 
941         signal_write_upstream_addr(faddr);
942 
943         break;
944       default: {
945         auto faddr = static_cast<UpstreamAddr *>(ps.path.user_data);
946         auto data = bufpos - nwrite;
947 
948         rv = send_packet(faddr, ps.path.remote.addr, ps.path.remote.addrlen,
949                          ps.path.local.addr, ps.path.local.addrlen, pi, data,
950                          nwrite, 0);
951         if (rv == SHRPX_ERR_SEND_BLOCKED) {
952           on_send_blocked(faddr, ps.path.remote, ps.path.local, pi, data,
953                           nwrite, 0);
954 
955           signal_write_upstream_addr(faddr);
956         }
957       }
958       }
959 
960       ngtcp2_conn_update_pkt_tx_time(conn_, ts);
961 
962       return 0;
963     }
964 
965     if (++pktcnt == max_pktcnt || static_cast<size_t>(nwrite) < gso_size) {
966       auto faddr = static_cast<UpstreamAddr *>(ps.path.user_data);
967       auto data = tx_.data.get();
968       auto datalen = bufpos - data;
969 
970       rv = send_packet(faddr, ps.path.remote.addr, ps.path.remote.addrlen,
971                        ps.path.local.addr, ps.path.local.addrlen, pi, data,
972                        datalen, gso_size);
973       if (rv == SHRPX_ERR_SEND_BLOCKED) {
974         on_send_blocked(faddr, ps.path.remote, ps.path.local, pi, data, datalen,
975                         gso_size);
976 
977         signal_write_upstream_addr(faddr);
978       }
979 
980       ngtcp2_conn_update_pkt_tx_time(conn_, ts);
981 
982       return 0;
983     }
984 #else  // !UDP_SEGMENT
985     auto faddr = static_cast<UpstreamAddr *>(ps.path.user_data);
986     auto data = tx_.data.get();
987     auto datalen = bufpos - data;
988 
989     rv = send_packet(faddr, ps.path.remote.addr, ps.path.remote.addrlen,
990                      ps.path.local.addr, ps.path.local.addrlen, pi, data,
991                      datalen, 0);
992     if (rv == SHRPX_ERR_SEND_BLOCKED) {
993       on_send_blocked(faddr, ps.path.remote, ps.path.local, pi, data, datalen,
994                       0);
995 
996       ngtcp2_conn_update_pkt_tx_time(conn_, ts);
997 
998       signal_write_upstream_addr(faddr);
999 
1000       return 0;
1001     }
1002 
1003     if (++pktcnt == max_pktcnt) {
1004       ngtcp2_conn_update_pkt_tx_time(conn_, ts);
1005 
1006       return 0;
1007     }
1008 
1009     bufpos = tx_.data.get();
1010 #endif // !UDP_SEGMENT
1011   }
1012 
1013   return 0;
1014 }
1015 
on_timeout(Downstream * downstream)1016 int Http3Upstream::on_timeout(Downstream *downstream) { return 0; }
1017 
on_downstream_abort_request(Downstream * downstream,unsigned int status_code)1018 int Http3Upstream::on_downstream_abort_request(Downstream *downstream,
1019                                                unsigned int status_code) {
1020   int rv;
1021 
1022   rv = error_reply(downstream, status_code);
1023 
1024   if (rv != 0) {
1025     return -1;
1026   }
1027 
1028   handler_->signal_write();
1029 
1030   return 0;
1031 }
1032 
on_downstream_abort_request_with_https_redirect(Downstream * downstream)1033 int Http3Upstream::on_downstream_abort_request_with_https_redirect(
1034     Downstream *downstream) {
1035   assert(0);
1036   abort();
1037 }
1038 
1039 namespace {
1040 uint64_t
infer_upstream_shutdown_stream_error_code(uint32_t downstream_error_code)1041 infer_upstream_shutdown_stream_error_code(uint32_t downstream_error_code) {
1042   // NGHTTP2_REFUSED_STREAM is important because it tells upstream
1043   // client to retry.
1044   switch (downstream_error_code) {
1045   case NGHTTP2_NO_ERROR:
1046     return NGHTTP3_H3_NO_ERROR;
1047   case NGHTTP2_REFUSED_STREAM:
1048     return NGHTTP3_H3_REQUEST_REJECTED;
1049   default:
1050     return NGHTTP3_H3_INTERNAL_ERROR;
1051   }
1052 }
1053 } // namespace
1054 
downstream_read(DownstreamConnection * dconn)1055 int Http3Upstream::downstream_read(DownstreamConnection *dconn) {
1056   auto downstream = dconn->get_downstream();
1057 
1058   if (downstream->get_response_state() == DownstreamState::MSG_RESET) {
1059     // The downstream stream was reset (canceled). In this case,
1060     // RST_STREAM to the upstream and delete downstream connection
1061     // here. Deleting downstream will be taken place at
1062     // on_stream_close_callback.
1063     shutdown_stream(downstream,
1064                     infer_upstream_shutdown_stream_error_code(
1065                         downstream->get_response_rst_stream_error_code()));
1066     downstream->pop_downstream_connection();
1067     // dconn was deleted
1068     dconn = nullptr;
1069   } else if (downstream->get_response_state() ==
1070              DownstreamState::MSG_BAD_HEADER) {
1071     if (error_reply(downstream, 502) != 0) {
1072       return -1;
1073     }
1074     downstream->pop_downstream_connection();
1075     // dconn was deleted
1076     dconn = nullptr;
1077   } else {
1078     auto rv = downstream->on_read();
1079     if (rv == SHRPX_ERR_EOF) {
1080       if (downstream->get_request_header_sent()) {
1081         return downstream_eof(dconn);
1082       }
1083       return SHRPX_ERR_RETRY;
1084     }
1085     if (rv == SHRPX_ERR_DCONN_CANCELED) {
1086       downstream->pop_downstream_connection();
1087       handler_->signal_write();
1088       return 0;
1089     }
1090     if (rv != 0) {
1091       if (rv != SHRPX_ERR_NETWORK) {
1092         if (LOG_ENABLED(INFO)) {
1093           DCLOG(INFO, dconn) << "HTTP parser failure";
1094         }
1095       }
1096       return downstream_error(dconn, Downstream::EVENT_ERROR);
1097     }
1098 
1099     if (downstream->can_detach_downstream_connection()) {
1100       // Keep-alive
1101       downstream->detach_downstream_connection();
1102     }
1103   }
1104 
1105   handler_->signal_write();
1106 
1107   // At this point, downstream may be deleted.
1108 
1109   return 0;
1110 }
1111 
downstream_write(DownstreamConnection * dconn)1112 int Http3Upstream::downstream_write(DownstreamConnection *dconn) {
1113   int rv;
1114   rv = dconn->on_write();
1115   if (rv == SHRPX_ERR_NETWORK) {
1116     return downstream_error(dconn, Downstream::EVENT_ERROR);
1117   }
1118   if (rv != 0) {
1119     return rv;
1120   }
1121   return 0;
1122 }
1123 
downstream_eof(DownstreamConnection * dconn)1124 int Http3Upstream::downstream_eof(DownstreamConnection *dconn) {
1125   auto downstream = dconn->get_downstream();
1126 
1127   if (LOG_ENABLED(INFO)) {
1128     DCLOG(INFO, dconn) << "EOF. stream_id=" << downstream->get_stream_id();
1129   }
1130 
1131   // Delete downstream connection. If we don't delete it here, it will
1132   // be pooled in on_stream_close_callback.
1133   downstream->pop_downstream_connection();
1134   // dconn was deleted
1135   dconn = nullptr;
1136   // downstream will be deleted in on_stream_close_callback.
1137   if (downstream->get_response_state() == DownstreamState::HEADER_COMPLETE) {
1138     // Server may indicate the end of the request by EOF
1139     if (LOG_ENABLED(INFO)) {
1140       ULOG(INFO, this) << "Downstream body was ended by EOF";
1141     }
1142     downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1143 
1144     // For tunneled connection, MSG_COMPLETE signals
1145     // downstream_read_data_callback to send RST_STREAM after pending
1146     // response body is sent. This is needed to ensure that RST_STREAM
1147     // is sent after all pending data are sent.
1148     if (on_downstream_body_complete(downstream) != 0) {
1149       return -1;
1150     }
1151   } else if (downstream->get_response_state() !=
1152              DownstreamState::MSG_COMPLETE) {
1153     // If stream was not closed, then we set MSG_COMPLETE and let
1154     // on_stream_close_callback delete downstream.
1155     if (error_reply(downstream, 502) != 0) {
1156       return -1;
1157     }
1158   }
1159   handler_->signal_write();
1160   // At this point, downstream may be deleted.
1161   return 0;
1162 }
1163 
downstream_error(DownstreamConnection * dconn,int events)1164 int Http3Upstream::downstream_error(DownstreamConnection *dconn, int events) {
1165   auto downstream = dconn->get_downstream();
1166 
1167   if (LOG_ENABLED(INFO)) {
1168     if (events & Downstream::EVENT_ERROR) {
1169       DCLOG(INFO, dconn) << "Downstream network/general error";
1170     } else {
1171       DCLOG(INFO, dconn) << "Timeout";
1172     }
1173     if (downstream->get_upgraded()) {
1174       DCLOG(INFO, dconn) << "Note: this is tunnel connection";
1175     }
1176   }
1177 
1178   // Delete downstream connection. If we don't delete it here, it will
1179   // be pooled in on_stream_close_callback.
1180   downstream->pop_downstream_connection();
1181   // dconn was deleted
1182   dconn = nullptr;
1183 
1184   if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1185     // For SSL tunneling, we issue RST_STREAM. For other types of
1186     // stream, we don't have to do anything since response was
1187     // complete.
1188     if (downstream->get_upgraded()) {
1189       shutdown_stream(downstream, NGHTTP3_H3_NO_ERROR);
1190     }
1191   } else {
1192     if (downstream->get_response_state() == DownstreamState::HEADER_COMPLETE) {
1193       if (downstream->get_upgraded()) {
1194         if (on_downstream_body_complete(downstream) != 0) {
1195           return -1;
1196         }
1197       } else {
1198         shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
1199       }
1200     } else {
1201       unsigned int status;
1202       if (events & Downstream::EVENT_TIMEOUT) {
1203         if (downstream->get_request_header_sent()) {
1204           status = 504;
1205         } else {
1206           status = 408;
1207         }
1208       } else {
1209         status = 502;
1210       }
1211       if (error_reply(downstream, status) != 0) {
1212         return -1;
1213       }
1214     }
1215     downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1216   }
1217   handler_->signal_write();
1218   // At this point, downstream may be deleted.
1219   return 0;
1220 }
1221 
get_client_handler() const1222 ClientHandler *Http3Upstream::get_client_handler() const { return handler_; }
1223 
1224 namespace {
downstream_read_data_callback(nghttp3_conn * conn,int64_t stream_id,nghttp3_vec * vec,size_t veccnt,uint32_t * pflags,void * conn_user_data,void * stream_user_data)1225 nghttp3_ssize downstream_read_data_callback(nghttp3_conn *conn,
1226                                             int64_t stream_id, nghttp3_vec *vec,
1227                                             size_t veccnt, uint32_t *pflags,
1228                                             void *conn_user_data,
1229                                             void *stream_user_data) {
1230   auto upstream = static_cast<Http3Upstream *>(conn_user_data);
1231   auto downstream = static_cast<Downstream *>(stream_user_data);
1232 
1233   assert(downstream);
1234 
1235   auto body = downstream->get_response_buf();
1236 
1237   assert(body);
1238 
1239   if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE &&
1240       body->rleft_mark() == 0) {
1241     downstream->disable_upstream_wtimer();
1242     return NGHTTP3_ERR_WOULDBLOCK;
1243   }
1244 
1245   downstream->reset_upstream_wtimer();
1246 
1247   veccnt = body->riovec_mark(reinterpret_cast<struct iovec *>(vec), veccnt);
1248 
1249   if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE &&
1250       body->rleft_mark() == 0) {
1251     *pflags |= NGHTTP3_DATA_FLAG_EOF;
1252   }
1253 
1254   assert((*pflags & NGHTTP3_DATA_FLAG_EOF) || veccnt);
1255 
1256   downstream->response_sent_body_length += nghttp3_vec_len(vec, veccnt);
1257 
1258   if ((*pflags & NGHTTP3_DATA_FLAG_EOF) &&
1259       upstream->shutdown_stream_read(stream_id, NGHTTP3_H3_NO_ERROR) != 0) {
1260     return NGHTTP3_ERR_CALLBACK_FAILURE;
1261   }
1262 
1263   return veccnt;
1264 }
1265 } // namespace
1266 
on_downstream_header_complete(Downstream * downstream)1267 int Http3Upstream::on_downstream_header_complete(Downstream *downstream) {
1268   int rv;
1269 
1270   const auto &req = downstream->request();
1271   auto &resp = downstream->response();
1272 
1273   auto &balloc = downstream->get_block_allocator();
1274 
1275   if (LOG_ENABLED(INFO)) {
1276     if (downstream->get_non_final_response()) {
1277       DLOG(INFO, downstream) << "HTTP non-final response header";
1278     } else {
1279       DLOG(INFO, downstream) << "HTTP response header completed";
1280     }
1281   }
1282 
1283   auto config = get_config();
1284   auto &httpconf = config->http;
1285 
1286   if (!config->http2_proxy && !httpconf.no_location_rewrite) {
1287     downstream->rewrite_location_response_header(req.scheme);
1288   }
1289 
1290 #ifdef HAVE_MRUBY
1291   if (!downstream->get_non_final_response()) {
1292     auto dconn = downstream->get_downstream_connection();
1293     const auto &group = dconn->get_downstream_addr_group();
1294     if (group) {
1295       const auto &dmruby_ctx = group->shared_addr->mruby_ctx;
1296 
1297       if (dmruby_ctx->run_on_response_proc(downstream) != 0) {
1298         if (error_reply(downstream, 500) != 0) {
1299           return -1;
1300         }
1301         // Returning -1 will signal deletion of dconn.
1302         return -1;
1303       }
1304 
1305       if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1306         return -1;
1307       }
1308     }
1309 
1310     auto worker = handler_->get_worker();
1311     auto mruby_ctx = worker->get_mruby_context();
1312 
1313     if (mruby_ctx->run_on_response_proc(downstream) != 0) {
1314       if (error_reply(downstream, 500) != 0) {
1315         return -1;
1316       }
1317       // Returning -1 will signal deletion of dconn.
1318       return -1;
1319     }
1320 
1321     if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1322       return -1;
1323     }
1324   }
1325 #endif // HAVE_MRUBY
1326 
1327   auto nva = std::vector<nghttp3_nv>();
1328   // 4 means :status and possible server, via, and set-cookie (for
1329   // affinity cookie) header field.
1330   nva.reserve(resp.fs.headers().size() + 4 +
1331               httpconf.add_response_headers.size());
1332 
1333   if (downstream->get_non_final_response()) {
1334     auto response_status = http2::stringify_status(balloc, resp.http_status);
1335 
1336     nva.push_back(http3::make_nv_ls_nocopy(":status", response_status));
1337 
1338     http3::copy_headers_to_nva_nocopy(nva, resp.fs.headers(),
1339                                       http2::HDOP_STRIP_ALL);
1340 
1341     if (LOG_ENABLED(INFO)) {
1342       log_response_headers(downstream, nva);
1343     }
1344 
1345     rv = nghttp3_conn_submit_info(httpconn_, downstream->get_stream_id(),
1346                                   nva.data(), nva.size());
1347 
1348     resp.fs.clear_headers();
1349 
1350     if (rv != 0) {
1351       ULOG(FATAL, this) << "nghttp3_conn_submit_info() failed";
1352       return -1;
1353     }
1354 
1355     return 0;
1356   }
1357 
1358   auto striphd_flags = http2::HDOP_STRIP_ALL & ~http2::HDOP_STRIP_VIA;
1359   StringRef response_status;
1360 
1361   if (req.connect_proto == ConnectProto::WEBSOCKET && resp.http_status == 101) {
1362     response_status = http2::stringify_status(balloc, 200);
1363     striphd_flags |= http2::HDOP_STRIP_SEC_WEBSOCKET_ACCEPT;
1364   } else {
1365     response_status = http2::stringify_status(balloc, resp.http_status);
1366   }
1367 
1368   nva.push_back(http3::make_nv_ls_nocopy(":status", response_status));
1369 
1370   http3::copy_headers_to_nva_nocopy(nva, resp.fs.headers(), striphd_flags);
1371 
1372   if (!config->http2_proxy && !httpconf.no_server_rewrite) {
1373     nva.push_back(http3::make_nv_ls_nocopy("server", httpconf.server_name));
1374   } else {
1375     auto server = resp.fs.header(http2::HD_SERVER);
1376     if (server) {
1377       nva.push_back(http3::make_nv_ls_nocopy("server", (*server).value));
1378     }
1379   }
1380 
1381   if (!req.regular_connect_method() || !downstream->get_upgraded()) {
1382     auto affinity_cookie = downstream->get_affinity_cookie_to_send();
1383     if (affinity_cookie) {
1384       auto dconn = downstream->get_downstream_connection();
1385       assert(dconn);
1386       auto &group = dconn->get_downstream_addr_group();
1387       auto &shared_addr = group->shared_addr;
1388       auto &cookieconf = shared_addr->affinity.cookie;
1389       auto secure =
1390           http::require_cookie_secure_attribute(cookieconf.secure, req.scheme);
1391       auto cookie_str = http::create_affinity_cookie(
1392           balloc, cookieconf.name, affinity_cookie, cookieconf.path, secure);
1393       nva.push_back(http3::make_nv_ls_nocopy("set-cookie", cookie_str));
1394     }
1395   }
1396 
1397   auto via = resp.fs.header(http2::HD_VIA);
1398   if (httpconf.no_via) {
1399     if (via) {
1400       nva.push_back(http3::make_nv_ls_nocopy("via", (*via).value));
1401     }
1402   } else {
1403     // we don't create more than 16 bytes in
1404     // http::create_via_header_value.
1405     size_t len = 16;
1406     if (via) {
1407       len += via->value.size() + 2;
1408     }
1409 
1410     auto iov = make_byte_ref(balloc, len + 1);
1411     auto p = iov.base;
1412     if (via) {
1413       p = std::copy(std::begin(via->value), std::end(via->value), p);
1414       p = util::copy_lit(p, ", ");
1415     }
1416     p = http::create_via_header_value(p, resp.http_major, resp.http_minor);
1417     *p = '\0';
1418 
1419     nva.push_back(http3::make_nv_ls_nocopy("via", StringRef{iov.base, p}));
1420   }
1421 
1422   for (auto &p : httpconf.add_response_headers) {
1423     nva.push_back(http3::make_nv_nocopy(p.name, p.value));
1424   }
1425 
1426   if (LOG_ENABLED(INFO)) {
1427     log_response_headers(downstream, nva);
1428   }
1429 
1430   nghttp3_data_reader data_read;
1431   data_read.read_data = downstream_read_data_callback;
1432 
1433   nghttp3_data_reader *data_readptr;
1434 
1435   if (downstream->expect_response_body() ||
1436       downstream->expect_response_trailer()) {
1437     data_readptr = &data_read;
1438   } else {
1439     data_readptr = nullptr;
1440   }
1441 
1442   rv = nghttp3_conn_submit_response(httpconn_, downstream->get_stream_id(),
1443                                     nva.data(), nva.size(), data_readptr);
1444   if (rv != 0) {
1445     ULOG(FATAL, this) << "nghttp3_conn_submit_response() failed";
1446     return -1;
1447   }
1448 
1449   if (data_readptr) {
1450     downstream->reset_upstream_wtimer();
1451   } else if (shutdown_stream_read(downstream->get_stream_id(),
1452                                   NGHTTP3_H3_NO_ERROR) != 0) {
1453     return -1;
1454   }
1455 
1456   return 0;
1457 }
1458 
on_downstream_body(Downstream * downstream,const uint8_t * data,size_t len,bool flush)1459 int Http3Upstream::on_downstream_body(Downstream *downstream,
1460                                       const uint8_t *data, size_t len,
1461                                       bool flush) {
1462   auto body = downstream->get_response_buf();
1463   body->append(data, len);
1464 
1465   if (flush) {
1466     nghttp3_conn_resume_stream(httpconn_, downstream->get_stream_id());
1467 
1468     downstream->ensure_upstream_wtimer();
1469   }
1470 
1471   return 0;
1472 }
1473 
on_downstream_body_complete(Downstream * downstream)1474 int Http3Upstream::on_downstream_body_complete(Downstream *downstream) {
1475   if (LOG_ENABLED(INFO)) {
1476     DLOG(INFO, downstream) << "HTTP response completed";
1477   }
1478 
1479   auto &resp = downstream->response();
1480 
1481   if (!downstream->validate_response_recv_body_length()) {
1482     shutdown_stream(downstream, NGHTTP3_H3_GENERAL_PROTOCOL_ERROR);
1483     resp.connection_close = true;
1484     return 0;
1485   }
1486 
1487   if (!downstream->get_upgraded()) {
1488     const auto &trailers = resp.fs.trailers();
1489     if (!trailers.empty()) {
1490       std::vector<nghttp3_nv> nva;
1491       nva.reserve(trailers.size());
1492       http3::copy_headers_to_nva_nocopy(nva, trailers, http2::HDOP_STRIP_ALL);
1493       if (!nva.empty()) {
1494         auto rv = nghttp3_conn_submit_trailers(
1495             httpconn_, downstream->get_stream_id(), nva.data(), nva.size());
1496         if (rv != 0) {
1497           ULOG(FATAL, this) << "nghttp3_conn_submit_trailers() failed: "
1498                             << nghttp3_strerror(rv);
1499           return -1;
1500         }
1501       }
1502     }
1503   }
1504 
1505   nghttp3_conn_resume_stream(httpconn_, downstream->get_stream_id());
1506   downstream->ensure_upstream_wtimer();
1507 
1508   return 0;
1509 }
1510 
on_handler_delete()1511 void Http3Upstream::on_handler_delete() {
1512   for (auto d = downstream_queue_.get_downstreams(); d; d = d->dlnext) {
1513     if (d->get_dispatch_state() == DispatchState::ACTIVE &&
1514         d->accesslog_ready()) {
1515       handler_->write_accesslog(d);
1516     }
1517   }
1518 
1519   auto worker = handler_->get_worker();
1520   auto quic_conn_handler = worker->get_quic_connection_handler();
1521 
1522   std::vector<ngtcp2_cid> scids(ngtcp2_conn_get_scid(conn_, nullptr) + 1);
1523   ngtcp2_conn_get_scid(conn_, scids.data());
1524   scids.back() = hashed_scid_;
1525 
1526   for (auto &cid : scids) {
1527     quic_conn_handler->remove_connection_id(cid);
1528   }
1529 
1530   if (retry_close_ || last_error_.type == NGTCP2_CCERR_TYPE_IDLE_CLOSE) {
1531     return;
1532   }
1533 
1534   // If this is not idle close, send CONNECTION_CLOSE.
1535   if (!ngtcp2_conn_in_closing_period(conn_) &&
1536       !ngtcp2_conn_in_draining_period(conn_)) {
1537     ngtcp2_path_storage ps;
1538     ngtcp2_pkt_info pi;
1539     conn_close_.resize(SHRPX_QUIC_CONN_CLOSE_PKTLEN);
1540 
1541     ngtcp2_path_storage_zero(&ps);
1542 
1543     ngtcp2_ccerr ccerr;
1544     ngtcp2_ccerr_default(&ccerr);
1545 
1546     if (worker->get_graceful_shutdown() &&
1547         !ngtcp2_conn_get_handshake_completed(conn_)) {
1548       ccerr.error_code = NGTCP2_CONNECTION_REFUSED;
1549     }
1550 
1551     auto nwrite = ngtcp2_conn_write_connection_close(
1552         conn_, &ps.path, &pi, conn_close_.data(), conn_close_.size(), &ccerr,
1553         quic_timestamp());
1554     if (nwrite < 0) {
1555       if (nwrite != NGTCP2_ERR_INVALID_STATE) {
1556         ULOG(ERROR, this) << "ngtcp2_conn_write_connection_close: "
1557                           << ngtcp2_strerror(nwrite);
1558       }
1559 
1560       return;
1561     }
1562 
1563     conn_close_.resize(nwrite);
1564 
1565     send_packet(static_cast<UpstreamAddr *>(ps.path.user_data),
1566                 ps.path.remote.addr, ps.path.remote.addrlen, ps.path.local.addr,
1567                 ps.path.local.addrlen, pi, conn_close_.data(), nwrite, 0);
1568   }
1569 
1570   auto d =
1571       static_cast<ev_tstamp>(ngtcp2_conn_get_pto(conn_) * 3) / NGTCP2_SECONDS;
1572 
1573   if (LOG_ENABLED(INFO)) {
1574     ULOG(INFO, this) << "Enter close-wait period " << d << "s with "
1575                      << conn_close_.size() << " bytes sentinel packet";
1576   }
1577 
1578   auto cw = std::make_unique<CloseWait>(worker, std::move(scids),
1579                                         std::move(conn_close_), d);
1580 
1581   quic_conn_handler->add_close_wait(cw.release());
1582 }
1583 
on_downstream_reset(Downstream * downstream,bool no_retry)1584 int Http3Upstream::on_downstream_reset(Downstream *downstream, bool no_retry) {
1585   int rv;
1586 
1587   if (downstream->get_dispatch_state() != DispatchState::ACTIVE) {
1588     // This is error condition when we failed push_request_headers()
1589     // in initiate_downstream().  Otherwise, we have
1590     // DispatchState::ACTIVE state, or we did not set
1591     // DownstreamConnection.
1592     downstream->pop_downstream_connection();
1593     handler_->signal_write();
1594 
1595     return 0;
1596   }
1597 
1598   if (!downstream->request_submission_ready()) {
1599     if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1600       // We have got all response body already.  Send it off.
1601       downstream->pop_downstream_connection();
1602       return 0;
1603     }
1604     // pushed stream is handled here
1605     shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
1606     downstream->pop_downstream_connection();
1607 
1608     handler_->signal_write();
1609 
1610     return 0;
1611   }
1612 
1613   downstream->pop_downstream_connection();
1614 
1615   downstream->add_retry();
1616 
1617   std::unique_ptr<DownstreamConnection> dconn;
1618 
1619   rv = 0;
1620 
1621   if (no_retry || downstream->no_more_retry()) {
1622     goto fail;
1623   }
1624 
1625   // downstream connection is clean; we can retry with new
1626   // downstream connection.
1627 
1628   for (;;) {
1629     auto dconn = handler_->get_downstream_connection(rv, downstream);
1630     if (!dconn) {
1631       goto fail;
1632     }
1633 
1634     rv = downstream->attach_downstream_connection(std::move(dconn));
1635     if (rv == 0) {
1636       break;
1637     }
1638   }
1639 
1640   rv = downstream->push_request_headers();
1641   if (rv != 0) {
1642     goto fail;
1643   }
1644 
1645   return 0;
1646 
1647 fail:
1648   if (rv == SHRPX_ERR_TLS_REQUIRED) {
1649     assert(0);
1650     abort();
1651   }
1652 
1653   rv = on_downstream_abort_request(downstream, 502);
1654   if (rv != 0) {
1655     shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
1656   }
1657   downstream->pop_downstream_connection();
1658 
1659   handler_->signal_write();
1660 
1661   return 0;
1662 }
1663 
pause_read(IOCtrlReason reason)1664 void Http3Upstream::pause_read(IOCtrlReason reason) {}
1665 
resume_read(IOCtrlReason reason,Downstream * downstream,size_t consumed)1666 int Http3Upstream::resume_read(IOCtrlReason reason, Downstream *downstream,
1667                                size_t consumed) {
1668   consume(downstream->get_stream_id(), consumed);
1669 
1670   auto &req = downstream->request();
1671 
1672   req.consume(consumed);
1673 
1674   handler_->signal_write();
1675 
1676   return 0;
1677 }
1678 
send_reply(Downstream * downstream,const uint8_t * body,size_t bodylen)1679 int Http3Upstream::send_reply(Downstream *downstream, const uint8_t *body,
1680                               size_t bodylen) {
1681   int rv;
1682 
1683   nghttp3_data_reader data_read, *data_read_ptr = nullptr;
1684 
1685   if (bodylen) {
1686     data_read.read_data = downstream_read_data_callback;
1687     data_read_ptr = &data_read;
1688   }
1689 
1690   const auto &resp = downstream->response();
1691   auto config = get_config();
1692   auto &httpconf = config->http;
1693 
1694   auto &balloc = downstream->get_block_allocator();
1695 
1696   const auto &headers = resp.fs.headers();
1697   auto nva = std::vector<nghttp3_nv>();
1698   // 2 for :status and server
1699   nva.reserve(2 + headers.size() + httpconf.add_response_headers.size());
1700 
1701   auto response_status = http2::stringify_status(balloc, resp.http_status);
1702 
1703   nva.push_back(http3::make_nv_ls_nocopy(":status", response_status));
1704 
1705   for (auto &kv : headers) {
1706     if (kv.name.empty() || kv.name[0] == ':') {
1707       continue;
1708     }
1709     switch (kv.token) {
1710     case http2::HD_CONNECTION:
1711     case http2::HD_KEEP_ALIVE:
1712     case http2::HD_PROXY_CONNECTION:
1713     case http2::HD_TE:
1714     case http2::HD_TRANSFER_ENCODING:
1715     case http2::HD_UPGRADE:
1716       continue;
1717     }
1718     nva.push_back(http3::make_nv_nocopy(kv.name, kv.value, kv.no_index));
1719   }
1720 
1721   if (!resp.fs.header(http2::HD_SERVER)) {
1722     nva.push_back(http3::make_nv_ls_nocopy("server", config->http.server_name));
1723   }
1724 
1725   for (auto &p : httpconf.add_response_headers) {
1726     nva.push_back(http3::make_nv_nocopy(p.name, p.value));
1727   }
1728 
1729   rv = nghttp3_conn_submit_response(httpconn_, downstream->get_stream_id(),
1730                                     nva.data(), nva.size(), data_read_ptr);
1731   if (nghttp3_err_is_fatal(rv)) {
1732     ULOG(FATAL, this) << "nghttp3_conn_submit_response() failed: "
1733                       << nghttp3_strerror(rv);
1734     return -1;
1735   }
1736 
1737   auto buf = downstream->get_response_buf();
1738 
1739   buf->append(body, bodylen);
1740 
1741   downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1742 
1743   if (data_read_ptr) {
1744     downstream->reset_upstream_wtimer();
1745   }
1746 
1747   if (shutdown_stream_read(downstream->get_stream_id(), NGHTTP3_H3_NO_ERROR) !=
1748       0) {
1749     return -1;
1750   }
1751 
1752   return 0;
1753 }
1754 
initiate_push(Downstream * downstream,const StringRef & uri)1755 int Http3Upstream::initiate_push(Downstream *downstream, const StringRef &uri) {
1756   return 0;
1757 }
1758 
response_riovec(struct iovec * iov,int iovcnt) const1759 int Http3Upstream::response_riovec(struct iovec *iov, int iovcnt) const {
1760   return 0;
1761 }
1762 
response_drain(size_t n)1763 void Http3Upstream::response_drain(size_t n) {}
1764 
response_empty() const1765 bool Http3Upstream::response_empty() const { return false; }
1766 
1767 Downstream *
on_downstream_push_promise(Downstream * downstream,int32_t promised_stream_id)1768 Http3Upstream::on_downstream_push_promise(Downstream *downstream,
1769                                           int32_t promised_stream_id) {
1770   return nullptr;
1771 }
1772 
on_downstream_push_promise_complete(Downstream * downstream,Downstream * promised_downstream)1773 int Http3Upstream::on_downstream_push_promise_complete(
1774     Downstream *downstream, Downstream *promised_downstream) {
1775   return 0;
1776 }
1777 
push_enabled() const1778 bool Http3Upstream::push_enabled() const { return false; }
1779 
cancel_premature_downstream(Downstream * promised_downstream)1780 void Http3Upstream::cancel_premature_downstream(
1781     Downstream *promised_downstream) {}
1782 
on_read(const UpstreamAddr * faddr,const Address & remote_addr,const Address & local_addr,const ngtcp2_pkt_info & pi,const uint8_t * data,size_t datalen)1783 int Http3Upstream::on_read(const UpstreamAddr *faddr,
1784                            const Address &remote_addr,
1785                            const Address &local_addr, const ngtcp2_pkt_info &pi,
1786                            const uint8_t *data, size_t datalen) {
1787   int rv;
1788 
1789   auto path = ngtcp2_path{
1790       {
1791           const_cast<sockaddr *>(&local_addr.su.sa),
1792           static_cast<socklen_t>(local_addr.len),
1793       },
1794       {
1795           const_cast<sockaddr *>(&remote_addr.su.sa),
1796           static_cast<socklen_t>(remote_addr.len),
1797       },
1798       const_cast<UpstreamAddr *>(faddr),
1799   };
1800 
1801   rv = ngtcp2_conn_read_pkt(conn_, &path, &pi, data, datalen, quic_timestamp());
1802   if (rv != 0) {
1803     switch (rv) {
1804     case NGTCP2_ERR_DRAINING:
1805       return -1;
1806     case NGTCP2_ERR_RETRY: {
1807       auto worker = handler_->get_worker();
1808       auto quic_conn_handler = worker->get_quic_connection_handler();
1809 
1810       if (worker->get_graceful_shutdown()) {
1811         ngtcp2_ccerr_set_transport_error(&last_error_,
1812                                          NGTCP2_CONNECTION_REFUSED, nullptr, 0);
1813 
1814         return handle_error();
1815       }
1816 
1817       ngtcp2_version_cid vc;
1818 
1819       rv =
1820           ngtcp2_pkt_decode_version_cid(&vc, data, datalen, SHRPX_QUIC_SCIDLEN);
1821       if (rv != 0) {
1822         return -1;
1823       }
1824 
1825       retry_close_ = true;
1826 
1827       quic_conn_handler->send_retry(handler_->get_upstream_addr(), vc.version,
1828                                     vc.dcid, vc.dcidlen, vc.scid, vc.scidlen,
1829                                     remote_addr, local_addr, datalen * 3);
1830 
1831       return -1;
1832     }
1833     case NGTCP2_ERR_CRYPTO:
1834       if (!last_error_.error_code) {
1835         ngtcp2_ccerr_set_tls_alert(
1836             &last_error_, ngtcp2_conn_get_tls_alert(conn_), nullptr, 0);
1837       }
1838       break;
1839     case NGTCP2_ERR_DROP_CONN:
1840       return -1;
1841     default:
1842       if (!last_error_.error_code) {
1843         ngtcp2_ccerr_set_liberr(&last_error_, rv, nullptr, 0);
1844       }
1845     }
1846 
1847     ULOG(ERROR, this) << "ngtcp2_conn_read_pkt: " << ngtcp2_strerror(rv);
1848 
1849     return handle_error();
1850   }
1851 
1852   return 0;
1853 }
1854 
send_packet(const UpstreamAddr * faddr,const sockaddr * remote_sa,size_t remote_salen,const sockaddr * local_sa,size_t local_salen,const ngtcp2_pkt_info & pi,const uint8_t * data,size_t datalen,size_t gso_size)1855 int Http3Upstream::send_packet(const UpstreamAddr *faddr,
1856                                const sockaddr *remote_sa, size_t remote_salen,
1857                                const sockaddr *local_sa, size_t local_salen,
1858                                const ngtcp2_pkt_info &pi, const uint8_t *data,
1859                                size_t datalen, size_t gso_size) {
1860   auto rv = quic_send_packet(faddr, remote_sa, remote_salen, local_sa,
1861                              local_salen, pi, data, datalen, gso_size);
1862   switch (rv) {
1863   case 0:
1864     return 0;
1865     // With GSO, sendmsg may fail with EINVAL if UDP payload is too
1866     // large.
1867   case -EINVAL:
1868   case -EMSGSIZE:
1869     // Let the packet lost.
1870     break;
1871   case -EAGAIN:
1872 #if EAGAIN != EWOULDBLOCK
1873   case -EWOULDBLOCK:
1874 #endif // EAGAIN != EWOULDBLOCK
1875     return SHRPX_ERR_SEND_BLOCKED;
1876   default:
1877     break;
1878   }
1879 
1880   return -1;
1881 }
1882 
on_send_blocked(const UpstreamAddr * faddr,const ngtcp2_addr & remote_addr,const ngtcp2_addr & local_addr,const ngtcp2_pkt_info & pi,const uint8_t * data,size_t datalen,size_t gso_size)1883 void Http3Upstream::on_send_blocked(const UpstreamAddr *faddr,
1884                                     const ngtcp2_addr &remote_addr,
1885                                     const ngtcp2_addr &local_addr,
1886                                     const ngtcp2_pkt_info &pi,
1887                                     const uint8_t *data, size_t datalen,
1888                                     size_t gso_size) {
1889   assert(tx_.num_blocked || !tx_.send_blocked);
1890   assert(tx_.num_blocked < 2);
1891 
1892   tx_.send_blocked = true;
1893 
1894   auto &p = tx_.blocked[tx_.num_blocked++];
1895 
1896   memcpy(&p.local_addr.su, local_addr.addr, local_addr.addrlen);
1897   memcpy(&p.remote_addr.su, remote_addr.addr, remote_addr.addrlen);
1898 
1899   p.local_addr.len = local_addr.addrlen;
1900   p.remote_addr.len = remote_addr.addrlen;
1901   p.faddr = faddr;
1902   p.pi = pi;
1903   p.data = data;
1904   p.datalen = datalen;
1905   p.gso_size = gso_size;
1906 }
1907 
send_blocked_packet()1908 int Http3Upstream::send_blocked_packet() {
1909   int rv;
1910 
1911   assert(tx_.send_blocked);
1912 
1913   for (; tx_.num_blocked_sent < tx_.num_blocked; ++tx_.num_blocked_sent) {
1914     auto &p = tx_.blocked[tx_.num_blocked_sent];
1915 
1916     rv = send_packet(p.faddr, &p.remote_addr.su.sa, p.remote_addr.len,
1917                      &p.local_addr.su.sa, p.local_addr.len, p.pi, p.data,
1918                      p.datalen, p.gso_size);
1919     if (rv == SHRPX_ERR_SEND_BLOCKED) {
1920       signal_write_upstream_addr(p.faddr);
1921 
1922       return 0;
1923     }
1924   }
1925 
1926   tx_.send_blocked = false;
1927   tx_.num_blocked = 0;
1928   tx_.num_blocked_sent = 0;
1929 
1930   return 0;
1931 }
1932 
signal_write_upstream_addr(const UpstreamAddr * faddr)1933 void Http3Upstream::signal_write_upstream_addr(const UpstreamAddr *faddr) {
1934   auto conn = handler_->get_connection();
1935 
1936   if (faddr->fd != conn->wev.fd) {
1937     if (ev_is_active(&conn->wev)) {
1938       ev_io_stop(handler_->get_loop(), &conn->wev);
1939     }
1940 
1941     ev_io_set(&conn->wev, faddr->fd, EV_WRITE);
1942   }
1943 
1944   conn->wlimit.startw();
1945 }
1946 
handle_error()1947 int Http3Upstream::handle_error() {
1948   if (ngtcp2_conn_in_closing_period(conn_) ||
1949       ngtcp2_conn_in_draining_period(conn_)) {
1950     return -1;
1951   }
1952 
1953   ngtcp2_path_storage ps;
1954   ngtcp2_pkt_info pi;
1955 
1956   ngtcp2_path_storage_zero(&ps);
1957 
1958   auto ts = quic_timestamp();
1959 
1960   conn_close_.resize(SHRPX_QUIC_CONN_CLOSE_PKTLEN);
1961 
1962   auto nwrite = ngtcp2_conn_write_connection_close(
1963       conn_, &ps.path, &pi, conn_close_.data(), conn_close_.size(),
1964       &last_error_, ts);
1965   if (nwrite < 0) {
1966     ULOG(ERROR, this) << "ngtcp2_conn_write_connection_close: "
1967                       << ngtcp2_strerror(nwrite);
1968     return -1;
1969   }
1970 
1971   conn_close_.resize(nwrite);
1972 
1973   if (nwrite == 0) {
1974     return -1;
1975   }
1976 
1977   send_packet(static_cast<UpstreamAddr *>(ps.path.user_data),
1978               ps.path.remote.addr, ps.path.remote.addrlen, ps.path.local.addr,
1979               ps.path.local.addrlen, pi, conn_close_.data(), nwrite, 0);
1980 
1981   return -1;
1982 }
1983 
handle_expiry()1984 int Http3Upstream::handle_expiry() {
1985   int rv;
1986 
1987   auto ts = quic_timestamp();
1988 
1989   rv = ngtcp2_conn_handle_expiry(conn_, ts);
1990   if (rv != 0) {
1991     if (rv == NGTCP2_ERR_IDLE_CLOSE) {
1992       ULOG(INFO, this) << "Idle connection timeout";
1993     } else {
1994       ULOG(ERROR, this) << "ngtcp2_conn_handle_expiry: " << ngtcp2_strerror(rv);
1995     }
1996     ngtcp2_ccerr_set_liberr(&last_error_, rv, nullptr, 0);
1997     return handle_error();
1998   }
1999 
2000   return 0;
2001 }
2002 
reset_timer()2003 void Http3Upstream::reset_timer() {
2004   auto ts = quic_timestamp();
2005   auto expiry_ts = ngtcp2_conn_get_expiry(conn_);
2006   auto loop = handler_->get_loop();
2007 
2008   if (expiry_ts <= ts) {
2009     ev_feed_event(loop, &timer_, EV_TIMER);
2010     return;
2011   }
2012 
2013   timer_.repeat = static_cast<ev_tstamp>(expiry_ts - ts) / NGTCP2_SECONDS;
2014 
2015   ev_timer_again(loop, &timer_);
2016 }
2017 
2018 namespace {
http_deferred_consume(nghttp3_conn * conn,int64_t stream_id,size_t nconsumed,void * user_data,void * stream_user_data)2019 int http_deferred_consume(nghttp3_conn *conn, int64_t stream_id,
2020                           size_t nconsumed, void *user_data,
2021                           void *stream_user_data) {
2022   auto upstream = static_cast<Http3Upstream *>(user_data);
2023 
2024   upstream->consume(stream_id, nconsumed);
2025 
2026   return 0;
2027 }
2028 } // namespace
2029 
2030 namespace {
http_acked_stream_data(nghttp3_conn * conn,int64_t stream_id,uint64_t datalen,void * user_data,void * stream_user_data)2031 int http_acked_stream_data(nghttp3_conn *conn, int64_t stream_id,
2032                            uint64_t datalen, void *user_data,
2033                            void *stream_user_data) {
2034   auto upstream = static_cast<Http3Upstream *>(user_data);
2035   auto downstream = static_cast<Downstream *>(stream_user_data);
2036 
2037   assert(downstream);
2038 
2039   if (upstream->http_acked_stream_data(downstream, datalen) != 0) {
2040     return NGHTTP3_ERR_CALLBACK_FAILURE;
2041   }
2042 
2043   return 0;
2044 }
2045 } // namespace
2046 
http_acked_stream_data(Downstream * downstream,uint64_t datalen)2047 int Http3Upstream::http_acked_stream_data(Downstream *downstream,
2048                                           uint64_t datalen) {
2049   if (LOG_ENABLED(INFO)) {
2050     ULOG(INFO, this) << "Stream " << downstream->get_stream_id() << " "
2051                      << datalen << " bytes acknowledged";
2052   }
2053 
2054   auto body = downstream->get_response_buf();
2055   auto drained = body->drain_mark(datalen);
2056   (void)drained;
2057 
2058   assert(datalen == drained);
2059 
2060   if (downstream->resume_read(SHRPX_NO_BUFFER, datalen) != 0) {
2061     return -1;
2062   }
2063 
2064   return 0;
2065 }
2066 
2067 namespace {
http_begin_request_headers(nghttp3_conn * conn,int64_t stream_id,void * user_data,void * stream_user_data)2068 int http_begin_request_headers(nghttp3_conn *conn, int64_t stream_id,
2069                                void *user_data, void *stream_user_data) {
2070   if (!ngtcp2_is_bidi_stream(stream_id)) {
2071     return 0;
2072   }
2073 
2074   auto upstream = static_cast<Http3Upstream *>(user_data);
2075   upstream->http_begin_request_headers(stream_id);
2076 
2077   return 0;
2078 }
2079 } // namespace
2080 
2081 namespace {
http_recv_request_header(nghttp3_conn * conn,int64_t stream_id,int32_t token,nghttp3_rcbuf * name,nghttp3_rcbuf * value,uint8_t flags,void * user_data,void * stream_user_data)2082 int http_recv_request_header(nghttp3_conn *conn, int64_t stream_id,
2083                              int32_t token, nghttp3_rcbuf *name,
2084                              nghttp3_rcbuf *value, uint8_t flags,
2085                              void *user_data, void *stream_user_data) {
2086   auto upstream = static_cast<Http3Upstream *>(user_data);
2087   auto downstream = static_cast<Downstream *>(stream_user_data);
2088 
2089   if (!downstream || downstream->get_stop_reading()) {
2090     return 0;
2091   }
2092 
2093   if (upstream->http_recv_request_header(downstream, token, name, value, flags,
2094                                          /* trailer = */ false) != 0) {
2095     return NGHTTP3_ERR_CALLBACK_FAILURE;
2096   }
2097 
2098   return 0;
2099 }
2100 } // namespace
2101 
2102 namespace {
http_recv_request_trailer(nghttp3_conn * conn,int64_t stream_id,int32_t token,nghttp3_rcbuf * name,nghttp3_rcbuf * value,uint8_t flags,void * user_data,void * stream_user_data)2103 int http_recv_request_trailer(nghttp3_conn *conn, int64_t stream_id,
2104                               int32_t token, nghttp3_rcbuf *name,
2105                               nghttp3_rcbuf *value, uint8_t flags,
2106                               void *user_data, void *stream_user_data) {
2107   auto upstream = static_cast<Http3Upstream *>(user_data);
2108   auto downstream = static_cast<Downstream *>(stream_user_data);
2109 
2110   if (!downstream || downstream->get_stop_reading()) {
2111     return 0;
2112   }
2113 
2114   if (upstream->http_recv_request_header(downstream, token, name, value, flags,
2115                                          /* trailer = */ true) != 0) {
2116     return NGHTTP3_ERR_CALLBACK_FAILURE;
2117   }
2118 
2119   return 0;
2120 }
2121 } // namespace
2122 
http_recv_request_header(Downstream * downstream,int32_t h3token,nghttp3_rcbuf * name,nghttp3_rcbuf * value,uint8_t flags,bool trailer)2123 int Http3Upstream::http_recv_request_header(Downstream *downstream,
2124                                             int32_t h3token,
2125                                             nghttp3_rcbuf *name,
2126                                             nghttp3_rcbuf *value, uint8_t flags,
2127                                             bool trailer) {
2128   auto namebuf = nghttp3_rcbuf_get_buf(name);
2129   auto valuebuf = nghttp3_rcbuf_get_buf(value);
2130   auto &req = downstream->request();
2131   auto config = get_config();
2132   auto &httpconf = config->http;
2133 
2134   if (req.fs.buffer_size() + namebuf.len + valuebuf.len >
2135           httpconf.request_header_field_buffer ||
2136       req.fs.num_fields() >= httpconf.max_request_header_fields) {
2137     downstream->set_stop_reading(true);
2138 
2139     if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
2140       return 0;
2141     }
2142 
2143     if (LOG_ENABLED(INFO)) {
2144       ULOG(INFO, this) << "Too large or many header field size="
2145                        << req.fs.buffer_size() + namebuf.len + valuebuf.len
2146                        << ", num=" << req.fs.num_fields() + 1;
2147     }
2148 
2149     // just ignore if this is a trailer part.
2150     if (trailer) {
2151       return 0;
2152     }
2153 
2154     if (error_reply(downstream, 431) != 0) {
2155       return -1;
2156     }
2157 
2158     return 0;
2159   }
2160 
2161   auto token = http2::lookup_token(namebuf.base, namebuf.len);
2162   auto no_index = flags & NGHTTP3_NV_FLAG_NEVER_INDEX;
2163 
2164   downstream->add_rcbuf(name);
2165   downstream->add_rcbuf(value);
2166 
2167   if (trailer) {
2168     req.fs.add_trailer_token(StringRef{namebuf.base, namebuf.len},
2169                              StringRef{valuebuf.base, valuebuf.len}, no_index,
2170                              token);
2171     return 0;
2172   }
2173 
2174   req.fs.add_header_token(StringRef{namebuf.base, namebuf.len},
2175                           StringRef{valuebuf.base, valuebuf.len}, no_index,
2176                           token);
2177   return 0;
2178 }
2179 
2180 namespace {
http_end_request_headers(nghttp3_conn * conn,int64_t stream_id,int fin,void * user_data,void * stream_user_data)2181 int http_end_request_headers(nghttp3_conn *conn, int64_t stream_id, int fin,
2182                              void *user_data, void *stream_user_data) {
2183   auto upstream = static_cast<Http3Upstream *>(user_data);
2184   auto handler = upstream->get_client_handler();
2185   auto downstream = static_cast<Downstream *>(stream_user_data);
2186 
2187   if (!downstream || downstream->get_stop_reading()) {
2188     return 0;
2189   }
2190 
2191   if (upstream->http_end_request_headers(downstream, fin) != 0) {
2192     return NGHTTP3_ERR_CALLBACK_FAILURE;
2193   }
2194 
2195   downstream->reset_upstream_rtimer();
2196   handler->stop_read_timer();
2197 
2198   return 0;
2199 }
2200 } // namespace
2201 
http_end_request_headers(Downstream * downstream,int fin)2202 int Http3Upstream::http_end_request_headers(Downstream *downstream, int fin) {
2203   auto lgconf = log_config();
2204   lgconf->update_tstamp(std::chrono::system_clock::now());
2205   auto &req = downstream->request();
2206   req.tstamp = lgconf->tstamp;
2207 
2208   if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
2209     return 0;
2210   }
2211 
2212   auto &nva = req.fs.headers();
2213 
2214   if (LOG_ENABLED(INFO)) {
2215     std::stringstream ss;
2216     for (auto &nv : nva) {
2217       if (nv.name == "authorization") {
2218         ss << TTY_HTTP_HD << nv.name << TTY_RST << ": <redacted>\n";
2219         continue;
2220       }
2221       ss << TTY_HTTP_HD << nv.name << TTY_RST << ": " << nv.value << "\n";
2222     }
2223     ULOG(INFO, this) << "HTTP request headers. stream_id="
2224                      << downstream->get_stream_id() << "\n"
2225                      << ss.str();
2226   }
2227 
2228   auto content_length = req.fs.header(http2::HD_CONTENT_LENGTH);
2229   if (content_length) {
2230     // libnghttp3 guarantees this can be parsed
2231     req.fs.content_length = util::parse_uint(content_length->value);
2232   }
2233 
2234   // presence of mandatory header fields are guaranteed by libnghttp3.
2235   auto authority = req.fs.header(http2::HD__AUTHORITY);
2236   auto path = req.fs.header(http2::HD__PATH);
2237   auto method = req.fs.header(http2::HD__METHOD);
2238   auto scheme = req.fs.header(http2::HD__SCHEME);
2239 
2240   auto method_token = http2::lookup_method_token(method->value);
2241   if (method_token == -1) {
2242     if (error_reply(downstream, 501) != 0) {
2243       return -1;
2244     }
2245     return 0;
2246   }
2247 
2248   auto faddr = handler_->get_upstream_addr();
2249 
2250   auto config = get_config();
2251 
2252   // For HTTP/2 proxy, we require :authority.
2253   if (method_token != HTTP_CONNECT && config->http2_proxy &&
2254       faddr->alt_mode == UpstreamAltMode::NONE && !authority) {
2255     shutdown_stream(downstream, NGHTTP3_H3_GENERAL_PROTOCOL_ERROR);
2256     return 0;
2257   }
2258 
2259   req.method = method_token;
2260   if (scheme) {
2261     req.scheme = scheme->value;
2262   }
2263 
2264   // nghttp2 library guarantees either :authority or host exist
2265   if (!authority) {
2266     req.no_authority = true;
2267     authority = req.fs.header(http2::HD_HOST);
2268   }
2269 
2270   if (authority) {
2271     req.authority = authority->value;
2272   }
2273 
2274   if (path) {
2275     if (method_token == HTTP_OPTIONS &&
2276         path->value == StringRef::from_lit("*")) {
2277       // Server-wide OPTIONS request.  Path is empty.
2278     } else if (config->http2_proxy &&
2279                faddr->alt_mode == UpstreamAltMode::NONE) {
2280       req.path = path->value;
2281     } else {
2282       req.path = http2::rewrite_clean_path(downstream->get_block_allocator(),
2283                                            path->value);
2284     }
2285   }
2286 
2287   auto connect_proto = req.fs.header(http2::HD__PROTOCOL);
2288   if (connect_proto) {
2289     if (connect_proto->value != "websocket") {
2290       if (error_reply(downstream, 400) != 0) {
2291         return -1;
2292       }
2293       return 0;
2294     }
2295     req.connect_proto = ConnectProto::WEBSOCKET;
2296   }
2297 
2298   if (!fin) {
2299     req.http2_expect_body = true;
2300   } else if (req.fs.content_length == -1) {
2301     req.fs.content_length = 0;
2302   }
2303 
2304   downstream->inspect_http2_request();
2305 
2306   downstream->set_request_state(DownstreamState::HEADER_COMPLETE);
2307 
2308   if (config->http.require_http_scheme &&
2309       !http::check_http_scheme(req.scheme, /* encrypted = */ true)) {
2310     if (error_reply(downstream, 400) != 0) {
2311       return -1;
2312     }
2313   }
2314 
2315 #ifdef HAVE_MRUBY
2316   auto worker = handler_->get_worker();
2317   auto mruby_ctx = worker->get_mruby_context();
2318 
2319   if (mruby_ctx->run_on_request_proc(downstream) != 0) {
2320     if (error_reply(downstream, 500) != 0) {
2321       return -1;
2322     }
2323     return 0;
2324   }
2325 #endif // HAVE_MRUBY
2326 
2327   if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
2328     return 0;
2329   }
2330 
2331   start_downstream(downstream);
2332 
2333   return 0;
2334 }
2335 
start_downstream(Downstream * downstream)2336 void Http3Upstream::start_downstream(Downstream *downstream) {
2337   if (downstream_queue_.can_activate(downstream->request().authority)) {
2338     initiate_downstream(downstream);
2339     return;
2340   }
2341 
2342   downstream_queue_.mark_blocked(downstream);
2343 }
2344 
initiate_downstream(Downstream * downstream)2345 void Http3Upstream::initiate_downstream(Downstream *downstream) {
2346   int rv;
2347 
2348 #ifdef HAVE_MRUBY
2349   DownstreamConnection *dconn_ptr;
2350 #endif // HAVE_MRUBY
2351 
2352   for (;;) {
2353     auto dconn = handler_->get_downstream_connection(rv, downstream);
2354     if (!dconn) {
2355       if (rv == SHRPX_ERR_TLS_REQUIRED) {
2356         assert(0);
2357         abort();
2358       }
2359 
2360       rv = error_reply(downstream, 502);
2361       if (rv != 0) {
2362         shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2363       }
2364 
2365       downstream->set_request_state(DownstreamState::CONNECT_FAIL);
2366       downstream_queue_.mark_failure(downstream);
2367 
2368       return;
2369     }
2370 
2371 #ifdef HAVE_MRUBY
2372     dconn_ptr = dconn.get();
2373 #endif // HAVE_MRUBY
2374     rv = downstream->attach_downstream_connection(std::move(dconn));
2375     if (rv == 0) {
2376       break;
2377     }
2378   }
2379 
2380 #ifdef HAVE_MRUBY
2381   const auto &group = dconn_ptr->get_downstream_addr_group();
2382   if (group) {
2383     const auto &mruby_ctx = group->shared_addr->mruby_ctx;
2384     if (mruby_ctx->run_on_request_proc(downstream) != 0) {
2385       if (error_reply(downstream, 500) != 0) {
2386         shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2387       }
2388 
2389       downstream_queue_.mark_failure(downstream);
2390 
2391       return;
2392     }
2393 
2394     if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
2395       return;
2396     }
2397   }
2398 #endif // HAVE_MRUBY
2399 
2400   rv = downstream->push_request_headers();
2401   if (rv != 0) {
2402 
2403     if (error_reply(downstream, 502) != 0) {
2404       shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2405     }
2406 
2407     downstream_queue_.mark_failure(downstream);
2408 
2409     return;
2410   }
2411 
2412   downstream_queue_.mark_active(downstream);
2413 
2414   auto &req = downstream->request();
2415   if (!req.http2_expect_body) {
2416     rv = downstream->end_upload_data();
2417     if (rv != 0) {
2418       shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2419     }
2420   }
2421 }
2422 
2423 namespace {
http_recv_data(nghttp3_conn * conn,int64_t stream_id,const uint8_t * data,size_t datalen,void * user_data,void * stream_user_data)2424 int http_recv_data(nghttp3_conn *conn, int64_t stream_id, const uint8_t *data,
2425                    size_t datalen, void *user_data, void *stream_user_data) {
2426   auto upstream = static_cast<Http3Upstream *>(user_data);
2427   auto downstream = static_cast<Downstream *>(stream_user_data);
2428 
2429   if (upstream->http_recv_data(downstream, data, datalen) != 0) {
2430     return NGHTTP3_ERR_CALLBACK_FAILURE;
2431   }
2432 
2433   return 0;
2434 }
2435 } // namespace
2436 
http_recv_data(Downstream * downstream,const uint8_t * data,size_t datalen)2437 int Http3Upstream::http_recv_data(Downstream *downstream, const uint8_t *data,
2438                                   size_t datalen) {
2439   downstream->reset_upstream_rtimer();
2440 
2441   if (downstream->push_upload_data_chunk(data, datalen) != 0) {
2442     if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE) {
2443       shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2444     }
2445 
2446     consume(downstream->get_stream_id(), datalen);
2447 
2448     return 0;
2449   }
2450 
2451   return 0;
2452 }
2453 
2454 namespace {
http_end_stream(nghttp3_conn * conn,int64_t stream_id,void * user_data,void * stream_user_data)2455 int http_end_stream(nghttp3_conn *conn, int64_t stream_id, void *user_data,
2456                     void *stream_user_data) {
2457   auto upstream = static_cast<Http3Upstream *>(user_data);
2458   auto downstream = static_cast<Downstream *>(stream_user_data);
2459 
2460   if (!downstream || downstream->get_stop_reading()) {
2461     return 0;
2462   }
2463 
2464   if (upstream->http_end_stream(downstream) != 0) {
2465     return NGHTTP3_ERR_CALLBACK_FAILURE;
2466   }
2467 
2468   return 0;
2469 }
2470 } // namespace
2471 
http_end_stream(Downstream * downstream)2472 int Http3Upstream::http_end_stream(Downstream *downstream) {
2473   downstream->disable_upstream_rtimer();
2474 
2475   if (downstream->end_upload_data() != 0) {
2476     if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE) {
2477       shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2478     }
2479   }
2480 
2481   downstream->set_request_state(DownstreamState::MSG_COMPLETE);
2482 
2483   return 0;
2484 }
2485 
2486 namespace {
http_stream_close(nghttp3_conn * conn,int64_t stream_id,uint64_t app_error_code,void * conn_user_data,void * stream_user_data)2487 int http_stream_close(nghttp3_conn *conn, int64_t stream_id,
2488                       uint64_t app_error_code, void *conn_user_data,
2489                       void *stream_user_data) {
2490   auto upstream = static_cast<Http3Upstream *>(conn_user_data);
2491   auto downstream = static_cast<Downstream *>(stream_user_data);
2492 
2493   if (!downstream) {
2494     return 0;
2495   }
2496 
2497   if (upstream->http_stream_close(downstream, app_error_code) != 0) {
2498     return NGHTTP3_ERR_CALLBACK_FAILURE;
2499   }
2500 
2501   return 0;
2502 }
2503 } // namespace
2504 
http_stream_close(Downstream * downstream,uint64_t app_error_code)2505 int Http3Upstream::http_stream_close(Downstream *downstream,
2506                                      uint64_t app_error_code) {
2507   auto stream_id = downstream->get_stream_id();
2508 
2509   if (LOG_ENABLED(INFO)) {
2510     ULOG(INFO, this) << "Stream stream_id=" << stream_id
2511                      << " is being closed with app_error_code="
2512                      << app_error_code;
2513 
2514     auto body = downstream->get_response_buf();
2515 
2516     ULOG(INFO, this) << "response unacked_left=" << body->rleft()
2517                      << " not_sent=" << body->rleft_mark();
2518   }
2519 
2520   auto &req = downstream->request();
2521 
2522   consume(stream_id, req.unconsumed_body_length);
2523 
2524   req.unconsumed_body_length = 0;
2525 
2526   ngtcp2_conn_extend_max_streams_bidi(conn_, 1);
2527 
2528   if (downstream->get_request_state() == DownstreamState::CONNECT_FAIL) {
2529     remove_downstream(downstream);
2530     // downstream was deleted
2531 
2532     return 0;
2533   }
2534 
2535   if (downstream->can_detach_downstream_connection()) {
2536     // Keep-alive
2537     downstream->detach_downstream_connection();
2538   }
2539 
2540   downstream->set_request_state(DownstreamState::STREAM_CLOSED);
2541 
2542   // At this point, downstream read may be paused.
2543 
2544   // If shrpx_downstream::push_request_headers() failed, the
2545   // error is handled here.
2546   remove_downstream(downstream);
2547   // downstream was deleted
2548 
2549   return 0;
2550 }
2551 
2552 namespace {
http_stop_sending(nghttp3_conn * conn,int64_t stream_id,uint64_t app_error_code,void * user_data,void * stream_user_data)2553 int http_stop_sending(nghttp3_conn *conn, int64_t stream_id,
2554                       uint64_t app_error_code, void *user_data,
2555                       void *stream_user_data) {
2556   auto upstream = static_cast<Http3Upstream *>(user_data);
2557 
2558   if (upstream->http_stop_sending(stream_id, app_error_code) != 0) {
2559     return NGHTTP3_ERR_CALLBACK_FAILURE;
2560   }
2561 
2562   return 0;
2563 }
2564 } // namespace
2565 
http_stop_sending(int64_t stream_id,uint64_t app_error_code)2566 int Http3Upstream::http_stop_sending(int64_t stream_id,
2567                                      uint64_t app_error_code) {
2568   auto rv =
2569       ngtcp2_conn_shutdown_stream_read(conn_, 0, stream_id, app_error_code);
2570   if (ngtcp2_err_is_fatal(rv)) {
2571     ULOG(ERROR, this) << "ngtcp2_conn_shutdown_stream_read: "
2572                       << ngtcp2_strerror(rv);
2573     return -1;
2574   }
2575 
2576   return 0;
2577 }
2578 
2579 namespace {
http_reset_stream(nghttp3_conn * conn,int64_t stream_id,uint64_t app_error_code,void * user_data,void * stream_user_data)2580 int http_reset_stream(nghttp3_conn *conn, int64_t stream_id,
2581                       uint64_t app_error_code, void *user_data,
2582                       void *stream_user_data) {
2583   auto upstream = static_cast<Http3Upstream *>(user_data);
2584 
2585   if (upstream->http_reset_stream(stream_id, app_error_code) != 0) {
2586     return NGHTTP3_ERR_CALLBACK_FAILURE;
2587   }
2588 
2589   return 0;
2590 }
2591 } // namespace
2592 
http_reset_stream(int64_t stream_id,uint64_t app_error_code)2593 int Http3Upstream::http_reset_stream(int64_t stream_id,
2594                                      uint64_t app_error_code) {
2595   auto rv =
2596       ngtcp2_conn_shutdown_stream_write(conn_, 0, stream_id, app_error_code);
2597   if (ngtcp2_err_is_fatal(rv)) {
2598     ULOG(ERROR, this) << "ngtcp2_conn_shutdown_stream_write: "
2599                       << ngtcp2_strerror(rv);
2600     return -1;
2601   }
2602 
2603   return 0;
2604 }
2605 
setup_httpconn()2606 int Http3Upstream::setup_httpconn() {
2607   int rv;
2608 
2609   if (ngtcp2_conn_get_streams_uni_left(conn_) < 3) {
2610     return -1;
2611   }
2612 
2613   nghttp3_callbacks callbacks{
2614       shrpx::http_acked_stream_data,
2615       shrpx::http_stream_close,
2616       shrpx::http_recv_data,
2617       http_deferred_consume,
2618       shrpx::http_begin_request_headers,
2619       shrpx::http_recv_request_header,
2620       shrpx::http_end_request_headers,
2621       nullptr, // begin_trailers
2622       shrpx::http_recv_request_trailer,
2623       nullptr, // end_trailers
2624       shrpx::http_stop_sending,
2625       shrpx::http_end_stream,
2626       shrpx::http_reset_stream,
2627   };
2628 
2629   auto config = get_config();
2630 
2631   nghttp3_settings settings;
2632   nghttp3_settings_default(&settings);
2633   settings.qpack_max_dtable_capacity = 4_k;
2634 
2635   if (!config->http2_proxy) {
2636     settings.enable_connect_protocol = 1;
2637   }
2638 
2639   auto mem = nghttp3_mem_default();
2640 
2641   rv = nghttp3_conn_server_new(&httpconn_, &callbacks, &settings, mem, this);
2642   if (rv != 0) {
2643     ULOG(ERROR, this) << "nghttp3_conn_server_new: " << nghttp3_strerror(rv);
2644     return -1;
2645   }
2646 
2647   auto params = ngtcp2_conn_get_local_transport_params(conn_);
2648 
2649   nghttp3_conn_set_max_client_streams_bidi(httpconn_,
2650                                            params->initial_max_streams_bidi);
2651 
2652   int64_t ctrl_stream_id;
2653 
2654   rv = ngtcp2_conn_open_uni_stream(conn_, &ctrl_stream_id, nullptr);
2655   if (rv != 0) {
2656     ULOG(ERROR, this) << "ngtcp2_conn_open_uni_stream: " << ngtcp2_strerror(rv);
2657     return -1;
2658   }
2659 
2660   rv = nghttp3_conn_bind_control_stream(httpconn_, ctrl_stream_id);
2661   if (rv != 0) {
2662     ULOG(ERROR, this) << "nghttp3_conn_bind_control_stream: "
2663                       << nghttp3_strerror(rv);
2664     return -1;
2665   }
2666 
2667   int64_t qpack_enc_stream_id, qpack_dec_stream_id;
2668 
2669   rv = ngtcp2_conn_open_uni_stream(conn_, &qpack_enc_stream_id, nullptr);
2670   if (rv != 0) {
2671     ULOG(ERROR, this) << "ngtcp2_conn_open_uni_stream: " << ngtcp2_strerror(rv);
2672     return -1;
2673   }
2674 
2675   rv = ngtcp2_conn_open_uni_stream(conn_, &qpack_dec_stream_id, nullptr);
2676   if (rv != 0) {
2677     ULOG(ERROR, this) << "ngtcp2_conn_open_uni_stream: " << ngtcp2_strerror(rv);
2678     return -1;
2679   }
2680 
2681   rv = nghttp3_conn_bind_qpack_streams(httpconn_, qpack_enc_stream_id,
2682                                        qpack_dec_stream_id);
2683   if (rv != 0) {
2684     ULOG(ERROR, this) << "nghttp3_conn_bind_qpack_streams: "
2685                       << nghttp3_strerror(rv);
2686     return -1;
2687   }
2688 
2689   return 0;
2690 }
2691 
error_reply(Downstream * downstream,unsigned int status_code)2692 int Http3Upstream::error_reply(Downstream *downstream,
2693                                unsigned int status_code) {
2694   int rv;
2695   auto &resp = downstream->response();
2696 
2697   auto &balloc = downstream->get_block_allocator();
2698 
2699   auto html = http::create_error_html(balloc, status_code);
2700   resp.http_status = status_code;
2701   auto body = downstream->get_response_buf();
2702   body->append(html);
2703   downstream->set_response_state(DownstreamState::MSG_COMPLETE);
2704 
2705   nghttp3_data_reader data_read;
2706   data_read.read_data = downstream_read_data_callback;
2707 
2708   auto lgconf = log_config();
2709   lgconf->update_tstamp(std::chrono::system_clock::now());
2710 
2711   auto response_status = http2::stringify_status(balloc, status_code);
2712   auto content_length = util::make_string_ref_uint(balloc, html.size());
2713   auto date = make_string_ref(balloc, lgconf->tstamp->time_http);
2714 
2715   auto nva = std::array<nghttp3_nv, 5>{
2716       {http3::make_nv_ls_nocopy(":status", response_status),
2717        http3::make_nv_ll("content-type", "text/html; charset=UTF-8"),
2718        http3::make_nv_ls_nocopy("server", get_config()->http.server_name),
2719        http3::make_nv_ls_nocopy("content-length", content_length),
2720        http3::make_nv_ls_nocopy("date", date)}};
2721 
2722   rv = nghttp3_conn_submit_response(httpconn_, downstream->get_stream_id(),
2723                                     nva.data(), nva.size(), &data_read);
2724   if (nghttp3_err_is_fatal(rv)) {
2725     ULOG(FATAL, this) << "nghttp3_conn_submit_response() failed: "
2726                       << nghttp3_strerror(rv);
2727     return -1;
2728   }
2729 
2730   downstream->reset_upstream_wtimer();
2731 
2732   if (shutdown_stream_read(downstream->get_stream_id(), NGHTTP3_H3_NO_ERROR) !=
2733       0) {
2734     return -1;
2735   }
2736 
2737   return 0;
2738 }
2739 
shutdown_stream(Downstream * downstream,uint64_t app_error_code)2740 int Http3Upstream::shutdown_stream(Downstream *downstream,
2741                                    uint64_t app_error_code) {
2742   auto stream_id = downstream->get_stream_id();
2743 
2744   if (LOG_ENABLED(INFO)) {
2745     ULOG(INFO, this) << "Shutdown stream_id=" << stream_id
2746                      << " with app_error_code=" << app_error_code;
2747   }
2748 
2749   auto rv = ngtcp2_conn_shutdown_stream(conn_, 0, stream_id, app_error_code);
2750   if (rv != 0) {
2751     ULOG(FATAL, this) << "ngtcp2_conn_shutdown_stream() failed: "
2752                       << ngtcp2_strerror(rv);
2753     return -1;
2754   }
2755 
2756   return 0;
2757 }
2758 
shutdown_stream_read(int64_t stream_id,uint64_t app_error_code)2759 int Http3Upstream::shutdown_stream_read(int64_t stream_id,
2760                                         uint64_t app_error_code) {
2761   auto rv = ngtcp2_conn_shutdown_stream_read(conn_, 0, stream_id,
2762                                              NGHTTP3_H3_NO_ERROR);
2763   if (ngtcp2_err_is_fatal(rv)) {
2764     ULOG(FATAL, this) << "ngtcp2_conn_shutdown_stream_read: "
2765                       << ngtcp2_strerror(rv);
2766     return -1;
2767   }
2768 
2769   return 0;
2770 }
2771 
consume(int64_t stream_id,size_t nconsumed)2772 void Http3Upstream::consume(int64_t stream_id, size_t nconsumed) {
2773   ngtcp2_conn_extend_max_stream_offset(conn_, stream_id, nconsumed);
2774   ngtcp2_conn_extend_max_offset(conn_, nconsumed);
2775 }
2776 
remove_downstream(Downstream * downstream)2777 void Http3Upstream::remove_downstream(Downstream *downstream) {
2778   if (downstream->accesslog_ready()) {
2779     handler_->write_accesslog(downstream);
2780   }
2781 
2782   nghttp3_conn_set_stream_user_data(httpconn_, downstream->get_stream_id(),
2783                                     nullptr);
2784 
2785   auto next_downstream = downstream_queue_.remove_and_get_blocked(downstream);
2786 
2787   if (next_downstream) {
2788     initiate_downstream(next_downstream);
2789   }
2790 
2791   if (downstream_queue_.get_downstreams() == nullptr) {
2792     // There is no downstream at the moment.  Start idle timer now.
2793     handler_->repeat_read_timer();
2794   }
2795 }
2796 
log_response_headers(Downstream * downstream,const std::vector<nghttp3_nv> & nva) const2797 void Http3Upstream::log_response_headers(
2798     Downstream *downstream, const std::vector<nghttp3_nv> &nva) const {
2799   std::stringstream ss;
2800   for (auto &nv : nva) {
2801     ss << TTY_HTTP_HD << StringRef{nv.name, nv.namelen} << TTY_RST << ": "
2802        << StringRef{nv.value, nv.valuelen} << "\n";
2803   }
2804   ULOG(INFO, this) << "HTTP response headers. stream_id="
2805                    << downstream->get_stream_id() << "\n"
2806                    << ss.str();
2807 }
2808 
check_shutdown()2809 int Http3Upstream::check_shutdown() {
2810   auto worker = handler_->get_worker();
2811 
2812   if (!worker->get_graceful_shutdown()) {
2813     return 0;
2814   }
2815 
2816   ev_prepare_stop(handler_->get_loop(), &prep_);
2817 
2818   return start_graceful_shutdown();
2819 }
2820 
start_graceful_shutdown()2821 int Http3Upstream::start_graceful_shutdown() {
2822   int rv;
2823 
2824   if (ev_is_active(&shutdown_timer_)) {
2825     return 0;
2826   }
2827 
2828   if (!httpconn_) {
2829     return -1;
2830   }
2831 
2832   rv = nghttp3_conn_submit_shutdown_notice(httpconn_);
2833   if (rv != 0) {
2834     ULOG(FATAL, this) << "nghttp3_conn_submit_shutdown_notice: "
2835                       << nghttp3_strerror(rv);
2836     return -1;
2837   }
2838 
2839   handler_->signal_write();
2840 
2841   auto t = ngtcp2_conn_get_pto(conn_);
2842 
2843   ev_timer_set(&shutdown_timer_, static_cast<ev_tstamp>(t * 3) / NGTCP2_SECONDS,
2844                0.);
2845   ev_timer_start(handler_->get_loop(), &shutdown_timer_);
2846 
2847   return 0;
2848 }
2849 
submit_goaway()2850 int Http3Upstream::submit_goaway() {
2851   int rv;
2852 
2853   rv = nghttp3_conn_shutdown(httpconn_);
2854   if (rv != 0) {
2855     ULOG(FATAL, this) << "nghttp3_conn_shutdown: " << nghttp3_strerror(rv);
2856     return -1;
2857   }
2858 
2859   handler_->signal_write();
2860 
2861   return 0;
2862 }
2863 
open_qlog_file(const StringRef & dir,const ngtcp2_cid & scid) const2864 int Http3Upstream::open_qlog_file(const StringRef &dir,
2865                                   const ngtcp2_cid &scid) const {
2866   std::array<char, sizeof("20141115T125824.741+0900")> buf;
2867 
2868   auto path = dir.str();
2869   path += '/';
2870   path +=
2871       util::format_iso8601_basic(buf.data(), std::chrono::system_clock::now());
2872   path += '-';
2873   path += util::format_hex(scid.data, scid.datalen);
2874   path += ".sqlog";
2875 
2876   int fd;
2877 
2878 #ifdef O_CLOEXEC
2879   while ((fd = open(path.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_CLOEXEC,
2880                     S_IRUSR | S_IWUSR | S_IRGRP)) == -1 &&
2881          errno == EINTR)
2882     ;
2883 #else  // !O_CLOEXEC
2884   while ((fd = open(path.c_str(), O_WRONLY | O_CREAT | O_TRUNC,
2885                     S_IRUSR | S_IWUSR | S_IRGRP)) == -1 &&
2886          errno == EINTR)
2887     ;
2888 
2889   if (fd != -1) {
2890     util::make_socket_closeonexec(fd);
2891   }
2892 #endif // !O_CLOEXEC
2893 
2894   if (fd == -1) {
2895     auto error = errno;
2896     ULOG(ERROR, this) << "Failed to open qlog file " << path
2897                       << ": errno=" << error;
2898     return -1;
2899   }
2900 
2901   return fd;
2902 }
2903 
get_conn() const2904 ngtcp2_conn *Http3Upstream::get_conn() const { return conn_; }
2905 
2906 } // namespace shrpx
2907