• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * nghttp2 - HTTP/2 C Library
3  *
4  * Copyright (c) 2021 Tatsuhiro Tsujikawa
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining
7  * a copy of this software and associated documentation files (the
8  * "Software"), to deal in the Software without restriction, including
9  * without limitation the rights to use, copy, modify, merge, publish,
10  * distribute, sublicense, and/or sell copies of the Software, and to
11  * permit persons to whom the Software is furnished to do so, subject to
12  * the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be
15  * included in all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21  * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22  * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23  * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24  */
25 #include "shrpx_http3_upstream.h"
26 
27 #include <sys/types.h>
28 #include <sys/stat.h>
29 #include <fcntl.h>
30 #include <netinet/udp.h>
31 
32 #include <cstdio>
33 
34 #include <ngtcp2/ngtcp2_crypto.h>
35 
36 #include "shrpx_client_handler.h"
37 #include "shrpx_downstream.h"
38 #include "shrpx_downstream_connection.h"
39 #include "shrpx_log.h"
40 #include "shrpx_quic.h"
41 #include "shrpx_worker.h"
42 #include "shrpx_http.h"
43 #include "shrpx_connection_handler.h"
44 #ifdef HAVE_MRUBY
45 #  include "shrpx_mruby.h"
46 #endif // HAVE_MRUBY
47 #include "http3.h"
48 #include "util.h"
49 #include "ssl_compat.h"
50 
51 namespace shrpx {
52 
53 namespace {
timeoutcb(struct ev_loop * loop,ev_timer * w,int revents)54 void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
55   auto upstream = static_cast<Http3Upstream *>(w->data);
56 
57   if (upstream->handle_expiry() != 0 || upstream->on_write() != 0) {
58     goto fail;
59   }
60 
61   return;
62 
63 fail:
64   auto handler = upstream->get_client_handler();
65 
66   delete handler;
67 }
68 } // namespace
69 
70 namespace {
shutdown_timeout_cb(struct ev_loop * loop,ev_timer * w,int revents)71 void shutdown_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
72   auto upstream = static_cast<Http3Upstream *>(w->data);
73   auto handler = upstream->get_client_handler();
74 
75   if (upstream->submit_goaway() != 0) {
76     delete handler;
77   }
78 }
79 } // namespace
80 
81 namespace {
prepare_cb(struct ev_loop * loop,ev_prepare * w,int revent)82 void prepare_cb(struct ev_loop *loop, ev_prepare *w, int revent) {
83   auto upstream = static_cast<Http3Upstream *>(w->data);
84   auto handler = upstream->get_client_handler();
85 
86   if (upstream->check_shutdown() != 0) {
87     delete handler;
88   }
89 }
90 } // namespace
91 
92 namespace {
downstream_queue_size(Worker * worker)93 size_t downstream_queue_size(Worker *worker) {
94   auto &downstreamconf = *worker->get_downstream_config();
95 
96   if (get_config()->http2_proxy) {
97     return downstreamconf.connections_per_host;
98   }
99 
100   return downstreamconf.connections_per_frontend;
101 }
102 } // namespace
103 
104 namespace {
get_conn(ngtcp2_crypto_conn_ref * conn_ref)105 ngtcp2_conn *get_conn(ngtcp2_crypto_conn_ref *conn_ref) {
106   auto conn = static_cast<Connection *>(conn_ref->user_data);
107   auto handler = static_cast<ClientHandler *>(conn->data);
108   auto upstream = static_cast<Http3Upstream *>(handler->get_upstream());
109   return upstream->get_conn();
110 }
111 } // namespace
112 
Http3Upstream(ClientHandler * handler)113 Http3Upstream::Http3Upstream(ClientHandler *handler)
114   : handler_{handler},
115     qlog_fd_{-1},
116     hashed_scid_{},
117     conn_{nullptr},
118     httpconn_{nullptr},
119     downstream_queue_{downstream_queue_size(handler->get_worker()),
120                       !get_config()->http2_proxy},
121     tx_{
122       .data = std::unique_ptr<uint8_t[]>(new uint8_t[64_k]),
123 #ifndef UDP_SEGMENT
124       .no_gso = true,
125 #endif // UDP_SEGMENT
126     } {
127   auto conn = handler_->get_connection();
128   conn->conn_ref.get_conn = shrpx::get_conn;
129 
130   ev_timer_init(&timer_, timeoutcb, 0., 0.);
131   timer_.data = this;
132 
133   ngtcp2_ccerr_default(&last_error_);
134 
135   ev_timer_init(&shutdown_timer_, shutdown_timeout_cb, 0., 0.);
136   shutdown_timer_.data = this;
137 
138   ev_prepare_init(&prep_, prepare_cb);
139   prep_.data = this;
140   ev_prepare_start(handler_->get_loop(), &prep_);
141 }
142 
~Http3Upstream()143 Http3Upstream::~Http3Upstream() {
144   auto loop = handler_->get_loop();
145 
146   ev_prepare_stop(loop, &prep_);
147   ev_timer_stop(loop, &shutdown_timer_);
148   ev_timer_stop(loop, &timer_);
149 
150   nghttp3_conn_del(httpconn_);
151 
152   ngtcp2_conn_del(conn_);
153 
154   if (qlog_fd_ != -1) {
155     close(qlog_fd_);
156   }
157 }
158 
159 namespace {
log_printf(void * user_data,const char * fmt,...)160 void log_printf(void *user_data, const char *fmt, ...) {
161   va_list ap;
162   std::array<char, 4096> buf;
163 
164   va_start(ap, fmt);
165   auto nwrite = vsnprintf(buf.data(), buf.size(), fmt, ap);
166   va_end(ap);
167 
168   if (static_cast<size_t>(nwrite) >= buf.size()) {
169     nwrite = buf.size() - 1;
170   }
171 
172   buf[nwrite++] = '\n';
173 
174   while (write(fileno(stderr), buf.data(), nwrite) == -1 && errno == EINTR)
175     ;
176 }
177 } // namespace
178 
179 namespace {
qlog_write(void * user_data,uint32_t flags,const void * data,size_t datalen)180 void qlog_write(void *user_data, uint32_t flags, const void *data,
181                 size_t datalen) {
182   auto upstream = static_cast<Http3Upstream *>(user_data);
183 
184   upstream->qlog_write(data, datalen, flags & NGTCP2_QLOG_WRITE_FLAG_FIN);
185 }
186 } // namespace
187 
qlog_write(const void * data,size_t datalen,bool fin)188 void Http3Upstream::qlog_write(const void *data, size_t datalen, bool fin) {
189   assert(qlog_fd_ != -1);
190 
191   while (write(qlog_fd_, data, datalen) == -1 && errno == EINTR)
192     ;
193 
194   if (fin) {
195     close(qlog_fd_);
196     qlog_fd_ = -1;
197   }
198 }
199 
200 namespace {
rand(uint8_t * dest,size_t destlen,const ngtcp2_rand_ctx * rand_ctx)201 void rand(uint8_t *dest, size_t destlen, const ngtcp2_rand_ctx *rand_ctx) {
202   util::random_bytes(dest, dest + destlen,
203                      *static_cast<std::mt19937 *>(rand_ctx->native_handle));
204 }
205 } // namespace
206 
207 namespace {
get_new_connection_id(ngtcp2_conn * conn,ngtcp2_cid * cid,uint8_t * token,size_t cidlen,void * user_data)208 int get_new_connection_id(ngtcp2_conn *conn, ngtcp2_cid *cid, uint8_t *token,
209                           size_t cidlen, void *user_data) {
210   auto upstream = static_cast<Http3Upstream *>(user_data);
211   auto handler = upstream->get_client_handler();
212   auto worker = handler->get_worker();
213   auto conn_handler = worker->get_connection_handler();
214   auto &qkms = conn_handler->get_quic_keying_materials();
215   auto &qkm = qkms->keying_materials.front();
216 
217   assert(SHRPX_QUIC_SCIDLEN == cidlen);
218 
219   if (generate_quic_connection_id(*cid, worker->get_worker_id(), qkm.id,
220                                   qkm.cid_encryption_ctx) != 0) {
221     return NGTCP2_ERR_CALLBACK_FAILURE;
222   }
223 
224   if (generate_quic_stateless_reset_token(token, *cid, qkm.secret.data(),
225                                           qkm.secret.size()) != 0) {
226     return NGTCP2_ERR_CALLBACK_FAILURE;
227   }
228 
229   auto quic_connection_handler = worker->get_quic_connection_handler();
230 
231   quic_connection_handler->add_connection_id(*cid, handler);
232 
233   return 0;
234 }
235 } // namespace
236 
237 namespace {
remove_connection_id(ngtcp2_conn * conn,const ngtcp2_cid * cid,void * user_data)238 int remove_connection_id(ngtcp2_conn *conn, const ngtcp2_cid *cid,
239                          void *user_data) {
240   auto upstream = static_cast<Http3Upstream *>(user_data);
241   auto handler = upstream->get_client_handler();
242   auto worker = handler->get_worker();
243   auto quic_conn_handler = worker->get_quic_connection_handler();
244 
245   quic_conn_handler->remove_connection_id(*cid);
246 
247   return 0;
248 }
249 } // namespace
250 
http_begin_request_headers(int64_t stream_id)251 void Http3Upstream::http_begin_request_headers(int64_t stream_id) {
252   auto downstream =
253     std::make_unique<Downstream>(this, handler_->get_mcpool(), stream_id);
254   nghttp3_conn_set_stream_user_data(httpconn_, stream_id, downstream.get());
255 
256   downstream->reset_upstream_rtimer();
257   downstream->repeat_header_timer();
258 
259   handler_->stop_read_timer();
260 
261   auto &req = downstream->request();
262   req.http_major = 3;
263   req.http_minor = 0;
264 
265   add_pending_downstream(std::move(downstream));
266 }
267 
add_pending_downstream(std::unique_ptr<Downstream> downstream)268 void Http3Upstream::add_pending_downstream(
269   std::unique_ptr<Downstream> downstream) {
270   downstream_queue_.add_pending(std::move(downstream));
271 }
272 
273 namespace {
recv_stream_data(ngtcp2_conn * conn,uint32_t flags,int64_t stream_id,uint64_t offset,const uint8_t * data,size_t datalen,void * user_data,void * stream_user_data)274 int recv_stream_data(ngtcp2_conn *conn, uint32_t flags, int64_t stream_id,
275                      uint64_t offset, const uint8_t *data, size_t datalen,
276                      void *user_data, void *stream_user_data) {
277   auto upstream = static_cast<Http3Upstream *>(user_data);
278 
279   if (upstream->recv_stream_data(flags, stream_id, {data, datalen}) != 0) {
280     return NGTCP2_ERR_CALLBACK_FAILURE;
281   }
282 
283   return 0;
284 }
285 } // namespace
286 
recv_stream_data(uint32_t flags,int64_t stream_id,std::span<const uint8_t> data)287 int Http3Upstream::recv_stream_data(uint32_t flags, int64_t stream_id,
288                                     std::span<const uint8_t> data) {
289   assert(httpconn_);
290 
291   auto nconsumed =
292     nghttp3_conn_read_stream(httpconn_, stream_id, data.data(), data.size(),
293                              flags & NGTCP2_STREAM_DATA_FLAG_FIN);
294   if (nconsumed < 0) {
295     ULOG(ERROR, this) << "nghttp3_conn_read_stream: "
296                       << nghttp3_strerror(nconsumed);
297     ngtcp2_ccerr_set_application_error(
298       &last_error_, nghttp3_err_infer_quic_app_error_code(nconsumed), nullptr,
299       0);
300     return -1;
301   }
302 
303   ngtcp2_conn_extend_max_stream_offset(conn_, stream_id, nconsumed);
304   ngtcp2_conn_extend_max_offset(conn_, nconsumed);
305 
306   return 0;
307 }
308 
309 namespace {
stream_close(ngtcp2_conn * conn,uint32_t flags,int64_t stream_id,uint64_t app_error_code,void * user_data,void * stream_user_data)310 int stream_close(ngtcp2_conn *conn, uint32_t flags, int64_t stream_id,
311                  uint64_t app_error_code, void *user_data,
312                  void *stream_user_data) {
313   auto upstream = static_cast<Http3Upstream *>(user_data);
314 
315   if (!(flags & NGTCP2_STREAM_CLOSE_FLAG_APP_ERROR_CODE_SET)) {
316     app_error_code = NGHTTP3_H3_NO_ERROR;
317   }
318 
319   if (upstream->stream_close(stream_id, app_error_code) != 0) {
320     return NGTCP2_ERR_CALLBACK_FAILURE;
321   }
322 
323   return 0;
324 }
325 } // namespace
326 
stream_close(int64_t stream_id,uint64_t app_error_code)327 int Http3Upstream::stream_close(int64_t stream_id, uint64_t app_error_code) {
328   if (!httpconn_) {
329     return 0;
330   }
331 
332   auto rv = nghttp3_conn_close_stream(httpconn_, stream_id, app_error_code);
333   switch (rv) {
334   case 0:
335     break;
336   case NGHTTP3_ERR_STREAM_NOT_FOUND:
337     if (ngtcp2_is_bidi_stream(stream_id)) {
338       ngtcp2_conn_extend_max_streams_bidi(conn_, 1);
339     }
340     break;
341   default:
342     ULOG(ERROR, this) << "nghttp3_conn_close_stream: " << nghttp3_strerror(rv);
343     ngtcp2_ccerr_set_application_error(
344       &last_error_, nghttp3_err_infer_quic_app_error_code(rv), nullptr, 0);
345     return -1;
346   }
347 
348   return 0;
349 }
350 
351 namespace {
acked_stream_data_offset(ngtcp2_conn * conn,int64_t stream_id,uint64_t offset,uint64_t datalen,void * user_data,void * stream_user_data)352 int acked_stream_data_offset(ngtcp2_conn *conn, int64_t stream_id,
353                              uint64_t offset, uint64_t datalen, void *user_data,
354                              void *stream_user_data) {
355   auto upstream = static_cast<Http3Upstream *>(user_data);
356 
357   if (upstream->acked_stream_data_offset(stream_id, datalen) != 0) {
358     return NGTCP2_ERR_CALLBACK_FAILURE;
359   }
360 
361   return 0;
362 }
363 } // namespace
364 
acked_stream_data_offset(int64_t stream_id,uint64_t datalen)365 int Http3Upstream::acked_stream_data_offset(int64_t stream_id,
366                                             uint64_t datalen) {
367   if (!httpconn_) {
368     return 0;
369   }
370 
371   auto rv = nghttp3_conn_add_ack_offset(httpconn_, stream_id, datalen);
372   if (rv != 0) {
373     ULOG(ERROR, this) << "nghttp3_conn_add_ack_offset: "
374                       << nghttp3_strerror(rv);
375     return -1;
376   }
377 
378   return 0;
379 }
380 
381 namespace {
extend_max_stream_data(ngtcp2_conn * conn,int64_t stream_id,uint64_t max_data,void * user_data,void * stream_user_data)382 int extend_max_stream_data(ngtcp2_conn *conn, int64_t stream_id,
383                            uint64_t max_data, void *user_data,
384                            void *stream_user_data) {
385   auto upstream = static_cast<Http3Upstream *>(user_data);
386 
387   if (upstream->extend_max_stream_data(stream_id) != 0) {
388     return NGTCP2_ERR_CALLBACK_FAILURE;
389   }
390 
391   return 0;
392 }
393 } // namespace
394 
extend_max_stream_data(int64_t stream_id)395 int Http3Upstream::extend_max_stream_data(int64_t stream_id) {
396   if (!httpconn_) {
397     return 0;
398   }
399 
400   auto rv = nghttp3_conn_unblock_stream(httpconn_, stream_id);
401   if (rv != 0) {
402     ULOG(ERROR, this) << "nghttp3_conn_unblock_stream: "
403                       << nghttp3_strerror(rv);
404     return -1;
405   }
406 
407   return 0;
408 }
409 
410 namespace {
extend_max_remote_streams_bidi(ngtcp2_conn * conn,uint64_t max_streams,void * user_data)411 int extend_max_remote_streams_bidi(ngtcp2_conn *conn, uint64_t max_streams,
412                                    void *user_data) {
413   auto upstream = static_cast<Http3Upstream *>(user_data);
414 
415   upstream->extend_max_remote_streams_bidi(max_streams);
416 
417   return 0;
418 }
419 } // namespace
420 
extend_max_remote_streams_bidi(uint64_t max_streams)421 void Http3Upstream::extend_max_remote_streams_bidi(uint64_t max_streams) {
422   nghttp3_conn_set_max_client_streams_bidi(httpconn_, max_streams);
423 }
424 
425 namespace {
stream_reset(ngtcp2_conn * conn,int64_t stream_id,uint64_t final_size,uint64_t app_error_code,void * user_data,void * stream_user_data)426 int stream_reset(ngtcp2_conn *conn, int64_t stream_id, uint64_t final_size,
427                  uint64_t app_error_code, void *user_data,
428                  void *stream_user_data) {
429   auto upstream = static_cast<Http3Upstream *>(user_data);
430 
431   if (upstream->http_shutdown_stream_read(stream_id) != 0) {
432     return NGTCP2_ERR_CALLBACK_FAILURE;
433   }
434 
435   return 0;
436 }
437 } // namespace
438 
http_shutdown_stream_read(int64_t stream_id)439 int Http3Upstream::http_shutdown_stream_read(int64_t stream_id) {
440   if (!httpconn_) {
441     return 0;
442   }
443 
444   auto rv = nghttp3_conn_shutdown_stream_read(httpconn_, stream_id);
445   if (rv != 0) {
446     ULOG(ERROR, this) << "nghttp3_conn_shutdown_stream_read: "
447                       << nghttp3_strerror(rv);
448     return -1;
449   }
450 
451   return 0;
452 }
453 
454 namespace {
stream_stop_sending(ngtcp2_conn * conn,int64_t stream_id,uint64_t app_error_code,void * user_data,void * stream_user_data)455 int stream_stop_sending(ngtcp2_conn *conn, int64_t stream_id,
456                         uint64_t app_error_code, void *user_data,
457                         void *stream_user_data) {
458   auto upstream = static_cast<Http3Upstream *>(user_data);
459 
460   if (upstream->http_shutdown_stream_read(stream_id) != 0) {
461     return NGTCP2_ERR_CALLBACK_FAILURE;
462   }
463 
464   return 0;
465 }
466 } // namespace
467 
468 namespace {
handshake_completed(ngtcp2_conn * conn,void * user_data)469 int handshake_completed(ngtcp2_conn *conn, void *user_data) {
470   auto upstream = static_cast<Http3Upstream *>(user_data);
471 
472   if (upstream->handshake_completed() != 0) {
473     return NGTCP2_ERR_CALLBACK_FAILURE;
474   }
475 
476   return 0;
477 }
478 } // namespace
479 
handshake_completed()480 int Http3Upstream::handshake_completed() {
481   handler_->set_alpn_from_conn();
482 
483   auto alpn = handler_->get_alpn();
484   if (alpn.empty()) {
485     ULOG(ERROR, this) << "NO ALPN was negotiated";
486     return -1;
487   }
488 
489   auto path = ngtcp2_conn_get_path(conn_);
490 
491   return send_new_token(&path->remote);
492 }
493 
494 namespace {
path_validation(ngtcp2_conn * conn,uint32_t flags,const ngtcp2_path * path,const ngtcp2_path * old_path,ngtcp2_path_validation_result res,void * user_data)495 int path_validation(ngtcp2_conn *conn, uint32_t flags, const ngtcp2_path *path,
496                     const ngtcp2_path *old_path,
497                     ngtcp2_path_validation_result res, void *user_data) {
498   if (res != NGTCP2_PATH_VALIDATION_RESULT_SUCCESS ||
499       !(flags & NGTCP2_PATH_VALIDATION_FLAG_NEW_TOKEN)) {
500     return 0;
501   }
502 
503   auto upstream = static_cast<Http3Upstream *>(user_data);
504   if (upstream->send_new_token(&path->remote) != 0) {
505     return NGTCP2_ERR_CALLBACK_FAILURE;
506   }
507 
508   return 0;
509 }
510 } // namespace
511 
send_new_token(const ngtcp2_addr * remote_addr)512 int Http3Upstream::send_new_token(const ngtcp2_addr *remote_addr) {
513   auto worker = handler_->get_worker();
514   auto conn_handler = worker->get_connection_handler();
515   auto &qkms = conn_handler->get_quic_keying_materials();
516   auto &qkm = qkms->keying_materials.front();
517 
518   std::array<uint8_t, NGTCP2_CRYPTO_MAX_REGULAR_TOKENLEN + 1> tokenbuf;
519 
520   auto token = generate_token(tokenbuf, remote_addr->addr, remote_addr->addrlen,
521                               qkm.secret, qkm.id);
522   if (!token) {
523     return -1;
524   }
525 
526   assert(token->size() == NGTCP2_CRYPTO_MAX_REGULAR_TOKENLEN + 1);
527 
528   auto rv = ngtcp2_conn_submit_new_token(conn_, token->data(), token->size());
529   if (rv != 0) {
530     ULOG(ERROR, this) << "ngtcp2_conn_submit_new_token: "
531                       << ngtcp2_strerror(rv);
532     return -1;
533   }
534 
535   return 0;
536 }
537 
538 namespace {
recv_tx_key(ngtcp2_conn * conn,ngtcp2_encryption_level level,void * user_data)539 int recv_tx_key(ngtcp2_conn *conn, ngtcp2_encryption_level level,
540                 void *user_data) {
541   if (level != NGTCP2_ENCRYPTION_LEVEL_1RTT) {
542     return 0;
543   }
544 
545   auto upstream = static_cast<Http3Upstream *>(user_data);
546   if (upstream->setup_httpconn() != 0) {
547     return NGTCP2_ERR_CALLBACK_FAILURE;
548   }
549 
550   return 0;
551 }
552 } // namespace
553 
init(const UpstreamAddr * faddr,const Address & remote_addr,const Address & local_addr,const ngtcp2_pkt_hd & initial_hd,const ngtcp2_cid * odcid,std::span<const uint8_t> token,ngtcp2_token_type token_type)554 int Http3Upstream::init(const UpstreamAddr *faddr, const Address &remote_addr,
555                         const Address &local_addr,
556                         const ngtcp2_pkt_hd &initial_hd,
557                         const ngtcp2_cid *odcid, std::span<const uint8_t> token,
558                         ngtcp2_token_type token_type) {
559   int rv;
560 
561   auto worker = handler_->get_worker();
562   auto conn_handler = worker->get_connection_handler();
563 
564   auto callbacks = ngtcp2_callbacks{
565     nullptr, // client_initial
566     ngtcp2_crypto_recv_client_initial_cb,
567     ngtcp2_crypto_recv_crypto_data_cb,
568     shrpx::handshake_completed,
569     nullptr, // recv_version_negotiation
570     ngtcp2_crypto_encrypt_cb,
571     ngtcp2_crypto_decrypt_cb,
572     ngtcp2_crypto_hp_mask_cb,
573     shrpx::recv_stream_data,
574     shrpx::acked_stream_data_offset,
575     nullptr, // stream_open
576     shrpx::stream_close,
577     nullptr, // recv_stateless_reset
578     nullptr, // recv_retry
579     nullptr, // extend_max_local_streams_bidi
580     nullptr, // extend_max_local_streams_uni
581     rand,
582     get_new_connection_id,
583     remove_connection_id,
584     ngtcp2_crypto_update_key_cb,
585     shrpx::path_validation,
586     nullptr, // select_preferred_addr
587     shrpx::stream_reset,
588     shrpx::extend_max_remote_streams_bidi,
589     nullptr, // extend_max_remote_streams_uni
590     shrpx::extend_max_stream_data,
591     nullptr, // dcid_status
592     nullptr, // handshake_confirmed
593     nullptr, // recv_new_token
594     ngtcp2_crypto_delete_crypto_aead_ctx_cb,
595     ngtcp2_crypto_delete_crypto_cipher_ctx_cb,
596     nullptr, // recv_datagram
597     nullptr, // ack_datagram
598     nullptr, // lost_datagram
599     ngtcp2_crypto_get_path_challenge_data_cb,
600     shrpx::stream_stop_sending,
601     nullptr, // version_negotiation
602     nullptr, // recv_rx_key
603     shrpx::recv_tx_key,
604   };
605 
606   auto config = get_config();
607   auto &quicconf = config->quic;
608   auto &http3conf = config->http3;
609 
610   auto &qkms = conn_handler->get_quic_keying_materials();
611   auto &qkm = qkms->keying_materials.front();
612 
613   ngtcp2_cid scid;
614 
615   if (generate_quic_connection_id(scid, worker->get_worker_id(), qkm.id,
616                                   qkm.cid_encryption_ctx) != 0) {
617     return -1;
618   }
619 
620   ngtcp2_settings settings;
621   ngtcp2_settings_default(&settings);
622   if (quicconf.upstream.debug.log) {
623     settings.log_printf = log_printf;
624   }
625 
626   if (!quicconf.upstream.qlog.dir.empty()) {
627     auto fd = open_qlog_file(quicconf.upstream.qlog.dir, scid);
628     if (fd != -1) {
629       qlog_fd_ = fd;
630       settings.qlog_write = shrpx::qlog_write;
631     }
632   }
633 
634   settings.initial_ts = quic_timestamp();
635   settings.initial_rtt =
636     static_cast<ngtcp2_tstamp>(quicconf.upstream.initial_rtt * NGTCP2_SECONDS);
637   settings.cc_algo = quicconf.upstream.congestion_controller;
638   settings.max_window = http3conf.upstream.max_connection_window_size;
639   settings.max_stream_window = http3conf.upstream.max_window_size;
640   settings.rand_ctx.native_handle = &worker->get_randgen();
641   settings.token = token.data();
642   settings.tokenlen = token.size();
643   settings.token_type = token_type;
644   settings.initial_pkt_num = std::uniform_int_distribution<uint32_t>(
645     0, std::numeric_limits<int32_t>::max())(worker->get_randgen());
646 
647   ngtcp2_transport_params params;
648   ngtcp2_transport_params_default(&params);
649   params.initial_max_streams_bidi = http3conf.upstream.max_concurrent_streams;
650   // The minimum number of unidirectional streams required for HTTP/3.
651   params.initial_max_streams_uni = 3;
652   params.initial_max_data = http3conf.upstream.connection_window_size;
653   params.initial_max_stream_data_bidi_remote = http3conf.upstream.window_size;
654   params.initial_max_stream_data_uni = http3conf.upstream.window_size;
655   params.max_idle_timeout =
656     static_cast<ngtcp2_tstamp>(quicconf.upstream.timeout.idle * NGTCP2_SECONDS);
657 
658 #ifdef NGHTTP2_OPENSSL_IS_BORINGSSL
659   if (quicconf.upstream.early_data) {
660     ngtcp2_transport_params early_data_params;
661 
662     ngtcp2_transport_params_default(&early_data_params);
663 
664     early_data_params.initial_max_stream_data_bidi_local =
665       params.initial_max_stream_data_bidi_local;
666     early_data_params.initial_max_stream_data_bidi_remote =
667       params.initial_max_stream_data_bidi_remote;
668     early_data_params.initial_max_stream_data_uni =
669       params.initial_max_stream_data_uni;
670     early_data_params.initial_max_data = params.initial_max_data;
671     early_data_params.initial_max_streams_bidi =
672       params.initial_max_streams_bidi;
673     early_data_params.initial_max_streams_uni = params.initial_max_streams_uni;
674 
675     // TODO include HTTP/3 SETTINGS
676 
677     std::array<uint8_t, 128> quic_early_data_ctx;
678 
679     auto quic_early_data_ctxlen = ngtcp2_transport_params_encode(
680       quic_early_data_ctx.data(), quic_early_data_ctx.size(),
681       &early_data_params);
682 
683     assert(quic_early_data_ctxlen > 0);
684     assert(static_cast<size_t>(quic_early_data_ctxlen) <=
685            quic_early_data_ctx.size());
686 
687     if (SSL_set_quic_early_data_context(handler_->get_ssl(),
688                                         quic_early_data_ctx.data(),
689                                         quic_early_data_ctxlen) != 1) {
690       ULOG(ERROR, this) << "SSL_set_quic_early_data_context failed";
691       return -1;
692     }
693   }
694 #endif // NGHTTP2_OPENSSL_IS_BORINGSSL
695 
696   if (odcid) {
697     params.original_dcid = *odcid;
698     params.retry_scid = initial_hd.dcid;
699     params.retry_scid_present = 1;
700   } else {
701     params.original_dcid = initial_hd.dcid;
702   }
703 
704   params.original_dcid_present = 1;
705 
706   rv = generate_quic_stateless_reset_token(
707     params.stateless_reset_token, scid, qkm.secret.data(), qkm.secret.size());
708   if (rv != 0) {
709     ULOG(ERROR, this) << "generate_quic_stateless_reset_token failed";
710     return -1;
711   }
712   params.stateless_reset_token_present = 1;
713 
714   auto path = ngtcp2_path{
715     {
716       const_cast<sockaddr *>(&local_addr.su.sa),
717       static_cast<socklen_t>(local_addr.len),
718     },
719     {
720       const_cast<sockaddr *>(&remote_addr.su.sa),
721       static_cast<socklen_t>(remote_addr.len),
722     },
723     const_cast<UpstreamAddr *>(faddr),
724   };
725 
726   rv = ngtcp2_conn_server_new(&conn_, &initial_hd.scid, &scid, &path,
727                               initial_hd.version, &callbacks, &settings,
728                               &params, nullptr, this);
729   if (rv != 0) {
730     ULOG(ERROR, this) << "ngtcp2_conn_server_new: " << ngtcp2_strerror(rv);
731     return -1;
732   }
733 
734   ngtcp2_conn_set_tls_native_handle(conn_, handler_->get_ssl());
735 
736   auto quic_connection_handler = worker->get_quic_connection_handler();
737 
738   if (generate_quic_hashed_connection_id(hashed_scid_, remote_addr, local_addr,
739                                          initial_hd.dcid) != 0) {
740     return -1;
741   }
742 
743   quic_connection_handler->add_connection_id(hashed_scid_, handler_);
744   quic_connection_handler->add_connection_id(scid, handler_);
745 
746   return 0;
747 }
748 
on_read()749 int Http3Upstream::on_read() { return 0; }
750 
on_write()751 int Http3Upstream::on_write() {
752   int rv;
753 
754   if (tx_.send_blocked) {
755     rv = send_blocked_packet();
756     if (rv != 0) {
757       return -1;
758     }
759 
760     if (tx_.send_blocked) {
761       return 0;
762     }
763   }
764 
765   handler_->get_connection()->wlimit.stopw();
766 
767   if (write_streams() != 0) {
768     return -1;
769   }
770 
771   if (httpconn_ && nghttp3_conn_is_drained(httpconn_)) {
772     return -1;
773   }
774 
775   reset_timer();
776 
777   return 0;
778 }
779 
write_streams()780 int Http3Upstream::write_streams() {
781   std::array<nghttp3_vec, 16> vec;
782   auto max_udp_payload_size = ngtcp2_conn_get_max_tx_udp_payload_size(conn_);
783   auto path_max_udp_payload_size =
784     ngtcp2_conn_get_path_max_tx_udp_payload_size(conn_);
785   ngtcp2_pkt_info pi, prev_pi;
786   auto txbuf =
787     std::span{tx_.data.get(), std::max(ngtcp2_conn_get_send_quantum(conn_),
788                                        path_max_udp_payload_size)};
789   auto buf = txbuf;
790   ngtcp2_path_storage ps, prev_ps;
791   int rv;
792   size_t gso_size = 0;
793   auto ts = quic_timestamp();
794 
795   ngtcp2_path_storage_zero(&ps);
796   ngtcp2_path_storage_zero(&prev_ps);
797 
798   for (;;) {
799     int64_t stream_id = -1;
800     int fin = 0;
801     nghttp3_ssize sveccnt = 0;
802 
803     if (httpconn_ && ngtcp2_conn_get_max_data_left(conn_)) {
804       sveccnt = nghttp3_conn_writev_stream(httpconn_, &stream_id, &fin,
805                                            vec.data(), vec.size());
806       if (sveccnt < 0) {
807         ULOG(ERROR, this) << "nghttp3_conn_writev_stream: "
808                           << nghttp3_strerror(sveccnt);
809         ngtcp2_ccerr_set_application_error(
810           &last_error_, nghttp3_err_infer_quic_app_error_code(sveccnt), nullptr,
811           0);
812         return handle_error();
813       }
814     }
815 
816     ngtcp2_ssize ndatalen;
817     auto v = vec.data();
818     auto vcnt = static_cast<size_t>(sveccnt);
819 
820     uint32_t flags = NGTCP2_WRITE_STREAM_FLAG_MORE;
821     if (fin) {
822       flags |= NGTCP2_WRITE_STREAM_FLAG_FIN;
823     }
824 
825     auto buflen = buf.size() >= max_udp_payload_size
826                     ? max_udp_payload_size
827                     : path_max_udp_payload_size;
828     auto nwrite = ngtcp2_conn_writev_stream(
829       conn_, &ps.path, &pi, buf.data(), buflen, &ndatalen, flags, stream_id,
830       reinterpret_cast<const ngtcp2_vec *>(v), vcnt, ts);
831     if (nwrite < 0) {
832       switch (nwrite) {
833       case NGTCP2_ERR_STREAM_DATA_BLOCKED:
834         assert(ndatalen == -1);
835         nghttp3_conn_block_stream(httpconn_, stream_id);
836         continue;
837       case NGTCP2_ERR_STREAM_SHUT_WR:
838         assert(ndatalen == -1);
839         nghttp3_conn_shutdown_stream_write(httpconn_, stream_id);
840         continue;
841       case NGTCP2_ERR_WRITE_MORE:
842         assert(ndatalen >= 0);
843         rv = nghttp3_conn_add_write_offset(httpconn_, stream_id, ndatalen);
844         if (rv != 0) {
845           ULOG(ERROR, this)
846             << "nghttp3_conn_add_write_offset: " << nghttp3_strerror(rv);
847           ngtcp2_ccerr_set_application_error(
848             &last_error_, nghttp3_err_infer_quic_app_error_code(rv), nullptr,
849             0);
850           return handle_error();
851         }
852         continue;
853       }
854 
855       assert(ndatalen == -1);
856 
857       ULOG(ERROR, this) << "ngtcp2_conn_writev_stream: "
858                         << ngtcp2_strerror(nwrite);
859 
860       ngtcp2_ccerr_set_liberr(&last_error_, nwrite, nullptr, 0);
861 
862       return handle_error();
863     } else if (ndatalen >= 0) {
864       rv = nghttp3_conn_add_write_offset(httpconn_, stream_id, ndatalen);
865       if (rv != 0) {
866         ULOG(ERROR, this) << "nghttp3_conn_add_write_offset: "
867                           << nghttp3_strerror(rv);
868         ngtcp2_ccerr_set_application_error(
869           &last_error_, nghttp3_err_infer_quic_app_error_code(rv), nullptr, 0);
870         return handle_error();
871       }
872     }
873 
874     if (nwrite == 0) {
875       auto data = std::span{std::begin(txbuf), std::begin(buf)};
876       if (!data.empty()) {
877         auto faddr = static_cast<UpstreamAddr *>(prev_ps.path.user_data);
878 
879         auto [rest, rv] =
880           send_packet(faddr, prev_ps.path.remote.addr,
881                       prev_ps.path.remote.addrlen, prev_ps.path.local.addr,
882                       prev_ps.path.local.addrlen, prev_pi, data, gso_size);
883         if (rv == SHRPX_ERR_SEND_BLOCKED) {
884           on_send_blocked(faddr, prev_ps.path.remote, prev_ps.path.local,
885                           prev_pi, rest, gso_size);
886 
887           signal_write_upstream_addr(faddr);
888         }
889       }
890 
891       ngtcp2_conn_update_pkt_tx_time(conn_, ts);
892 
893       return 0;
894     }
895 
896     auto last_pkt = std::begin(buf);
897 
898     buf = buf.subspan(nwrite);
899 
900     if (last_pkt == std::begin(txbuf)) {
901       ngtcp2_path_copy(&prev_ps.path, &ps.path);
902       prev_pi = pi;
903       gso_size = nwrite;
904     } else if (!ngtcp2_path_eq(&prev_ps.path, &ps.path) ||
905                prev_pi.ecn != pi.ecn ||
906                static_cast<size_t>(nwrite) > gso_size ||
907                (gso_size > path_max_udp_payload_size &&
908                 static_cast<size_t>(nwrite) != gso_size)) {
909       auto faddr = static_cast<UpstreamAddr *>(prev_ps.path.user_data);
910       auto data = std::span{std::begin(txbuf), last_pkt};
911 
912       auto [rest, rv] =
913         send_packet(faddr, prev_ps.path.remote.addr,
914                     prev_ps.path.remote.addrlen, prev_ps.path.local.addr,
915                     prev_ps.path.local.addrlen, prev_pi, data, gso_size);
916       switch (rv) {
917       case SHRPX_ERR_SEND_BLOCKED:
918         on_send_blocked(faddr, prev_ps.path.remote, prev_ps.path.local, prev_pi,
919                         rest, gso_size);
920 
921         data = std::span{last_pkt, std::begin(buf)};
922         on_send_blocked(static_cast<UpstreamAddr *>(ps.path.user_data),
923                         ps.path.remote, ps.path.local, pi, data, data.size());
924 
925         signal_write_upstream_addr(faddr);
926 
927         break;
928       default: {
929         auto faddr = static_cast<UpstreamAddr *>(ps.path.user_data);
930         auto data = std::span{last_pkt, std::begin(buf)};
931 
932         auto [rest, rv] = send_packet(
933           faddr, ps.path.remote.addr, ps.path.remote.addrlen,
934           ps.path.local.addr, ps.path.local.addrlen, pi, data, data.size());
935         if (rv == SHRPX_ERR_SEND_BLOCKED) {
936           assert(rest.size() == data.size());
937 
938           on_send_blocked(faddr, ps.path.remote, ps.path.local, pi, rest,
939                           rest.size());
940 
941           signal_write_upstream_addr(faddr);
942         }
943       }
944       }
945 
946       ngtcp2_conn_update_pkt_tx_time(conn_, ts);
947 
948       return 0;
949     }
950 
951     if (buf.size() < path_max_udp_payload_size ||
952         static_cast<size_t>(nwrite) < gso_size) {
953       auto faddr = static_cast<UpstreamAddr *>(ps.path.user_data);
954       auto data = std::span{std::begin(txbuf), std::begin(buf)};
955 
956       auto [rest, rv] = send_packet(faddr, ps.path.remote.addr,
957                                     ps.path.remote.addrlen, ps.path.local.addr,
958                                     ps.path.local.addrlen, pi, data, gso_size);
959       if (rv == SHRPX_ERR_SEND_BLOCKED) {
960         on_send_blocked(faddr, ps.path.remote, ps.path.local, pi, rest,
961                         gso_size);
962 
963         signal_write_upstream_addr(faddr);
964       }
965 
966       ngtcp2_conn_update_pkt_tx_time(conn_, ts);
967 
968       return 0;
969     }
970   }
971 
972   return 0;
973 }
974 
on_timeout(Downstream * downstream)975 int Http3Upstream::on_timeout(Downstream *downstream) {
976   if (LOG_ENABLED(INFO)) {
977     ULOG(INFO, this) << "Stream timeout stream_id="
978                      << downstream->get_stream_id();
979   }
980 
981   shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
982 
983   handler_->signal_write();
984 
985   return 0;
986 }
987 
on_downstream_abort_request(Downstream * downstream,unsigned int status_code)988 int Http3Upstream::on_downstream_abort_request(Downstream *downstream,
989                                                unsigned int status_code) {
990   int rv;
991 
992   rv = error_reply(downstream, status_code);
993 
994   if (rv != 0) {
995     return -1;
996   }
997 
998   handler_->signal_write();
999 
1000   return 0;
1001 }
1002 
on_downstream_abort_request_with_https_redirect(Downstream * downstream)1003 int Http3Upstream::on_downstream_abort_request_with_https_redirect(
1004   Downstream *downstream) {
1005   assert(0);
1006   abort();
1007 }
1008 
1009 namespace {
1010 uint64_t
infer_upstream_shutdown_stream_error_code(uint32_t downstream_error_code)1011 infer_upstream_shutdown_stream_error_code(uint32_t downstream_error_code) {
1012   // NGHTTP2_REFUSED_STREAM is important because it tells upstream
1013   // client to retry.
1014   switch (downstream_error_code) {
1015   case NGHTTP2_NO_ERROR:
1016     return NGHTTP3_H3_NO_ERROR;
1017   case NGHTTP2_REFUSED_STREAM:
1018     return NGHTTP3_H3_REQUEST_REJECTED;
1019   default:
1020     return NGHTTP3_H3_INTERNAL_ERROR;
1021   }
1022 }
1023 } // namespace
1024 
downstream_read(DownstreamConnection * dconn)1025 int Http3Upstream::downstream_read(DownstreamConnection *dconn) {
1026   auto downstream = dconn->get_downstream();
1027 
1028   if (downstream->get_response_state() == DownstreamState::MSG_RESET) {
1029     // The downstream stream was reset (canceled). In this case,
1030     // RST_STREAM to the upstream and delete downstream connection
1031     // here. Deleting downstream will be taken place at
1032     // on_stream_close_callback.
1033     shutdown_stream(downstream,
1034                     infer_upstream_shutdown_stream_error_code(
1035                       downstream->get_response_rst_stream_error_code()));
1036     downstream->pop_downstream_connection();
1037     // dconn was deleted
1038     dconn = nullptr;
1039   } else if (downstream->get_response_state() ==
1040              DownstreamState::MSG_BAD_HEADER) {
1041     if (error_reply(downstream, 502) != 0) {
1042       return -1;
1043     }
1044     downstream->pop_downstream_connection();
1045     // dconn was deleted
1046     dconn = nullptr;
1047   } else {
1048     auto rv = downstream->on_read();
1049     if (rv == SHRPX_ERR_EOF) {
1050       if (downstream->get_request_header_sent()) {
1051         return downstream_eof(dconn);
1052       }
1053       return SHRPX_ERR_RETRY;
1054     }
1055     if (rv == SHRPX_ERR_DCONN_CANCELED) {
1056       downstream->pop_downstream_connection();
1057       handler_->signal_write();
1058       return 0;
1059     }
1060     if (rv != 0) {
1061       if (rv != SHRPX_ERR_NETWORK) {
1062         if (LOG_ENABLED(INFO)) {
1063           DCLOG(INFO, dconn) << "HTTP parser failure";
1064         }
1065       }
1066       return downstream_error(dconn, Downstream::EVENT_ERROR);
1067     }
1068 
1069     if (downstream->can_detach_downstream_connection()) {
1070       // Keep-alive
1071       downstream->detach_downstream_connection();
1072     }
1073   }
1074 
1075   handler_->signal_write();
1076 
1077   // At this point, downstream may be deleted.
1078 
1079   return 0;
1080 }
1081 
downstream_write(DownstreamConnection * dconn)1082 int Http3Upstream::downstream_write(DownstreamConnection *dconn) {
1083   int rv;
1084   rv = dconn->on_write();
1085   if (rv == SHRPX_ERR_NETWORK) {
1086     return downstream_error(dconn, Downstream::EVENT_ERROR);
1087   }
1088   if (rv != 0) {
1089     return rv;
1090   }
1091   return 0;
1092 }
1093 
downstream_eof(DownstreamConnection * dconn)1094 int Http3Upstream::downstream_eof(DownstreamConnection *dconn) {
1095   auto downstream = dconn->get_downstream();
1096 
1097   if (LOG_ENABLED(INFO)) {
1098     DCLOG(INFO, dconn) << "EOF. stream_id=" << downstream->get_stream_id();
1099   }
1100 
1101   // Delete downstream connection. If we don't delete it here, it will
1102   // be pooled in on_stream_close_callback.
1103   downstream->pop_downstream_connection();
1104   // dconn was deleted
1105   dconn = nullptr;
1106   // downstream will be deleted in on_stream_close_callback.
1107   if (downstream->get_response_state() == DownstreamState::HEADER_COMPLETE) {
1108     // Server may indicate the end of the request by EOF
1109     if (LOG_ENABLED(INFO)) {
1110       ULOG(INFO, this) << "Downstream body was ended by EOF";
1111     }
1112     downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1113 
1114     // For tunneled connection, MSG_COMPLETE signals
1115     // downstream_read_data_callback to send RST_STREAM after pending
1116     // response body is sent. This is needed to ensure that RST_STREAM
1117     // is sent after all pending data are sent.
1118     if (on_downstream_body_complete(downstream) != 0) {
1119       return -1;
1120     }
1121   } else if (downstream->get_response_state() !=
1122              DownstreamState::MSG_COMPLETE) {
1123     // If stream was not closed, then we set MSG_COMPLETE and let
1124     // on_stream_close_callback delete downstream.
1125     if (error_reply(downstream, 502) != 0) {
1126       return -1;
1127     }
1128   }
1129   handler_->signal_write();
1130   // At this point, downstream may be deleted.
1131   return 0;
1132 }
1133 
downstream_error(DownstreamConnection * dconn,int events)1134 int Http3Upstream::downstream_error(DownstreamConnection *dconn, int events) {
1135   auto downstream = dconn->get_downstream();
1136 
1137   if (LOG_ENABLED(INFO)) {
1138     if (events & Downstream::EVENT_ERROR) {
1139       DCLOG(INFO, dconn) << "Downstream network/general error";
1140     } else {
1141       DCLOG(INFO, dconn) << "Timeout";
1142     }
1143     if (downstream->get_upgraded()) {
1144       DCLOG(INFO, dconn) << "Note: this is tunnel connection";
1145     }
1146   }
1147 
1148   // Delete downstream connection. If we don't delete it here, it will
1149   // be pooled in on_stream_close_callback.
1150   downstream->pop_downstream_connection();
1151   // dconn was deleted
1152   dconn = nullptr;
1153 
1154   if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1155     // For SSL tunneling, we issue RST_STREAM. For other types of
1156     // stream, we don't have to do anything since response was
1157     // complete.
1158     if (downstream->get_upgraded()) {
1159       shutdown_stream(downstream, NGHTTP3_H3_NO_ERROR);
1160     }
1161   } else {
1162     if (downstream->get_response_state() == DownstreamState::HEADER_COMPLETE) {
1163       if (downstream->get_upgraded()) {
1164         if (on_downstream_body_complete(downstream) != 0) {
1165           return -1;
1166         }
1167       } else {
1168         shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
1169       }
1170     } else {
1171       unsigned int status;
1172       if (events & Downstream::EVENT_TIMEOUT) {
1173         if (downstream->get_request_header_sent()) {
1174           status = 504;
1175         } else {
1176           status = 408;
1177         }
1178       } else {
1179         status = 502;
1180       }
1181       if (error_reply(downstream, status) != 0) {
1182         return -1;
1183       }
1184     }
1185     downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1186   }
1187   handler_->signal_write();
1188   // At this point, downstream may be deleted.
1189   return 0;
1190 }
1191 
get_client_handler() const1192 ClientHandler *Http3Upstream::get_client_handler() const { return handler_; }
1193 
1194 namespace {
downstream_read_data_callback(nghttp3_conn * conn,int64_t stream_id,nghttp3_vec * vec,size_t veccnt,uint32_t * pflags,void * conn_user_data,void * stream_user_data)1195 nghttp3_ssize downstream_read_data_callback(nghttp3_conn *conn,
1196                                             int64_t stream_id, nghttp3_vec *vec,
1197                                             size_t veccnt, uint32_t *pflags,
1198                                             void *conn_user_data,
1199                                             void *stream_user_data) {
1200   auto upstream = static_cast<Http3Upstream *>(conn_user_data);
1201   auto downstream = static_cast<Downstream *>(stream_user_data);
1202 
1203   assert(downstream);
1204 
1205   auto body = downstream->get_response_buf();
1206 
1207   assert(body);
1208 
1209   if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE &&
1210       body->rleft_mark() == 0) {
1211     downstream->disable_upstream_wtimer();
1212     return NGHTTP3_ERR_WOULDBLOCK;
1213   }
1214 
1215   downstream->reset_upstream_wtimer();
1216 
1217   veccnt = body->riovec_mark(reinterpret_cast<struct iovec *>(vec), veccnt);
1218 
1219   if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE &&
1220       body->rleft_mark() == 0) {
1221     *pflags |= NGHTTP3_DATA_FLAG_EOF;
1222   }
1223 
1224   assert((*pflags & NGHTTP3_DATA_FLAG_EOF) || veccnt);
1225 
1226   downstream->response_sent_body_length += nghttp3_vec_len(vec, veccnt);
1227 
1228   if ((*pflags & NGHTTP3_DATA_FLAG_EOF) &&
1229       upstream->shutdown_stream_read(stream_id, NGHTTP3_H3_NO_ERROR) != 0) {
1230     return NGHTTP3_ERR_CALLBACK_FAILURE;
1231   }
1232 
1233   return veccnt;
1234 }
1235 } // namespace
1236 
on_downstream_header_complete(Downstream * downstream)1237 int Http3Upstream::on_downstream_header_complete(Downstream *downstream) {
1238   int rv;
1239 
1240   const auto &req = downstream->request();
1241   auto &resp = downstream->response();
1242 
1243   auto &balloc = downstream->get_block_allocator();
1244 
1245   if (LOG_ENABLED(INFO)) {
1246     if (downstream->get_non_final_response()) {
1247       DLOG(INFO, downstream) << "HTTP non-final response header";
1248     } else {
1249       DLOG(INFO, downstream) << "HTTP response header completed";
1250     }
1251   }
1252 
1253   auto config = get_config();
1254   auto &httpconf = config->http;
1255 
1256   if (!config->http2_proxy && !httpconf.no_location_rewrite) {
1257     downstream->rewrite_location_response_header(req.scheme);
1258   }
1259 
1260 #ifdef HAVE_MRUBY
1261   if (!downstream->get_non_final_response()) {
1262     auto dconn = downstream->get_downstream_connection();
1263     const auto &group = dconn->get_downstream_addr_group();
1264     if (group) {
1265       const auto &dmruby_ctx = group->shared_addr->mruby_ctx;
1266 
1267       if (dmruby_ctx->run_on_response_proc(downstream) != 0) {
1268         if (error_reply(downstream, 500) != 0) {
1269           return -1;
1270         }
1271         // Returning -1 will signal deletion of dconn.
1272         return -1;
1273       }
1274 
1275       if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1276         return -1;
1277       }
1278     }
1279 
1280     auto worker = handler_->get_worker();
1281     auto mruby_ctx = worker->get_mruby_context();
1282 
1283     if (mruby_ctx->run_on_response_proc(downstream) != 0) {
1284       if (error_reply(downstream, 500) != 0) {
1285         return -1;
1286       }
1287       // Returning -1 will signal deletion of dconn.
1288       return -1;
1289     }
1290 
1291     if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1292       return -1;
1293     }
1294   }
1295 #endif // HAVE_MRUBY
1296 
1297   auto nva = std::vector<nghttp3_nv>();
1298   // 4 means :status and possible server, via, and set-cookie (for
1299   // affinity cookie) header field.
1300   nva.reserve(resp.fs.headers().size() + 4 +
1301               httpconf.add_response_headers.size());
1302 
1303   if (downstream->get_non_final_response()) {
1304     auto response_status = http2::stringify_status(balloc, resp.http_status);
1305 
1306     nva.push_back(http3::make_field(":status"_sr, response_status));
1307 
1308     http3::copy_headers_to_nva_nocopy(nva, resp.fs.headers(),
1309                                       http2::HDOP_STRIP_ALL);
1310 
1311     if (LOG_ENABLED(INFO)) {
1312       log_response_headers(downstream, nva);
1313     }
1314 
1315     rv = nghttp3_conn_submit_info(httpconn_, downstream->get_stream_id(),
1316                                   nva.data(), nva.size());
1317 
1318     resp.fs.clear_headers();
1319 
1320     if (rv != 0) {
1321       ULOG(FATAL, this) << "nghttp3_conn_submit_info() failed";
1322       return -1;
1323     }
1324 
1325     return 0;
1326   }
1327 
1328   auto striphd_flags = http2::HDOP_STRIP_ALL & ~http2::HDOP_STRIP_VIA;
1329   StringRef response_status;
1330 
1331   if (req.connect_proto == ConnectProto::WEBSOCKET && resp.http_status == 101) {
1332     response_status = http2::stringify_status(balloc, 200);
1333     striphd_flags |= http2::HDOP_STRIP_SEC_WEBSOCKET_ACCEPT;
1334   } else {
1335     response_status = http2::stringify_status(balloc, resp.http_status);
1336   }
1337 
1338   nva.push_back(http3::make_field(":status"_sr, response_status));
1339 
1340   http3::copy_headers_to_nva_nocopy(nva, resp.fs.headers(), striphd_flags);
1341 
1342   if (!config->http2_proxy && !httpconf.no_server_rewrite) {
1343     nva.push_back(http3::make_field("server"_sr, httpconf.server_name));
1344   } else {
1345     auto server = resp.fs.header(http2::HD_SERVER);
1346     if (server) {
1347       nva.push_back(http3::make_field("server"_sr, (*server).value));
1348     }
1349   }
1350 
1351   if (!req.regular_connect_method() || !downstream->get_upgraded()) {
1352     auto affinity_cookie = downstream->get_affinity_cookie_to_send();
1353     if (affinity_cookie) {
1354       auto dconn = downstream->get_downstream_connection();
1355       assert(dconn);
1356       auto &group = dconn->get_downstream_addr_group();
1357       auto &shared_addr = group->shared_addr;
1358       auto &cookieconf = shared_addr->affinity.cookie;
1359       auto secure =
1360         http::require_cookie_secure_attribute(cookieconf.secure, req.scheme);
1361       auto cookie_str = http::create_affinity_cookie(
1362         balloc, cookieconf.name, affinity_cookie, cookieconf.path, secure);
1363       nva.push_back(http3::make_field("set-cookie"_sr, cookie_str));
1364     }
1365   }
1366 
1367   auto via = resp.fs.header(http2::HD_VIA);
1368   if (httpconf.no_via) {
1369     if (via) {
1370       nva.push_back(http3::make_field("via"_sr, (*via).value));
1371     }
1372   } else {
1373     // we don't create more than 16 bytes in
1374     // http::create_via_header_value.
1375     size_t len = 16;
1376     if (via) {
1377       len += via->value.size() + 2;
1378     }
1379 
1380     auto iov = make_byte_ref(balloc, len + 1);
1381     auto p = std::begin(iov);
1382     if (via) {
1383       p = std::copy(std::begin(via->value), std::end(via->value), p);
1384       p = util::copy_lit(p, ", ");
1385     }
1386     p = http::create_via_header_value(p, resp.http_major, resp.http_minor);
1387     *p = '\0';
1388 
1389     nva.push_back(
1390       http3::make_field("via"_sr, StringRef{std::span{std::begin(iov), p}}));
1391   }
1392 
1393   for (auto &p : httpconf.add_response_headers) {
1394     nva.push_back(http3::make_field(p.name, p.value));
1395   }
1396 
1397   if (LOG_ENABLED(INFO)) {
1398     log_response_headers(downstream, nva);
1399   }
1400 
1401   auto priority = resp.fs.header(http2::HD_PRIORITY);
1402   if (priority) {
1403     nghttp3_pri pri;
1404 
1405     if (nghttp3_conn_get_stream_priority(httpconn_, &pri,
1406                                          downstream->get_stream_id()) == 0 &&
1407         nghttp3_pri_parse_priority(&pri, priority->value.byte(),
1408                                    priority->value.size()) == 0) {
1409       rv = nghttp3_conn_set_server_stream_priority(
1410         httpconn_, downstream->get_stream_id(), &pri);
1411       if (rv != 0) {
1412         ULOG(ERROR, this) << "nghttp3_conn_set_server_stream_priority: "
1413                           << nghttp3_strerror(rv);
1414       }
1415     }
1416   }
1417 
1418   nghttp3_data_reader data_read;
1419   data_read.read_data = downstream_read_data_callback;
1420 
1421   nghttp3_data_reader *data_readptr;
1422 
1423   if (downstream->expect_response_body() ||
1424       downstream->expect_response_trailer()) {
1425     data_readptr = &data_read;
1426   } else {
1427     data_readptr = nullptr;
1428   }
1429 
1430   rv = nghttp3_conn_submit_response(httpconn_, downstream->get_stream_id(),
1431                                     nva.data(), nva.size(), data_readptr);
1432   if (rv != 0) {
1433     ULOG(FATAL, this) << "nghttp3_conn_submit_response() failed";
1434     return -1;
1435   }
1436 
1437   if (data_readptr) {
1438     downstream->reset_upstream_wtimer();
1439   } else if (shutdown_stream_read(downstream->get_stream_id(),
1440                                   NGHTTP3_H3_NO_ERROR) != 0) {
1441     return -1;
1442   }
1443 
1444   return 0;
1445 }
1446 
on_downstream_body(Downstream * downstream,const uint8_t * data,size_t len,bool flush)1447 int Http3Upstream::on_downstream_body(Downstream *downstream,
1448                                       const uint8_t *data, size_t len,
1449                                       bool flush) {
1450   auto body = downstream->get_response_buf();
1451   body->append(data, len);
1452 
1453   if (flush) {
1454     nghttp3_conn_resume_stream(httpconn_, downstream->get_stream_id());
1455 
1456     downstream->ensure_upstream_wtimer();
1457   }
1458 
1459   return 0;
1460 }
1461 
on_downstream_body_complete(Downstream * downstream)1462 int Http3Upstream::on_downstream_body_complete(Downstream *downstream) {
1463   if (LOG_ENABLED(INFO)) {
1464     DLOG(INFO, downstream) << "HTTP response completed";
1465   }
1466 
1467   auto &resp = downstream->response();
1468 
1469   if (!downstream->validate_response_recv_body_length()) {
1470     shutdown_stream(downstream, NGHTTP3_H3_GENERAL_PROTOCOL_ERROR);
1471     resp.connection_close = true;
1472     return 0;
1473   }
1474 
1475   if (!downstream->get_upgraded()) {
1476     const auto &trailers = resp.fs.trailers();
1477     if (!trailers.empty()) {
1478       std::vector<nghttp3_nv> nva;
1479       nva.reserve(trailers.size());
1480       http3::copy_headers_to_nva_nocopy(nva, trailers, http2::HDOP_STRIP_ALL);
1481       if (!nva.empty()) {
1482         auto rv = nghttp3_conn_submit_trailers(
1483           httpconn_, downstream->get_stream_id(), nva.data(), nva.size());
1484         if (rv != 0) {
1485           ULOG(FATAL, this) << "nghttp3_conn_submit_trailers() failed: "
1486                             << nghttp3_strerror(rv);
1487           return -1;
1488         }
1489       }
1490     }
1491   }
1492 
1493   nghttp3_conn_resume_stream(httpconn_, downstream->get_stream_id());
1494   downstream->ensure_upstream_wtimer();
1495 
1496   return 0;
1497 }
1498 
on_handler_delete()1499 void Http3Upstream::on_handler_delete() {
1500   for (auto d = downstream_queue_.get_downstreams(); d; d = d->dlnext) {
1501     if (d->get_dispatch_state() == DispatchState::ACTIVE &&
1502         d->accesslog_ready()) {
1503       handler_->write_accesslog(d);
1504     }
1505   }
1506 
1507   auto worker = handler_->get_worker();
1508   auto quic_conn_handler = worker->get_quic_connection_handler();
1509 
1510   std::vector<ngtcp2_cid> scids(ngtcp2_conn_get_scid(conn_, nullptr) + 1);
1511   ngtcp2_conn_get_scid(conn_, scids.data());
1512   scids.back() = hashed_scid_;
1513 
1514   for (auto &cid : scids) {
1515     quic_conn_handler->remove_connection_id(cid);
1516   }
1517 
1518   switch (last_error_.type) {
1519   case NGTCP2_CCERR_TYPE_IDLE_CLOSE:
1520   case NGTCP2_CCERR_TYPE_DROP_CONN:
1521   case NGTCP2_CCERR_TYPE_RETRY:
1522     return;
1523   default:
1524     break;
1525   }
1526 
1527   // If this is not idle close, send CONNECTION_CLOSE.
1528   if (!ngtcp2_conn_in_closing_period(conn_) &&
1529       !ngtcp2_conn_in_draining_period(conn_)) {
1530     ngtcp2_path_storage ps;
1531     ngtcp2_pkt_info pi;
1532     conn_close_.resize(SHRPX_QUIC_CONN_CLOSE_PKTLEN);
1533 
1534     ngtcp2_path_storage_zero(&ps);
1535 
1536     ngtcp2_ccerr ccerr;
1537     ngtcp2_ccerr_default(&ccerr);
1538 
1539     if (worker->get_graceful_shutdown() &&
1540         !ngtcp2_conn_get_handshake_completed(conn_)) {
1541       ccerr.error_code = NGTCP2_CONNECTION_REFUSED;
1542     }
1543 
1544     auto nwrite = ngtcp2_conn_write_connection_close(
1545       conn_, &ps.path, &pi, conn_close_.data(), conn_close_.size(), &ccerr,
1546       quic_timestamp());
1547     if (nwrite < 0) {
1548       if (nwrite != NGTCP2_ERR_INVALID_STATE) {
1549         ULOG(ERROR, this) << "ngtcp2_conn_write_connection_close: "
1550                           << ngtcp2_strerror(nwrite);
1551       }
1552 
1553       return;
1554     }
1555 
1556     conn_close_.resize(nwrite);
1557 
1558     send_packet(static_cast<UpstreamAddr *>(ps.path.user_data),
1559                 ps.path.remote.addr, ps.path.remote.addrlen, ps.path.local.addr,
1560                 ps.path.local.addrlen, pi, conn_close_, conn_close_.size());
1561   }
1562 
1563   auto d =
1564     static_cast<ev_tstamp>(ngtcp2_conn_get_pto(conn_) * 3) / NGTCP2_SECONDS;
1565 
1566   if (LOG_ENABLED(INFO)) {
1567     ULOG(INFO, this) << "Enter close-wait period " << d << "s with "
1568                      << conn_close_.size() << " bytes sentinel packet";
1569   }
1570 
1571   auto cw = std::make_unique<CloseWait>(worker, std::move(scids),
1572                                         std::move(conn_close_), d);
1573 
1574   quic_conn_handler->add_close_wait(cw.release());
1575 }
1576 
on_downstream_reset(Downstream * downstream,bool no_retry)1577 int Http3Upstream::on_downstream_reset(Downstream *downstream, bool no_retry) {
1578   int rv;
1579 
1580   if (downstream->get_dispatch_state() != DispatchState::ACTIVE) {
1581     // This is error condition when we failed push_request_headers()
1582     // in initiate_downstream().  Otherwise, we have
1583     // DispatchState::ACTIVE state, or we did not set
1584     // DownstreamConnection.
1585     downstream->pop_downstream_connection();
1586     handler_->signal_write();
1587 
1588     return 0;
1589   }
1590 
1591   if (!downstream->request_submission_ready()) {
1592     if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1593       // We have got all response body already.  Send it off.
1594       downstream->pop_downstream_connection();
1595       return 0;
1596     }
1597     // pushed stream is handled here
1598     shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
1599     downstream->pop_downstream_connection();
1600 
1601     handler_->signal_write();
1602 
1603     return 0;
1604   }
1605 
1606   downstream->pop_downstream_connection();
1607 
1608   downstream->add_retry();
1609 
1610   std::unique_ptr<DownstreamConnection> dconn;
1611 
1612   rv = 0;
1613 
1614   if (no_retry || downstream->no_more_retry()) {
1615     goto fail;
1616   }
1617 
1618   // downstream connection is clean; we can retry with new
1619   // downstream connection.
1620 
1621   for (;;) {
1622     auto dconn = handler_->get_downstream_connection(rv, downstream);
1623     if (!dconn) {
1624       goto fail;
1625     }
1626 
1627     rv = downstream->attach_downstream_connection(std::move(dconn));
1628     if (rv == 0) {
1629       break;
1630     }
1631   }
1632 
1633   rv = downstream->push_request_headers();
1634   if (rv != 0) {
1635     goto fail;
1636   }
1637 
1638   return 0;
1639 
1640 fail:
1641   if (rv == SHRPX_ERR_TLS_REQUIRED) {
1642     assert(0);
1643     abort();
1644   }
1645 
1646   rv = on_downstream_abort_request(downstream, 502);
1647   if (rv != 0) {
1648     shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
1649   }
1650   downstream->pop_downstream_connection();
1651 
1652   handler_->signal_write();
1653 
1654   return 0;
1655 }
1656 
pause_read(IOCtrlReason reason)1657 void Http3Upstream::pause_read(IOCtrlReason reason) {}
1658 
resume_read(IOCtrlReason reason,Downstream * downstream,size_t consumed)1659 int Http3Upstream::resume_read(IOCtrlReason reason, Downstream *downstream,
1660                                size_t consumed) {
1661   consume(downstream->get_stream_id(), consumed);
1662 
1663   auto &req = downstream->request();
1664 
1665   req.consume(consumed);
1666 
1667   handler_->signal_write();
1668 
1669   return 0;
1670 }
1671 
send_reply(Downstream * downstream,const uint8_t * body,size_t bodylen)1672 int Http3Upstream::send_reply(Downstream *downstream, const uint8_t *body,
1673                               size_t bodylen) {
1674   int rv;
1675 
1676   nghttp3_data_reader data_read, *data_read_ptr = nullptr;
1677 
1678   const auto &req = downstream->request();
1679 
1680   if (req.method != HTTP_HEAD && bodylen) {
1681     data_read.read_data = downstream_read_data_callback;
1682     data_read_ptr = &data_read;
1683 
1684     auto buf = downstream->get_response_buf();
1685 
1686     buf->append(body, bodylen);
1687   }
1688 
1689   const auto &resp = downstream->response();
1690   auto config = get_config();
1691   auto &httpconf = config->http;
1692 
1693   auto &balloc = downstream->get_block_allocator();
1694 
1695   const auto &headers = resp.fs.headers();
1696   auto nva = std::vector<nghttp3_nv>();
1697   // 2 for :status and server
1698   nva.reserve(2 + headers.size() + httpconf.add_response_headers.size());
1699 
1700   auto response_status = http2::stringify_status(balloc, resp.http_status);
1701 
1702   nva.push_back(http3::make_field(":status"_sr, response_status));
1703 
1704   for (auto &kv : headers) {
1705     if (kv.name.empty() || kv.name[0] == ':') {
1706       continue;
1707     }
1708     switch (kv.token) {
1709     case http2::HD_CONNECTION:
1710     case http2::HD_KEEP_ALIVE:
1711     case http2::HD_PROXY_CONNECTION:
1712     case http2::HD_TE:
1713     case http2::HD_TRANSFER_ENCODING:
1714     case http2::HD_UPGRADE:
1715       continue;
1716     }
1717     nva.push_back(
1718       http3::make_field(kv.name, kv.value, http3::never_index(kv.no_index)));
1719   }
1720 
1721   if (!resp.fs.header(http2::HD_SERVER)) {
1722     nva.push_back(http3::make_field("server"_sr, config->http.server_name));
1723   }
1724 
1725   for (auto &p : httpconf.add_response_headers) {
1726     nva.push_back(http3::make_field(p.name, p.value));
1727   }
1728 
1729   rv = nghttp3_conn_submit_response(httpconn_, downstream->get_stream_id(),
1730                                     nva.data(), nva.size(), data_read_ptr);
1731   if (nghttp3_err_is_fatal(rv)) {
1732     ULOG(FATAL, this) << "nghttp3_conn_submit_response() failed: "
1733                       << nghttp3_strerror(rv);
1734     return -1;
1735   }
1736 
1737   downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1738 
1739   if (data_read_ptr) {
1740     downstream->reset_upstream_wtimer();
1741   }
1742 
1743   if (shutdown_stream_read(downstream->get_stream_id(), NGHTTP3_H3_NO_ERROR) !=
1744       0) {
1745     return -1;
1746   }
1747 
1748   return 0;
1749 }
1750 
initiate_push(Downstream * downstream,const StringRef & uri)1751 int Http3Upstream::initiate_push(Downstream *downstream, const StringRef &uri) {
1752   return 0;
1753 }
1754 
response_riovec(struct iovec * iov,int iovcnt) const1755 int Http3Upstream::response_riovec(struct iovec *iov, int iovcnt) const {
1756   return 0;
1757 }
1758 
response_drain(size_t n)1759 void Http3Upstream::response_drain(size_t n) {}
1760 
response_empty() const1761 bool Http3Upstream::response_empty() const { return false; }
1762 
1763 Downstream *
on_downstream_push_promise(Downstream * downstream,int32_t promised_stream_id)1764 Http3Upstream::on_downstream_push_promise(Downstream *downstream,
1765                                           int32_t promised_stream_id) {
1766   return nullptr;
1767 }
1768 
on_downstream_push_promise_complete(Downstream * downstream,Downstream * promised_downstream)1769 int Http3Upstream::on_downstream_push_promise_complete(
1770   Downstream *downstream, Downstream *promised_downstream) {
1771   return 0;
1772 }
1773 
push_enabled() const1774 bool Http3Upstream::push_enabled() const { return false; }
1775 
cancel_premature_downstream(Downstream * promised_downstream)1776 void Http3Upstream::cancel_premature_downstream(
1777   Downstream *promised_downstream) {}
1778 
on_read(const UpstreamAddr * faddr,const Address & remote_addr,const Address & local_addr,const ngtcp2_pkt_info & pi,std::span<const uint8_t> data)1779 int Http3Upstream::on_read(const UpstreamAddr *faddr,
1780                            const Address &remote_addr,
1781                            const Address &local_addr, const ngtcp2_pkt_info &pi,
1782                            std::span<const uint8_t> data) {
1783   int rv;
1784 
1785   auto path = ngtcp2_path{
1786     {
1787       const_cast<sockaddr *>(&local_addr.su.sa),
1788       static_cast<socklen_t>(local_addr.len),
1789     },
1790     {
1791       const_cast<sockaddr *>(&remote_addr.su.sa),
1792       static_cast<socklen_t>(remote_addr.len),
1793     },
1794     const_cast<UpstreamAddr *>(faddr),
1795   };
1796 
1797   rv = ngtcp2_conn_read_pkt(conn_, &path, &pi, data.data(), data.size(),
1798                             quic_timestamp());
1799   if (rv != 0) {
1800     switch (rv) {
1801     case NGTCP2_ERR_DRAINING:
1802       return -1;
1803     case NGTCP2_ERR_RETRY: {
1804       auto worker = handler_->get_worker();
1805       auto quic_conn_handler = worker->get_quic_connection_handler();
1806 
1807       if (worker->get_graceful_shutdown()) {
1808         ngtcp2_ccerr_set_transport_error(&last_error_,
1809                                          NGTCP2_CONNECTION_REFUSED, nullptr, 0);
1810 
1811         return handle_error();
1812       }
1813 
1814       ngtcp2_version_cid vc;
1815 
1816       rv = ngtcp2_pkt_decode_version_cid(&vc, data.data(), data.size(),
1817                                          SHRPX_QUIC_SCIDLEN);
1818       if (rv != 0) {
1819         return -1;
1820       }
1821 
1822       // Overwrite error if any is set
1823       ngtcp2_ccerr_set_liberr(&last_error_, rv, nullptr, 0);
1824 
1825       quic_conn_handler->send_retry(
1826         handler_->get_upstream_addr(), vc.version, {vc.dcid, vc.dcidlen},
1827         {vc.scid, vc.scidlen}, remote_addr, local_addr, data.size() * 3);
1828 
1829       return -1;
1830     }
1831     case NGTCP2_ERR_CRYPTO:
1832       if (!last_error_.error_code) {
1833         ngtcp2_ccerr_set_tls_alert(
1834           &last_error_, ngtcp2_conn_get_tls_alert(conn_), nullptr, 0);
1835       }
1836       break;
1837     case NGTCP2_ERR_DROP_CONN:
1838       // Overwrite error if any is set
1839       ngtcp2_ccerr_set_liberr(&last_error_, rv, nullptr, 0);
1840 
1841       return -1;
1842     default:
1843       if (!last_error_.error_code) {
1844         ngtcp2_ccerr_set_liberr(&last_error_, rv, nullptr, 0);
1845       }
1846     }
1847 
1848     ULOG(ERROR, this) << "ngtcp2_conn_read_pkt: " << ngtcp2_strerror(rv);
1849 
1850     return handle_error();
1851   }
1852 
1853   return 0;
1854 }
1855 
1856 std::pair<std::span<const uint8_t>, int>
send_packet(const UpstreamAddr * faddr,const sockaddr * remote_sa,size_t remote_salen,const sockaddr * local_sa,size_t local_salen,const ngtcp2_pkt_info & pi,std::span<const uint8_t> data,size_t gso_size)1857 Http3Upstream::send_packet(const UpstreamAddr *faddr, const sockaddr *remote_sa,
1858                            size_t remote_salen, const sockaddr *local_sa,
1859                            size_t local_salen, const ngtcp2_pkt_info &pi,
1860                            std::span<const uint8_t> data, size_t gso_size) {
1861   if (tx_.no_gso) {
1862     for (; !data.empty();) {
1863       auto len = std::min(gso_size, data.size());
1864       auto rv =
1865         quic_send_packet(faddr, remote_sa, remote_salen, local_sa, local_salen,
1866                          pi, {std::begin(data), len}, gso_size);
1867       if (rv != 0) {
1868         switch (rv) {
1869         case -EAGAIN:
1870 #if EAGAIN != EWOULDBLOCK
1871         case -EWOULDBLOCK:
1872 #endif // EAGAIN != EWOULDBLOCK
1873           return {data, SHRPX_ERR_SEND_BLOCKED};
1874         default:
1875           return {data, -1};
1876         }
1877       }
1878 
1879       data = data.subspan(len);
1880     }
1881 
1882     return {{}, 0};
1883   }
1884 
1885   auto rv = quic_send_packet(faddr, remote_sa, remote_salen, local_sa,
1886                              local_salen, pi, data, gso_size);
1887   switch (rv) {
1888   case 0:
1889     return {{}, 0};
1890     // With GSO, sendmsg may fail with EINVAL if UDP payload is too
1891     // large.
1892   case -EINVAL:
1893   case -EMSGSIZE:
1894     // Let the packet lost.
1895     break;
1896   case -EAGAIN:
1897 #if EAGAIN != EWOULDBLOCK
1898   case -EWOULDBLOCK:
1899 #endif // EAGAIN != EWOULDBLOCK
1900     return {data, SHRPX_ERR_SEND_BLOCKED};
1901   case -EIO:
1902     if (tx_.no_gso) {
1903       break;
1904     }
1905 
1906     tx_.no_gso = true;
1907 
1908     return send_packet(faddr, remote_sa, remote_salen, local_sa, local_salen,
1909                        pi, data, gso_size);
1910   default:
1911     break;
1912   }
1913 
1914   return {{}, -1};
1915 }
1916 
on_send_blocked(const UpstreamAddr * faddr,const ngtcp2_addr & remote_addr,const ngtcp2_addr & local_addr,const ngtcp2_pkt_info & pi,std::span<const uint8_t> data,size_t gso_size)1917 void Http3Upstream::on_send_blocked(const UpstreamAddr *faddr,
1918                                     const ngtcp2_addr &remote_addr,
1919                                     const ngtcp2_addr &local_addr,
1920                                     const ngtcp2_pkt_info &pi,
1921                                     std::span<const uint8_t> data,
1922                                     size_t gso_size) {
1923   assert(tx_.num_blocked || !tx_.send_blocked);
1924   assert(tx_.num_blocked < 2);
1925   assert(gso_size);
1926 
1927   tx_.send_blocked = true;
1928 
1929   auto &p = tx_.blocked[tx_.num_blocked++];
1930 
1931   memcpy(&p.local_addr.su, local_addr.addr, local_addr.addrlen);
1932   memcpy(&p.remote_addr.su, remote_addr.addr, remote_addr.addrlen);
1933 
1934   p.local_addr.len = local_addr.addrlen;
1935   p.remote_addr.len = remote_addr.addrlen;
1936   p.faddr = faddr;
1937   p.pi = pi;
1938   p.data = data;
1939   p.gso_size = gso_size;
1940 }
1941 
send_blocked_packet()1942 int Http3Upstream::send_blocked_packet() {
1943   assert(tx_.send_blocked);
1944 
1945   for (; tx_.num_blocked_sent < tx_.num_blocked; ++tx_.num_blocked_sent) {
1946     auto &p = tx_.blocked[tx_.num_blocked_sent];
1947 
1948     auto [rest, rv] = send_packet(p.faddr, &p.remote_addr.su.sa,
1949                                   p.remote_addr.len, &p.local_addr.su.sa,
1950                                   p.local_addr.len, p.pi, p.data, p.gso_size);
1951     if (rv == SHRPX_ERR_SEND_BLOCKED) {
1952       p.data = rest;
1953 
1954       signal_write_upstream_addr(p.faddr);
1955 
1956       return 0;
1957     }
1958   }
1959 
1960   tx_.send_blocked = false;
1961   tx_.num_blocked = 0;
1962   tx_.num_blocked_sent = 0;
1963 
1964   return 0;
1965 }
1966 
signal_write_upstream_addr(const UpstreamAddr * faddr)1967 void Http3Upstream::signal_write_upstream_addr(const UpstreamAddr *faddr) {
1968   auto conn = handler_->get_connection();
1969 
1970   if (faddr->fd != conn->wev.fd) {
1971     if (ev_is_active(&conn->wev)) {
1972       ev_io_stop(handler_->get_loop(), &conn->wev);
1973     }
1974 
1975     ev_io_set(&conn->wev, faddr->fd, EV_WRITE);
1976   }
1977 
1978   conn->wlimit.startw();
1979 }
1980 
handle_error()1981 int Http3Upstream::handle_error() {
1982   if (ngtcp2_conn_in_closing_period(conn_) ||
1983       ngtcp2_conn_in_draining_period(conn_)) {
1984     return -1;
1985   }
1986 
1987   ngtcp2_path_storage ps;
1988   ngtcp2_pkt_info pi;
1989 
1990   ngtcp2_path_storage_zero(&ps);
1991 
1992   auto ts = quic_timestamp();
1993 
1994   conn_close_.resize(SHRPX_QUIC_CONN_CLOSE_PKTLEN);
1995 
1996   auto nwrite =
1997     ngtcp2_conn_write_connection_close(conn_, &ps.path, &pi, conn_close_.data(),
1998                                        conn_close_.size(), &last_error_, ts);
1999   if (nwrite < 0) {
2000     ULOG(ERROR, this) << "ngtcp2_conn_write_connection_close: "
2001                       << ngtcp2_strerror(nwrite);
2002     return -1;
2003   }
2004 
2005   conn_close_.resize(nwrite);
2006 
2007   if (nwrite == 0) {
2008     return -1;
2009   }
2010 
2011   send_packet(static_cast<UpstreamAddr *>(ps.path.user_data),
2012               ps.path.remote.addr, ps.path.remote.addrlen, ps.path.local.addr,
2013               ps.path.local.addrlen, pi, conn_close_, conn_close_.size());
2014 
2015   return -1;
2016 }
2017 
handle_expiry()2018 int Http3Upstream::handle_expiry() {
2019   int rv;
2020 
2021   auto ts = quic_timestamp();
2022 
2023   rv = ngtcp2_conn_handle_expiry(conn_, ts);
2024   if (rv != 0) {
2025     if (rv == NGTCP2_ERR_IDLE_CLOSE) {
2026       ULOG(INFO, this) << "Idle connection timeout";
2027     } else {
2028       ULOG(ERROR, this) << "ngtcp2_conn_handle_expiry: " << ngtcp2_strerror(rv);
2029     }
2030     ngtcp2_ccerr_set_liberr(&last_error_, rv, nullptr, 0);
2031     return handle_error();
2032   }
2033 
2034   return 0;
2035 }
2036 
reset_timer()2037 void Http3Upstream::reset_timer() {
2038   auto ts = quic_timestamp();
2039   auto expiry_ts = ngtcp2_conn_get_expiry(conn_);
2040   auto loop = handler_->get_loop();
2041 
2042   if (expiry_ts <= ts) {
2043     ev_feed_event(loop, &timer_, EV_TIMER);
2044     return;
2045   }
2046 
2047   timer_.repeat = static_cast<ev_tstamp>(expiry_ts - ts) / NGTCP2_SECONDS;
2048 
2049   ev_timer_again(loop, &timer_);
2050 }
2051 
2052 namespace {
http_deferred_consume(nghttp3_conn * conn,int64_t stream_id,size_t nconsumed,void * user_data,void * stream_user_data)2053 int http_deferred_consume(nghttp3_conn *conn, int64_t stream_id,
2054                           size_t nconsumed, void *user_data,
2055                           void *stream_user_data) {
2056   auto upstream = static_cast<Http3Upstream *>(user_data);
2057 
2058   upstream->consume(stream_id, nconsumed);
2059 
2060   return 0;
2061 }
2062 } // namespace
2063 
2064 namespace {
http_acked_stream_data(nghttp3_conn * conn,int64_t stream_id,uint64_t datalen,void * user_data,void * stream_user_data)2065 int http_acked_stream_data(nghttp3_conn *conn, int64_t stream_id,
2066                            uint64_t datalen, void *user_data,
2067                            void *stream_user_data) {
2068   auto upstream = static_cast<Http3Upstream *>(user_data);
2069   auto downstream = static_cast<Downstream *>(stream_user_data);
2070 
2071   assert(downstream);
2072 
2073   if (upstream->http_acked_stream_data(downstream, datalen) != 0) {
2074     return NGHTTP3_ERR_CALLBACK_FAILURE;
2075   }
2076 
2077   return 0;
2078 }
2079 } // namespace
2080 
http_acked_stream_data(Downstream * downstream,uint64_t datalen)2081 int Http3Upstream::http_acked_stream_data(Downstream *downstream,
2082                                           uint64_t datalen) {
2083   if (LOG_ENABLED(INFO)) {
2084     ULOG(INFO, this) << "Stream " << downstream->get_stream_id() << " "
2085                      << datalen << " bytes acknowledged";
2086   }
2087 
2088   auto body = downstream->get_response_buf();
2089   auto drained = body->drain_mark(datalen);
2090   (void)drained;
2091 
2092   assert(datalen == drained);
2093 
2094   if (downstream->resume_read(SHRPX_NO_BUFFER, datalen) != 0) {
2095     return -1;
2096   }
2097 
2098   return 0;
2099 }
2100 
2101 namespace {
http_begin_request_headers(nghttp3_conn * conn,int64_t stream_id,void * user_data,void * stream_user_data)2102 int http_begin_request_headers(nghttp3_conn *conn, int64_t stream_id,
2103                                void *user_data, void *stream_user_data) {
2104   if (!ngtcp2_is_bidi_stream(stream_id)) {
2105     return 0;
2106   }
2107 
2108   auto upstream = static_cast<Http3Upstream *>(user_data);
2109   upstream->http_begin_request_headers(stream_id);
2110 
2111   return 0;
2112 }
2113 } // namespace
2114 
2115 namespace {
http_recv_request_header(nghttp3_conn * conn,int64_t stream_id,int32_t token,nghttp3_rcbuf * name,nghttp3_rcbuf * value,uint8_t flags,void * user_data,void * stream_user_data)2116 int http_recv_request_header(nghttp3_conn *conn, int64_t stream_id,
2117                              int32_t token, nghttp3_rcbuf *name,
2118                              nghttp3_rcbuf *value, uint8_t flags,
2119                              void *user_data, void *stream_user_data) {
2120   auto upstream = static_cast<Http3Upstream *>(user_data);
2121   auto downstream = static_cast<Downstream *>(stream_user_data);
2122 
2123   if (!downstream || downstream->get_stop_reading()) {
2124     return 0;
2125   }
2126 
2127   if (upstream->http_recv_request_header(downstream, token, name, value, flags,
2128                                          /* trailer = */ false) != 0) {
2129     return NGHTTP3_ERR_CALLBACK_FAILURE;
2130   }
2131 
2132   return 0;
2133 }
2134 } // namespace
2135 
2136 namespace {
http_recv_request_trailer(nghttp3_conn * conn,int64_t stream_id,int32_t token,nghttp3_rcbuf * name,nghttp3_rcbuf * value,uint8_t flags,void * user_data,void * stream_user_data)2137 int http_recv_request_trailer(nghttp3_conn *conn, int64_t stream_id,
2138                               int32_t token, nghttp3_rcbuf *name,
2139                               nghttp3_rcbuf *value, uint8_t flags,
2140                               void *user_data, void *stream_user_data) {
2141   auto upstream = static_cast<Http3Upstream *>(user_data);
2142   auto downstream = static_cast<Downstream *>(stream_user_data);
2143 
2144   if (!downstream || downstream->get_stop_reading()) {
2145     return 0;
2146   }
2147 
2148   if (upstream->http_recv_request_header(downstream, token, name, value, flags,
2149                                          /* trailer = */ true) != 0) {
2150     return NGHTTP3_ERR_CALLBACK_FAILURE;
2151   }
2152 
2153   return 0;
2154 }
2155 } // namespace
2156 
http_recv_request_header(Downstream * downstream,int32_t h3token,nghttp3_rcbuf * name,nghttp3_rcbuf * value,uint8_t flags,bool trailer)2157 int Http3Upstream::http_recv_request_header(Downstream *downstream,
2158                                             int32_t h3token,
2159                                             nghttp3_rcbuf *name,
2160                                             nghttp3_rcbuf *value, uint8_t flags,
2161                                             bool trailer) {
2162   auto namebuf = nghttp3_rcbuf_get_buf(name);
2163   auto valuebuf = nghttp3_rcbuf_get_buf(value);
2164   auto &req = downstream->request();
2165   auto config = get_config();
2166   auto &httpconf = config->http;
2167 
2168   if (req.fs.buffer_size() + namebuf.len + valuebuf.len >
2169         httpconf.request_header_field_buffer ||
2170       req.fs.num_fields() >= httpconf.max_request_header_fields) {
2171     downstream->set_stop_reading(true);
2172 
2173     if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
2174       return 0;
2175     }
2176 
2177     if (LOG_ENABLED(INFO)) {
2178       ULOG(INFO, this) << "Too large or many header field size="
2179                        << req.fs.buffer_size() + namebuf.len + valuebuf.len
2180                        << ", num=" << req.fs.num_fields() + 1;
2181     }
2182 
2183     // just ignore if this is a trailer part.
2184     if (trailer) {
2185       if (shutdown_stream_read(downstream->get_stream_id(),
2186                                NGHTTP3_H3_NO_ERROR) != 0) {
2187         return -1;
2188       }
2189 
2190       return 0;
2191     }
2192 
2193     if (error_reply(downstream, 431) != 0) {
2194       return -1;
2195     }
2196 
2197     return 0;
2198   }
2199 
2200   auto nameref = StringRef{namebuf.base, namebuf.len};
2201   auto valueref = StringRef{valuebuf.base, valuebuf.len};
2202   auto token = http2::lookup_token(nameref);
2203   auto no_index = flags & NGHTTP3_NV_FLAG_NEVER_INDEX;
2204 
2205   downstream->add_rcbuf(name);
2206   downstream->add_rcbuf(value);
2207 
2208   if (trailer) {
2209     req.fs.add_trailer_token(nameref, valueref, no_index, token);
2210     return 0;
2211   }
2212 
2213   req.fs.add_header_token(nameref, valueref, no_index, token);
2214   return 0;
2215 }
2216 
2217 namespace {
http_end_request_headers(nghttp3_conn * conn,int64_t stream_id,int fin,void * user_data,void * stream_user_data)2218 int http_end_request_headers(nghttp3_conn *conn, int64_t stream_id, int fin,
2219                              void *user_data, void *stream_user_data) {
2220   auto upstream = static_cast<Http3Upstream *>(user_data);
2221   auto downstream = static_cast<Downstream *>(stream_user_data);
2222 
2223   if (!downstream || downstream->get_stop_reading()) {
2224     return 0;
2225   }
2226 
2227   if (upstream->http_end_request_headers(downstream, fin) != 0) {
2228     return NGHTTP3_ERR_CALLBACK_FAILURE;
2229   }
2230 
2231   downstream->reset_upstream_rtimer();
2232   downstream->stop_header_timer();
2233 
2234   return 0;
2235 }
2236 } // namespace
2237 
http_end_request_headers(Downstream * downstream,int fin)2238 int Http3Upstream::http_end_request_headers(Downstream *downstream, int fin) {
2239   auto lgconf = log_config();
2240   lgconf->update_tstamp(std::chrono::system_clock::now());
2241   auto &req = downstream->request();
2242   req.tstamp = lgconf->tstamp;
2243 
2244   if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
2245     return 0;
2246   }
2247 
2248   auto &nva = req.fs.headers();
2249 
2250   if (LOG_ENABLED(INFO)) {
2251     std::stringstream ss;
2252     for (auto &nv : nva) {
2253       if (nv.name == "authorization"_sr) {
2254         ss << TTY_HTTP_HD << nv.name << TTY_RST << ": <redacted>\n";
2255         continue;
2256       }
2257       ss << TTY_HTTP_HD << nv.name << TTY_RST << ": " << nv.value << "\n";
2258     }
2259     ULOG(INFO, this) << "HTTP request headers. stream_id="
2260                      << downstream->get_stream_id() << "\n"
2261                      << ss.str();
2262   }
2263 
2264   auto content_length = req.fs.header(http2::HD_CONTENT_LENGTH);
2265   if (content_length) {
2266     // libnghttp3 guarantees this can be parsed
2267     req.fs.content_length =
2268       util::parse_uint(content_length->value).value_or(-1);
2269   }
2270 
2271   // presence of mandatory header fields are guaranteed by libnghttp3.
2272   auto authority = req.fs.header(http2::HD__AUTHORITY);
2273   auto path = req.fs.header(http2::HD__PATH);
2274   auto method = req.fs.header(http2::HD__METHOD);
2275   auto scheme = req.fs.header(http2::HD__SCHEME);
2276 
2277   auto method_token = http2::lookup_method_token(method->value);
2278   if (method_token == -1) {
2279     if (error_reply(downstream, 501) != 0) {
2280       return -1;
2281     }
2282     return 0;
2283   }
2284 
2285   auto faddr = handler_->get_upstream_addr();
2286 
2287   auto config = get_config();
2288 
2289   // For HTTP/2 proxy, we require :authority.
2290   if (method_token != HTTP_CONNECT && config->http2_proxy &&
2291       faddr->alt_mode == UpstreamAltMode::NONE && !authority) {
2292     shutdown_stream(downstream, NGHTTP3_H3_GENERAL_PROTOCOL_ERROR);
2293     return 0;
2294   }
2295 
2296   req.method = method_token;
2297   if (scheme) {
2298     req.scheme = scheme->value;
2299   }
2300 
2301   // nghttp2 library guarantees either :authority or host exist
2302   if (!authority) {
2303     req.no_authority = true;
2304     authority = req.fs.header(http2::HD_HOST);
2305   }
2306 
2307   if (authority) {
2308     req.authority = authority->value;
2309   }
2310 
2311   if (path) {
2312     if (method_token == HTTP_OPTIONS && path->value == "*"_sr) {
2313       // Server-wide OPTIONS request.  Path is empty.
2314     } else if (config->http2_proxy &&
2315                faddr->alt_mode == UpstreamAltMode::NONE) {
2316       req.path = path->value;
2317     } else {
2318       req.path = http2::rewrite_clean_path(downstream->get_block_allocator(),
2319                                            path->value);
2320     }
2321   }
2322 
2323   auto connect_proto = req.fs.header(http2::HD__PROTOCOL);
2324   if (connect_proto) {
2325     if (connect_proto->value != "websocket"_sr) {
2326       if (error_reply(downstream, 400) != 0) {
2327         return -1;
2328       }
2329       return 0;
2330     }
2331     req.connect_proto = ConnectProto::WEBSOCKET;
2332   }
2333 
2334   if (!fin) {
2335     req.http2_expect_body = true;
2336   } else if (req.fs.content_length == -1) {
2337     req.fs.content_length = 0;
2338   }
2339 
2340   downstream->inspect_http2_request();
2341 
2342   downstream->set_request_state(DownstreamState::HEADER_COMPLETE);
2343 
2344   if (config->http.require_http_scheme &&
2345       !http::check_http_scheme(req.scheme, /* encrypted = */ true)) {
2346     if (error_reply(downstream, 400) != 0) {
2347       return -1;
2348     }
2349   }
2350 
2351 #ifdef HAVE_MRUBY
2352   auto worker = handler_->get_worker();
2353   auto mruby_ctx = worker->get_mruby_context();
2354 
2355   if (mruby_ctx->run_on_request_proc(downstream) != 0) {
2356     if (error_reply(downstream, 500) != 0) {
2357       return -1;
2358     }
2359     return 0;
2360   }
2361 #endif // HAVE_MRUBY
2362 
2363   if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
2364     return 0;
2365   }
2366 
2367   start_downstream(downstream);
2368 
2369   return 0;
2370 }
2371 
start_downstream(Downstream * downstream)2372 void Http3Upstream::start_downstream(Downstream *downstream) {
2373   if (downstream_queue_.can_activate(downstream->request().authority)) {
2374     initiate_downstream(downstream);
2375     return;
2376   }
2377 
2378   downstream_queue_.mark_blocked(downstream);
2379 }
2380 
initiate_downstream(Downstream * downstream)2381 void Http3Upstream::initiate_downstream(Downstream *downstream) {
2382   int rv;
2383 
2384 #ifdef HAVE_MRUBY
2385   DownstreamConnection *dconn_ptr;
2386 #endif // HAVE_MRUBY
2387 
2388   for (;;) {
2389     auto dconn = handler_->get_downstream_connection(rv, downstream);
2390     if (!dconn) {
2391       if (rv == SHRPX_ERR_TLS_REQUIRED) {
2392         assert(0);
2393         abort();
2394       }
2395 
2396       rv = error_reply(downstream, 502);
2397       if (rv != 0) {
2398         shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2399       }
2400 
2401       downstream->set_request_state(DownstreamState::CONNECT_FAIL);
2402       downstream_queue_.mark_failure(downstream);
2403 
2404       return;
2405     }
2406 
2407 #ifdef HAVE_MRUBY
2408     dconn_ptr = dconn.get();
2409 #endif // HAVE_MRUBY
2410     rv = downstream->attach_downstream_connection(std::move(dconn));
2411     if (rv == 0) {
2412       break;
2413     }
2414   }
2415 
2416 #ifdef HAVE_MRUBY
2417   const auto &group = dconn_ptr->get_downstream_addr_group();
2418   if (group) {
2419     const auto &mruby_ctx = group->shared_addr->mruby_ctx;
2420     if (mruby_ctx->run_on_request_proc(downstream) != 0) {
2421       if (error_reply(downstream, 500) != 0) {
2422         shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2423       }
2424 
2425       downstream_queue_.mark_failure(downstream);
2426 
2427       return;
2428     }
2429 
2430     if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
2431       return;
2432     }
2433   }
2434 #endif // HAVE_MRUBY
2435 
2436   rv = downstream->push_request_headers();
2437   if (rv != 0) {
2438     if (error_reply(downstream, 502) != 0) {
2439       shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2440     }
2441 
2442     downstream_queue_.mark_failure(downstream);
2443 
2444     return;
2445   }
2446 
2447   downstream_queue_.mark_active(downstream);
2448 
2449   auto &req = downstream->request();
2450   if (!req.http2_expect_body) {
2451     rv = downstream->end_upload_data();
2452     if (rv != 0) {
2453       shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2454     }
2455   }
2456 }
2457 
2458 namespace {
http_recv_data(nghttp3_conn * conn,int64_t stream_id,const uint8_t * data,size_t datalen,void * user_data,void * stream_user_data)2459 int http_recv_data(nghttp3_conn *conn, int64_t stream_id, const uint8_t *data,
2460                    size_t datalen, void *user_data, void *stream_user_data) {
2461   auto upstream = static_cast<Http3Upstream *>(user_data);
2462   auto downstream = static_cast<Downstream *>(stream_user_data);
2463 
2464   if (upstream->http_recv_data(downstream, {data, datalen}) != 0) {
2465     return NGHTTP3_ERR_CALLBACK_FAILURE;
2466   }
2467 
2468   return 0;
2469 }
2470 } // namespace
2471 
http_recv_data(Downstream * downstream,std::span<const uint8_t> data)2472 int Http3Upstream::http_recv_data(Downstream *downstream,
2473                                   std::span<const uint8_t> data) {
2474   downstream->reset_upstream_rtimer();
2475 
2476   if (downstream->push_upload_data_chunk(data.data(), data.size()) != 0) {
2477     if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE) {
2478       shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2479     }
2480 
2481     consume(downstream->get_stream_id(), data.size());
2482 
2483     return 0;
2484   }
2485 
2486   return 0;
2487 }
2488 
2489 namespace {
http_end_stream(nghttp3_conn * conn,int64_t stream_id,void * user_data,void * stream_user_data)2490 int http_end_stream(nghttp3_conn *conn, int64_t stream_id, void *user_data,
2491                     void *stream_user_data) {
2492   auto upstream = static_cast<Http3Upstream *>(user_data);
2493   auto downstream = static_cast<Downstream *>(stream_user_data);
2494 
2495   if (!downstream || downstream->get_stop_reading()) {
2496     return 0;
2497   }
2498 
2499   if (upstream->http_end_stream(downstream) != 0) {
2500     return NGHTTP3_ERR_CALLBACK_FAILURE;
2501   }
2502 
2503   return 0;
2504 }
2505 } // namespace
2506 
http_end_stream(Downstream * downstream)2507 int Http3Upstream::http_end_stream(Downstream *downstream) {
2508   downstream->disable_upstream_rtimer();
2509 
2510   if (downstream->end_upload_data() != 0) {
2511     if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE) {
2512       shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2513     }
2514   }
2515 
2516   downstream->set_request_state(DownstreamState::MSG_COMPLETE);
2517 
2518   return 0;
2519 }
2520 
2521 namespace {
http_stream_close(nghttp3_conn * conn,int64_t stream_id,uint64_t app_error_code,void * conn_user_data,void * stream_user_data)2522 int http_stream_close(nghttp3_conn *conn, int64_t stream_id,
2523                       uint64_t app_error_code, void *conn_user_data,
2524                       void *stream_user_data) {
2525   auto upstream = static_cast<Http3Upstream *>(conn_user_data);
2526   auto downstream = static_cast<Downstream *>(stream_user_data);
2527 
2528   if (!downstream) {
2529     return 0;
2530   }
2531 
2532   if (upstream->http_stream_close(downstream, app_error_code) != 0) {
2533     return NGHTTP3_ERR_CALLBACK_FAILURE;
2534   }
2535 
2536   return 0;
2537 }
2538 } // namespace
2539 
http_stream_close(Downstream * downstream,uint64_t app_error_code)2540 int Http3Upstream::http_stream_close(Downstream *downstream,
2541                                      uint64_t app_error_code) {
2542   auto stream_id = downstream->get_stream_id();
2543 
2544   if (LOG_ENABLED(INFO)) {
2545     ULOG(INFO, this) << "Stream stream_id=" << stream_id
2546                      << " is being closed with app_error_code="
2547                      << app_error_code;
2548 
2549     auto body = downstream->get_response_buf();
2550 
2551     ULOG(INFO, this) << "response unacked_left=" << body->rleft()
2552                      << " not_sent=" << body->rleft_mark();
2553   }
2554 
2555   auto &req = downstream->request();
2556 
2557   consume(stream_id, req.unconsumed_body_length);
2558 
2559   req.unconsumed_body_length = 0;
2560 
2561   ngtcp2_conn_extend_max_streams_bidi(conn_, 1);
2562 
2563   if (downstream->get_request_state() == DownstreamState::CONNECT_FAIL) {
2564     remove_downstream(downstream);
2565     // downstream was deleted
2566 
2567     return 0;
2568   }
2569 
2570   if (downstream->can_detach_downstream_connection()) {
2571     // Keep-alive
2572     downstream->detach_downstream_connection();
2573   }
2574 
2575   downstream->set_request_state(DownstreamState::STREAM_CLOSED);
2576 
2577   // At this point, downstream read may be paused.
2578 
2579   // If shrpx_downstream::push_request_headers() failed, the
2580   // error is handled here.
2581   remove_downstream(downstream);
2582   // downstream was deleted
2583 
2584   return 0;
2585 }
2586 
2587 namespace {
http_stop_sending(nghttp3_conn * conn,int64_t stream_id,uint64_t app_error_code,void * user_data,void * stream_user_data)2588 int http_stop_sending(nghttp3_conn *conn, int64_t stream_id,
2589                       uint64_t app_error_code, void *user_data,
2590                       void *stream_user_data) {
2591   auto upstream = static_cast<Http3Upstream *>(user_data);
2592 
2593   if (upstream->http_stop_sending(stream_id, app_error_code) != 0) {
2594     return NGHTTP3_ERR_CALLBACK_FAILURE;
2595   }
2596 
2597   return 0;
2598 }
2599 } // namespace
2600 
http_stop_sending(int64_t stream_id,uint64_t app_error_code)2601 int Http3Upstream::http_stop_sending(int64_t stream_id,
2602                                      uint64_t app_error_code) {
2603   auto rv =
2604     ngtcp2_conn_shutdown_stream_read(conn_, 0, stream_id, app_error_code);
2605   if (ngtcp2_err_is_fatal(rv)) {
2606     ULOG(ERROR, this) << "ngtcp2_conn_shutdown_stream_read: "
2607                       << ngtcp2_strerror(rv);
2608     return -1;
2609   }
2610 
2611   return 0;
2612 }
2613 
2614 namespace {
http_reset_stream(nghttp3_conn * conn,int64_t stream_id,uint64_t app_error_code,void * user_data,void * stream_user_data)2615 int http_reset_stream(nghttp3_conn *conn, int64_t stream_id,
2616                       uint64_t app_error_code, void *user_data,
2617                       void *stream_user_data) {
2618   auto upstream = static_cast<Http3Upstream *>(user_data);
2619 
2620   if (upstream->http_reset_stream(stream_id, app_error_code) != 0) {
2621     return NGHTTP3_ERR_CALLBACK_FAILURE;
2622   }
2623 
2624   return 0;
2625 }
2626 } // namespace
2627 
http_reset_stream(int64_t stream_id,uint64_t app_error_code)2628 int Http3Upstream::http_reset_stream(int64_t stream_id,
2629                                      uint64_t app_error_code) {
2630   auto rv =
2631     ngtcp2_conn_shutdown_stream_write(conn_, 0, stream_id, app_error_code);
2632   if (ngtcp2_err_is_fatal(rv)) {
2633     ULOG(ERROR, this) << "ngtcp2_conn_shutdown_stream_write: "
2634                       << ngtcp2_strerror(rv);
2635     return -1;
2636   }
2637 
2638   return 0;
2639 }
2640 
setup_httpconn()2641 int Http3Upstream::setup_httpconn() {
2642   int rv;
2643 
2644   if (ngtcp2_conn_get_streams_uni_left(conn_) < 3) {
2645     return -1;
2646   }
2647 
2648   nghttp3_callbacks callbacks{
2649     shrpx::http_acked_stream_data,
2650     shrpx::http_stream_close,
2651     shrpx::http_recv_data,
2652     http_deferred_consume,
2653     shrpx::http_begin_request_headers,
2654     shrpx::http_recv_request_header,
2655     shrpx::http_end_request_headers,
2656     nullptr, // begin_trailers
2657     shrpx::http_recv_request_trailer,
2658     nullptr, // end_trailers
2659     shrpx::http_stop_sending,
2660     shrpx::http_end_stream,
2661     shrpx::http_reset_stream,
2662   };
2663 
2664   auto config = get_config();
2665 
2666   nghttp3_settings settings;
2667   nghttp3_settings_default(&settings);
2668   settings.qpack_max_dtable_capacity = 4_k;
2669 
2670   if (!config->http2_proxy) {
2671     settings.enable_connect_protocol = 1;
2672   }
2673 
2674   auto mem = nghttp3_mem_default();
2675 
2676   rv = nghttp3_conn_server_new(&httpconn_, &callbacks, &settings, mem, this);
2677   if (rv != 0) {
2678     ULOG(ERROR, this) << "nghttp3_conn_server_new: " << nghttp3_strerror(rv);
2679     return -1;
2680   }
2681 
2682   auto params = ngtcp2_conn_get_local_transport_params(conn_);
2683 
2684   nghttp3_conn_set_max_client_streams_bidi(httpconn_,
2685                                            params->initial_max_streams_bidi);
2686 
2687   int64_t ctrl_stream_id;
2688 
2689   rv = ngtcp2_conn_open_uni_stream(conn_, &ctrl_stream_id, nullptr);
2690   if (rv != 0) {
2691     ULOG(ERROR, this) << "ngtcp2_conn_open_uni_stream: " << ngtcp2_strerror(rv);
2692     return -1;
2693   }
2694 
2695   rv = nghttp3_conn_bind_control_stream(httpconn_, ctrl_stream_id);
2696   if (rv != 0) {
2697     ULOG(ERROR, this) << "nghttp3_conn_bind_control_stream: "
2698                       << nghttp3_strerror(rv);
2699     return -1;
2700   }
2701 
2702   int64_t qpack_enc_stream_id, qpack_dec_stream_id;
2703 
2704   rv = ngtcp2_conn_open_uni_stream(conn_, &qpack_enc_stream_id, nullptr);
2705   if (rv != 0) {
2706     ULOG(ERROR, this) << "ngtcp2_conn_open_uni_stream: " << ngtcp2_strerror(rv);
2707     return -1;
2708   }
2709 
2710   rv = ngtcp2_conn_open_uni_stream(conn_, &qpack_dec_stream_id, nullptr);
2711   if (rv != 0) {
2712     ULOG(ERROR, this) << "ngtcp2_conn_open_uni_stream: " << ngtcp2_strerror(rv);
2713     return -1;
2714   }
2715 
2716   rv = nghttp3_conn_bind_qpack_streams(httpconn_, qpack_enc_stream_id,
2717                                        qpack_dec_stream_id);
2718   if (rv != 0) {
2719     ULOG(ERROR, this) << "nghttp3_conn_bind_qpack_streams: "
2720                       << nghttp3_strerror(rv);
2721     return -1;
2722   }
2723 
2724   return 0;
2725 }
2726 
error_reply(Downstream * downstream,unsigned int status_code)2727 int Http3Upstream::error_reply(Downstream *downstream,
2728                                unsigned int status_code) {
2729   int rv;
2730   auto &resp = downstream->response();
2731 
2732   auto &balloc = downstream->get_block_allocator();
2733 
2734   auto html = http::create_error_html(balloc, status_code);
2735   resp.http_status = status_code;
2736 
2737   nghttp3_data_reader data_read, *data_read_ptr = nullptr;
2738 
2739   const auto &req = downstream->request();
2740 
2741   if (req.method != HTTP_HEAD) {
2742     data_read.read_data = downstream_read_data_callback;
2743     data_read_ptr = &data_read;
2744 
2745     auto body = downstream->get_response_buf();
2746 
2747     body->append(html);
2748   }
2749 
2750   downstream->set_response_state(DownstreamState::MSG_COMPLETE);
2751 
2752   auto lgconf = log_config();
2753   lgconf->update_tstamp(std::chrono::system_clock::now());
2754 
2755   auto response_status = http2::stringify_status(balloc, status_code);
2756   auto content_length = util::make_string_ref_uint(balloc, html.size());
2757   auto date = make_string_ref(balloc, lgconf->tstamp->time_http);
2758 
2759   auto nva = std::to_array(
2760     {http3::make_field(":status"_sr, response_status),
2761      http3::make_field("content-type"_sr, "text/html; charset=UTF-8"_sr),
2762      http3::make_field("server"_sr, get_config()->http.server_name),
2763      http3::make_field("content-length"_sr, content_length),
2764      http3::make_field("date"_sr, date)});
2765 
2766   rv = nghttp3_conn_submit_response(httpconn_, downstream->get_stream_id(),
2767                                     nva.data(), nva.size(), data_read_ptr);
2768   if (nghttp3_err_is_fatal(rv)) {
2769     ULOG(FATAL, this) << "nghttp3_conn_submit_response() failed: "
2770                       << nghttp3_strerror(rv);
2771     return -1;
2772   }
2773 
2774   downstream->reset_upstream_wtimer();
2775 
2776   if (shutdown_stream_read(downstream->get_stream_id(), NGHTTP3_H3_NO_ERROR) !=
2777       0) {
2778     return -1;
2779   }
2780 
2781   return 0;
2782 }
2783 
shutdown_stream(Downstream * downstream,uint64_t app_error_code)2784 int Http3Upstream::shutdown_stream(Downstream *downstream,
2785                                    uint64_t app_error_code) {
2786   auto stream_id = downstream->get_stream_id();
2787 
2788   if (LOG_ENABLED(INFO)) {
2789     ULOG(INFO, this) << "Shutdown stream_id=" << stream_id
2790                      << " with app_error_code=" << app_error_code;
2791   }
2792 
2793   auto rv = ngtcp2_conn_shutdown_stream(conn_, 0, stream_id, app_error_code);
2794   if (rv != 0) {
2795     ULOG(FATAL, this) << "ngtcp2_conn_shutdown_stream() failed: "
2796                       << ngtcp2_strerror(rv);
2797     return -1;
2798   }
2799 
2800   return 0;
2801 }
2802 
shutdown_stream_read(int64_t stream_id,uint64_t app_error_code)2803 int Http3Upstream::shutdown_stream_read(int64_t stream_id,
2804                                         uint64_t app_error_code) {
2805   auto rv =
2806     ngtcp2_conn_shutdown_stream_read(conn_, 0, stream_id, NGHTTP3_H3_NO_ERROR);
2807   if (ngtcp2_err_is_fatal(rv)) {
2808     ULOG(FATAL, this) << "ngtcp2_conn_shutdown_stream_read: "
2809                       << ngtcp2_strerror(rv);
2810     return -1;
2811   }
2812 
2813   return 0;
2814 }
2815 
consume(int64_t stream_id,size_t nconsumed)2816 void Http3Upstream::consume(int64_t stream_id, size_t nconsumed) {
2817   ngtcp2_conn_extend_max_stream_offset(conn_, stream_id, nconsumed);
2818   ngtcp2_conn_extend_max_offset(conn_, nconsumed);
2819 }
2820 
remove_downstream(Downstream * downstream)2821 void Http3Upstream::remove_downstream(Downstream *downstream) {
2822   if (downstream->accesslog_ready()) {
2823     handler_->write_accesslog(downstream);
2824   }
2825 
2826   nghttp3_conn_set_stream_user_data(httpconn_, downstream->get_stream_id(),
2827                                     nullptr);
2828 
2829   auto next_downstream = downstream_queue_.remove_and_get_blocked(downstream);
2830 
2831   if (next_downstream) {
2832     initiate_downstream(next_downstream);
2833   }
2834 
2835   if (downstream_queue_.get_downstreams() == nullptr) {
2836     // There is no downstream at the moment.  Start idle timer now.
2837     handler_->repeat_read_timer();
2838   }
2839 }
2840 
log_response_headers(Downstream * downstream,const std::vector<nghttp3_nv> & nva) const2841 void Http3Upstream::log_response_headers(
2842   Downstream *downstream, const std::vector<nghttp3_nv> &nva) const {
2843   std::stringstream ss;
2844   for (auto &nv : nva) {
2845     ss << TTY_HTTP_HD << StringRef{nv.name, nv.namelen} << TTY_RST << ": "
2846        << StringRef{nv.value, nv.valuelen} << "\n";
2847   }
2848   ULOG(INFO, this) << "HTTP response headers. stream_id="
2849                    << downstream->get_stream_id() << "\n"
2850                    << ss.str();
2851 }
2852 
check_shutdown()2853 int Http3Upstream::check_shutdown() {
2854   auto worker = handler_->get_worker();
2855 
2856   if (!worker->get_graceful_shutdown()) {
2857     return 0;
2858   }
2859 
2860   ev_prepare_stop(handler_->get_loop(), &prep_);
2861 
2862   return start_graceful_shutdown();
2863 }
2864 
start_graceful_shutdown()2865 int Http3Upstream::start_graceful_shutdown() {
2866   int rv;
2867 
2868   if (ev_is_active(&shutdown_timer_)) {
2869     return 0;
2870   }
2871 
2872   if (!httpconn_) {
2873     return -1;
2874   }
2875 
2876   rv = nghttp3_conn_submit_shutdown_notice(httpconn_);
2877   if (rv != 0) {
2878     ULOG(FATAL, this) << "nghttp3_conn_submit_shutdown_notice: "
2879                       << nghttp3_strerror(rv);
2880     return -1;
2881   }
2882 
2883   handler_->signal_write();
2884 
2885   auto t = ngtcp2_conn_get_pto(conn_);
2886 
2887   ev_timer_set(&shutdown_timer_, static_cast<ev_tstamp>(t * 3) / NGTCP2_SECONDS,
2888                0.);
2889   ev_timer_start(handler_->get_loop(), &shutdown_timer_);
2890 
2891   return 0;
2892 }
2893 
submit_goaway()2894 int Http3Upstream::submit_goaway() {
2895   int rv;
2896 
2897   rv = nghttp3_conn_shutdown(httpconn_);
2898   if (rv != 0) {
2899     ULOG(FATAL, this) << "nghttp3_conn_shutdown: " << nghttp3_strerror(rv);
2900     return -1;
2901   }
2902 
2903   handler_->signal_write();
2904 
2905   return 0;
2906 }
2907 
open_qlog_file(const StringRef & dir,const ngtcp2_cid & scid) const2908 int Http3Upstream::open_qlog_file(const StringRef &dir,
2909                                   const ngtcp2_cid &scid) const {
2910   std::array<char, sizeof("20141115T125824.741+0900")> buf;
2911 
2912   auto path = std::string{dir};
2913   path += '/';
2914   path +=
2915     util::format_iso8601_basic(buf.data(), std::chrono::system_clock::now());
2916   path += '-';
2917   path += util::format_hex(std::span{scid.data, scid.datalen});
2918   path += ".sqlog";
2919 
2920   int fd;
2921 
2922 #ifdef O_CLOEXEC
2923   while ((fd = open(path.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_CLOEXEC,
2924                     S_IRUSR | S_IWUSR | S_IRGRP)) == -1 &&
2925          errno == EINTR)
2926     ;
2927 #else  // !O_CLOEXEC
2928   while ((fd = open(path.c_str(), O_WRONLY | O_CREAT | O_TRUNC,
2929                     S_IRUSR | S_IWUSR | S_IRGRP)) == -1 &&
2930          errno == EINTR)
2931     ;
2932 
2933   if (fd != -1) {
2934     util::make_socket_closeonexec(fd);
2935   }
2936 #endif // !O_CLOEXEC
2937 
2938   if (fd == -1) {
2939     auto error = errno;
2940     ULOG(ERROR, this) << "Failed to open qlog file " << path
2941                       << ": errno=" << error;
2942     return -1;
2943   }
2944 
2945   return fd;
2946 }
2947 
get_conn() const2948 ngtcp2_conn *Http3Upstream::get_conn() const { return conn_; }
2949 
2950 } // namespace shrpx
2951