• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * nghttp2 - HTTP/2 C Library
3  *
4  * Copyright (c) 2021 Tatsuhiro Tsujikawa
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining
7  * a copy of this software and associated documentation files (the
8  * "Software"), to deal in the Software without restriction, including
9  * without limitation the rights to use, copy, modify, merge, publish,
10  * distribute, sublicense, and/or sell copies of the Software, and to
11  * permit persons to whom the Software is furnished to do so, subject to
12  * the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be
15  * included in all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21  * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22  * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23  * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24  */
25 #include "shrpx_http3_upstream.h"
26 
27 #include <sys/types.h>
28 #include <sys/stat.h>
29 #include <fcntl.h>
30 #include <netinet/udp.h>
31 
32 #include <cstdio>
33 
34 #include <ngtcp2/ngtcp2_crypto.h>
35 
36 #include "shrpx_client_handler.h"
37 #include "shrpx_downstream.h"
38 #include "shrpx_downstream_connection.h"
39 #include "shrpx_log.h"
40 #include "shrpx_quic.h"
41 #include "shrpx_worker.h"
42 #include "shrpx_http.h"
43 #include "shrpx_connection_handler.h"
44 #ifdef HAVE_MRUBY
45 #  include "shrpx_mruby.h"
46 #endif // HAVE_MRUBY
47 #include "http3.h"
48 #include "util.h"
49 #include "ssl_compat.h"
50 
51 namespace shrpx {
52 
53 namespace {
timeoutcb(struct ev_loop * loop,ev_timer * w,int revents)54 void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
55   auto upstream = static_cast<Http3Upstream *>(w->data);
56 
57   if (upstream->handle_expiry() != 0 || upstream->on_write() != 0) {
58     goto fail;
59   }
60 
61   return;
62 
63 fail:
64   auto handler = upstream->get_client_handler();
65 
66   delete handler;
67 }
68 } // namespace
69 
70 namespace {
shutdown_timeout_cb(struct ev_loop * loop,ev_timer * w,int revents)71 void shutdown_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
72   auto upstream = static_cast<Http3Upstream *>(w->data);
73   auto handler = upstream->get_client_handler();
74 
75   if (upstream->submit_goaway() != 0) {
76     delete handler;
77   }
78 }
79 } // namespace
80 
81 namespace {
prepare_cb(struct ev_loop * loop,ev_prepare * w,int revent)82 void prepare_cb(struct ev_loop *loop, ev_prepare *w, int revent) {
83   auto upstream = static_cast<Http3Upstream *>(w->data);
84   auto handler = upstream->get_client_handler();
85 
86   if (upstream->check_shutdown() != 0) {
87     delete handler;
88   }
89 }
90 } // namespace
91 
92 namespace {
downstream_queue_size(Worker * worker)93 size_t downstream_queue_size(Worker *worker) {
94   auto &downstreamconf = *worker->get_downstream_config();
95 
96   if (get_config()->http2_proxy) {
97     return downstreamconf.connections_per_host;
98   }
99 
100   return downstreamconf.connections_per_frontend;
101 }
102 } // namespace
103 
104 namespace {
get_conn(ngtcp2_crypto_conn_ref * conn_ref)105 ngtcp2_conn *get_conn(ngtcp2_crypto_conn_ref *conn_ref) {
106   auto conn = static_cast<Connection *>(conn_ref->user_data);
107   auto handler = static_cast<ClientHandler *>(conn->data);
108   auto upstream = static_cast<Http3Upstream *>(handler->get_upstream());
109   return upstream->get_conn();
110 }
111 } // namespace
112 
Http3Upstream(ClientHandler * handler)113 Http3Upstream::Http3Upstream(ClientHandler *handler)
114     : handler_{handler},
115       qlog_fd_{-1},
116       hashed_scid_{},
117       conn_{nullptr},
118       httpconn_{nullptr},
119       downstream_queue_{downstream_queue_size(handler->get_worker()),
120                         !get_config()->http2_proxy},
121       tx_{
122           .data = std::unique_ptr<uint8_t[]>(new uint8_t[64_k]),
123 #ifndef UDP_SEGMENT
124           .no_gso = true,
125 #endif // UDP_SEGMENT
126       } {
127   auto conn = handler_->get_connection();
128   conn->conn_ref.get_conn = shrpx::get_conn;
129 
130   ev_timer_init(&timer_, timeoutcb, 0., 0.);
131   timer_.data = this;
132 
133   ngtcp2_ccerr_default(&last_error_);
134 
135   ev_timer_init(&shutdown_timer_, shutdown_timeout_cb, 0., 0.);
136   shutdown_timer_.data = this;
137 
138   ev_prepare_init(&prep_, prepare_cb);
139   prep_.data = this;
140   ev_prepare_start(handler_->get_loop(), &prep_);
141 }
142 
~Http3Upstream()143 Http3Upstream::~Http3Upstream() {
144   auto loop = handler_->get_loop();
145 
146   ev_prepare_stop(loop, &prep_);
147   ev_timer_stop(loop, &shutdown_timer_);
148   ev_timer_stop(loop, &timer_);
149 
150   nghttp3_conn_del(httpconn_);
151 
152   ngtcp2_conn_del(conn_);
153 
154   if (qlog_fd_ != -1) {
155     close(qlog_fd_);
156   }
157 }
158 
159 namespace {
log_printf(void * user_data,const char * fmt,...)160 void log_printf(void *user_data, const char *fmt, ...) {
161   va_list ap;
162   std::array<char, 4096> buf;
163 
164   va_start(ap, fmt);
165   auto nwrite = vsnprintf(buf.data(), buf.size(), fmt, ap);
166   va_end(ap);
167 
168   if (static_cast<size_t>(nwrite) >= buf.size()) {
169     nwrite = buf.size() - 1;
170   }
171 
172   buf[nwrite++] = '\n';
173 
174   while (write(fileno(stderr), buf.data(), nwrite) == -1 && errno == EINTR)
175     ;
176 }
177 } // namespace
178 
179 namespace {
qlog_write(void * user_data,uint32_t flags,const void * data,size_t datalen)180 void qlog_write(void *user_data, uint32_t flags, const void *data,
181                 size_t datalen) {
182   auto upstream = static_cast<Http3Upstream *>(user_data);
183 
184   upstream->qlog_write(data, datalen, flags & NGTCP2_QLOG_WRITE_FLAG_FIN);
185 }
186 } // namespace
187 
qlog_write(const void * data,size_t datalen,bool fin)188 void Http3Upstream::qlog_write(const void *data, size_t datalen, bool fin) {
189   assert(qlog_fd_ != -1);
190 
191   while (write(qlog_fd_, data, datalen) == -1 && errno == EINTR)
192     ;
193 
194   if (fin) {
195     close(qlog_fd_);
196     qlog_fd_ = -1;
197   }
198 }
199 
200 namespace {
rand(uint8_t * dest,size_t destlen,const ngtcp2_rand_ctx * rand_ctx)201 void rand(uint8_t *dest, size_t destlen, const ngtcp2_rand_ctx *rand_ctx) {
202   util::random_bytes(dest, dest + destlen,
203                      *static_cast<std::mt19937 *>(rand_ctx->native_handle));
204 }
205 } // namespace
206 
207 namespace {
get_new_connection_id(ngtcp2_conn * conn,ngtcp2_cid * cid,uint8_t * token,size_t cidlen,void * user_data)208 int get_new_connection_id(ngtcp2_conn *conn, ngtcp2_cid *cid, uint8_t *token,
209                           size_t cidlen, void *user_data) {
210   auto upstream = static_cast<Http3Upstream *>(user_data);
211   auto handler = upstream->get_client_handler();
212   auto worker = handler->get_worker();
213   auto conn_handler = worker->get_connection_handler();
214   auto &qkms = conn_handler->get_quic_keying_materials();
215   auto &qkm = qkms->keying_materials.front();
216 
217   assert(SHRPX_QUIC_SCIDLEN == cidlen);
218 
219   if (generate_quic_connection_id(*cid, worker->get_worker_id(), qkm.id,
220                                   qkm.cid_encryption_ctx) != 0) {
221     return NGTCP2_ERR_CALLBACK_FAILURE;
222   }
223 
224   if (generate_quic_stateless_reset_token(token, *cid, qkm.secret.data(),
225                                           qkm.secret.size()) != 0) {
226     return NGTCP2_ERR_CALLBACK_FAILURE;
227   }
228 
229   auto quic_connection_handler = worker->get_quic_connection_handler();
230 
231   quic_connection_handler->add_connection_id(*cid, handler);
232 
233   return 0;
234 }
235 } // namespace
236 
237 namespace {
remove_connection_id(ngtcp2_conn * conn,const ngtcp2_cid * cid,void * user_data)238 int remove_connection_id(ngtcp2_conn *conn, const ngtcp2_cid *cid,
239                          void *user_data) {
240   auto upstream = static_cast<Http3Upstream *>(user_data);
241   auto handler = upstream->get_client_handler();
242   auto worker = handler->get_worker();
243   auto quic_conn_handler = worker->get_quic_connection_handler();
244 
245   quic_conn_handler->remove_connection_id(*cid);
246 
247   return 0;
248 }
249 } // namespace
250 
http_begin_request_headers(int64_t stream_id)251 void Http3Upstream::http_begin_request_headers(int64_t stream_id) {
252   auto downstream =
253       std::make_unique<Downstream>(this, handler_->get_mcpool(), stream_id);
254   nghttp3_conn_set_stream_user_data(httpconn_, stream_id, downstream.get());
255 
256   downstream->reset_upstream_rtimer();
257   downstream->repeat_header_timer();
258 
259   handler_->stop_read_timer();
260 
261   auto &req = downstream->request();
262   req.http_major = 3;
263   req.http_minor = 0;
264 
265   add_pending_downstream(std::move(downstream));
266 }
267 
add_pending_downstream(std::unique_ptr<Downstream> downstream)268 void Http3Upstream::add_pending_downstream(
269     std::unique_ptr<Downstream> downstream) {
270   downstream_queue_.add_pending(std::move(downstream));
271 }
272 
273 namespace {
recv_stream_data(ngtcp2_conn * conn,uint32_t flags,int64_t stream_id,uint64_t offset,const uint8_t * data,size_t datalen,void * user_data,void * stream_user_data)274 int recv_stream_data(ngtcp2_conn *conn, uint32_t flags, int64_t stream_id,
275                      uint64_t offset, const uint8_t *data, size_t datalen,
276                      void *user_data, void *stream_user_data) {
277   auto upstream = static_cast<Http3Upstream *>(user_data);
278 
279   if (upstream->recv_stream_data(flags, stream_id, {data, datalen}) != 0) {
280     return NGTCP2_ERR_CALLBACK_FAILURE;
281   }
282 
283   return 0;
284 }
285 } // namespace
286 
recv_stream_data(uint32_t flags,int64_t stream_id,std::span<const uint8_t> data)287 int Http3Upstream::recv_stream_data(uint32_t flags, int64_t stream_id,
288                                     std::span<const uint8_t> data) {
289   assert(httpconn_);
290 
291   auto nconsumed =
292       nghttp3_conn_read_stream(httpconn_, stream_id, data.data(), data.size(),
293                                flags & NGTCP2_STREAM_DATA_FLAG_FIN);
294   if (nconsumed < 0) {
295     ULOG(ERROR, this) << "nghttp3_conn_read_stream: "
296                       << nghttp3_strerror(nconsumed);
297     ngtcp2_ccerr_set_application_error(
298         &last_error_, nghttp3_err_infer_quic_app_error_code(nconsumed), nullptr,
299         0);
300     return -1;
301   }
302 
303   ngtcp2_conn_extend_max_stream_offset(conn_, stream_id, nconsumed);
304   ngtcp2_conn_extend_max_offset(conn_, nconsumed);
305 
306   return 0;
307 }
308 
309 namespace {
stream_close(ngtcp2_conn * conn,uint32_t flags,int64_t stream_id,uint64_t app_error_code,void * user_data,void * stream_user_data)310 int stream_close(ngtcp2_conn *conn, uint32_t flags, int64_t stream_id,
311                  uint64_t app_error_code, void *user_data,
312                  void *stream_user_data) {
313   auto upstream = static_cast<Http3Upstream *>(user_data);
314 
315   if (!(flags & NGTCP2_STREAM_CLOSE_FLAG_APP_ERROR_CODE_SET)) {
316     app_error_code = NGHTTP3_H3_NO_ERROR;
317   }
318 
319   if (upstream->stream_close(stream_id, app_error_code) != 0) {
320     return NGTCP2_ERR_CALLBACK_FAILURE;
321   }
322 
323   return 0;
324 }
325 } // namespace
326 
stream_close(int64_t stream_id,uint64_t app_error_code)327 int Http3Upstream::stream_close(int64_t stream_id, uint64_t app_error_code) {
328   if (!httpconn_) {
329     return 0;
330   }
331 
332   auto rv = nghttp3_conn_close_stream(httpconn_, stream_id, app_error_code);
333   switch (rv) {
334   case 0:
335     break;
336   case NGHTTP3_ERR_STREAM_NOT_FOUND:
337     if (ngtcp2_is_bidi_stream(stream_id)) {
338       ngtcp2_conn_extend_max_streams_bidi(conn_, 1);
339     }
340     break;
341   default:
342     ULOG(ERROR, this) << "nghttp3_conn_close_stream: " << nghttp3_strerror(rv);
343     ngtcp2_ccerr_set_application_error(
344         &last_error_, nghttp3_err_infer_quic_app_error_code(rv), nullptr, 0);
345     return -1;
346   }
347 
348   return 0;
349 }
350 
351 namespace {
acked_stream_data_offset(ngtcp2_conn * conn,int64_t stream_id,uint64_t offset,uint64_t datalen,void * user_data,void * stream_user_data)352 int acked_stream_data_offset(ngtcp2_conn *conn, int64_t stream_id,
353                              uint64_t offset, uint64_t datalen, void *user_data,
354                              void *stream_user_data) {
355   auto upstream = static_cast<Http3Upstream *>(user_data);
356 
357   if (upstream->acked_stream_data_offset(stream_id, datalen) != 0) {
358     return NGTCP2_ERR_CALLBACK_FAILURE;
359   }
360 
361   return 0;
362 }
363 } // namespace
364 
acked_stream_data_offset(int64_t stream_id,uint64_t datalen)365 int Http3Upstream::acked_stream_data_offset(int64_t stream_id,
366                                             uint64_t datalen) {
367   if (!httpconn_) {
368     return 0;
369   }
370 
371   auto rv = nghttp3_conn_add_ack_offset(httpconn_, stream_id, datalen);
372   if (rv != 0) {
373     ULOG(ERROR, this) << "nghttp3_conn_add_ack_offset: "
374                       << nghttp3_strerror(rv);
375     return -1;
376   }
377 
378   return 0;
379 }
380 
381 namespace {
extend_max_stream_data(ngtcp2_conn * conn,int64_t stream_id,uint64_t max_data,void * user_data,void * stream_user_data)382 int extend_max_stream_data(ngtcp2_conn *conn, int64_t stream_id,
383                            uint64_t max_data, void *user_data,
384                            void *stream_user_data) {
385   auto upstream = static_cast<Http3Upstream *>(user_data);
386 
387   if (upstream->extend_max_stream_data(stream_id) != 0) {
388     return NGTCP2_ERR_CALLBACK_FAILURE;
389   }
390 
391   return 0;
392 }
393 } // namespace
394 
extend_max_stream_data(int64_t stream_id)395 int Http3Upstream::extend_max_stream_data(int64_t stream_id) {
396   if (!httpconn_) {
397     return 0;
398   }
399 
400   auto rv = nghttp3_conn_unblock_stream(httpconn_, stream_id);
401   if (rv != 0) {
402     ULOG(ERROR, this) << "nghttp3_conn_unblock_stream: "
403                       << nghttp3_strerror(rv);
404     return -1;
405   }
406 
407   return 0;
408 }
409 
410 namespace {
extend_max_remote_streams_bidi(ngtcp2_conn * conn,uint64_t max_streams,void * user_data)411 int extend_max_remote_streams_bidi(ngtcp2_conn *conn, uint64_t max_streams,
412                                    void *user_data) {
413   auto upstream = static_cast<Http3Upstream *>(user_data);
414 
415   upstream->extend_max_remote_streams_bidi(max_streams);
416 
417   return 0;
418 }
419 } // namespace
420 
extend_max_remote_streams_bidi(uint64_t max_streams)421 void Http3Upstream::extend_max_remote_streams_bidi(uint64_t max_streams) {
422   nghttp3_conn_set_max_client_streams_bidi(httpconn_, max_streams);
423 }
424 
425 namespace {
stream_reset(ngtcp2_conn * conn,int64_t stream_id,uint64_t final_size,uint64_t app_error_code,void * user_data,void * stream_user_data)426 int stream_reset(ngtcp2_conn *conn, int64_t stream_id, uint64_t final_size,
427                  uint64_t app_error_code, void *user_data,
428                  void *stream_user_data) {
429   auto upstream = static_cast<Http3Upstream *>(user_data);
430 
431   if (upstream->http_shutdown_stream_read(stream_id) != 0) {
432     return NGTCP2_ERR_CALLBACK_FAILURE;
433   }
434 
435   return 0;
436 }
437 } // namespace
438 
http_shutdown_stream_read(int64_t stream_id)439 int Http3Upstream::http_shutdown_stream_read(int64_t stream_id) {
440   if (!httpconn_) {
441     return 0;
442   }
443 
444   auto rv = nghttp3_conn_shutdown_stream_read(httpconn_, stream_id);
445   if (rv != 0) {
446     ULOG(ERROR, this) << "nghttp3_conn_shutdown_stream_read: "
447                       << nghttp3_strerror(rv);
448     return -1;
449   }
450 
451   return 0;
452 }
453 
454 namespace {
stream_stop_sending(ngtcp2_conn * conn,int64_t stream_id,uint64_t app_error_code,void * user_data,void * stream_user_data)455 int stream_stop_sending(ngtcp2_conn *conn, int64_t stream_id,
456                         uint64_t app_error_code, void *user_data,
457                         void *stream_user_data) {
458   auto upstream = static_cast<Http3Upstream *>(user_data);
459 
460   if (upstream->http_shutdown_stream_read(stream_id) != 0) {
461     return NGTCP2_ERR_CALLBACK_FAILURE;
462   }
463 
464   return 0;
465 }
466 } // namespace
467 
468 namespace {
handshake_completed(ngtcp2_conn * conn,void * user_data)469 int handshake_completed(ngtcp2_conn *conn, void *user_data) {
470   auto upstream = static_cast<Http3Upstream *>(user_data);
471 
472   if (upstream->handshake_completed() != 0) {
473     return NGTCP2_ERR_CALLBACK_FAILURE;
474   }
475 
476   return 0;
477 }
478 } // namespace
479 
handshake_completed()480 int Http3Upstream::handshake_completed() {
481   handler_->set_alpn_from_conn();
482 
483   auto alpn = handler_->get_alpn();
484   if (alpn.empty()) {
485     ULOG(ERROR, this) << "NO ALPN was negotiated";
486     return -1;
487   }
488 
489   auto path = ngtcp2_conn_get_path(conn_);
490 
491   return send_new_token(&path->remote);
492 }
493 
494 namespace {
path_validation(ngtcp2_conn * conn,uint32_t flags,const ngtcp2_path * path,const ngtcp2_path * old_path,ngtcp2_path_validation_result res,void * user_data)495 int path_validation(ngtcp2_conn *conn, uint32_t flags, const ngtcp2_path *path,
496                     const ngtcp2_path *old_path,
497                     ngtcp2_path_validation_result res, void *user_data) {
498   if (res != NGTCP2_PATH_VALIDATION_RESULT_SUCCESS ||
499       !(flags & NGTCP2_PATH_VALIDATION_FLAG_NEW_TOKEN)) {
500     return 0;
501   }
502 
503   auto upstream = static_cast<Http3Upstream *>(user_data);
504   if (upstream->send_new_token(&path->remote) != 0) {
505     return NGTCP2_ERR_CALLBACK_FAILURE;
506   }
507 
508   return 0;
509 }
510 } // namespace
511 
send_new_token(const ngtcp2_addr * remote_addr)512 int Http3Upstream::send_new_token(const ngtcp2_addr *remote_addr) {
513   auto worker = handler_->get_worker();
514   auto conn_handler = worker->get_connection_handler();
515   auto &qkms = conn_handler->get_quic_keying_materials();
516   auto &qkm = qkms->keying_materials.front();
517 
518   std::array<uint8_t, NGTCP2_CRYPTO_MAX_REGULAR_TOKENLEN + 1> tokenbuf;
519 
520   auto token = generate_token(tokenbuf, remote_addr->addr, remote_addr->addrlen,
521                               qkm.secret, qkm.id);
522   if (!token) {
523     return -1;
524   }
525 
526   assert(token->size() == NGTCP2_CRYPTO_MAX_REGULAR_TOKENLEN + 1);
527 
528   auto rv = ngtcp2_conn_submit_new_token(conn_, token->data(), token->size());
529   if (rv != 0) {
530     ULOG(ERROR, this) << "ngtcp2_conn_submit_new_token: "
531                       << ngtcp2_strerror(rv);
532     return -1;
533   }
534 
535   return 0;
536 }
537 
538 namespace {
recv_tx_key(ngtcp2_conn * conn,ngtcp2_encryption_level level,void * user_data)539 int recv_tx_key(ngtcp2_conn *conn, ngtcp2_encryption_level level,
540                 void *user_data) {
541   if (level != NGTCP2_ENCRYPTION_LEVEL_1RTT) {
542     return 0;
543   }
544 
545   auto upstream = static_cast<Http3Upstream *>(user_data);
546   if (upstream->setup_httpconn() != 0) {
547     return NGTCP2_ERR_CALLBACK_FAILURE;
548   }
549 
550   return 0;
551 }
552 } // namespace
553 
init(const UpstreamAddr * faddr,const Address & remote_addr,const Address & local_addr,const ngtcp2_pkt_hd & initial_hd,const ngtcp2_cid * odcid,std::span<const uint8_t> token,ngtcp2_token_type token_type)554 int Http3Upstream::init(const UpstreamAddr *faddr, const Address &remote_addr,
555                         const Address &local_addr,
556                         const ngtcp2_pkt_hd &initial_hd,
557                         const ngtcp2_cid *odcid, std::span<const uint8_t> token,
558                         ngtcp2_token_type token_type) {
559   int rv;
560 
561   auto worker = handler_->get_worker();
562   auto conn_handler = worker->get_connection_handler();
563 
564   auto callbacks = ngtcp2_callbacks{
565       nullptr, // client_initial
566       ngtcp2_crypto_recv_client_initial_cb,
567       ngtcp2_crypto_recv_crypto_data_cb,
568       shrpx::handshake_completed,
569       nullptr, // recv_version_negotiation
570       ngtcp2_crypto_encrypt_cb,
571       ngtcp2_crypto_decrypt_cb,
572       ngtcp2_crypto_hp_mask_cb,
573       shrpx::recv_stream_data,
574       shrpx::acked_stream_data_offset,
575       nullptr, // stream_open
576       shrpx::stream_close,
577       nullptr, // recv_stateless_reset
578       nullptr, // recv_retry
579       nullptr, // extend_max_local_streams_bidi
580       nullptr, // extend_max_local_streams_uni
581       rand,
582       get_new_connection_id,
583       remove_connection_id,
584       ngtcp2_crypto_update_key_cb,
585       shrpx::path_validation,
586       nullptr, // select_preferred_addr
587       shrpx::stream_reset,
588       shrpx::extend_max_remote_streams_bidi,
589       nullptr, // extend_max_remote_streams_uni
590       shrpx::extend_max_stream_data,
591       nullptr, // dcid_status
592       nullptr, // handshake_confirmed
593       nullptr, // recv_new_token
594       ngtcp2_crypto_delete_crypto_aead_ctx_cb,
595       ngtcp2_crypto_delete_crypto_cipher_ctx_cb,
596       nullptr, // recv_datagram
597       nullptr, // ack_datagram
598       nullptr, // lost_datagram
599       ngtcp2_crypto_get_path_challenge_data_cb,
600       shrpx::stream_stop_sending,
601       nullptr, // version_negotiation
602       nullptr, // recv_rx_key
603       shrpx::recv_tx_key,
604   };
605 
606   auto config = get_config();
607   auto &quicconf = config->quic;
608   auto &http3conf = config->http3;
609 
610   auto &qkms = conn_handler->get_quic_keying_materials();
611   auto &qkm = qkms->keying_materials.front();
612 
613   ngtcp2_cid scid;
614 
615   if (generate_quic_connection_id(scid, worker->get_worker_id(), qkm.id,
616                                   qkm.cid_encryption_ctx) != 0) {
617     return -1;
618   }
619 
620   ngtcp2_settings settings;
621   ngtcp2_settings_default(&settings);
622   if (quicconf.upstream.debug.log) {
623     settings.log_printf = log_printf;
624   }
625 
626   if (!quicconf.upstream.qlog.dir.empty()) {
627     auto fd = open_qlog_file(quicconf.upstream.qlog.dir, scid);
628     if (fd != -1) {
629       qlog_fd_ = fd;
630       settings.qlog_write = shrpx::qlog_write;
631     }
632   }
633 
634   settings.initial_ts = quic_timestamp();
635   settings.initial_rtt = static_cast<ngtcp2_tstamp>(
636       quicconf.upstream.initial_rtt * NGTCP2_SECONDS);
637   settings.cc_algo = quicconf.upstream.congestion_controller;
638   settings.max_window = http3conf.upstream.max_connection_window_size;
639   settings.max_stream_window = http3conf.upstream.max_window_size;
640   settings.rand_ctx.native_handle = &worker->get_randgen();
641   settings.token = token.data();
642   settings.tokenlen = token.size();
643   settings.token_type = token_type;
644   settings.initial_pkt_num = std::uniform_int_distribution<uint32_t>(
645       0, std::numeric_limits<int32_t>::max())(worker->get_randgen());
646 
647   ngtcp2_transport_params params;
648   ngtcp2_transport_params_default(&params);
649   params.initial_max_streams_bidi = http3conf.upstream.max_concurrent_streams;
650   // The minimum number of unidirectional streams required for HTTP/3.
651   params.initial_max_streams_uni = 3;
652   params.initial_max_data = http3conf.upstream.connection_window_size;
653   params.initial_max_stream_data_bidi_remote = http3conf.upstream.window_size;
654   params.initial_max_stream_data_uni = http3conf.upstream.window_size;
655   params.max_idle_timeout = static_cast<ngtcp2_tstamp>(
656       quicconf.upstream.timeout.idle * NGTCP2_SECONDS);
657 
658 #ifdef NGHTTP2_OPENSSL_IS_BORINGSSL
659   if (quicconf.upstream.early_data) {
660     ngtcp2_transport_params early_data_params;
661 
662     ngtcp2_transport_params_default(&early_data_params);
663 
664     early_data_params.initial_max_stream_data_bidi_local =
665         params.initial_max_stream_data_bidi_local;
666     early_data_params.initial_max_stream_data_bidi_remote =
667         params.initial_max_stream_data_bidi_remote;
668     early_data_params.initial_max_stream_data_uni =
669         params.initial_max_stream_data_uni;
670     early_data_params.initial_max_data = params.initial_max_data;
671     early_data_params.initial_max_streams_bidi =
672         params.initial_max_streams_bidi;
673     early_data_params.initial_max_streams_uni = params.initial_max_streams_uni;
674 
675     // TODO include HTTP/3 SETTINGS
676 
677     std::array<uint8_t, 128> quic_early_data_ctx;
678 
679     auto quic_early_data_ctxlen = ngtcp2_transport_params_encode(
680         quic_early_data_ctx.data(), quic_early_data_ctx.size(),
681         &early_data_params);
682 
683     assert(quic_early_data_ctxlen > 0);
684     assert(static_cast<size_t>(quic_early_data_ctxlen) <=
685            quic_early_data_ctx.size());
686 
687     if (SSL_set_quic_early_data_context(handler_->get_ssl(),
688                                         quic_early_data_ctx.data(),
689                                         quic_early_data_ctxlen) != 1) {
690       ULOG(ERROR, this) << "SSL_set_quic_early_data_context failed";
691       return -1;
692     }
693   }
694 #endif // NGHTTP2_OPENSSL_IS_BORINGSSL
695 
696   if (odcid) {
697     params.original_dcid = *odcid;
698     params.retry_scid = initial_hd.dcid;
699     params.retry_scid_present = 1;
700   } else {
701     params.original_dcid = initial_hd.dcid;
702   }
703 
704   params.original_dcid_present = 1;
705 
706   rv = generate_quic_stateless_reset_token(
707       params.stateless_reset_token, scid, qkm.secret.data(), qkm.secret.size());
708   if (rv != 0) {
709     ULOG(ERROR, this) << "generate_quic_stateless_reset_token failed";
710     return -1;
711   }
712   params.stateless_reset_token_present = 1;
713 
714   auto path = ngtcp2_path{
715       {
716           const_cast<sockaddr *>(&local_addr.su.sa),
717           static_cast<socklen_t>(local_addr.len),
718       },
719       {
720           const_cast<sockaddr *>(&remote_addr.su.sa),
721           static_cast<socklen_t>(remote_addr.len),
722       },
723       const_cast<UpstreamAddr *>(faddr),
724   };
725 
726   rv = ngtcp2_conn_server_new(&conn_, &initial_hd.scid, &scid, &path,
727                               initial_hd.version, &callbacks, &settings,
728                               &params, nullptr, this);
729   if (rv != 0) {
730     ULOG(ERROR, this) << "ngtcp2_conn_server_new: " << ngtcp2_strerror(rv);
731     return -1;
732   }
733 
734   ngtcp2_conn_set_tls_native_handle(conn_, handler_->get_ssl());
735 
736   auto quic_connection_handler = worker->get_quic_connection_handler();
737 
738   if (generate_quic_hashed_connection_id(hashed_scid_, remote_addr, local_addr,
739                                          initial_hd.dcid) != 0) {
740     return -1;
741   }
742 
743   quic_connection_handler->add_connection_id(hashed_scid_, handler_);
744   quic_connection_handler->add_connection_id(scid, handler_);
745 
746   return 0;
747 }
748 
on_read()749 int Http3Upstream::on_read() { return 0; }
750 
on_write()751 int Http3Upstream::on_write() {
752   int rv;
753 
754   if (tx_.send_blocked) {
755     rv = send_blocked_packet();
756     if (rv != 0) {
757       return -1;
758     }
759 
760     if (tx_.send_blocked) {
761       return 0;
762     }
763   }
764 
765   handler_->get_connection()->wlimit.stopw();
766 
767   if (write_streams() != 0) {
768     return -1;
769   }
770 
771   if (httpconn_ && nghttp3_conn_is_drained(httpconn_)) {
772     return -1;
773   }
774 
775   reset_timer();
776 
777   return 0;
778 }
779 
write_streams()780 int Http3Upstream::write_streams() {
781   std::array<nghttp3_vec, 16> vec;
782   auto max_udp_payload_size = ngtcp2_conn_get_max_tx_udp_payload_size(conn_);
783   auto path_max_udp_payload_size =
784       ngtcp2_conn_get_path_max_tx_udp_payload_size(conn_);
785   ngtcp2_pkt_info pi, prev_pi;
786   auto txbuf =
787       std::span{tx_.data.get(), std::max(ngtcp2_conn_get_send_quantum(conn_),
788                                          path_max_udp_payload_size)};
789   auto buf = txbuf;
790   ngtcp2_path_storage ps, prev_ps;
791   int rv;
792   size_t gso_size = 0;
793   auto ts = quic_timestamp();
794 
795   ngtcp2_path_storage_zero(&ps);
796   ngtcp2_path_storage_zero(&prev_ps);
797 
798   for (;;) {
799     int64_t stream_id = -1;
800     int fin = 0;
801     nghttp3_ssize sveccnt = 0;
802 
803     if (httpconn_ && ngtcp2_conn_get_max_data_left(conn_)) {
804       sveccnt = nghttp3_conn_writev_stream(httpconn_, &stream_id, &fin,
805                                            vec.data(), vec.size());
806       if (sveccnt < 0) {
807         ULOG(ERROR, this) << "nghttp3_conn_writev_stream: "
808                           << nghttp3_strerror(sveccnt);
809         ngtcp2_ccerr_set_application_error(
810             &last_error_, nghttp3_err_infer_quic_app_error_code(sveccnt),
811             nullptr, 0);
812         return handle_error();
813       }
814     }
815 
816     ngtcp2_ssize ndatalen;
817     auto v = vec.data();
818     auto vcnt = static_cast<size_t>(sveccnt);
819 
820     uint32_t flags = NGTCP2_WRITE_STREAM_FLAG_MORE;
821     if (fin) {
822       flags |= NGTCP2_WRITE_STREAM_FLAG_FIN;
823     }
824 
825     auto buflen = buf.size() >= max_udp_payload_size
826                       ? max_udp_payload_size
827                       : path_max_udp_payload_size;
828     auto nwrite = ngtcp2_conn_writev_stream(
829         conn_, &ps.path, &pi, buf.data(), buflen, &ndatalen, flags, stream_id,
830         reinterpret_cast<const ngtcp2_vec *>(v), vcnt, ts);
831     if (nwrite < 0) {
832       switch (nwrite) {
833       case NGTCP2_ERR_STREAM_DATA_BLOCKED:
834         assert(ndatalen == -1);
835         nghttp3_conn_block_stream(httpconn_, stream_id);
836         continue;
837       case NGTCP2_ERR_STREAM_SHUT_WR:
838         assert(ndatalen == -1);
839         nghttp3_conn_shutdown_stream_write(httpconn_, stream_id);
840         continue;
841       case NGTCP2_ERR_WRITE_MORE:
842         assert(ndatalen >= 0);
843         rv = nghttp3_conn_add_write_offset(httpconn_, stream_id, ndatalen);
844         if (rv != 0) {
845           ULOG(ERROR, this)
846               << "nghttp3_conn_add_write_offset: " << nghttp3_strerror(rv);
847           ngtcp2_ccerr_set_application_error(
848               &last_error_, nghttp3_err_infer_quic_app_error_code(rv), nullptr,
849               0);
850           return handle_error();
851         }
852         continue;
853       }
854 
855       assert(ndatalen == -1);
856 
857       ULOG(ERROR, this) << "ngtcp2_conn_writev_stream: "
858                         << ngtcp2_strerror(nwrite);
859 
860       ngtcp2_ccerr_set_liberr(&last_error_, nwrite, nullptr, 0);
861 
862       return handle_error();
863     } else if (ndatalen >= 0) {
864       rv = nghttp3_conn_add_write_offset(httpconn_, stream_id, ndatalen);
865       if (rv != 0) {
866         ULOG(ERROR, this) << "nghttp3_conn_add_write_offset: "
867                           << nghttp3_strerror(rv);
868         ngtcp2_ccerr_set_application_error(
869             &last_error_, nghttp3_err_infer_quic_app_error_code(rv), nullptr,
870             0);
871         return handle_error();
872       }
873     }
874 
875     if (nwrite == 0) {
876       auto data = std::span{std::begin(txbuf), std::begin(buf)};
877       if (!data.empty()) {
878         auto faddr = static_cast<UpstreamAddr *>(prev_ps.path.user_data);
879 
880         auto [rest, rv] =
881             send_packet(faddr, prev_ps.path.remote.addr,
882                         prev_ps.path.remote.addrlen, prev_ps.path.local.addr,
883                         prev_ps.path.local.addrlen, prev_pi, data, gso_size);
884         if (rv == SHRPX_ERR_SEND_BLOCKED) {
885           on_send_blocked(faddr, prev_ps.path.remote, prev_ps.path.local,
886                           prev_pi, rest, gso_size);
887 
888           signal_write_upstream_addr(faddr);
889         }
890       }
891 
892       ngtcp2_conn_update_pkt_tx_time(conn_, ts);
893 
894       return 0;
895     }
896 
897     auto last_pkt = std::begin(buf);
898 
899     buf = buf.subspan(nwrite);
900 
901     if (last_pkt == std::begin(txbuf)) {
902       ngtcp2_path_copy(&prev_ps.path, &ps.path);
903       prev_pi = pi;
904       gso_size = nwrite;
905     } else if (!ngtcp2_path_eq(&prev_ps.path, &ps.path) ||
906                prev_pi.ecn != pi.ecn ||
907                static_cast<size_t>(nwrite) > gso_size ||
908                (gso_size > path_max_udp_payload_size &&
909                 static_cast<size_t>(nwrite) != gso_size)) {
910       auto faddr = static_cast<UpstreamAddr *>(prev_ps.path.user_data);
911       auto data = std::span{std::begin(txbuf), last_pkt};
912 
913       auto [rest, rv] =
914           send_packet(faddr, prev_ps.path.remote.addr,
915                       prev_ps.path.remote.addrlen, prev_ps.path.local.addr,
916                       prev_ps.path.local.addrlen, prev_pi, data, gso_size);
917       switch (rv) {
918       case SHRPX_ERR_SEND_BLOCKED:
919         on_send_blocked(faddr, prev_ps.path.remote, prev_ps.path.local, prev_pi,
920                         rest, gso_size);
921 
922         data = std::span{last_pkt, std::begin(buf)};
923         on_send_blocked(static_cast<UpstreamAddr *>(ps.path.user_data),
924                         ps.path.remote, ps.path.local, pi, data, data.size());
925 
926         signal_write_upstream_addr(faddr);
927 
928         break;
929       default: {
930         auto faddr = static_cast<UpstreamAddr *>(ps.path.user_data);
931         auto data = std::span{last_pkt, std::begin(buf)};
932 
933         auto [rest, rv] = send_packet(
934             faddr, ps.path.remote.addr, ps.path.remote.addrlen,
935             ps.path.local.addr, ps.path.local.addrlen, pi, data, data.size());
936         if (rv == SHRPX_ERR_SEND_BLOCKED) {
937           assert(rest.size() == data.size());
938 
939           on_send_blocked(faddr, ps.path.remote, ps.path.local, pi, rest,
940                           rest.size());
941 
942           signal_write_upstream_addr(faddr);
943         }
944       }
945       }
946 
947       ngtcp2_conn_update_pkt_tx_time(conn_, ts);
948 
949       return 0;
950     }
951 
952     if (buf.size() < path_max_udp_payload_size ||
953         static_cast<size_t>(nwrite) < gso_size) {
954       auto faddr = static_cast<UpstreamAddr *>(ps.path.user_data);
955       auto data = std::span{std::begin(txbuf), std::begin(buf)};
956 
957       auto [rest, rv] = send_packet(faddr, ps.path.remote.addr,
958                                     ps.path.remote.addrlen, ps.path.local.addr,
959                                     ps.path.local.addrlen, pi, data, gso_size);
960       if (rv == SHRPX_ERR_SEND_BLOCKED) {
961         on_send_blocked(faddr, ps.path.remote, ps.path.local, pi, rest,
962                         gso_size);
963 
964         signal_write_upstream_addr(faddr);
965       }
966 
967       ngtcp2_conn_update_pkt_tx_time(conn_, ts);
968 
969       return 0;
970     }
971   }
972 
973   return 0;
974 }
975 
on_timeout(Downstream * downstream)976 int Http3Upstream::on_timeout(Downstream *downstream) {
977   if (LOG_ENABLED(INFO)) {
978     ULOG(INFO, this) << "Stream timeout stream_id="
979                      << downstream->get_stream_id();
980   }
981 
982   shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
983 
984   handler_->signal_write();
985 
986   return 0;
987 }
988 
on_downstream_abort_request(Downstream * downstream,unsigned int status_code)989 int Http3Upstream::on_downstream_abort_request(Downstream *downstream,
990                                                unsigned int status_code) {
991   int rv;
992 
993   rv = error_reply(downstream, status_code);
994 
995   if (rv != 0) {
996     return -1;
997   }
998 
999   handler_->signal_write();
1000 
1001   return 0;
1002 }
1003 
on_downstream_abort_request_with_https_redirect(Downstream * downstream)1004 int Http3Upstream::on_downstream_abort_request_with_https_redirect(
1005     Downstream *downstream) {
1006   assert(0);
1007   abort();
1008 }
1009 
1010 namespace {
1011 uint64_t
infer_upstream_shutdown_stream_error_code(uint32_t downstream_error_code)1012 infer_upstream_shutdown_stream_error_code(uint32_t downstream_error_code) {
1013   // NGHTTP2_REFUSED_STREAM is important because it tells upstream
1014   // client to retry.
1015   switch (downstream_error_code) {
1016   case NGHTTP2_NO_ERROR:
1017     return NGHTTP3_H3_NO_ERROR;
1018   case NGHTTP2_REFUSED_STREAM:
1019     return NGHTTP3_H3_REQUEST_REJECTED;
1020   default:
1021     return NGHTTP3_H3_INTERNAL_ERROR;
1022   }
1023 }
1024 } // namespace
1025 
downstream_read(DownstreamConnection * dconn)1026 int Http3Upstream::downstream_read(DownstreamConnection *dconn) {
1027   auto downstream = dconn->get_downstream();
1028 
1029   if (downstream->get_response_state() == DownstreamState::MSG_RESET) {
1030     // The downstream stream was reset (canceled). In this case,
1031     // RST_STREAM to the upstream and delete downstream connection
1032     // here. Deleting downstream will be taken place at
1033     // on_stream_close_callback.
1034     shutdown_stream(downstream,
1035                     infer_upstream_shutdown_stream_error_code(
1036                         downstream->get_response_rst_stream_error_code()));
1037     downstream->pop_downstream_connection();
1038     // dconn was deleted
1039     dconn = nullptr;
1040   } else if (downstream->get_response_state() ==
1041              DownstreamState::MSG_BAD_HEADER) {
1042     if (error_reply(downstream, 502) != 0) {
1043       return -1;
1044     }
1045     downstream->pop_downstream_connection();
1046     // dconn was deleted
1047     dconn = nullptr;
1048   } else {
1049     auto rv = downstream->on_read();
1050     if (rv == SHRPX_ERR_EOF) {
1051       if (downstream->get_request_header_sent()) {
1052         return downstream_eof(dconn);
1053       }
1054       return SHRPX_ERR_RETRY;
1055     }
1056     if (rv == SHRPX_ERR_DCONN_CANCELED) {
1057       downstream->pop_downstream_connection();
1058       handler_->signal_write();
1059       return 0;
1060     }
1061     if (rv != 0) {
1062       if (rv != SHRPX_ERR_NETWORK) {
1063         if (LOG_ENABLED(INFO)) {
1064           DCLOG(INFO, dconn) << "HTTP parser failure";
1065         }
1066       }
1067       return downstream_error(dconn, Downstream::EVENT_ERROR);
1068     }
1069 
1070     if (downstream->can_detach_downstream_connection()) {
1071       // Keep-alive
1072       downstream->detach_downstream_connection();
1073     }
1074   }
1075 
1076   handler_->signal_write();
1077 
1078   // At this point, downstream may be deleted.
1079 
1080   return 0;
1081 }
1082 
downstream_write(DownstreamConnection * dconn)1083 int Http3Upstream::downstream_write(DownstreamConnection *dconn) {
1084   int rv;
1085   rv = dconn->on_write();
1086   if (rv == SHRPX_ERR_NETWORK) {
1087     return downstream_error(dconn, Downstream::EVENT_ERROR);
1088   }
1089   if (rv != 0) {
1090     return rv;
1091   }
1092   return 0;
1093 }
1094 
downstream_eof(DownstreamConnection * dconn)1095 int Http3Upstream::downstream_eof(DownstreamConnection *dconn) {
1096   auto downstream = dconn->get_downstream();
1097 
1098   if (LOG_ENABLED(INFO)) {
1099     DCLOG(INFO, dconn) << "EOF. stream_id=" << downstream->get_stream_id();
1100   }
1101 
1102   // Delete downstream connection. If we don't delete it here, it will
1103   // be pooled in on_stream_close_callback.
1104   downstream->pop_downstream_connection();
1105   // dconn was deleted
1106   dconn = nullptr;
1107   // downstream will be deleted in on_stream_close_callback.
1108   if (downstream->get_response_state() == DownstreamState::HEADER_COMPLETE) {
1109     // Server may indicate the end of the request by EOF
1110     if (LOG_ENABLED(INFO)) {
1111       ULOG(INFO, this) << "Downstream body was ended by EOF";
1112     }
1113     downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1114 
1115     // For tunneled connection, MSG_COMPLETE signals
1116     // downstream_read_data_callback to send RST_STREAM after pending
1117     // response body is sent. This is needed to ensure that RST_STREAM
1118     // is sent after all pending data are sent.
1119     if (on_downstream_body_complete(downstream) != 0) {
1120       return -1;
1121     }
1122   } else if (downstream->get_response_state() !=
1123              DownstreamState::MSG_COMPLETE) {
1124     // If stream was not closed, then we set MSG_COMPLETE and let
1125     // on_stream_close_callback delete downstream.
1126     if (error_reply(downstream, 502) != 0) {
1127       return -1;
1128     }
1129   }
1130   handler_->signal_write();
1131   // At this point, downstream may be deleted.
1132   return 0;
1133 }
1134 
downstream_error(DownstreamConnection * dconn,int events)1135 int Http3Upstream::downstream_error(DownstreamConnection *dconn, int events) {
1136   auto downstream = dconn->get_downstream();
1137 
1138   if (LOG_ENABLED(INFO)) {
1139     if (events & Downstream::EVENT_ERROR) {
1140       DCLOG(INFO, dconn) << "Downstream network/general error";
1141     } else {
1142       DCLOG(INFO, dconn) << "Timeout";
1143     }
1144     if (downstream->get_upgraded()) {
1145       DCLOG(INFO, dconn) << "Note: this is tunnel connection";
1146     }
1147   }
1148 
1149   // Delete downstream connection. If we don't delete it here, it will
1150   // be pooled in on_stream_close_callback.
1151   downstream->pop_downstream_connection();
1152   // dconn was deleted
1153   dconn = nullptr;
1154 
1155   if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1156     // For SSL tunneling, we issue RST_STREAM. For other types of
1157     // stream, we don't have to do anything since response was
1158     // complete.
1159     if (downstream->get_upgraded()) {
1160       shutdown_stream(downstream, NGHTTP3_H3_NO_ERROR);
1161     }
1162   } else {
1163     if (downstream->get_response_state() == DownstreamState::HEADER_COMPLETE) {
1164       if (downstream->get_upgraded()) {
1165         if (on_downstream_body_complete(downstream) != 0) {
1166           return -1;
1167         }
1168       } else {
1169         shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
1170       }
1171     } else {
1172       unsigned int status;
1173       if (events & Downstream::EVENT_TIMEOUT) {
1174         if (downstream->get_request_header_sent()) {
1175           status = 504;
1176         } else {
1177           status = 408;
1178         }
1179       } else {
1180         status = 502;
1181       }
1182       if (error_reply(downstream, status) != 0) {
1183         return -1;
1184       }
1185     }
1186     downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1187   }
1188   handler_->signal_write();
1189   // At this point, downstream may be deleted.
1190   return 0;
1191 }
1192 
get_client_handler() const1193 ClientHandler *Http3Upstream::get_client_handler() const { return handler_; }
1194 
1195 namespace {
downstream_read_data_callback(nghttp3_conn * conn,int64_t stream_id,nghttp3_vec * vec,size_t veccnt,uint32_t * pflags,void * conn_user_data,void * stream_user_data)1196 nghttp3_ssize downstream_read_data_callback(nghttp3_conn *conn,
1197                                             int64_t stream_id, nghttp3_vec *vec,
1198                                             size_t veccnt, uint32_t *pflags,
1199                                             void *conn_user_data,
1200                                             void *stream_user_data) {
1201   auto upstream = static_cast<Http3Upstream *>(conn_user_data);
1202   auto downstream = static_cast<Downstream *>(stream_user_data);
1203 
1204   assert(downstream);
1205 
1206   auto body = downstream->get_response_buf();
1207 
1208   assert(body);
1209 
1210   if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE &&
1211       body->rleft_mark() == 0) {
1212     downstream->disable_upstream_wtimer();
1213     return NGHTTP3_ERR_WOULDBLOCK;
1214   }
1215 
1216   downstream->reset_upstream_wtimer();
1217 
1218   veccnt = body->riovec_mark(reinterpret_cast<struct iovec *>(vec), veccnt);
1219 
1220   if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE &&
1221       body->rleft_mark() == 0) {
1222     *pflags |= NGHTTP3_DATA_FLAG_EOF;
1223   }
1224 
1225   assert((*pflags & NGHTTP3_DATA_FLAG_EOF) || veccnt);
1226 
1227   downstream->response_sent_body_length += nghttp3_vec_len(vec, veccnt);
1228 
1229   if ((*pflags & NGHTTP3_DATA_FLAG_EOF) &&
1230       upstream->shutdown_stream_read(stream_id, NGHTTP3_H3_NO_ERROR) != 0) {
1231     return NGHTTP3_ERR_CALLBACK_FAILURE;
1232   }
1233 
1234   return veccnt;
1235 }
1236 } // namespace
1237 
on_downstream_header_complete(Downstream * downstream)1238 int Http3Upstream::on_downstream_header_complete(Downstream *downstream) {
1239   int rv;
1240 
1241   const auto &req = downstream->request();
1242   auto &resp = downstream->response();
1243 
1244   auto &balloc = downstream->get_block_allocator();
1245 
1246   if (LOG_ENABLED(INFO)) {
1247     if (downstream->get_non_final_response()) {
1248       DLOG(INFO, downstream) << "HTTP non-final response header";
1249     } else {
1250       DLOG(INFO, downstream) << "HTTP response header completed";
1251     }
1252   }
1253 
1254   auto config = get_config();
1255   auto &httpconf = config->http;
1256 
1257   if (!config->http2_proxy && !httpconf.no_location_rewrite) {
1258     downstream->rewrite_location_response_header(req.scheme);
1259   }
1260 
1261 #ifdef HAVE_MRUBY
1262   if (!downstream->get_non_final_response()) {
1263     auto dconn = downstream->get_downstream_connection();
1264     const auto &group = dconn->get_downstream_addr_group();
1265     if (group) {
1266       const auto &dmruby_ctx = group->shared_addr->mruby_ctx;
1267 
1268       if (dmruby_ctx->run_on_response_proc(downstream) != 0) {
1269         if (error_reply(downstream, 500) != 0) {
1270           return -1;
1271         }
1272         // Returning -1 will signal deletion of dconn.
1273         return -1;
1274       }
1275 
1276       if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1277         return -1;
1278       }
1279     }
1280 
1281     auto worker = handler_->get_worker();
1282     auto mruby_ctx = worker->get_mruby_context();
1283 
1284     if (mruby_ctx->run_on_response_proc(downstream) != 0) {
1285       if (error_reply(downstream, 500) != 0) {
1286         return -1;
1287       }
1288       // Returning -1 will signal deletion of dconn.
1289       return -1;
1290     }
1291 
1292     if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1293       return -1;
1294     }
1295   }
1296 #endif // HAVE_MRUBY
1297 
1298   auto nva = std::vector<nghttp3_nv>();
1299   // 4 means :status and possible server, via, and set-cookie (for
1300   // affinity cookie) header field.
1301   nva.reserve(resp.fs.headers().size() + 4 +
1302               httpconf.add_response_headers.size());
1303 
1304   if (downstream->get_non_final_response()) {
1305     auto response_status = http2::stringify_status(balloc, resp.http_status);
1306 
1307     nva.push_back(http3::make_field(":status"_sr, response_status));
1308 
1309     http3::copy_headers_to_nva_nocopy(nva, resp.fs.headers(),
1310                                       http2::HDOP_STRIP_ALL);
1311 
1312     if (LOG_ENABLED(INFO)) {
1313       log_response_headers(downstream, nva);
1314     }
1315 
1316     rv = nghttp3_conn_submit_info(httpconn_, downstream->get_stream_id(),
1317                                   nva.data(), nva.size());
1318 
1319     resp.fs.clear_headers();
1320 
1321     if (rv != 0) {
1322       ULOG(FATAL, this) << "nghttp3_conn_submit_info() failed";
1323       return -1;
1324     }
1325 
1326     return 0;
1327   }
1328 
1329   auto striphd_flags = http2::HDOP_STRIP_ALL & ~http2::HDOP_STRIP_VIA;
1330   StringRef response_status;
1331 
1332   if (req.connect_proto == ConnectProto::WEBSOCKET && resp.http_status == 101) {
1333     response_status = http2::stringify_status(balloc, 200);
1334     striphd_flags |= http2::HDOP_STRIP_SEC_WEBSOCKET_ACCEPT;
1335   } else {
1336     response_status = http2::stringify_status(balloc, resp.http_status);
1337   }
1338 
1339   nva.push_back(http3::make_field(":status"_sr, response_status));
1340 
1341   http3::copy_headers_to_nva_nocopy(nva, resp.fs.headers(), striphd_flags);
1342 
1343   if (!config->http2_proxy && !httpconf.no_server_rewrite) {
1344     nva.push_back(http3::make_field("server"_sr, httpconf.server_name));
1345   } else {
1346     auto server = resp.fs.header(http2::HD_SERVER);
1347     if (server) {
1348       nva.push_back(http3::make_field("server"_sr, (*server).value));
1349     }
1350   }
1351 
1352   if (!req.regular_connect_method() || !downstream->get_upgraded()) {
1353     auto affinity_cookie = downstream->get_affinity_cookie_to_send();
1354     if (affinity_cookie) {
1355       auto dconn = downstream->get_downstream_connection();
1356       assert(dconn);
1357       auto &group = dconn->get_downstream_addr_group();
1358       auto &shared_addr = group->shared_addr;
1359       auto &cookieconf = shared_addr->affinity.cookie;
1360       auto secure =
1361           http::require_cookie_secure_attribute(cookieconf.secure, req.scheme);
1362       auto cookie_str = http::create_affinity_cookie(
1363           balloc, cookieconf.name, affinity_cookie, cookieconf.path, secure);
1364       nva.push_back(http3::make_field("set-cookie"_sr, cookie_str));
1365     }
1366   }
1367 
1368   auto via = resp.fs.header(http2::HD_VIA);
1369   if (httpconf.no_via) {
1370     if (via) {
1371       nva.push_back(http3::make_field("via"_sr, (*via).value));
1372     }
1373   } else {
1374     // we don't create more than 16 bytes in
1375     // http::create_via_header_value.
1376     size_t len = 16;
1377     if (via) {
1378       len += via->value.size() + 2;
1379     }
1380 
1381     auto iov = make_byte_ref(balloc, len + 1);
1382     auto p = std::begin(iov);
1383     if (via) {
1384       p = std::copy(std::begin(via->value), std::end(via->value), p);
1385       p = util::copy_lit(p, ", ");
1386     }
1387     p = http::create_via_header_value(p, resp.http_major, resp.http_minor);
1388     *p = '\0';
1389 
1390     nva.push_back(
1391         http3::make_field("via"_sr, StringRef{std::span{std::begin(iov), p}}));
1392   }
1393 
1394   for (auto &p : httpconf.add_response_headers) {
1395     nva.push_back(http3::make_field(p.name, p.value));
1396   }
1397 
1398   if (LOG_ENABLED(INFO)) {
1399     log_response_headers(downstream, nva);
1400   }
1401 
1402   auto priority = resp.fs.header(http2::HD_PRIORITY);
1403   if (priority) {
1404     nghttp3_pri pri;
1405 
1406     if (nghttp3_conn_get_stream_priority(httpconn_, &pri,
1407                                          downstream->get_stream_id()) == 0 &&
1408         nghttp3_pri_parse_priority(&pri, priority->value.byte(),
1409                                    priority->value.size()) == 0) {
1410       rv = nghttp3_conn_set_server_stream_priority(
1411           httpconn_, downstream->get_stream_id(), &pri);
1412       if (rv != 0) {
1413         ULOG(ERROR, this) << "nghttp3_conn_set_server_stream_priority: "
1414                           << nghttp3_strerror(rv);
1415       }
1416     }
1417   }
1418 
1419   nghttp3_data_reader data_read;
1420   data_read.read_data = downstream_read_data_callback;
1421 
1422   nghttp3_data_reader *data_readptr;
1423 
1424   if (downstream->expect_response_body() ||
1425       downstream->expect_response_trailer()) {
1426     data_readptr = &data_read;
1427   } else {
1428     data_readptr = nullptr;
1429   }
1430 
1431   rv = nghttp3_conn_submit_response(httpconn_, downstream->get_stream_id(),
1432                                     nva.data(), nva.size(), data_readptr);
1433   if (rv != 0) {
1434     ULOG(FATAL, this) << "nghttp3_conn_submit_response() failed";
1435     return -1;
1436   }
1437 
1438   if (data_readptr) {
1439     downstream->reset_upstream_wtimer();
1440   } else if (shutdown_stream_read(downstream->get_stream_id(),
1441                                   NGHTTP3_H3_NO_ERROR) != 0) {
1442     return -1;
1443   }
1444 
1445   return 0;
1446 }
1447 
on_downstream_body(Downstream * downstream,const uint8_t * data,size_t len,bool flush)1448 int Http3Upstream::on_downstream_body(Downstream *downstream,
1449                                       const uint8_t *data, size_t len,
1450                                       bool flush) {
1451   auto body = downstream->get_response_buf();
1452   body->append(data, len);
1453 
1454   if (flush) {
1455     nghttp3_conn_resume_stream(httpconn_, downstream->get_stream_id());
1456 
1457     downstream->ensure_upstream_wtimer();
1458   }
1459 
1460   return 0;
1461 }
1462 
on_downstream_body_complete(Downstream * downstream)1463 int Http3Upstream::on_downstream_body_complete(Downstream *downstream) {
1464   if (LOG_ENABLED(INFO)) {
1465     DLOG(INFO, downstream) << "HTTP response completed";
1466   }
1467 
1468   auto &resp = downstream->response();
1469 
1470   if (!downstream->validate_response_recv_body_length()) {
1471     shutdown_stream(downstream, NGHTTP3_H3_GENERAL_PROTOCOL_ERROR);
1472     resp.connection_close = true;
1473     return 0;
1474   }
1475 
1476   if (!downstream->get_upgraded()) {
1477     const auto &trailers = resp.fs.trailers();
1478     if (!trailers.empty()) {
1479       std::vector<nghttp3_nv> nva;
1480       nva.reserve(trailers.size());
1481       http3::copy_headers_to_nva_nocopy(nva, trailers, http2::HDOP_STRIP_ALL);
1482       if (!nva.empty()) {
1483         auto rv = nghttp3_conn_submit_trailers(
1484             httpconn_, downstream->get_stream_id(), nva.data(), nva.size());
1485         if (rv != 0) {
1486           ULOG(FATAL, this) << "nghttp3_conn_submit_trailers() failed: "
1487                             << nghttp3_strerror(rv);
1488           return -1;
1489         }
1490       }
1491     }
1492   }
1493 
1494   nghttp3_conn_resume_stream(httpconn_, downstream->get_stream_id());
1495   downstream->ensure_upstream_wtimer();
1496 
1497   return 0;
1498 }
1499 
on_handler_delete()1500 void Http3Upstream::on_handler_delete() {
1501   for (auto d = downstream_queue_.get_downstreams(); d; d = d->dlnext) {
1502     if (d->get_dispatch_state() == DispatchState::ACTIVE &&
1503         d->accesslog_ready()) {
1504       handler_->write_accesslog(d);
1505     }
1506   }
1507 
1508   auto worker = handler_->get_worker();
1509   auto quic_conn_handler = worker->get_quic_connection_handler();
1510 
1511   std::vector<ngtcp2_cid> scids(ngtcp2_conn_get_scid(conn_, nullptr) + 1);
1512   ngtcp2_conn_get_scid(conn_, scids.data());
1513   scids.back() = hashed_scid_;
1514 
1515   for (auto &cid : scids) {
1516     quic_conn_handler->remove_connection_id(cid);
1517   }
1518 
1519   switch (last_error_.type) {
1520   case NGTCP2_CCERR_TYPE_IDLE_CLOSE:
1521   case NGTCP2_CCERR_TYPE_DROP_CONN:
1522   case NGTCP2_CCERR_TYPE_RETRY:
1523     return;
1524   default:
1525     break;
1526   }
1527 
1528   // If this is not idle close, send CONNECTION_CLOSE.
1529   if (!ngtcp2_conn_in_closing_period(conn_) &&
1530       !ngtcp2_conn_in_draining_period(conn_)) {
1531     ngtcp2_path_storage ps;
1532     ngtcp2_pkt_info pi;
1533     conn_close_.resize(SHRPX_QUIC_CONN_CLOSE_PKTLEN);
1534 
1535     ngtcp2_path_storage_zero(&ps);
1536 
1537     ngtcp2_ccerr ccerr;
1538     ngtcp2_ccerr_default(&ccerr);
1539 
1540     if (worker->get_graceful_shutdown() &&
1541         !ngtcp2_conn_get_handshake_completed(conn_)) {
1542       ccerr.error_code = NGTCP2_CONNECTION_REFUSED;
1543     }
1544 
1545     auto nwrite = ngtcp2_conn_write_connection_close(
1546         conn_, &ps.path, &pi, conn_close_.data(), conn_close_.size(), &ccerr,
1547         quic_timestamp());
1548     if (nwrite < 0) {
1549       if (nwrite != NGTCP2_ERR_INVALID_STATE) {
1550         ULOG(ERROR, this) << "ngtcp2_conn_write_connection_close: "
1551                           << ngtcp2_strerror(nwrite);
1552       }
1553 
1554       return;
1555     }
1556 
1557     conn_close_.resize(nwrite);
1558 
1559     send_packet(static_cast<UpstreamAddr *>(ps.path.user_data),
1560                 ps.path.remote.addr, ps.path.remote.addrlen, ps.path.local.addr,
1561                 ps.path.local.addrlen, pi, conn_close_, conn_close_.size());
1562   }
1563 
1564   auto d =
1565       static_cast<ev_tstamp>(ngtcp2_conn_get_pto(conn_) * 3) / NGTCP2_SECONDS;
1566 
1567   if (LOG_ENABLED(INFO)) {
1568     ULOG(INFO, this) << "Enter close-wait period " << d << "s with "
1569                      << conn_close_.size() << " bytes sentinel packet";
1570   }
1571 
1572   auto cw = std::make_unique<CloseWait>(worker, std::move(scids),
1573                                         std::move(conn_close_), d);
1574 
1575   quic_conn_handler->add_close_wait(cw.release());
1576 }
1577 
on_downstream_reset(Downstream * downstream,bool no_retry)1578 int Http3Upstream::on_downstream_reset(Downstream *downstream, bool no_retry) {
1579   int rv;
1580 
1581   if (downstream->get_dispatch_state() != DispatchState::ACTIVE) {
1582     // This is error condition when we failed push_request_headers()
1583     // in initiate_downstream().  Otherwise, we have
1584     // DispatchState::ACTIVE state, or we did not set
1585     // DownstreamConnection.
1586     downstream->pop_downstream_connection();
1587     handler_->signal_write();
1588 
1589     return 0;
1590   }
1591 
1592   if (!downstream->request_submission_ready()) {
1593     if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1594       // We have got all response body already.  Send it off.
1595       downstream->pop_downstream_connection();
1596       return 0;
1597     }
1598     // pushed stream is handled here
1599     shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
1600     downstream->pop_downstream_connection();
1601 
1602     handler_->signal_write();
1603 
1604     return 0;
1605   }
1606 
1607   downstream->pop_downstream_connection();
1608 
1609   downstream->add_retry();
1610 
1611   std::unique_ptr<DownstreamConnection> dconn;
1612 
1613   rv = 0;
1614 
1615   if (no_retry || downstream->no_more_retry()) {
1616     goto fail;
1617   }
1618 
1619   // downstream connection is clean; we can retry with new
1620   // downstream connection.
1621 
1622   for (;;) {
1623     auto dconn = handler_->get_downstream_connection(rv, downstream);
1624     if (!dconn) {
1625       goto fail;
1626     }
1627 
1628     rv = downstream->attach_downstream_connection(std::move(dconn));
1629     if (rv == 0) {
1630       break;
1631     }
1632   }
1633 
1634   rv = downstream->push_request_headers();
1635   if (rv != 0) {
1636     goto fail;
1637   }
1638 
1639   return 0;
1640 
1641 fail:
1642   if (rv == SHRPX_ERR_TLS_REQUIRED) {
1643     assert(0);
1644     abort();
1645   }
1646 
1647   rv = on_downstream_abort_request(downstream, 502);
1648   if (rv != 0) {
1649     shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
1650   }
1651   downstream->pop_downstream_connection();
1652 
1653   handler_->signal_write();
1654 
1655   return 0;
1656 }
1657 
pause_read(IOCtrlReason reason)1658 void Http3Upstream::pause_read(IOCtrlReason reason) {}
1659 
resume_read(IOCtrlReason reason,Downstream * downstream,size_t consumed)1660 int Http3Upstream::resume_read(IOCtrlReason reason, Downstream *downstream,
1661                                size_t consumed) {
1662   consume(downstream->get_stream_id(), consumed);
1663 
1664   auto &req = downstream->request();
1665 
1666   req.consume(consumed);
1667 
1668   handler_->signal_write();
1669 
1670   return 0;
1671 }
1672 
send_reply(Downstream * downstream,const uint8_t * body,size_t bodylen)1673 int Http3Upstream::send_reply(Downstream *downstream, const uint8_t *body,
1674                               size_t bodylen) {
1675   int rv;
1676 
1677   nghttp3_data_reader data_read, *data_read_ptr = nullptr;
1678 
1679   const auto &req = downstream->request();
1680 
1681   if (req.method != HTTP_HEAD && bodylen) {
1682     data_read.read_data = downstream_read_data_callback;
1683     data_read_ptr = &data_read;
1684 
1685     auto buf = downstream->get_response_buf();
1686 
1687     buf->append(body, bodylen);
1688   }
1689 
1690   const auto &resp = downstream->response();
1691   auto config = get_config();
1692   auto &httpconf = config->http;
1693 
1694   auto &balloc = downstream->get_block_allocator();
1695 
1696   const auto &headers = resp.fs.headers();
1697   auto nva = std::vector<nghttp3_nv>();
1698   // 2 for :status and server
1699   nva.reserve(2 + headers.size() + httpconf.add_response_headers.size());
1700 
1701   auto response_status = http2::stringify_status(balloc, resp.http_status);
1702 
1703   nva.push_back(http3::make_field(":status"_sr, response_status));
1704 
1705   for (auto &kv : headers) {
1706     if (kv.name.empty() || kv.name[0] == ':') {
1707       continue;
1708     }
1709     switch (kv.token) {
1710     case http2::HD_CONNECTION:
1711     case http2::HD_KEEP_ALIVE:
1712     case http2::HD_PROXY_CONNECTION:
1713     case http2::HD_TE:
1714     case http2::HD_TRANSFER_ENCODING:
1715     case http2::HD_UPGRADE:
1716       continue;
1717     }
1718     nva.push_back(
1719         http3::make_field(kv.name, kv.value, http3::never_index(kv.no_index)));
1720   }
1721 
1722   if (!resp.fs.header(http2::HD_SERVER)) {
1723     nva.push_back(http3::make_field("server"_sr, config->http.server_name));
1724   }
1725 
1726   for (auto &p : httpconf.add_response_headers) {
1727     nva.push_back(http3::make_field(p.name, p.value));
1728   }
1729 
1730   rv = nghttp3_conn_submit_response(httpconn_, downstream->get_stream_id(),
1731                                     nva.data(), nva.size(), data_read_ptr);
1732   if (nghttp3_err_is_fatal(rv)) {
1733     ULOG(FATAL, this) << "nghttp3_conn_submit_response() failed: "
1734                       << nghttp3_strerror(rv);
1735     return -1;
1736   }
1737 
1738   downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1739 
1740   if (data_read_ptr) {
1741     downstream->reset_upstream_wtimer();
1742   }
1743 
1744   if (shutdown_stream_read(downstream->get_stream_id(), NGHTTP3_H3_NO_ERROR) !=
1745       0) {
1746     return -1;
1747   }
1748 
1749   return 0;
1750 }
1751 
initiate_push(Downstream * downstream,const StringRef & uri)1752 int Http3Upstream::initiate_push(Downstream *downstream, const StringRef &uri) {
1753   return 0;
1754 }
1755 
response_riovec(struct iovec * iov,int iovcnt) const1756 int Http3Upstream::response_riovec(struct iovec *iov, int iovcnt) const {
1757   return 0;
1758 }
1759 
response_drain(size_t n)1760 void Http3Upstream::response_drain(size_t n) {}
1761 
response_empty() const1762 bool Http3Upstream::response_empty() const { return false; }
1763 
1764 Downstream *
on_downstream_push_promise(Downstream * downstream,int32_t promised_stream_id)1765 Http3Upstream::on_downstream_push_promise(Downstream *downstream,
1766                                           int32_t promised_stream_id) {
1767   return nullptr;
1768 }
1769 
on_downstream_push_promise_complete(Downstream * downstream,Downstream * promised_downstream)1770 int Http3Upstream::on_downstream_push_promise_complete(
1771     Downstream *downstream, Downstream *promised_downstream) {
1772   return 0;
1773 }
1774 
push_enabled() const1775 bool Http3Upstream::push_enabled() const { return false; }
1776 
cancel_premature_downstream(Downstream * promised_downstream)1777 void Http3Upstream::cancel_premature_downstream(
1778     Downstream *promised_downstream) {}
1779 
on_read(const UpstreamAddr * faddr,const Address & remote_addr,const Address & local_addr,const ngtcp2_pkt_info & pi,std::span<const uint8_t> data)1780 int Http3Upstream::on_read(const UpstreamAddr *faddr,
1781                            const Address &remote_addr,
1782                            const Address &local_addr, const ngtcp2_pkt_info &pi,
1783                            std::span<const uint8_t> data) {
1784   int rv;
1785 
1786   auto path = ngtcp2_path{
1787       {
1788           const_cast<sockaddr *>(&local_addr.su.sa),
1789           static_cast<socklen_t>(local_addr.len),
1790       },
1791       {
1792           const_cast<sockaddr *>(&remote_addr.su.sa),
1793           static_cast<socklen_t>(remote_addr.len),
1794       },
1795       const_cast<UpstreamAddr *>(faddr),
1796   };
1797 
1798   rv = ngtcp2_conn_read_pkt(conn_, &path, &pi, data.data(), data.size(),
1799                             quic_timestamp());
1800   if (rv != 0) {
1801     switch (rv) {
1802     case NGTCP2_ERR_DRAINING:
1803       return -1;
1804     case NGTCP2_ERR_RETRY: {
1805       auto worker = handler_->get_worker();
1806       auto quic_conn_handler = worker->get_quic_connection_handler();
1807 
1808       if (worker->get_graceful_shutdown()) {
1809         ngtcp2_ccerr_set_transport_error(&last_error_,
1810                                          NGTCP2_CONNECTION_REFUSED, nullptr, 0);
1811 
1812         return handle_error();
1813       }
1814 
1815       ngtcp2_version_cid vc;
1816 
1817       rv = ngtcp2_pkt_decode_version_cid(&vc, data.data(), data.size(),
1818                                          SHRPX_QUIC_SCIDLEN);
1819       if (rv != 0) {
1820         return -1;
1821       }
1822 
1823       // Overwrite error if any is set
1824       ngtcp2_ccerr_set_liberr(&last_error_, rv, nullptr, 0);
1825 
1826       quic_conn_handler->send_retry(
1827           handler_->get_upstream_addr(), vc.version, {vc.dcid, vc.dcidlen},
1828           {vc.scid, vc.scidlen}, remote_addr, local_addr, data.size() * 3);
1829 
1830       return -1;
1831     }
1832     case NGTCP2_ERR_CRYPTO:
1833       if (!last_error_.error_code) {
1834         ngtcp2_ccerr_set_tls_alert(
1835             &last_error_, ngtcp2_conn_get_tls_alert(conn_), nullptr, 0);
1836       }
1837       break;
1838     case NGTCP2_ERR_DROP_CONN:
1839       // Overwrite error if any is set
1840       ngtcp2_ccerr_set_liberr(&last_error_, rv, nullptr, 0);
1841 
1842       return -1;
1843     default:
1844       if (!last_error_.error_code) {
1845         ngtcp2_ccerr_set_liberr(&last_error_, rv, nullptr, 0);
1846       }
1847     }
1848 
1849     ULOG(ERROR, this) << "ngtcp2_conn_read_pkt: " << ngtcp2_strerror(rv);
1850 
1851     return handle_error();
1852   }
1853 
1854   return 0;
1855 }
1856 
1857 std::pair<std::span<const uint8_t>, int>
send_packet(const UpstreamAddr * faddr,const sockaddr * remote_sa,size_t remote_salen,const sockaddr * local_sa,size_t local_salen,const ngtcp2_pkt_info & pi,std::span<const uint8_t> data,size_t gso_size)1858 Http3Upstream::send_packet(const UpstreamAddr *faddr, const sockaddr *remote_sa,
1859                            size_t remote_salen, const sockaddr *local_sa,
1860                            size_t local_salen, const ngtcp2_pkt_info &pi,
1861                            std::span<const uint8_t> data, size_t gso_size) {
1862   if (tx_.no_gso) {
1863     for (; !data.empty();) {
1864       auto len = std::min(gso_size, data.size());
1865       auto rv =
1866           quic_send_packet(faddr, remote_sa, remote_salen, local_sa,
1867                            local_salen, pi, {std::begin(data), len}, gso_size);
1868       if (rv != 0) {
1869         switch (rv) {
1870         case -EAGAIN:
1871 #if EAGAIN != EWOULDBLOCK
1872         case -EWOULDBLOCK:
1873 #endif // EAGAIN != EWOULDBLOCK
1874           return {data, SHRPX_ERR_SEND_BLOCKED};
1875         default:
1876           return {data, -1};
1877         }
1878       }
1879 
1880       data = data.subspan(len);
1881     }
1882 
1883     return {{}, 0};
1884   }
1885 
1886   auto rv = quic_send_packet(faddr, remote_sa, remote_salen, local_sa,
1887                              local_salen, pi, data, gso_size);
1888   switch (rv) {
1889   case 0:
1890     return {{}, 0};
1891     // With GSO, sendmsg may fail with EINVAL if UDP payload is too
1892     // large.
1893   case -EINVAL:
1894   case -EMSGSIZE:
1895     // Let the packet lost.
1896     break;
1897   case -EAGAIN:
1898 #if EAGAIN != EWOULDBLOCK
1899   case -EWOULDBLOCK:
1900 #endif // EAGAIN != EWOULDBLOCK
1901     return {data, SHRPX_ERR_SEND_BLOCKED};
1902   case -EIO:
1903     if (tx_.no_gso) {
1904       break;
1905     }
1906 
1907     tx_.no_gso = true;
1908 
1909     return send_packet(faddr, remote_sa, remote_salen, local_sa, local_salen,
1910                        pi, data, gso_size);
1911   default:
1912     break;
1913   }
1914 
1915   return {{}, -1};
1916 }
1917 
on_send_blocked(const UpstreamAddr * faddr,const ngtcp2_addr & remote_addr,const ngtcp2_addr & local_addr,const ngtcp2_pkt_info & pi,std::span<const uint8_t> data,size_t gso_size)1918 void Http3Upstream::on_send_blocked(const UpstreamAddr *faddr,
1919                                     const ngtcp2_addr &remote_addr,
1920                                     const ngtcp2_addr &local_addr,
1921                                     const ngtcp2_pkt_info &pi,
1922                                     std::span<const uint8_t> data,
1923                                     size_t gso_size) {
1924   assert(tx_.num_blocked || !tx_.send_blocked);
1925   assert(tx_.num_blocked < 2);
1926   assert(gso_size);
1927 
1928   tx_.send_blocked = true;
1929 
1930   auto &p = tx_.blocked[tx_.num_blocked++];
1931 
1932   memcpy(&p.local_addr.su, local_addr.addr, local_addr.addrlen);
1933   memcpy(&p.remote_addr.su, remote_addr.addr, remote_addr.addrlen);
1934 
1935   p.local_addr.len = local_addr.addrlen;
1936   p.remote_addr.len = remote_addr.addrlen;
1937   p.faddr = faddr;
1938   p.pi = pi;
1939   p.data = data;
1940   p.gso_size = gso_size;
1941 }
1942 
send_blocked_packet()1943 int Http3Upstream::send_blocked_packet() {
1944   assert(tx_.send_blocked);
1945 
1946   for (; tx_.num_blocked_sent < tx_.num_blocked; ++tx_.num_blocked_sent) {
1947     auto &p = tx_.blocked[tx_.num_blocked_sent];
1948 
1949     auto [rest, rv] = send_packet(p.faddr, &p.remote_addr.su.sa,
1950                                   p.remote_addr.len, &p.local_addr.su.sa,
1951                                   p.local_addr.len, p.pi, p.data, p.gso_size);
1952     if (rv == SHRPX_ERR_SEND_BLOCKED) {
1953       p.data = rest;
1954 
1955       signal_write_upstream_addr(p.faddr);
1956 
1957       return 0;
1958     }
1959   }
1960 
1961   tx_.send_blocked = false;
1962   tx_.num_blocked = 0;
1963   tx_.num_blocked_sent = 0;
1964 
1965   return 0;
1966 }
1967 
signal_write_upstream_addr(const UpstreamAddr * faddr)1968 void Http3Upstream::signal_write_upstream_addr(const UpstreamAddr *faddr) {
1969   auto conn = handler_->get_connection();
1970 
1971   if (faddr->fd != conn->wev.fd) {
1972     if (ev_is_active(&conn->wev)) {
1973       ev_io_stop(handler_->get_loop(), &conn->wev);
1974     }
1975 
1976     ev_io_set(&conn->wev, faddr->fd, EV_WRITE);
1977   }
1978 
1979   conn->wlimit.startw();
1980 }
1981 
handle_error()1982 int Http3Upstream::handle_error() {
1983   if (ngtcp2_conn_in_closing_period(conn_) ||
1984       ngtcp2_conn_in_draining_period(conn_)) {
1985     return -1;
1986   }
1987 
1988   ngtcp2_path_storage ps;
1989   ngtcp2_pkt_info pi;
1990 
1991   ngtcp2_path_storage_zero(&ps);
1992 
1993   auto ts = quic_timestamp();
1994 
1995   conn_close_.resize(SHRPX_QUIC_CONN_CLOSE_PKTLEN);
1996 
1997   auto nwrite = ngtcp2_conn_write_connection_close(
1998       conn_, &ps.path, &pi, conn_close_.data(), conn_close_.size(),
1999       &last_error_, ts);
2000   if (nwrite < 0) {
2001     ULOG(ERROR, this) << "ngtcp2_conn_write_connection_close: "
2002                       << ngtcp2_strerror(nwrite);
2003     return -1;
2004   }
2005 
2006   conn_close_.resize(nwrite);
2007 
2008   if (nwrite == 0) {
2009     return -1;
2010   }
2011 
2012   send_packet(static_cast<UpstreamAddr *>(ps.path.user_data),
2013               ps.path.remote.addr, ps.path.remote.addrlen, ps.path.local.addr,
2014               ps.path.local.addrlen, pi, conn_close_, conn_close_.size());
2015 
2016   return -1;
2017 }
2018 
handle_expiry()2019 int Http3Upstream::handle_expiry() {
2020   int rv;
2021 
2022   auto ts = quic_timestamp();
2023 
2024   rv = ngtcp2_conn_handle_expiry(conn_, ts);
2025   if (rv != 0) {
2026     if (rv == NGTCP2_ERR_IDLE_CLOSE) {
2027       ULOG(INFO, this) << "Idle connection timeout";
2028     } else {
2029       ULOG(ERROR, this) << "ngtcp2_conn_handle_expiry: " << ngtcp2_strerror(rv);
2030     }
2031     ngtcp2_ccerr_set_liberr(&last_error_, rv, nullptr, 0);
2032     return handle_error();
2033   }
2034 
2035   return 0;
2036 }
2037 
reset_timer()2038 void Http3Upstream::reset_timer() {
2039   auto ts = quic_timestamp();
2040   auto expiry_ts = ngtcp2_conn_get_expiry(conn_);
2041   auto loop = handler_->get_loop();
2042 
2043   if (expiry_ts <= ts) {
2044     ev_feed_event(loop, &timer_, EV_TIMER);
2045     return;
2046   }
2047 
2048   timer_.repeat = static_cast<ev_tstamp>(expiry_ts - ts) / NGTCP2_SECONDS;
2049 
2050   ev_timer_again(loop, &timer_);
2051 }
2052 
2053 namespace {
http_deferred_consume(nghttp3_conn * conn,int64_t stream_id,size_t nconsumed,void * user_data,void * stream_user_data)2054 int http_deferred_consume(nghttp3_conn *conn, int64_t stream_id,
2055                           size_t nconsumed, void *user_data,
2056                           void *stream_user_data) {
2057   auto upstream = static_cast<Http3Upstream *>(user_data);
2058 
2059   upstream->consume(stream_id, nconsumed);
2060 
2061   return 0;
2062 }
2063 } // namespace
2064 
2065 namespace {
http_acked_stream_data(nghttp3_conn * conn,int64_t stream_id,uint64_t datalen,void * user_data,void * stream_user_data)2066 int http_acked_stream_data(nghttp3_conn *conn, int64_t stream_id,
2067                            uint64_t datalen, void *user_data,
2068                            void *stream_user_data) {
2069   auto upstream = static_cast<Http3Upstream *>(user_data);
2070   auto downstream = static_cast<Downstream *>(stream_user_data);
2071 
2072   assert(downstream);
2073 
2074   if (upstream->http_acked_stream_data(downstream, datalen) != 0) {
2075     return NGHTTP3_ERR_CALLBACK_FAILURE;
2076   }
2077 
2078   return 0;
2079 }
2080 } // namespace
2081 
http_acked_stream_data(Downstream * downstream,uint64_t datalen)2082 int Http3Upstream::http_acked_stream_data(Downstream *downstream,
2083                                           uint64_t datalen) {
2084   if (LOG_ENABLED(INFO)) {
2085     ULOG(INFO, this) << "Stream " << downstream->get_stream_id() << " "
2086                      << datalen << " bytes acknowledged";
2087   }
2088 
2089   auto body = downstream->get_response_buf();
2090   auto drained = body->drain_mark(datalen);
2091   (void)drained;
2092 
2093   assert(datalen == drained);
2094 
2095   if (downstream->resume_read(SHRPX_NO_BUFFER, datalen) != 0) {
2096     return -1;
2097   }
2098 
2099   return 0;
2100 }
2101 
2102 namespace {
http_begin_request_headers(nghttp3_conn * conn,int64_t stream_id,void * user_data,void * stream_user_data)2103 int http_begin_request_headers(nghttp3_conn *conn, int64_t stream_id,
2104                                void *user_data, void *stream_user_data) {
2105   if (!ngtcp2_is_bidi_stream(stream_id)) {
2106     return 0;
2107   }
2108 
2109   auto upstream = static_cast<Http3Upstream *>(user_data);
2110   upstream->http_begin_request_headers(stream_id);
2111 
2112   return 0;
2113 }
2114 } // namespace
2115 
2116 namespace {
http_recv_request_header(nghttp3_conn * conn,int64_t stream_id,int32_t token,nghttp3_rcbuf * name,nghttp3_rcbuf * value,uint8_t flags,void * user_data,void * stream_user_data)2117 int http_recv_request_header(nghttp3_conn *conn, int64_t stream_id,
2118                              int32_t token, nghttp3_rcbuf *name,
2119                              nghttp3_rcbuf *value, uint8_t flags,
2120                              void *user_data, void *stream_user_data) {
2121   auto upstream = static_cast<Http3Upstream *>(user_data);
2122   auto downstream = static_cast<Downstream *>(stream_user_data);
2123 
2124   if (!downstream || downstream->get_stop_reading()) {
2125     return 0;
2126   }
2127 
2128   if (upstream->http_recv_request_header(downstream, token, name, value, flags,
2129                                          /* trailer = */ false) != 0) {
2130     return NGHTTP3_ERR_CALLBACK_FAILURE;
2131   }
2132 
2133   return 0;
2134 }
2135 } // namespace
2136 
2137 namespace {
http_recv_request_trailer(nghttp3_conn * conn,int64_t stream_id,int32_t token,nghttp3_rcbuf * name,nghttp3_rcbuf * value,uint8_t flags,void * user_data,void * stream_user_data)2138 int http_recv_request_trailer(nghttp3_conn *conn, int64_t stream_id,
2139                               int32_t token, nghttp3_rcbuf *name,
2140                               nghttp3_rcbuf *value, uint8_t flags,
2141                               void *user_data, void *stream_user_data) {
2142   auto upstream = static_cast<Http3Upstream *>(user_data);
2143   auto downstream = static_cast<Downstream *>(stream_user_data);
2144 
2145   if (!downstream || downstream->get_stop_reading()) {
2146     return 0;
2147   }
2148 
2149   if (upstream->http_recv_request_header(downstream, token, name, value, flags,
2150                                          /* trailer = */ true) != 0) {
2151     return NGHTTP3_ERR_CALLBACK_FAILURE;
2152   }
2153 
2154   return 0;
2155 }
2156 } // namespace
2157 
http_recv_request_header(Downstream * downstream,int32_t h3token,nghttp3_rcbuf * name,nghttp3_rcbuf * value,uint8_t flags,bool trailer)2158 int Http3Upstream::http_recv_request_header(Downstream *downstream,
2159                                             int32_t h3token,
2160                                             nghttp3_rcbuf *name,
2161                                             nghttp3_rcbuf *value, uint8_t flags,
2162                                             bool trailer) {
2163   auto namebuf = nghttp3_rcbuf_get_buf(name);
2164   auto valuebuf = nghttp3_rcbuf_get_buf(value);
2165   auto &req = downstream->request();
2166   auto config = get_config();
2167   auto &httpconf = config->http;
2168 
2169   if (req.fs.buffer_size() + namebuf.len + valuebuf.len >
2170           httpconf.request_header_field_buffer ||
2171       req.fs.num_fields() >= httpconf.max_request_header_fields) {
2172     downstream->set_stop_reading(true);
2173 
2174     if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
2175       return 0;
2176     }
2177 
2178     if (LOG_ENABLED(INFO)) {
2179       ULOG(INFO, this) << "Too large or many header field size="
2180                        << req.fs.buffer_size() + namebuf.len + valuebuf.len
2181                        << ", num=" << req.fs.num_fields() + 1;
2182     }
2183 
2184     // just ignore if this is a trailer part.
2185     if (trailer) {
2186       if (shutdown_stream_read(downstream->get_stream_id(),
2187                                NGHTTP3_H3_NO_ERROR) != 0) {
2188         return -1;
2189       }
2190 
2191       return 0;
2192     }
2193 
2194     if (error_reply(downstream, 431) != 0) {
2195       return -1;
2196     }
2197 
2198     return 0;
2199   }
2200 
2201   auto nameref = StringRef{namebuf.base, namebuf.len};
2202   auto valueref = StringRef{valuebuf.base, valuebuf.len};
2203   auto token = http2::lookup_token(nameref);
2204   auto no_index = flags & NGHTTP3_NV_FLAG_NEVER_INDEX;
2205 
2206   downstream->add_rcbuf(name);
2207   downstream->add_rcbuf(value);
2208 
2209   if (trailer) {
2210     req.fs.add_trailer_token(nameref, valueref, no_index, token);
2211     return 0;
2212   }
2213 
2214   req.fs.add_header_token(nameref, valueref, no_index, token);
2215   return 0;
2216 }
2217 
2218 namespace {
http_end_request_headers(nghttp3_conn * conn,int64_t stream_id,int fin,void * user_data,void * stream_user_data)2219 int http_end_request_headers(nghttp3_conn *conn, int64_t stream_id, int fin,
2220                              void *user_data, void *stream_user_data) {
2221   auto upstream = static_cast<Http3Upstream *>(user_data);
2222   auto downstream = static_cast<Downstream *>(stream_user_data);
2223 
2224   if (!downstream || downstream->get_stop_reading()) {
2225     return 0;
2226   }
2227 
2228   if (upstream->http_end_request_headers(downstream, fin) != 0) {
2229     return NGHTTP3_ERR_CALLBACK_FAILURE;
2230   }
2231 
2232   downstream->reset_upstream_rtimer();
2233   downstream->stop_header_timer();
2234 
2235   return 0;
2236 }
2237 } // namespace
2238 
http_end_request_headers(Downstream * downstream,int fin)2239 int Http3Upstream::http_end_request_headers(Downstream *downstream, int fin) {
2240   auto lgconf = log_config();
2241   lgconf->update_tstamp(std::chrono::system_clock::now());
2242   auto &req = downstream->request();
2243   req.tstamp = lgconf->tstamp;
2244 
2245   if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
2246     return 0;
2247   }
2248 
2249   auto &nva = req.fs.headers();
2250 
2251   if (LOG_ENABLED(INFO)) {
2252     std::stringstream ss;
2253     for (auto &nv : nva) {
2254       if (nv.name == "authorization"_sr) {
2255         ss << TTY_HTTP_HD << nv.name << TTY_RST << ": <redacted>\n";
2256         continue;
2257       }
2258       ss << TTY_HTTP_HD << nv.name << TTY_RST << ": " << nv.value << "\n";
2259     }
2260     ULOG(INFO, this) << "HTTP request headers. stream_id="
2261                      << downstream->get_stream_id() << "\n"
2262                      << ss.str();
2263   }
2264 
2265   auto content_length = req.fs.header(http2::HD_CONTENT_LENGTH);
2266   if (content_length) {
2267     // libnghttp3 guarantees this can be parsed
2268     req.fs.content_length =
2269         util::parse_uint(content_length->value).value_or(-1);
2270   }
2271 
2272   // presence of mandatory header fields are guaranteed by libnghttp3.
2273   auto authority = req.fs.header(http2::HD__AUTHORITY);
2274   auto path = req.fs.header(http2::HD__PATH);
2275   auto method = req.fs.header(http2::HD__METHOD);
2276   auto scheme = req.fs.header(http2::HD__SCHEME);
2277 
2278   auto method_token = http2::lookup_method_token(method->value);
2279   if (method_token == -1) {
2280     if (error_reply(downstream, 501) != 0) {
2281       return -1;
2282     }
2283     return 0;
2284   }
2285 
2286   auto faddr = handler_->get_upstream_addr();
2287 
2288   auto config = get_config();
2289 
2290   // For HTTP/2 proxy, we require :authority.
2291   if (method_token != HTTP_CONNECT && config->http2_proxy &&
2292       faddr->alt_mode == UpstreamAltMode::NONE && !authority) {
2293     shutdown_stream(downstream, NGHTTP3_H3_GENERAL_PROTOCOL_ERROR);
2294     return 0;
2295   }
2296 
2297   req.method = method_token;
2298   if (scheme) {
2299     req.scheme = scheme->value;
2300   }
2301 
2302   // nghttp2 library guarantees either :authority or host exist
2303   if (!authority) {
2304     req.no_authority = true;
2305     authority = req.fs.header(http2::HD_HOST);
2306   }
2307 
2308   if (authority) {
2309     req.authority = authority->value;
2310   }
2311 
2312   if (path) {
2313     if (method_token == HTTP_OPTIONS && path->value == "*"_sr) {
2314       // Server-wide OPTIONS request.  Path is empty.
2315     } else if (config->http2_proxy &&
2316                faddr->alt_mode == UpstreamAltMode::NONE) {
2317       req.path = path->value;
2318     } else {
2319       req.path = http2::rewrite_clean_path(downstream->get_block_allocator(),
2320                                            path->value);
2321     }
2322   }
2323 
2324   auto connect_proto = req.fs.header(http2::HD__PROTOCOL);
2325   if (connect_proto) {
2326     if (connect_proto->value != "websocket"_sr) {
2327       if (error_reply(downstream, 400) != 0) {
2328         return -1;
2329       }
2330       return 0;
2331     }
2332     req.connect_proto = ConnectProto::WEBSOCKET;
2333   }
2334 
2335   if (!fin) {
2336     req.http2_expect_body = true;
2337   } else if (req.fs.content_length == -1) {
2338     req.fs.content_length = 0;
2339   }
2340 
2341   downstream->inspect_http2_request();
2342 
2343   downstream->set_request_state(DownstreamState::HEADER_COMPLETE);
2344 
2345   if (config->http.require_http_scheme &&
2346       !http::check_http_scheme(req.scheme, /* encrypted = */ true)) {
2347     if (error_reply(downstream, 400) != 0) {
2348       return -1;
2349     }
2350   }
2351 
2352 #ifdef HAVE_MRUBY
2353   auto worker = handler_->get_worker();
2354   auto mruby_ctx = worker->get_mruby_context();
2355 
2356   if (mruby_ctx->run_on_request_proc(downstream) != 0) {
2357     if (error_reply(downstream, 500) != 0) {
2358       return -1;
2359     }
2360     return 0;
2361   }
2362 #endif // HAVE_MRUBY
2363 
2364   if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
2365     return 0;
2366   }
2367 
2368   start_downstream(downstream);
2369 
2370   return 0;
2371 }
2372 
start_downstream(Downstream * downstream)2373 void Http3Upstream::start_downstream(Downstream *downstream) {
2374   if (downstream_queue_.can_activate(downstream->request().authority)) {
2375     initiate_downstream(downstream);
2376     return;
2377   }
2378 
2379   downstream_queue_.mark_blocked(downstream);
2380 }
2381 
initiate_downstream(Downstream * downstream)2382 void Http3Upstream::initiate_downstream(Downstream *downstream) {
2383   int rv;
2384 
2385 #ifdef HAVE_MRUBY
2386   DownstreamConnection *dconn_ptr;
2387 #endif // HAVE_MRUBY
2388 
2389   for (;;) {
2390     auto dconn = handler_->get_downstream_connection(rv, downstream);
2391     if (!dconn) {
2392       if (rv == SHRPX_ERR_TLS_REQUIRED) {
2393         assert(0);
2394         abort();
2395       }
2396 
2397       rv = error_reply(downstream, 502);
2398       if (rv != 0) {
2399         shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2400       }
2401 
2402       downstream->set_request_state(DownstreamState::CONNECT_FAIL);
2403       downstream_queue_.mark_failure(downstream);
2404 
2405       return;
2406     }
2407 
2408 #ifdef HAVE_MRUBY
2409     dconn_ptr = dconn.get();
2410 #endif // HAVE_MRUBY
2411     rv = downstream->attach_downstream_connection(std::move(dconn));
2412     if (rv == 0) {
2413       break;
2414     }
2415   }
2416 
2417 #ifdef HAVE_MRUBY
2418   const auto &group = dconn_ptr->get_downstream_addr_group();
2419   if (group) {
2420     const auto &mruby_ctx = group->shared_addr->mruby_ctx;
2421     if (mruby_ctx->run_on_request_proc(downstream) != 0) {
2422       if (error_reply(downstream, 500) != 0) {
2423         shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2424       }
2425 
2426       downstream_queue_.mark_failure(downstream);
2427 
2428       return;
2429     }
2430 
2431     if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
2432       return;
2433     }
2434   }
2435 #endif // HAVE_MRUBY
2436 
2437   rv = downstream->push_request_headers();
2438   if (rv != 0) {
2439 
2440     if (error_reply(downstream, 502) != 0) {
2441       shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2442     }
2443 
2444     downstream_queue_.mark_failure(downstream);
2445 
2446     return;
2447   }
2448 
2449   downstream_queue_.mark_active(downstream);
2450 
2451   auto &req = downstream->request();
2452   if (!req.http2_expect_body) {
2453     rv = downstream->end_upload_data();
2454     if (rv != 0) {
2455       shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2456     }
2457   }
2458 }
2459 
2460 namespace {
http_recv_data(nghttp3_conn * conn,int64_t stream_id,const uint8_t * data,size_t datalen,void * user_data,void * stream_user_data)2461 int http_recv_data(nghttp3_conn *conn, int64_t stream_id, const uint8_t *data,
2462                    size_t datalen, void *user_data, void *stream_user_data) {
2463   auto upstream = static_cast<Http3Upstream *>(user_data);
2464   auto downstream = static_cast<Downstream *>(stream_user_data);
2465 
2466   if (upstream->http_recv_data(downstream, {data, datalen}) != 0) {
2467     return NGHTTP3_ERR_CALLBACK_FAILURE;
2468   }
2469 
2470   return 0;
2471 }
2472 } // namespace
2473 
http_recv_data(Downstream * downstream,std::span<const uint8_t> data)2474 int Http3Upstream::http_recv_data(Downstream *downstream,
2475                                   std::span<const uint8_t> data) {
2476   downstream->reset_upstream_rtimer();
2477 
2478   if (downstream->push_upload_data_chunk(data.data(), data.size()) != 0) {
2479     if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE) {
2480       shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2481     }
2482 
2483     consume(downstream->get_stream_id(), data.size());
2484 
2485     return 0;
2486   }
2487 
2488   return 0;
2489 }
2490 
2491 namespace {
http_end_stream(nghttp3_conn * conn,int64_t stream_id,void * user_data,void * stream_user_data)2492 int http_end_stream(nghttp3_conn *conn, int64_t stream_id, void *user_data,
2493                     void *stream_user_data) {
2494   auto upstream = static_cast<Http3Upstream *>(user_data);
2495   auto downstream = static_cast<Downstream *>(stream_user_data);
2496 
2497   if (!downstream || downstream->get_stop_reading()) {
2498     return 0;
2499   }
2500 
2501   if (upstream->http_end_stream(downstream) != 0) {
2502     return NGHTTP3_ERR_CALLBACK_FAILURE;
2503   }
2504 
2505   return 0;
2506 }
2507 } // namespace
2508 
http_end_stream(Downstream * downstream)2509 int Http3Upstream::http_end_stream(Downstream *downstream) {
2510   downstream->disable_upstream_rtimer();
2511 
2512   if (downstream->end_upload_data() != 0) {
2513     if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE) {
2514       shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2515     }
2516   }
2517 
2518   downstream->set_request_state(DownstreamState::MSG_COMPLETE);
2519 
2520   return 0;
2521 }
2522 
2523 namespace {
http_stream_close(nghttp3_conn * conn,int64_t stream_id,uint64_t app_error_code,void * conn_user_data,void * stream_user_data)2524 int http_stream_close(nghttp3_conn *conn, int64_t stream_id,
2525                       uint64_t app_error_code, void *conn_user_data,
2526                       void *stream_user_data) {
2527   auto upstream = static_cast<Http3Upstream *>(conn_user_data);
2528   auto downstream = static_cast<Downstream *>(stream_user_data);
2529 
2530   if (!downstream) {
2531     return 0;
2532   }
2533 
2534   if (upstream->http_stream_close(downstream, app_error_code) != 0) {
2535     return NGHTTP3_ERR_CALLBACK_FAILURE;
2536   }
2537 
2538   return 0;
2539 }
2540 } // namespace
2541 
http_stream_close(Downstream * downstream,uint64_t app_error_code)2542 int Http3Upstream::http_stream_close(Downstream *downstream,
2543                                      uint64_t app_error_code) {
2544   auto stream_id = downstream->get_stream_id();
2545 
2546   if (LOG_ENABLED(INFO)) {
2547     ULOG(INFO, this) << "Stream stream_id=" << stream_id
2548                      << " is being closed with app_error_code="
2549                      << app_error_code;
2550 
2551     auto body = downstream->get_response_buf();
2552 
2553     ULOG(INFO, this) << "response unacked_left=" << body->rleft()
2554                      << " not_sent=" << body->rleft_mark();
2555   }
2556 
2557   auto &req = downstream->request();
2558 
2559   consume(stream_id, req.unconsumed_body_length);
2560 
2561   req.unconsumed_body_length = 0;
2562 
2563   ngtcp2_conn_extend_max_streams_bidi(conn_, 1);
2564 
2565   if (downstream->get_request_state() == DownstreamState::CONNECT_FAIL) {
2566     remove_downstream(downstream);
2567     // downstream was deleted
2568 
2569     return 0;
2570   }
2571 
2572   if (downstream->can_detach_downstream_connection()) {
2573     // Keep-alive
2574     downstream->detach_downstream_connection();
2575   }
2576 
2577   downstream->set_request_state(DownstreamState::STREAM_CLOSED);
2578 
2579   // At this point, downstream read may be paused.
2580 
2581   // If shrpx_downstream::push_request_headers() failed, the
2582   // error is handled here.
2583   remove_downstream(downstream);
2584   // downstream was deleted
2585 
2586   return 0;
2587 }
2588 
2589 namespace {
http_stop_sending(nghttp3_conn * conn,int64_t stream_id,uint64_t app_error_code,void * user_data,void * stream_user_data)2590 int http_stop_sending(nghttp3_conn *conn, int64_t stream_id,
2591                       uint64_t app_error_code, void *user_data,
2592                       void *stream_user_data) {
2593   auto upstream = static_cast<Http3Upstream *>(user_data);
2594 
2595   if (upstream->http_stop_sending(stream_id, app_error_code) != 0) {
2596     return NGHTTP3_ERR_CALLBACK_FAILURE;
2597   }
2598 
2599   return 0;
2600 }
2601 } // namespace
2602 
http_stop_sending(int64_t stream_id,uint64_t app_error_code)2603 int Http3Upstream::http_stop_sending(int64_t stream_id,
2604                                      uint64_t app_error_code) {
2605   auto rv =
2606       ngtcp2_conn_shutdown_stream_read(conn_, 0, stream_id, app_error_code);
2607   if (ngtcp2_err_is_fatal(rv)) {
2608     ULOG(ERROR, this) << "ngtcp2_conn_shutdown_stream_read: "
2609                       << ngtcp2_strerror(rv);
2610     return -1;
2611   }
2612 
2613   return 0;
2614 }
2615 
2616 namespace {
http_reset_stream(nghttp3_conn * conn,int64_t stream_id,uint64_t app_error_code,void * user_data,void * stream_user_data)2617 int http_reset_stream(nghttp3_conn *conn, int64_t stream_id,
2618                       uint64_t app_error_code, void *user_data,
2619                       void *stream_user_data) {
2620   auto upstream = static_cast<Http3Upstream *>(user_data);
2621 
2622   if (upstream->http_reset_stream(stream_id, app_error_code) != 0) {
2623     return NGHTTP3_ERR_CALLBACK_FAILURE;
2624   }
2625 
2626   return 0;
2627 }
2628 } // namespace
2629 
http_reset_stream(int64_t stream_id,uint64_t app_error_code)2630 int Http3Upstream::http_reset_stream(int64_t stream_id,
2631                                      uint64_t app_error_code) {
2632   auto rv =
2633       ngtcp2_conn_shutdown_stream_write(conn_, 0, stream_id, app_error_code);
2634   if (ngtcp2_err_is_fatal(rv)) {
2635     ULOG(ERROR, this) << "ngtcp2_conn_shutdown_stream_write: "
2636                       << ngtcp2_strerror(rv);
2637     return -1;
2638   }
2639 
2640   return 0;
2641 }
2642 
setup_httpconn()2643 int Http3Upstream::setup_httpconn() {
2644   int rv;
2645 
2646   if (ngtcp2_conn_get_streams_uni_left(conn_) < 3) {
2647     return -1;
2648   }
2649 
2650   nghttp3_callbacks callbacks{
2651       shrpx::http_acked_stream_data,
2652       shrpx::http_stream_close,
2653       shrpx::http_recv_data,
2654       http_deferred_consume,
2655       shrpx::http_begin_request_headers,
2656       shrpx::http_recv_request_header,
2657       shrpx::http_end_request_headers,
2658       nullptr, // begin_trailers
2659       shrpx::http_recv_request_trailer,
2660       nullptr, // end_trailers
2661       shrpx::http_stop_sending,
2662       shrpx::http_end_stream,
2663       shrpx::http_reset_stream,
2664   };
2665 
2666   auto config = get_config();
2667 
2668   nghttp3_settings settings;
2669   nghttp3_settings_default(&settings);
2670   settings.qpack_max_dtable_capacity = 4_k;
2671 
2672   if (!config->http2_proxy) {
2673     settings.enable_connect_protocol = 1;
2674   }
2675 
2676   auto mem = nghttp3_mem_default();
2677 
2678   rv = nghttp3_conn_server_new(&httpconn_, &callbacks, &settings, mem, this);
2679   if (rv != 0) {
2680     ULOG(ERROR, this) << "nghttp3_conn_server_new: " << nghttp3_strerror(rv);
2681     return -1;
2682   }
2683 
2684   auto params = ngtcp2_conn_get_local_transport_params(conn_);
2685 
2686   nghttp3_conn_set_max_client_streams_bidi(httpconn_,
2687                                            params->initial_max_streams_bidi);
2688 
2689   int64_t ctrl_stream_id;
2690 
2691   rv = ngtcp2_conn_open_uni_stream(conn_, &ctrl_stream_id, nullptr);
2692   if (rv != 0) {
2693     ULOG(ERROR, this) << "ngtcp2_conn_open_uni_stream: " << ngtcp2_strerror(rv);
2694     return -1;
2695   }
2696 
2697   rv = nghttp3_conn_bind_control_stream(httpconn_, ctrl_stream_id);
2698   if (rv != 0) {
2699     ULOG(ERROR, this) << "nghttp3_conn_bind_control_stream: "
2700                       << nghttp3_strerror(rv);
2701     return -1;
2702   }
2703 
2704   int64_t qpack_enc_stream_id, qpack_dec_stream_id;
2705 
2706   rv = ngtcp2_conn_open_uni_stream(conn_, &qpack_enc_stream_id, nullptr);
2707   if (rv != 0) {
2708     ULOG(ERROR, this) << "ngtcp2_conn_open_uni_stream: " << ngtcp2_strerror(rv);
2709     return -1;
2710   }
2711 
2712   rv = ngtcp2_conn_open_uni_stream(conn_, &qpack_dec_stream_id, nullptr);
2713   if (rv != 0) {
2714     ULOG(ERROR, this) << "ngtcp2_conn_open_uni_stream: " << ngtcp2_strerror(rv);
2715     return -1;
2716   }
2717 
2718   rv = nghttp3_conn_bind_qpack_streams(httpconn_, qpack_enc_stream_id,
2719                                        qpack_dec_stream_id);
2720   if (rv != 0) {
2721     ULOG(ERROR, this) << "nghttp3_conn_bind_qpack_streams: "
2722                       << nghttp3_strerror(rv);
2723     return -1;
2724   }
2725 
2726   return 0;
2727 }
2728 
error_reply(Downstream * downstream,unsigned int status_code)2729 int Http3Upstream::error_reply(Downstream *downstream,
2730                                unsigned int status_code) {
2731   int rv;
2732   auto &resp = downstream->response();
2733 
2734   auto &balloc = downstream->get_block_allocator();
2735 
2736   auto html = http::create_error_html(balloc, status_code);
2737   resp.http_status = status_code;
2738 
2739   nghttp3_data_reader data_read, *data_read_ptr = nullptr;
2740 
2741   const auto &req = downstream->request();
2742 
2743   if (req.method != HTTP_HEAD) {
2744     data_read.read_data = downstream_read_data_callback;
2745     data_read_ptr = &data_read;
2746 
2747     auto body = downstream->get_response_buf();
2748 
2749     body->append(html);
2750   }
2751 
2752   downstream->set_response_state(DownstreamState::MSG_COMPLETE);
2753 
2754   auto lgconf = log_config();
2755   lgconf->update_tstamp(std::chrono::system_clock::now());
2756 
2757   auto response_status = http2::stringify_status(balloc, status_code);
2758   auto content_length = util::make_string_ref_uint(balloc, html.size());
2759   auto date = make_string_ref(balloc, lgconf->tstamp->time_http);
2760 
2761   auto nva = std::to_array(
2762       {http3::make_field(":status"_sr, response_status),
2763        http3::make_field("content-type"_sr, "text/html; charset=UTF-8"_sr),
2764        http3::make_field("server"_sr, get_config()->http.server_name),
2765        http3::make_field("content-length"_sr, content_length),
2766        http3::make_field("date"_sr, date)});
2767 
2768   rv = nghttp3_conn_submit_response(httpconn_, downstream->get_stream_id(),
2769                                     nva.data(), nva.size(), data_read_ptr);
2770   if (nghttp3_err_is_fatal(rv)) {
2771     ULOG(FATAL, this) << "nghttp3_conn_submit_response() failed: "
2772                       << nghttp3_strerror(rv);
2773     return -1;
2774   }
2775 
2776   downstream->reset_upstream_wtimer();
2777 
2778   if (shutdown_stream_read(downstream->get_stream_id(), NGHTTP3_H3_NO_ERROR) !=
2779       0) {
2780     return -1;
2781   }
2782 
2783   return 0;
2784 }
2785 
shutdown_stream(Downstream * downstream,uint64_t app_error_code)2786 int Http3Upstream::shutdown_stream(Downstream *downstream,
2787                                    uint64_t app_error_code) {
2788   auto stream_id = downstream->get_stream_id();
2789 
2790   if (LOG_ENABLED(INFO)) {
2791     ULOG(INFO, this) << "Shutdown stream_id=" << stream_id
2792                      << " with app_error_code=" << app_error_code;
2793   }
2794 
2795   auto rv = ngtcp2_conn_shutdown_stream(conn_, 0, stream_id, app_error_code);
2796   if (rv != 0) {
2797     ULOG(FATAL, this) << "ngtcp2_conn_shutdown_stream() failed: "
2798                       << ngtcp2_strerror(rv);
2799     return -1;
2800   }
2801 
2802   return 0;
2803 }
2804 
shutdown_stream_read(int64_t stream_id,uint64_t app_error_code)2805 int Http3Upstream::shutdown_stream_read(int64_t stream_id,
2806                                         uint64_t app_error_code) {
2807   auto rv = ngtcp2_conn_shutdown_stream_read(conn_, 0, stream_id,
2808                                              NGHTTP3_H3_NO_ERROR);
2809   if (ngtcp2_err_is_fatal(rv)) {
2810     ULOG(FATAL, this) << "ngtcp2_conn_shutdown_stream_read: "
2811                       << ngtcp2_strerror(rv);
2812     return -1;
2813   }
2814 
2815   return 0;
2816 }
2817 
consume(int64_t stream_id,size_t nconsumed)2818 void Http3Upstream::consume(int64_t stream_id, size_t nconsumed) {
2819   ngtcp2_conn_extend_max_stream_offset(conn_, stream_id, nconsumed);
2820   ngtcp2_conn_extend_max_offset(conn_, nconsumed);
2821 }
2822 
remove_downstream(Downstream * downstream)2823 void Http3Upstream::remove_downstream(Downstream *downstream) {
2824   if (downstream->accesslog_ready()) {
2825     handler_->write_accesslog(downstream);
2826   }
2827 
2828   nghttp3_conn_set_stream_user_data(httpconn_, downstream->get_stream_id(),
2829                                     nullptr);
2830 
2831   auto next_downstream = downstream_queue_.remove_and_get_blocked(downstream);
2832 
2833   if (next_downstream) {
2834     initiate_downstream(next_downstream);
2835   }
2836 
2837   if (downstream_queue_.get_downstreams() == nullptr) {
2838     // There is no downstream at the moment.  Start idle timer now.
2839     handler_->repeat_read_timer();
2840   }
2841 }
2842 
log_response_headers(Downstream * downstream,const std::vector<nghttp3_nv> & nva) const2843 void Http3Upstream::log_response_headers(
2844     Downstream *downstream, const std::vector<nghttp3_nv> &nva) const {
2845   std::stringstream ss;
2846   for (auto &nv : nva) {
2847     ss << TTY_HTTP_HD << StringRef{nv.name, nv.namelen} << TTY_RST << ": "
2848        << StringRef{nv.value, nv.valuelen} << "\n";
2849   }
2850   ULOG(INFO, this) << "HTTP response headers. stream_id="
2851                    << downstream->get_stream_id() << "\n"
2852                    << ss.str();
2853 }
2854 
check_shutdown()2855 int Http3Upstream::check_shutdown() {
2856   auto worker = handler_->get_worker();
2857 
2858   if (!worker->get_graceful_shutdown()) {
2859     return 0;
2860   }
2861 
2862   ev_prepare_stop(handler_->get_loop(), &prep_);
2863 
2864   return start_graceful_shutdown();
2865 }
2866 
start_graceful_shutdown()2867 int Http3Upstream::start_graceful_shutdown() {
2868   int rv;
2869 
2870   if (ev_is_active(&shutdown_timer_)) {
2871     return 0;
2872   }
2873 
2874   if (!httpconn_) {
2875     return -1;
2876   }
2877 
2878   rv = nghttp3_conn_submit_shutdown_notice(httpconn_);
2879   if (rv != 0) {
2880     ULOG(FATAL, this) << "nghttp3_conn_submit_shutdown_notice: "
2881                       << nghttp3_strerror(rv);
2882     return -1;
2883   }
2884 
2885   handler_->signal_write();
2886 
2887   auto t = ngtcp2_conn_get_pto(conn_);
2888 
2889   ev_timer_set(&shutdown_timer_, static_cast<ev_tstamp>(t * 3) / NGTCP2_SECONDS,
2890                0.);
2891   ev_timer_start(handler_->get_loop(), &shutdown_timer_);
2892 
2893   return 0;
2894 }
2895 
submit_goaway()2896 int Http3Upstream::submit_goaway() {
2897   int rv;
2898 
2899   rv = nghttp3_conn_shutdown(httpconn_);
2900   if (rv != 0) {
2901     ULOG(FATAL, this) << "nghttp3_conn_shutdown: " << nghttp3_strerror(rv);
2902     return -1;
2903   }
2904 
2905   handler_->signal_write();
2906 
2907   return 0;
2908 }
2909 
open_qlog_file(const StringRef & dir,const ngtcp2_cid & scid) const2910 int Http3Upstream::open_qlog_file(const StringRef &dir,
2911                                   const ngtcp2_cid &scid) const {
2912   std::array<char, sizeof("20141115T125824.741+0900")> buf;
2913 
2914   auto path = std::string{dir};
2915   path += '/';
2916   path +=
2917       util::format_iso8601_basic(buf.data(), std::chrono::system_clock::now());
2918   path += '-';
2919   path += util::format_hex(std::span{scid.data, scid.datalen});
2920   path += ".sqlog";
2921 
2922   int fd;
2923 
2924 #ifdef O_CLOEXEC
2925   while ((fd = open(path.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_CLOEXEC,
2926                     S_IRUSR | S_IWUSR | S_IRGRP)) == -1 &&
2927          errno == EINTR)
2928     ;
2929 #else  // !O_CLOEXEC
2930   while ((fd = open(path.c_str(), O_WRONLY | O_CREAT | O_TRUNC,
2931                     S_IRUSR | S_IWUSR | S_IRGRP)) == -1 &&
2932          errno == EINTR)
2933     ;
2934 
2935   if (fd != -1) {
2936     util::make_socket_closeonexec(fd);
2937   }
2938 #endif // !O_CLOEXEC
2939 
2940   if (fd == -1) {
2941     auto error = errno;
2942     ULOG(ERROR, this) << "Failed to open qlog file " << path
2943                       << ": errno=" << error;
2944     return -1;
2945   }
2946 
2947   return fd;
2948 }
2949 
get_conn() const2950 ngtcp2_conn *Http3Upstream::get_conn() const { return conn_; }
2951 
2952 } // namespace shrpx
2953