• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * nghttp2 - HTTP/2 C Library
3  *
4  * Copyright (c) 2021 Tatsuhiro Tsujikawa
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining
7  * a copy of this software and associated documentation files (the
8  * "Software"), to deal in the Software without restriction, including
9  * without limitation the rights to use, copy, modify, merge, publish,
10  * distribute, sublicense, and/or sell copies of the Software, and to
11  * permit persons to whom the Software is furnished to do so, subject to
12  * the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be
15  * included in all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21  * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22  * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23  * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24  */
25 #include "shrpx_http3_upstream.h"
26 
27 #include <sys/types.h>
28 #include <sys/stat.h>
29 #include <fcntl.h>
30 #include <netinet/udp.h>
31 
32 #include <cstdio>
33 
34 #include <ngtcp2/ngtcp2_crypto.h>
35 
36 #include "shrpx_client_handler.h"
37 #include "shrpx_downstream.h"
38 #include "shrpx_downstream_connection.h"
39 #include "shrpx_log.h"
40 #include "shrpx_quic.h"
41 #include "shrpx_worker.h"
42 #include "shrpx_http.h"
43 #include "shrpx_connection_handler.h"
44 #ifdef HAVE_MRUBY
45 #  include "shrpx_mruby.h"
46 #endif // HAVE_MRUBY
47 #include "http3.h"
48 #include "util.h"
49 
50 namespace shrpx {
51 
52 namespace {
timeoutcb(struct ev_loop * loop,ev_timer * w,int revents)53 void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
54   auto upstream = static_cast<Http3Upstream *>(w->data);
55 
56   if (upstream->handle_expiry() != 0 || upstream->on_write() != 0) {
57     goto fail;
58   }
59 
60   return;
61 
62 fail:
63   auto handler = upstream->get_client_handler();
64 
65   delete handler;
66 }
67 } // namespace
68 
69 namespace {
shutdown_timeout_cb(struct ev_loop * loop,ev_timer * w,int revents)70 void shutdown_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
71   auto upstream = static_cast<Http3Upstream *>(w->data);
72   auto handler = upstream->get_client_handler();
73 
74   if (upstream->submit_goaway() != 0) {
75     delete handler;
76   }
77 }
78 } // namespace
79 
80 namespace {
prepare_cb(struct ev_loop * loop,ev_prepare * w,int revent)81 void prepare_cb(struct ev_loop *loop, ev_prepare *w, int revent) {
82   auto upstream = static_cast<Http3Upstream *>(w->data);
83   auto handler = upstream->get_client_handler();
84 
85   if (upstream->check_shutdown() != 0) {
86     delete handler;
87   }
88 }
89 } // namespace
90 
91 namespace {
downstream_queue_size(Worker * worker)92 size_t downstream_queue_size(Worker *worker) {
93   auto &downstreamconf = *worker->get_downstream_config();
94 
95   if (get_config()->http2_proxy) {
96     return downstreamconf.connections_per_host;
97   }
98 
99   return downstreamconf.connections_per_frontend;
100 }
101 } // namespace
102 
103 namespace {
get_conn(ngtcp2_crypto_conn_ref * conn_ref)104 ngtcp2_conn *get_conn(ngtcp2_crypto_conn_ref *conn_ref) {
105   auto conn = static_cast<Connection *>(conn_ref->user_data);
106   auto handler = static_cast<ClientHandler *>(conn->data);
107   auto upstream = static_cast<Http3Upstream *>(handler->get_upstream());
108   return upstream->get_conn();
109 }
110 } // namespace
111 
Http3Upstream(ClientHandler * handler)112 Http3Upstream::Http3Upstream(ClientHandler *handler)
113     : handler_{handler},
114       qlog_fd_{-1},
115       hashed_scid_{},
116       conn_{nullptr},
117       httpconn_{nullptr},
118       downstream_queue_{downstream_queue_size(handler->get_worker()),
119                         !get_config()->http2_proxy},
120       retry_close_{false},
121       tx_{
122           .data = std::unique_ptr<uint8_t[]>(new uint8_t[64_k]),
123       } {
124   auto conn = handler_->get_connection();
125   conn->conn_ref.get_conn = shrpx::get_conn;
126 
127   ev_timer_init(&timer_, timeoutcb, 0., 0.);
128   timer_.data = this;
129 
130   ngtcp2_ccerr_default(&last_error_);
131 
132   ev_timer_init(&shutdown_timer_, shutdown_timeout_cb, 0., 0.);
133   shutdown_timer_.data = this;
134 
135   ev_prepare_init(&prep_, prepare_cb);
136   prep_.data = this;
137   ev_prepare_start(handler_->get_loop(), &prep_);
138 }
139 
~Http3Upstream()140 Http3Upstream::~Http3Upstream() {
141   auto loop = handler_->get_loop();
142 
143   ev_prepare_stop(loop, &prep_);
144   ev_timer_stop(loop, &shutdown_timer_);
145   ev_timer_stop(loop, &timer_);
146 
147   nghttp3_conn_del(httpconn_);
148 
149   ngtcp2_conn_del(conn_);
150 
151   if (qlog_fd_ != -1) {
152     close(qlog_fd_);
153   }
154 }
155 
156 namespace {
log_printf(void * user_data,const char * fmt,...)157 void log_printf(void *user_data, const char *fmt, ...) {
158   va_list ap;
159   std::array<char, 4096> buf;
160 
161   va_start(ap, fmt);
162   auto nwrite = vsnprintf(buf.data(), buf.size(), fmt, ap);
163   va_end(ap);
164 
165   if (static_cast<size_t>(nwrite) >= buf.size()) {
166     nwrite = buf.size() - 1;
167   }
168 
169   buf[nwrite++] = '\n';
170 
171   while (write(fileno(stderr), buf.data(), nwrite) == -1 && errno == EINTR)
172     ;
173 }
174 } // namespace
175 
176 namespace {
qlog_write(void * user_data,uint32_t flags,const void * data,size_t datalen)177 void qlog_write(void *user_data, uint32_t flags, const void *data,
178                 size_t datalen) {
179   auto upstream = static_cast<Http3Upstream *>(user_data);
180 
181   upstream->qlog_write(data, datalen, flags & NGTCP2_QLOG_WRITE_FLAG_FIN);
182 }
183 } // namespace
184 
qlog_write(const void * data,size_t datalen,bool fin)185 void Http3Upstream::qlog_write(const void *data, size_t datalen, bool fin) {
186   assert(qlog_fd_ != -1);
187 
188   while (write(qlog_fd_, data, datalen) == -1 && errno == EINTR)
189     ;
190 
191   if (fin) {
192     close(qlog_fd_);
193     qlog_fd_ = -1;
194   }
195 }
196 
197 namespace {
rand(uint8_t * dest,size_t destlen,const ngtcp2_rand_ctx * rand_ctx)198 void rand(uint8_t *dest, size_t destlen, const ngtcp2_rand_ctx *rand_ctx) {
199   util::random_bytes(dest, dest + destlen,
200                      *static_cast<std::mt19937 *>(rand_ctx->native_handle));
201 }
202 } // namespace
203 
204 namespace {
get_new_connection_id(ngtcp2_conn * conn,ngtcp2_cid * cid,uint8_t * token,size_t cidlen,void * user_data)205 int get_new_connection_id(ngtcp2_conn *conn, ngtcp2_cid *cid, uint8_t *token,
206                           size_t cidlen, void *user_data) {
207   auto upstream = static_cast<Http3Upstream *>(user_data);
208   auto handler = upstream->get_client_handler();
209   auto worker = handler->get_worker();
210   auto conn_handler = worker->get_connection_handler();
211   auto &qkms = conn_handler->get_quic_keying_materials();
212   auto &qkm = qkms->keying_materials.front();
213 
214   if (generate_quic_connection_id(*cid, cidlen, worker->get_cid_prefix(),
215                                   qkm.id, qkm.cid_encryption_key.data()) != 0) {
216     return NGTCP2_ERR_CALLBACK_FAILURE;
217   }
218 
219   if (generate_quic_stateless_reset_token(token, *cid, qkm.secret.data(),
220                                           qkm.secret.size()) != 0) {
221     return NGTCP2_ERR_CALLBACK_FAILURE;
222   }
223 
224   auto quic_connection_handler = worker->get_quic_connection_handler();
225 
226   quic_connection_handler->add_connection_id(*cid, handler);
227 
228   return 0;
229 }
230 } // namespace
231 
232 namespace {
remove_connection_id(ngtcp2_conn * conn,const ngtcp2_cid * cid,void * user_data)233 int remove_connection_id(ngtcp2_conn *conn, const ngtcp2_cid *cid,
234                          void *user_data) {
235   auto upstream = static_cast<Http3Upstream *>(user_data);
236   auto handler = upstream->get_client_handler();
237   auto worker = handler->get_worker();
238   auto quic_conn_handler = worker->get_quic_connection_handler();
239 
240   quic_conn_handler->remove_connection_id(*cid);
241 
242   return 0;
243 }
244 } // namespace
245 
http_begin_request_headers(int64_t stream_id)246 void Http3Upstream::http_begin_request_headers(int64_t stream_id) {
247   auto downstream =
248       std::make_unique<Downstream>(this, handler_->get_mcpool(), stream_id);
249   nghttp3_conn_set_stream_user_data(httpconn_, stream_id, downstream.get());
250 
251   downstream->reset_upstream_rtimer();
252 
253   handler_->repeat_read_timer();
254 
255   auto &req = downstream->request();
256   req.http_major = 3;
257   req.http_minor = 0;
258 
259   add_pending_downstream(std::move(downstream));
260 }
261 
add_pending_downstream(std::unique_ptr<Downstream> downstream)262 void Http3Upstream::add_pending_downstream(
263     std::unique_ptr<Downstream> downstream) {
264   downstream_queue_.add_pending(std::move(downstream));
265 }
266 
267 namespace {
recv_stream_data(ngtcp2_conn * conn,uint32_t flags,int64_t stream_id,uint64_t offset,const uint8_t * data,size_t datalen,void * user_data,void * stream_user_data)268 int recv_stream_data(ngtcp2_conn *conn, uint32_t flags, int64_t stream_id,
269                      uint64_t offset, const uint8_t *data, size_t datalen,
270                      void *user_data, void *stream_user_data) {
271   auto upstream = static_cast<Http3Upstream *>(user_data);
272 
273   if (upstream->recv_stream_data(flags, stream_id, data, datalen) != 0) {
274     return NGTCP2_ERR_CALLBACK_FAILURE;
275   }
276 
277   return 0;
278 }
279 } // namespace
280 
recv_stream_data(uint32_t flags,int64_t stream_id,const uint8_t * data,size_t datalen)281 int Http3Upstream::recv_stream_data(uint32_t flags, int64_t stream_id,
282                                     const uint8_t *data, size_t datalen) {
283   assert(httpconn_);
284 
285   auto nconsumed = nghttp3_conn_read_stream(
286       httpconn_, stream_id, data, datalen, flags & NGTCP2_STREAM_DATA_FLAG_FIN);
287   if (nconsumed < 0) {
288     ULOG(ERROR, this) << "nghttp3_conn_read_stream: "
289                       << nghttp3_strerror(nconsumed);
290     ngtcp2_ccerr_set_application_error(
291         &last_error_, nghttp3_err_infer_quic_app_error_code(nconsumed), nullptr,
292         0);
293     return -1;
294   }
295 
296   ngtcp2_conn_extend_max_stream_offset(conn_, stream_id, nconsumed);
297   ngtcp2_conn_extend_max_offset(conn_, nconsumed);
298 
299   return 0;
300 }
301 
302 namespace {
stream_close(ngtcp2_conn * conn,uint32_t flags,int64_t stream_id,uint64_t app_error_code,void * user_data,void * stream_user_data)303 int stream_close(ngtcp2_conn *conn, uint32_t flags, int64_t stream_id,
304                  uint64_t app_error_code, void *user_data,
305                  void *stream_user_data) {
306   auto upstream = static_cast<Http3Upstream *>(user_data);
307 
308   if (!(flags & NGTCP2_STREAM_CLOSE_FLAG_APP_ERROR_CODE_SET)) {
309     app_error_code = NGHTTP3_H3_NO_ERROR;
310   }
311 
312   if (upstream->stream_close(stream_id, app_error_code) != 0) {
313     return NGTCP2_ERR_CALLBACK_FAILURE;
314   }
315 
316   return 0;
317 }
318 } // namespace
319 
stream_close(int64_t stream_id,uint64_t app_error_code)320 int Http3Upstream::stream_close(int64_t stream_id, uint64_t app_error_code) {
321   if (!httpconn_) {
322     return 0;
323   }
324 
325   auto rv = nghttp3_conn_close_stream(httpconn_, stream_id, app_error_code);
326   switch (rv) {
327   case 0:
328     break;
329   case NGHTTP3_ERR_STREAM_NOT_FOUND:
330     if (ngtcp2_is_bidi_stream(stream_id)) {
331       ngtcp2_conn_extend_max_streams_bidi(conn_, 1);
332     }
333     break;
334   default:
335     ULOG(ERROR, this) << "nghttp3_conn_close_stream: " << nghttp3_strerror(rv);
336     ngtcp2_ccerr_set_application_error(
337         &last_error_, nghttp3_err_infer_quic_app_error_code(rv), nullptr, 0);
338     return -1;
339   }
340 
341   return 0;
342 }
343 
344 namespace {
acked_stream_data_offset(ngtcp2_conn * conn,int64_t stream_id,uint64_t offset,uint64_t datalen,void * user_data,void * stream_user_data)345 int acked_stream_data_offset(ngtcp2_conn *conn, int64_t stream_id,
346                              uint64_t offset, uint64_t datalen, void *user_data,
347                              void *stream_user_data) {
348   auto upstream = static_cast<Http3Upstream *>(user_data);
349 
350   if (upstream->acked_stream_data_offset(stream_id, datalen) != 0) {
351     return NGTCP2_ERR_CALLBACK_FAILURE;
352   }
353 
354   return 0;
355 }
356 } // namespace
357 
acked_stream_data_offset(int64_t stream_id,uint64_t datalen)358 int Http3Upstream::acked_stream_data_offset(int64_t stream_id,
359                                             uint64_t datalen) {
360   if (!httpconn_) {
361     return 0;
362   }
363 
364   auto rv = nghttp3_conn_add_ack_offset(httpconn_, stream_id, datalen);
365   if (rv != 0) {
366     ULOG(ERROR, this) << "nghttp3_conn_add_ack_offset: "
367                       << nghttp3_strerror(rv);
368     return -1;
369   }
370 
371   return 0;
372 }
373 
374 namespace {
extend_max_stream_data(ngtcp2_conn * conn,int64_t stream_id,uint64_t max_data,void * user_data,void * stream_user_data)375 int extend_max_stream_data(ngtcp2_conn *conn, int64_t stream_id,
376                            uint64_t max_data, void *user_data,
377                            void *stream_user_data) {
378   auto upstream = static_cast<Http3Upstream *>(user_data);
379 
380   if (upstream->extend_max_stream_data(stream_id) != 0) {
381     return NGTCP2_ERR_CALLBACK_FAILURE;
382   }
383 
384   return 0;
385 }
386 } // namespace
387 
extend_max_stream_data(int64_t stream_id)388 int Http3Upstream::extend_max_stream_data(int64_t stream_id) {
389   if (!httpconn_) {
390     return 0;
391   }
392 
393   auto rv = nghttp3_conn_unblock_stream(httpconn_, stream_id);
394   if (rv != 0) {
395     ULOG(ERROR, this) << "nghttp3_conn_unblock_stream: "
396                       << nghttp3_strerror(rv);
397     return -1;
398   }
399 
400   return 0;
401 }
402 
403 namespace {
extend_max_remote_streams_bidi(ngtcp2_conn * conn,uint64_t max_streams,void * user_data)404 int extend_max_remote_streams_bidi(ngtcp2_conn *conn, uint64_t max_streams,
405                                    void *user_data) {
406   auto upstream = static_cast<Http3Upstream *>(user_data);
407 
408   upstream->extend_max_remote_streams_bidi(max_streams);
409 
410   return 0;
411 }
412 } // namespace
413 
extend_max_remote_streams_bidi(uint64_t max_streams)414 void Http3Upstream::extend_max_remote_streams_bidi(uint64_t max_streams) {
415   nghttp3_conn_set_max_client_streams_bidi(httpconn_, max_streams);
416 }
417 
418 namespace {
stream_reset(ngtcp2_conn * conn,int64_t stream_id,uint64_t final_size,uint64_t app_error_code,void * user_data,void * stream_user_data)419 int stream_reset(ngtcp2_conn *conn, int64_t stream_id, uint64_t final_size,
420                  uint64_t app_error_code, void *user_data,
421                  void *stream_user_data) {
422   auto upstream = static_cast<Http3Upstream *>(user_data);
423 
424   if (upstream->http_shutdown_stream_read(stream_id) != 0) {
425     return NGTCP2_ERR_CALLBACK_FAILURE;
426   }
427 
428   return 0;
429 }
430 } // namespace
431 
http_shutdown_stream_read(int64_t stream_id)432 int Http3Upstream::http_shutdown_stream_read(int64_t stream_id) {
433   if (!httpconn_) {
434     return 0;
435   }
436 
437   auto rv = nghttp3_conn_shutdown_stream_read(httpconn_, stream_id);
438   if (rv != 0) {
439     ULOG(ERROR, this) << "nghttp3_conn_shutdown_stream_read: "
440                       << nghttp3_strerror(rv);
441     return -1;
442   }
443 
444   return 0;
445 }
446 
447 namespace {
stream_stop_sending(ngtcp2_conn * conn,int64_t stream_id,uint64_t app_error_code,void * user_data,void * stream_user_data)448 int stream_stop_sending(ngtcp2_conn *conn, int64_t stream_id,
449                         uint64_t app_error_code, void *user_data,
450                         void *stream_user_data) {
451   auto upstream = static_cast<Http3Upstream *>(user_data);
452 
453   if (upstream->http_shutdown_stream_read(stream_id) != 0) {
454     return NGTCP2_ERR_CALLBACK_FAILURE;
455   }
456 
457   return 0;
458 }
459 } // namespace
460 
461 namespace {
handshake_completed(ngtcp2_conn * conn,void * user_data)462 int handshake_completed(ngtcp2_conn *conn, void *user_data) {
463   auto upstream = static_cast<Http3Upstream *>(user_data);
464 
465   if (upstream->handshake_completed() != 0) {
466     return NGTCP2_ERR_CALLBACK_FAILURE;
467   }
468 
469   return 0;
470 }
471 } // namespace
472 
handshake_completed()473 int Http3Upstream::handshake_completed() {
474   handler_->set_alpn_from_conn();
475 
476   auto alpn = handler_->get_alpn();
477   if (alpn.empty()) {
478     ULOG(ERROR, this) << "NO ALPN was negotiated";
479     return -1;
480   }
481 
482   auto path = ngtcp2_conn_get_path(conn_);
483 
484   return send_new_token(&path->remote);
485 }
486 
487 namespace {
path_validation(ngtcp2_conn * conn,uint32_t flags,const ngtcp2_path * path,const ngtcp2_path * old_path,ngtcp2_path_validation_result res,void * user_data)488 int path_validation(ngtcp2_conn *conn, uint32_t flags, const ngtcp2_path *path,
489                     const ngtcp2_path *old_path,
490                     ngtcp2_path_validation_result res, void *user_data) {
491   if (res != NGTCP2_PATH_VALIDATION_RESULT_SUCCESS ||
492       !(flags & NGTCP2_PATH_VALIDATION_FLAG_NEW_TOKEN)) {
493     return 0;
494   }
495 
496   auto upstream = static_cast<Http3Upstream *>(user_data);
497   if (upstream->send_new_token(&path->remote) != 0) {
498     return NGTCP2_ERR_CALLBACK_FAILURE;
499   }
500 
501   return 0;
502 }
503 } // namespace
504 
send_new_token(const ngtcp2_addr * remote_addr)505 int Http3Upstream::send_new_token(const ngtcp2_addr *remote_addr) {
506   std::array<uint8_t, NGTCP2_CRYPTO_MAX_REGULAR_TOKENLEN + 1> token;
507   size_t tokenlen;
508 
509   auto worker = handler_->get_worker();
510   auto conn_handler = worker->get_connection_handler();
511   auto &qkms = conn_handler->get_quic_keying_materials();
512   auto &qkm = qkms->keying_materials.front();
513 
514   if (generate_token(token.data(), tokenlen, remote_addr->addr,
515                      remote_addr->addrlen, qkm.secret.data(),
516                      qkm.secret.size()) != 0) {
517     return -1;
518   }
519 
520   assert(tokenlen == NGTCP2_CRYPTO_MAX_REGULAR_TOKENLEN);
521 
522   token[tokenlen++] = qkm.id;
523 
524   auto rv = ngtcp2_conn_submit_new_token(conn_, token.data(), tokenlen);
525   if (rv != 0) {
526     ULOG(ERROR, this) << "ngtcp2_conn_submit_new_token: "
527                       << ngtcp2_strerror(rv);
528     return -1;
529   }
530 
531   return 0;
532 }
533 
534 namespace {
recv_tx_key(ngtcp2_conn * conn,ngtcp2_encryption_level level,void * user_data)535 int recv_tx_key(ngtcp2_conn *conn, ngtcp2_encryption_level level,
536                 void *user_data) {
537   if (level != NGTCP2_ENCRYPTION_LEVEL_1RTT) {
538     return 0;
539   }
540 
541   auto upstream = static_cast<Http3Upstream *>(user_data);
542   if (upstream->setup_httpconn() != 0) {
543     return NGTCP2_ERR_CALLBACK_FAILURE;
544   }
545 
546   return 0;
547 }
548 } // namespace
549 
init(const UpstreamAddr * faddr,const Address & remote_addr,const Address & local_addr,const ngtcp2_pkt_hd & initial_hd,const ngtcp2_cid * odcid,const uint8_t * token,size_t tokenlen)550 int Http3Upstream::init(const UpstreamAddr *faddr, const Address &remote_addr,
551                         const Address &local_addr,
552                         const ngtcp2_pkt_hd &initial_hd,
553                         const ngtcp2_cid *odcid, const uint8_t *token,
554                         size_t tokenlen) {
555   int rv;
556 
557   auto worker = handler_->get_worker();
558   auto conn_handler = worker->get_connection_handler();
559 
560   auto callbacks = ngtcp2_callbacks{
561       nullptr, // client_initial
562       ngtcp2_crypto_recv_client_initial_cb,
563       ngtcp2_crypto_recv_crypto_data_cb,
564       shrpx::handshake_completed,
565       nullptr, // recv_version_negotiation
566       ngtcp2_crypto_encrypt_cb,
567       ngtcp2_crypto_decrypt_cb,
568       ngtcp2_crypto_hp_mask_cb,
569       shrpx::recv_stream_data,
570       shrpx::acked_stream_data_offset,
571       nullptr, // stream_open
572       shrpx::stream_close,
573       nullptr, // recv_stateless_reset
574       nullptr, // recv_retry
575       nullptr, // extend_max_local_streams_bidi
576       nullptr, // extend_max_local_streams_uni
577       rand,
578       get_new_connection_id,
579       remove_connection_id,
580       ngtcp2_crypto_update_key_cb,
581       shrpx::path_validation,
582       nullptr, // select_preferred_addr
583       shrpx::stream_reset,
584       shrpx::extend_max_remote_streams_bidi,
585       nullptr, // extend_max_remote_streams_uni
586       shrpx::extend_max_stream_data,
587       nullptr, // dcid_status
588       nullptr, // handshake_confirmed
589       nullptr, // recv_new_token
590       ngtcp2_crypto_delete_crypto_aead_ctx_cb,
591       ngtcp2_crypto_delete_crypto_cipher_ctx_cb,
592       nullptr, // recv_datagram
593       nullptr, // ack_datagram
594       nullptr, // lost_datagram
595       ngtcp2_crypto_get_path_challenge_data_cb,
596       shrpx::stream_stop_sending,
597       nullptr, // version_negotiation
598       nullptr, // recv_rx_key
599       shrpx::recv_tx_key,
600   };
601 
602   auto config = get_config();
603   auto &quicconf = config->quic;
604   auto &http3conf = config->http3;
605 
606   auto &qkms = conn_handler->get_quic_keying_materials();
607   auto &qkm = qkms->keying_materials.front();
608 
609   ngtcp2_cid scid;
610 
611   if (generate_quic_connection_id(scid, SHRPX_QUIC_SCIDLEN,
612                                   worker->get_cid_prefix(), qkm.id,
613                                   qkm.cid_encryption_key.data()) != 0) {
614     return -1;
615   }
616 
617   ngtcp2_settings settings;
618   ngtcp2_settings_default(&settings);
619   if (quicconf.upstream.debug.log) {
620     settings.log_printf = log_printf;
621   }
622 
623   if (!quicconf.upstream.qlog.dir.empty()) {
624     auto fd = open_qlog_file(quicconf.upstream.qlog.dir, scid);
625     if (fd != -1) {
626       qlog_fd_ = fd;
627       settings.qlog_write = shrpx::qlog_write;
628     }
629   }
630 
631   settings.initial_ts = quic_timestamp();
632   settings.initial_rtt = static_cast<ngtcp2_tstamp>(
633       quicconf.upstream.initial_rtt * NGTCP2_SECONDS);
634   settings.cc_algo = quicconf.upstream.congestion_controller;
635   settings.max_window = http3conf.upstream.max_connection_window_size;
636   settings.max_stream_window = http3conf.upstream.max_window_size;
637   settings.max_tx_udp_payload_size = SHRPX_QUIC_MAX_UDP_PAYLOAD_SIZE;
638   settings.rand_ctx.native_handle = &worker->get_randgen();
639   settings.token = token;
640   settings.tokenlen = tokenlen;
641   settings.initial_pkt_num = std::uniform_int_distribution<uint32_t>(
642       0, std::numeric_limits<int32_t>::max())(worker->get_randgen());
643 
644   ngtcp2_transport_params params;
645   ngtcp2_transport_params_default(&params);
646   params.initial_max_streams_bidi = http3conf.upstream.max_concurrent_streams;
647   // The minimum number of unidirectional streams required for HTTP/3.
648   params.initial_max_streams_uni = 3;
649   params.initial_max_data = http3conf.upstream.connection_window_size;
650   params.initial_max_stream_data_bidi_remote = http3conf.upstream.window_size;
651   params.initial_max_stream_data_uni = http3conf.upstream.window_size;
652   params.max_idle_timeout = static_cast<ngtcp2_tstamp>(
653       quicconf.upstream.timeout.idle * NGTCP2_SECONDS);
654 
655 #ifdef OPENSSL_IS_BORINGSSL
656   if (quicconf.upstream.early_data) {
657     ngtcp2_transport_params early_data_params;
658 
659     ngtcp2_transport_params_default(&early_data_params);
660 
661     early_data_params.initial_max_stream_data_bidi_local =
662         params.initial_max_stream_data_bidi_local;
663     early_data_params.initial_max_stream_data_bidi_remote =
664         params.initial_max_stream_data_bidi_remote;
665     early_data_params.initial_max_stream_data_uni =
666         params.initial_max_stream_data_uni;
667     early_data_params.initial_max_data = params.initial_max_data;
668     early_data_params.initial_max_streams_bidi =
669         params.initial_max_streams_bidi;
670     early_data_params.initial_max_streams_uni = params.initial_max_streams_uni;
671 
672     // TODO include HTTP/3 SETTINGS
673 
674     std::array<uint8_t, 128> quic_early_data_ctx;
675 
676     auto quic_early_data_ctxlen = ngtcp2_transport_params_encode(
677         quic_early_data_ctx.data(), quic_early_data_ctx.size(),
678         &early_data_params);
679 
680     assert(quic_early_data_ctxlen > 0);
681     assert(static_cast<size_t>(quic_early_data_ctxlen) <=
682            quic_early_data_ctx.size());
683 
684     if (SSL_set_quic_early_data_context(handler_->get_ssl(),
685                                         quic_early_data_ctx.data(),
686                                         quic_early_data_ctxlen) != 1) {
687       ULOG(ERROR, this) << "SSL_set_quic_early_data_context failed";
688       return -1;
689     }
690   }
691 #endif // OPENSSL_IS_BORINGSSL
692 
693   if (odcid) {
694     params.original_dcid = *odcid;
695     params.retry_scid = initial_hd.dcid;
696     params.retry_scid_present = 1;
697   } else {
698     params.original_dcid = initial_hd.dcid;
699   }
700 
701   params.original_dcid_present = 1;
702 
703   rv = generate_quic_stateless_reset_token(
704       params.stateless_reset_token, scid, qkm.secret.data(), qkm.secret.size());
705   if (rv != 0) {
706     ULOG(ERROR, this) << "generate_quic_stateless_reset_token failed";
707     return -1;
708   }
709   params.stateless_reset_token_present = 1;
710 
711   auto path = ngtcp2_path{
712       {
713           const_cast<sockaddr *>(&local_addr.su.sa),
714           static_cast<socklen_t>(local_addr.len),
715       },
716       {
717           const_cast<sockaddr *>(&remote_addr.su.sa),
718           static_cast<socklen_t>(remote_addr.len),
719       },
720       const_cast<UpstreamAddr *>(faddr),
721   };
722 
723   rv = ngtcp2_conn_server_new(&conn_, &initial_hd.scid, &scid, &path,
724                               initial_hd.version, &callbacks, &settings,
725                               &params, nullptr, this);
726   if (rv != 0) {
727     ULOG(ERROR, this) << "ngtcp2_conn_server_new: " << ngtcp2_strerror(rv);
728     return -1;
729   }
730 
731   ngtcp2_conn_set_tls_native_handle(conn_, handler_->get_ssl());
732 
733   auto quic_connection_handler = worker->get_quic_connection_handler();
734 
735   if (generate_quic_hashed_connection_id(hashed_scid_, remote_addr, local_addr,
736                                          initial_hd.dcid) != 0) {
737     return -1;
738   }
739 
740   quic_connection_handler->add_connection_id(hashed_scid_, handler_);
741   quic_connection_handler->add_connection_id(scid, handler_);
742 
743   return 0;
744 }
745 
on_read()746 int Http3Upstream::on_read() { return 0; }
747 
on_write()748 int Http3Upstream::on_write() {
749   int rv;
750 
751   if (tx_.send_blocked) {
752     rv = send_blocked_packet();
753     if (rv != 0) {
754       return -1;
755     }
756 
757     if (tx_.send_blocked) {
758       return 0;
759     }
760   }
761 
762   handler_->get_connection()->wlimit.stopw();
763 
764   if (write_streams() != 0) {
765     return -1;
766   }
767 
768   if (httpconn_ && nghttp3_conn_is_drained(httpconn_)) {
769     return -1;
770   }
771 
772   reset_timer();
773 
774   return 0;
775 }
776 
write_streams()777 int Http3Upstream::write_streams() {
778   std::array<nghttp3_vec, 16> vec;
779   auto max_udp_payload_size = ngtcp2_conn_get_max_tx_udp_payload_size(conn_);
780 #ifdef UDP_SEGMENT
781   auto path_max_udp_payload_size =
782       ngtcp2_conn_get_path_max_tx_udp_payload_size(conn_);
783 #endif // UDP_SEGMENT
784   auto max_pktcnt = ngtcp2_conn_get_send_quantum(conn_) / max_udp_payload_size;
785   ngtcp2_pkt_info pi, prev_pi;
786   uint8_t *bufpos = tx_.data.get();
787   ngtcp2_path_storage ps, prev_ps;
788   size_t pktcnt = 0;
789   int rv;
790   size_t gso_size = 0;
791   auto ts = quic_timestamp();
792 
793   ngtcp2_path_storage_zero(&ps);
794   ngtcp2_path_storage_zero(&prev_ps);
795 
796   for (;;) {
797     int64_t stream_id = -1;
798     int fin = 0;
799     nghttp3_ssize sveccnt = 0;
800 
801     if (httpconn_ && ngtcp2_conn_get_max_data_left(conn_)) {
802       sveccnt = nghttp3_conn_writev_stream(httpconn_, &stream_id, &fin,
803                                            vec.data(), vec.size());
804       if (sveccnt < 0) {
805         ULOG(ERROR, this) << "nghttp3_conn_writev_stream: "
806                           << nghttp3_strerror(sveccnt);
807         ngtcp2_ccerr_set_application_error(
808             &last_error_, nghttp3_err_infer_quic_app_error_code(sveccnt),
809             nullptr, 0);
810         return handle_error();
811       }
812     }
813 
814     ngtcp2_ssize ndatalen;
815     auto v = vec.data();
816     auto vcnt = static_cast<size_t>(sveccnt);
817 
818     uint32_t flags = NGTCP2_WRITE_STREAM_FLAG_MORE;
819     if (fin) {
820       flags |= NGTCP2_WRITE_STREAM_FLAG_FIN;
821     }
822 
823     auto nwrite = ngtcp2_conn_writev_stream(
824         conn_, &ps.path, &pi, bufpos, max_udp_payload_size, &ndatalen, flags,
825         stream_id, reinterpret_cast<const ngtcp2_vec *>(v), vcnt, ts);
826     if (nwrite < 0) {
827       switch (nwrite) {
828       case NGTCP2_ERR_STREAM_DATA_BLOCKED:
829         assert(ndatalen == -1);
830         nghttp3_conn_block_stream(httpconn_, stream_id);
831         continue;
832       case NGTCP2_ERR_STREAM_SHUT_WR:
833         assert(ndatalen == -1);
834         nghttp3_conn_shutdown_stream_write(httpconn_, stream_id);
835         continue;
836       case NGTCP2_ERR_WRITE_MORE:
837         assert(ndatalen >= 0);
838         rv = nghttp3_conn_add_write_offset(httpconn_, stream_id, ndatalen);
839         if (rv != 0) {
840           ULOG(ERROR, this)
841               << "nghttp3_conn_add_write_offset: " << nghttp3_strerror(rv);
842           ngtcp2_ccerr_set_application_error(
843               &last_error_, nghttp3_err_infer_quic_app_error_code(rv), nullptr,
844               0);
845           return handle_error();
846         }
847         continue;
848       }
849 
850       assert(ndatalen == -1);
851 
852       ULOG(ERROR, this) << "ngtcp2_conn_writev_stream: "
853                         << ngtcp2_strerror(nwrite);
854 
855       ngtcp2_ccerr_set_liberr(&last_error_, nwrite, nullptr, 0);
856 
857       return handle_error();
858     } else if (ndatalen >= 0) {
859       rv = nghttp3_conn_add_write_offset(httpconn_, stream_id, ndatalen);
860       if (rv != 0) {
861         ULOG(ERROR, this) << "nghttp3_conn_add_write_offset: "
862                           << nghttp3_strerror(rv);
863         ngtcp2_ccerr_set_application_error(
864             &last_error_, nghttp3_err_infer_quic_app_error_code(rv), nullptr,
865             0);
866         return handle_error();
867       }
868     }
869 
870     if (nwrite == 0) {
871       if (bufpos - tx_.data.get()) {
872         auto faddr = static_cast<UpstreamAddr *>(prev_ps.path.user_data);
873         auto data = tx_.data.get();
874         auto datalen = bufpos - data;
875 
876         rv = send_packet(faddr, prev_ps.path.remote.addr,
877                          prev_ps.path.remote.addrlen, prev_ps.path.local.addr,
878                          prev_ps.path.local.addrlen, prev_pi, data, datalen,
879                          gso_size);
880         if (rv == SHRPX_ERR_SEND_BLOCKED) {
881           on_send_blocked(faddr, prev_ps.path.remote, prev_ps.path.local,
882                           prev_pi, data, datalen, gso_size);
883 
884           signal_write_upstream_addr(faddr);
885         }
886       }
887 
888       ngtcp2_conn_update_pkt_tx_time(conn_, ts);
889 
890       return 0;
891     }
892 
893     bufpos += nwrite;
894 
895 #ifdef UDP_SEGMENT
896     if (pktcnt == 0) {
897       ngtcp2_path_copy(&prev_ps.path, &ps.path);
898       prev_pi = pi;
899       gso_size = nwrite;
900     } else if (!ngtcp2_path_eq(&prev_ps.path, &ps.path) ||
901                prev_pi.ecn != pi.ecn ||
902                static_cast<size_t>(nwrite) > gso_size ||
903                (gso_size > path_max_udp_payload_size &&
904                 static_cast<size_t>(nwrite) != gso_size)) {
905       auto faddr = static_cast<UpstreamAddr *>(prev_ps.path.user_data);
906       auto data = tx_.data.get();
907       auto datalen = bufpos - data - nwrite;
908 
909       rv = send_packet(faddr, prev_ps.path.remote.addr,
910                        prev_ps.path.remote.addrlen, prev_ps.path.local.addr,
911                        prev_ps.path.local.addrlen, prev_pi, data, datalen,
912                        gso_size);
913       switch (rv) {
914       case SHRPX_ERR_SEND_BLOCKED:
915         on_send_blocked(faddr, prev_ps.path.remote, prev_ps.path.local, prev_pi,
916                         data, datalen, gso_size);
917 
918         on_send_blocked(static_cast<UpstreamAddr *>(ps.path.user_data),
919                         ps.path.remote, ps.path.local, pi, bufpos - nwrite,
920                         nwrite, 0);
921 
922         signal_write_upstream_addr(faddr);
923 
924         break;
925       default: {
926         auto faddr = static_cast<UpstreamAddr *>(ps.path.user_data);
927         auto data = bufpos - nwrite;
928 
929         rv = send_packet(faddr, ps.path.remote.addr, ps.path.remote.addrlen,
930                          ps.path.local.addr, ps.path.local.addrlen, pi, data,
931                          nwrite, 0);
932         if (rv == SHRPX_ERR_SEND_BLOCKED) {
933           on_send_blocked(faddr, ps.path.remote, ps.path.local, pi, data,
934                           nwrite, 0);
935 
936           signal_write_upstream_addr(faddr);
937         }
938       }
939       }
940 
941       ngtcp2_conn_update_pkt_tx_time(conn_, ts);
942 
943       return 0;
944     }
945 
946     if (++pktcnt == max_pktcnt || static_cast<size_t>(nwrite) < gso_size) {
947       auto faddr = static_cast<UpstreamAddr *>(ps.path.user_data);
948       auto data = tx_.data.get();
949       auto datalen = bufpos - data;
950 
951       rv = send_packet(faddr, ps.path.remote.addr, ps.path.remote.addrlen,
952                        ps.path.local.addr, ps.path.local.addrlen, pi, data,
953                        datalen, gso_size);
954       if (rv == SHRPX_ERR_SEND_BLOCKED) {
955         on_send_blocked(faddr, ps.path.remote, ps.path.local, pi, data, datalen,
956                         gso_size);
957 
958         signal_write_upstream_addr(faddr);
959       }
960 
961       ngtcp2_conn_update_pkt_tx_time(conn_, ts);
962 
963       return 0;
964     }
965 #else  // !UDP_SEGMENT
966     auto faddr = static_cast<UpstreamAddr *>(ps.path.user_data);
967     auto data = tx_.data.get();
968     auto datalen = bufpos - data;
969 
970     rv = send_packet(faddr, ps.path.remote.addr, ps.path.remote.addrlen,
971                      ps.path.local.addr, ps.path.local.addrlen, pi, data,
972                      datalen, 0);
973     if (rv == SHRPX_ERR_SEND_BLOCKED) {
974       on_send_blocked(faddr, ps.path.remote, ps.path.local, pi, data, datalen,
975                       0);
976 
977       ngtcp2_conn_update_pkt_tx_time(conn_, ts);
978 
979       signal_write_upstream_addr(faddr);
980 
981       return 0;
982     }
983 
984     if (++pktcnt == max_pktcnt) {
985       ngtcp2_conn_update_pkt_tx_time(conn_, ts);
986 
987       return 0;
988     }
989 
990     bufpos = tx_.data.get();
991 #endif // !UDP_SEGMENT
992   }
993 
994   return 0;
995 }
996 
on_timeout(Downstream * downstream)997 int Http3Upstream::on_timeout(Downstream *downstream) { return 0; }
998 
on_downstream_abort_request(Downstream * downstream,unsigned int status_code)999 int Http3Upstream::on_downstream_abort_request(Downstream *downstream,
1000                                                unsigned int status_code) {
1001   int rv;
1002 
1003   rv = error_reply(downstream, status_code);
1004 
1005   if (rv != 0) {
1006     return -1;
1007   }
1008 
1009   handler_->signal_write();
1010 
1011   return 0;
1012 }
1013 
on_downstream_abort_request_with_https_redirect(Downstream * downstream)1014 int Http3Upstream::on_downstream_abort_request_with_https_redirect(
1015     Downstream *downstream) {
1016   assert(0);
1017   abort();
1018 }
1019 
1020 namespace {
1021 uint64_t
infer_upstream_shutdown_stream_error_code(uint32_t downstream_error_code)1022 infer_upstream_shutdown_stream_error_code(uint32_t downstream_error_code) {
1023   // NGHTTP2_REFUSED_STREAM is important because it tells upstream
1024   // client to retry.
1025   switch (downstream_error_code) {
1026   case NGHTTP2_NO_ERROR:
1027     return NGHTTP3_H3_NO_ERROR;
1028   case NGHTTP2_REFUSED_STREAM:
1029     return NGHTTP3_H3_REQUEST_REJECTED;
1030   default:
1031     return NGHTTP3_H3_INTERNAL_ERROR;
1032   }
1033 }
1034 } // namespace
1035 
downstream_read(DownstreamConnection * dconn)1036 int Http3Upstream::downstream_read(DownstreamConnection *dconn) {
1037   auto downstream = dconn->get_downstream();
1038 
1039   if (downstream->get_response_state() == DownstreamState::MSG_RESET) {
1040     // The downstream stream was reset (canceled). In this case,
1041     // RST_STREAM to the upstream and delete downstream connection
1042     // here. Deleting downstream will be taken place at
1043     // on_stream_close_callback.
1044     shutdown_stream(downstream,
1045                     infer_upstream_shutdown_stream_error_code(
1046                         downstream->get_response_rst_stream_error_code()));
1047     downstream->pop_downstream_connection();
1048     // dconn was deleted
1049     dconn = nullptr;
1050   } else if (downstream->get_response_state() ==
1051              DownstreamState::MSG_BAD_HEADER) {
1052     if (error_reply(downstream, 502) != 0) {
1053       return -1;
1054     }
1055     downstream->pop_downstream_connection();
1056     // dconn was deleted
1057     dconn = nullptr;
1058   } else {
1059     auto rv = downstream->on_read();
1060     if (rv == SHRPX_ERR_EOF) {
1061       if (downstream->get_request_header_sent()) {
1062         return downstream_eof(dconn);
1063       }
1064       return SHRPX_ERR_RETRY;
1065     }
1066     if (rv == SHRPX_ERR_DCONN_CANCELED) {
1067       downstream->pop_downstream_connection();
1068       handler_->signal_write();
1069       return 0;
1070     }
1071     if (rv != 0) {
1072       if (rv != SHRPX_ERR_NETWORK) {
1073         if (LOG_ENABLED(INFO)) {
1074           DCLOG(INFO, dconn) << "HTTP parser failure";
1075         }
1076       }
1077       return downstream_error(dconn, Downstream::EVENT_ERROR);
1078     }
1079 
1080     if (downstream->can_detach_downstream_connection()) {
1081       // Keep-alive
1082       downstream->detach_downstream_connection();
1083     }
1084   }
1085 
1086   handler_->signal_write();
1087 
1088   // At this point, downstream may be deleted.
1089 
1090   return 0;
1091 }
1092 
downstream_write(DownstreamConnection * dconn)1093 int Http3Upstream::downstream_write(DownstreamConnection *dconn) {
1094   int rv;
1095   rv = dconn->on_write();
1096   if (rv == SHRPX_ERR_NETWORK) {
1097     return downstream_error(dconn, Downstream::EVENT_ERROR);
1098   }
1099   if (rv != 0) {
1100     return rv;
1101   }
1102   return 0;
1103 }
1104 
downstream_eof(DownstreamConnection * dconn)1105 int Http3Upstream::downstream_eof(DownstreamConnection *dconn) {
1106   auto downstream = dconn->get_downstream();
1107 
1108   if (LOG_ENABLED(INFO)) {
1109     DCLOG(INFO, dconn) << "EOF. stream_id=" << downstream->get_stream_id();
1110   }
1111 
1112   // Delete downstream connection. If we don't delete it here, it will
1113   // be pooled in on_stream_close_callback.
1114   downstream->pop_downstream_connection();
1115   // dconn was deleted
1116   dconn = nullptr;
1117   // downstream will be deleted in on_stream_close_callback.
1118   if (downstream->get_response_state() == DownstreamState::HEADER_COMPLETE) {
1119     // Server may indicate the end of the request by EOF
1120     if (LOG_ENABLED(INFO)) {
1121       ULOG(INFO, this) << "Downstream body was ended by EOF";
1122     }
1123     downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1124 
1125     // For tunneled connection, MSG_COMPLETE signals
1126     // downstream_read_data_callback to send RST_STREAM after pending
1127     // response body is sent. This is needed to ensure that RST_STREAM
1128     // is sent after all pending data are sent.
1129     if (on_downstream_body_complete(downstream) != 0) {
1130       return -1;
1131     }
1132   } else if (downstream->get_response_state() !=
1133              DownstreamState::MSG_COMPLETE) {
1134     // If stream was not closed, then we set MSG_COMPLETE and let
1135     // on_stream_close_callback delete downstream.
1136     if (error_reply(downstream, 502) != 0) {
1137       return -1;
1138     }
1139   }
1140   handler_->signal_write();
1141   // At this point, downstream may be deleted.
1142   return 0;
1143 }
1144 
downstream_error(DownstreamConnection * dconn,int events)1145 int Http3Upstream::downstream_error(DownstreamConnection *dconn, int events) {
1146   auto downstream = dconn->get_downstream();
1147 
1148   if (LOG_ENABLED(INFO)) {
1149     if (events & Downstream::EVENT_ERROR) {
1150       DCLOG(INFO, dconn) << "Downstream network/general error";
1151     } else {
1152       DCLOG(INFO, dconn) << "Timeout";
1153     }
1154     if (downstream->get_upgraded()) {
1155       DCLOG(INFO, dconn) << "Note: this is tunnel connection";
1156     }
1157   }
1158 
1159   // Delete downstream connection. If we don't delete it here, it will
1160   // be pooled in on_stream_close_callback.
1161   downstream->pop_downstream_connection();
1162   // dconn was deleted
1163   dconn = nullptr;
1164 
1165   if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1166     // For SSL tunneling, we issue RST_STREAM. For other types of
1167     // stream, we don't have to do anything since response was
1168     // complete.
1169     if (downstream->get_upgraded()) {
1170       shutdown_stream(downstream, NGHTTP3_H3_NO_ERROR);
1171     }
1172   } else {
1173     if (downstream->get_response_state() == DownstreamState::HEADER_COMPLETE) {
1174       if (downstream->get_upgraded()) {
1175         if (on_downstream_body_complete(downstream) != 0) {
1176           return -1;
1177         }
1178       } else {
1179         shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
1180       }
1181     } else {
1182       unsigned int status;
1183       if (events & Downstream::EVENT_TIMEOUT) {
1184         if (downstream->get_request_header_sent()) {
1185           status = 504;
1186         } else {
1187           status = 408;
1188         }
1189       } else {
1190         status = 502;
1191       }
1192       if (error_reply(downstream, status) != 0) {
1193         return -1;
1194       }
1195     }
1196     downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1197   }
1198   handler_->signal_write();
1199   // At this point, downstream may be deleted.
1200   return 0;
1201 }
1202 
get_client_handler() const1203 ClientHandler *Http3Upstream::get_client_handler() const { return handler_; }
1204 
1205 namespace {
downstream_read_data_callback(nghttp3_conn * conn,int64_t stream_id,nghttp3_vec * vec,size_t veccnt,uint32_t * pflags,void * conn_user_data,void * stream_user_data)1206 nghttp3_ssize downstream_read_data_callback(nghttp3_conn *conn,
1207                                             int64_t stream_id, nghttp3_vec *vec,
1208                                             size_t veccnt, uint32_t *pflags,
1209                                             void *conn_user_data,
1210                                             void *stream_user_data) {
1211   auto upstream = static_cast<Http3Upstream *>(conn_user_data);
1212   auto downstream = static_cast<Downstream *>(stream_user_data);
1213 
1214   assert(downstream);
1215 
1216   auto body = downstream->get_response_buf();
1217 
1218   assert(body);
1219 
1220   if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE &&
1221       body->rleft_mark() == 0) {
1222     downstream->disable_upstream_wtimer();
1223     return NGHTTP3_ERR_WOULDBLOCK;
1224   }
1225 
1226   downstream->reset_upstream_wtimer();
1227 
1228   veccnt = body->riovec_mark(reinterpret_cast<struct iovec *>(vec), veccnt);
1229 
1230   if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE &&
1231       body->rleft_mark() == 0) {
1232     *pflags |= NGHTTP3_DATA_FLAG_EOF;
1233   }
1234 
1235   assert((*pflags & NGHTTP3_DATA_FLAG_EOF) || veccnt);
1236 
1237   downstream->response_sent_body_length += nghttp3_vec_len(vec, veccnt);
1238 
1239   if ((*pflags & NGHTTP3_DATA_FLAG_EOF) &&
1240       upstream->shutdown_stream_read(stream_id, NGHTTP3_H3_NO_ERROR) != 0) {
1241     return NGHTTP3_ERR_CALLBACK_FAILURE;
1242   }
1243 
1244   return veccnt;
1245 }
1246 } // namespace
1247 
on_downstream_header_complete(Downstream * downstream)1248 int Http3Upstream::on_downstream_header_complete(Downstream *downstream) {
1249   int rv;
1250 
1251   const auto &req = downstream->request();
1252   auto &resp = downstream->response();
1253 
1254   auto &balloc = downstream->get_block_allocator();
1255 
1256   if (LOG_ENABLED(INFO)) {
1257     if (downstream->get_non_final_response()) {
1258       DLOG(INFO, downstream) << "HTTP non-final response header";
1259     } else {
1260       DLOG(INFO, downstream) << "HTTP response header completed";
1261     }
1262   }
1263 
1264   auto config = get_config();
1265   auto &httpconf = config->http;
1266 
1267   if (!config->http2_proxy && !httpconf.no_location_rewrite) {
1268     downstream->rewrite_location_response_header(req.scheme);
1269   }
1270 
1271 #ifdef HAVE_MRUBY
1272   if (!downstream->get_non_final_response()) {
1273     auto dconn = downstream->get_downstream_connection();
1274     const auto &group = dconn->get_downstream_addr_group();
1275     if (group) {
1276       const auto &dmruby_ctx = group->shared_addr->mruby_ctx;
1277 
1278       if (dmruby_ctx->run_on_response_proc(downstream) != 0) {
1279         if (error_reply(downstream, 500) != 0) {
1280           return -1;
1281         }
1282         // Returning -1 will signal deletion of dconn.
1283         return -1;
1284       }
1285 
1286       if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1287         return -1;
1288       }
1289     }
1290 
1291     auto worker = handler_->get_worker();
1292     auto mruby_ctx = worker->get_mruby_context();
1293 
1294     if (mruby_ctx->run_on_response_proc(downstream) != 0) {
1295       if (error_reply(downstream, 500) != 0) {
1296         return -1;
1297       }
1298       // Returning -1 will signal deletion of dconn.
1299       return -1;
1300     }
1301 
1302     if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1303       return -1;
1304     }
1305   }
1306 #endif // HAVE_MRUBY
1307 
1308   auto nva = std::vector<nghttp3_nv>();
1309   // 4 means :status and possible server, via, and set-cookie (for
1310   // affinity cookie) header field.
1311   nva.reserve(resp.fs.headers().size() + 4 +
1312               httpconf.add_response_headers.size());
1313 
1314   if (downstream->get_non_final_response()) {
1315     auto response_status = http2::stringify_status(balloc, resp.http_status);
1316 
1317     nva.push_back(http3::make_nv_ls_nocopy(":status", response_status));
1318 
1319     http3::copy_headers_to_nva_nocopy(nva, resp.fs.headers(),
1320                                       http2::HDOP_STRIP_ALL);
1321 
1322     if (LOG_ENABLED(INFO)) {
1323       log_response_headers(downstream, nva);
1324     }
1325 
1326     rv = nghttp3_conn_submit_info(httpconn_, downstream->get_stream_id(),
1327                                   nva.data(), nva.size());
1328 
1329     resp.fs.clear_headers();
1330 
1331     if (rv != 0) {
1332       ULOG(FATAL, this) << "nghttp3_conn_submit_info() failed";
1333       return -1;
1334     }
1335 
1336     return 0;
1337   }
1338 
1339   auto striphd_flags = http2::HDOP_STRIP_ALL & ~http2::HDOP_STRIP_VIA;
1340   StringRef response_status;
1341 
1342   if (req.connect_proto == ConnectProto::WEBSOCKET && resp.http_status == 101) {
1343     response_status = http2::stringify_status(balloc, 200);
1344     striphd_flags |= http2::HDOP_STRIP_SEC_WEBSOCKET_ACCEPT;
1345   } else {
1346     response_status = http2::stringify_status(balloc, resp.http_status);
1347   }
1348 
1349   nva.push_back(http3::make_nv_ls_nocopy(":status", response_status));
1350 
1351   http3::copy_headers_to_nva_nocopy(nva, resp.fs.headers(), striphd_flags);
1352 
1353   if (!config->http2_proxy && !httpconf.no_server_rewrite) {
1354     nva.push_back(http3::make_nv_ls_nocopy("server", httpconf.server_name));
1355   } else {
1356     auto server = resp.fs.header(http2::HD_SERVER);
1357     if (server) {
1358       nva.push_back(http3::make_nv_ls_nocopy("server", (*server).value));
1359     }
1360   }
1361 
1362   if (!req.regular_connect_method() || !downstream->get_upgraded()) {
1363     auto affinity_cookie = downstream->get_affinity_cookie_to_send();
1364     if (affinity_cookie) {
1365       auto dconn = downstream->get_downstream_connection();
1366       assert(dconn);
1367       auto &group = dconn->get_downstream_addr_group();
1368       auto &shared_addr = group->shared_addr;
1369       auto &cookieconf = shared_addr->affinity.cookie;
1370       auto secure =
1371           http::require_cookie_secure_attribute(cookieconf.secure, req.scheme);
1372       auto cookie_str = http::create_affinity_cookie(
1373           balloc, cookieconf.name, affinity_cookie, cookieconf.path, secure);
1374       nva.push_back(http3::make_nv_ls_nocopy("set-cookie", cookie_str));
1375     }
1376   }
1377 
1378   auto via = resp.fs.header(http2::HD_VIA);
1379   if (httpconf.no_via) {
1380     if (via) {
1381       nva.push_back(http3::make_nv_ls_nocopy("via", (*via).value));
1382     }
1383   } else {
1384     // we don't create more than 16 bytes in
1385     // http::create_via_header_value.
1386     size_t len = 16;
1387     if (via) {
1388       len += via->value.size() + 2;
1389     }
1390 
1391     auto iov = make_byte_ref(balloc, len + 1);
1392     auto p = iov.base;
1393     if (via) {
1394       p = std::copy(std::begin(via->value), std::end(via->value), p);
1395       p = util::copy_lit(p, ", ");
1396     }
1397     p = http::create_via_header_value(p, resp.http_major, resp.http_minor);
1398     *p = '\0';
1399 
1400     nva.push_back(http3::make_nv_ls_nocopy("via", StringRef{iov.base, p}));
1401   }
1402 
1403   for (auto &p : httpconf.add_response_headers) {
1404     nva.push_back(http3::make_nv_nocopy(p.name, p.value));
1405   }
1406 
1407   if (LOG_ENABLED(INFO)) {
1408     log_response_headers(downstream, nva);
1409   }
1410 
1411   nghttp3_data_reader data_read;
1412   data_read.read_data = downstream_read_data_callback;
1413 
1414   nghttp3_data_reader *data_readptr;
1415 
1416   if (downstream->expect_response_body() ||
1417       downstream->expect_response_trailer()) {
1418     data_readptr = &data_read;
1419   } else {
1420     data_readptr = nullptr;
1421   }
1422 
1423   rv = nghttp3_conn_submit_response(httpconn_, downstream->get_stream_id(),
1424                                     nva.data(), nva.size(), data_readptr);
1425   if (rv != 0) {
1426     ULOG(FATAL, this) << "nghttp3_conn_submit_response() failed";
1427     return -1;
1428   }
1429 
1430   if (data_readptr) {
1431     downstream->reset_upstream_wtimer();
1432   } else if (shutdown_stream_read(downstream->get_stream_id(),
1433                                   NGHTTP3_H3_NO_ERROR) != 0) {
1434     return -1;
1435   }
1436 
1437   return 0;
1438 }
1439 
on_downstream_body(Downstream * downstream,const uint8_t * data,size_t len,bool flush)1440 int Http3Upstream::on_downstream_body(Downstream *downstream,
1441                                       const uint8_t *data, size_t len,
1442                                       bool flush) {
1443   auto body = downstream->get_response_buf();
1444   body->append(data, len);
1445 
1446   if (flush) {
1447     nghttp3_conn_resume_stream(httpconn_, downstream->get_stream_id());
1448 
1449     downstream->ensure_upstream_wtimer();
1450   }
1451 
1452   return 0;
1453 }
1454 
on_downstream_body_complete(Downstream * downstream)1455 int Http3Upstream::on_downstream_body_complete(Downstream *downstream) {
1456   if (LOG_ENABLED(INFO)) {
1457     DLOG(INFO, downstream) << "HTTP response completed";
1458   }
1459 
1460   auto &resp = downstream->response();
1461 
1462   if (!downstream->validate_response_recv_body_length()) {
1463     shutdown_stream(downstream, NGHTTP3_H3_GENERAL_PROTOCOL_ERROR);
1464     resp.connection_close = true;
1465     return 0;
1466   }
1467 
1468   if (!downstream->get_upgraded()) {
1469     const auto &trailers = resp.fs.trailers();
1470     if (!trailers.empty()) {
1471       std::vector<nghttp3_nv> nva;
1472       nva.reserve(trailers.size());
1473       http3::copy_headers_to_nva_nocopy(nva, trailers, http2::HDOP_STRIP_ALL);
1474       if (!nva.empty()) {
1475         auto rv = nghttp3_conn_submit_trailers(
1476             httpconn_, downstream->get_stream_id(), nva.data(), nva.size());
1477         if (rv != 0) {
1478           ULOG(FATAL, this) << "nghttp3_conn_submit_trailers() failed: "
1479                             << nghttp3_strerror(rv);
1480           return -1;
1481         }
1482       }
1483     }
1484   }
1485 
1486   nghttp3_conn_resume_stream(httpconn_, downstream->get_stream_id());
1487   downstream->ensure_upstream_wtimer();
1488 
1489   return 0;
1490 }
1491 
on_handler_delete()1492 void Http3Upstream::on_handler_delete() {
1493   for (auto d = downstream_queue_.get_downstreams(); d; d = d->dlnext) {
1494     if (d->get_dispatch_state() == DispatchState::ACTIVE &&
1495         d->accesslog_ready()) {
1496       handler_->write_accesslog(d);
1497     }
1498   }
1499 
1500   auto worker = handler_->get_worker();
1501   auto quic_conn_handler = worker->get_quic_connection_handler();
1502 
1503   std::vector<ngtcp2_cid> scids(ngtcp2_conn_get_scid(conn_, nullptr) + 1);
1504   ngtcp2_conn_get_scid(conn_, scids.data());
1505   scids.back() = hashed_scid_;
1506 
1507   for (auto &cid : scids) {
1508     quic_conn_handler->remove_connection_id(cid);
1509   }
1510 
1511   if (retry_close_ || last_error_.type == NGTCP2_CCERR_TYPE_IDLE_CLOSE) {
1512     return;
1513   }
1514 
1515   // If this is not idle close, send CONNECTION_CLOSE.
1516   if (!ngtcp2_conn_in_closing_period(conn_) &&
1517       !ngtcp2_conn_in_draining_period(conn_)) {
1518     ngtcp2_path_storage ps;
1519     ngtcp2_pkt_info pi;
1520     conn_close_.resize(SHRPX_QUIC_CONN_CLOSE_PKTLEN);
1521 
1522     ngtcp2_path_storage_zero(&ps);
1523 
1524     ngtcp2_ccerr ccerr;
1525     ngtcp2_ccerr_default(&ccerr);
1526 
1527     if (worker->get_graceful_shutdown() &&
1528         !ngtcp2_conn_get_handshake_completed(conn_)) {
1529       ccerr.error_code = NGTCP2_CONNECTION_REFUSED;
1530     }
1531 
1532     auto nwrite = ngtcp2_conn_write_connection_close(
1533         conn_, &ps.path, &pi, conn_close_.data(), conn_close_.size(), &ccerr,
1534         quic_timestamp());
1535     if (nwrite < 0) {
1536       if (nwrite != NGTCP2_ERR_INVALID_STATE) {
1537         ULOG(ERROR, this) << "ngtcp2_conn_write_connection_close: "
1538                           << ngtcp2_strerror(nwrite);
1539       }
1540 
1541       return;
1542     }
1543 
1544     conn_close_.resize(nwrite);
1545 
1546     send_packet(static_cast<UpstreamAddr *>(ps.path.user_data),
1547                 ps.path.remote.addr, ps.path.remote.addrlen, ps.path.local.addr,
1548                 ps.path.local.addrlen, pi, conn_close_.data(), nwrite, 0);
1549   }
1550 
1551   auto d =
1552       static_cast<ev_tstamp>(ngtcp2_conn_get_pto(conn_) * 3) / NGTCP2_SECONDS;
1553 
1554   if (LOG_ENABLED(INFO)) {
1555     ULOG(INFO, this) << "Enter close-wait period " << d << "s with "
1556                      << conn_close_.size() << " bytes sentinel packet";
1557   }
1558 
1559   auto cw = std::make_unique<CloseWait>(worker, std::move(scids),
1560                                         std::move(conn_close_), d);
1561 
1562   quic_conn_handler->add_close_wait(cw.get());
1563 
1564   cw.release();
1565 }
1566 
on_downstream_reset(Downstream * downstream,bool no_retry)1567 int Http3Upstream::on_downstream_reset(Downstream *downstream, bool no_retry) {
1568   int rv;
1569 
1570   if (downstream->get_dispatch_state() != DispatchState::ACTIVE) {
1571     // This is error condition when we failed push_request_headers()
1572     // in initiate_downstream().  Otherwise, we have
1573     // DispatchState::ACTIVE state, or we did not set
1574     // DownstreamConnection.
1575     downstream->pop_downstream_connection();
1576     handler_->signal_write();
1577 
1578     return 0;
1579   }
1580 
1581   if (!downstream->request_submission_ready()) {
1582     if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1583       // We have got all response body already.  Send it off.
1584       downstream->pop_downstream_connection();
1585       return 0;
1586     }
1587     // pushed stream is handled here
1588     shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
1589     downstream->pop_downstream_connection();
1590 
1591     handler_->signal_write();
1592 
1593     return 0;
1594   }
1595 
1596   downstream->pop_downstream_connection();
1597 
1598   downstream->add_retry();
1599 
1600   std::unique_ptr<DownstreamConnection> dconn;
1601 
1602   rv = 0;
1603 
1604   if (no_retry || downstream->no_more_retry()) {
1605     goto fail;
1606   }
1607 
1608   // downstream connection is clean; we can retry with new
1609   // downstream connection.
1610 
1611   for (;;) {
1612     auto dconn = handler_->get_downstream_connection(rv, downstream);
1613     if (!dconn) {
1614       goto fail;
1615     }
1616 
1617     rv = downstream->attach_downstream_connection(std::move(dconn));
1618     if (rv == 0) {
1619       break;
1620     }
1621   }
1622 
1623   rv = downstream->push_request_headers();
1624   if (rv != 0) {
1625     goto fail;
1626   }
1627 
1628   return 0;
1629 
1630 fail:
1631   if (rv == SHRPX_ERR_TLS_REQUIRED) {
1632     assert(0);
1633     abort();
1634   }
1635 
1636   rv = on_downstream_abort_request(downstream, 502);
1637   if (rv != 0) {
1638     shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
1639   }
1640   downstream->pop_downstream_connection();
1641 
1642   handler_->signal_write();
1643 
1644   return 0;
1645 }
1646 
pause_read(IOCtrlReason reason)1647 void Http3Upstream::pause_read(IOCtrlReason reason) {}
1648 
resume_read(IOCtrlReason reason,Downstream * downstream,size_t consumed)1649 int Http3Upstream::resume_read(IOCtrlReason reason, Downstream *downstream,
1650                                size_t consumed) {
1651   consume(downstream->get_stream_id(), consumed);
1652 
1653   auto &req = downstream->request();
1654 
1655   req.consume(consumed);
1656 
1657   handler_->signal_write();
1658 
1659   return 0;
1660 }
1661 
send_reply(Downstream * downstream,const uint8_t * body,size_t bodylen)1662 int Http3Upstream::send_reply(Downstream *downstream, const uint8_t *body,
1663                               size_t bodylen) {
1664   int rv;
1665 
1666   nghttp3_data_reader data_read, *data_read_ptr = nullptr;
1667 
1668   if (bodylen) {
1669     data_read.read_data = downstream_read_data_callback;
1670     data_read_ptr = &data_read;
1671   }
1672 
1673   const auto &resp = downstream->response();
1674   auto config = get_config();
1675   auto &httpconf = config->http;
1676 
1677   auto &balloc = downstream->get_block_allocator();
1678 
1679   const auto &headers = resp.fs.headers();
1680   auto nva = std::vector<nghttp3_nv>();
1681   // 2 for :status and server
1682   nva.reserve(2 + headers.size() + httpconf.add_response_headers.size());
1683 
1684   auto response_status = http2::stringify_status(balloc, resp.http_status);
1685 
1686   nva.push_back(http3::make_nv_ls_nocopy(":status", response_status));
1687 
1688   for (auto &kv : headers) {
1689     if (kv.name.empty() || kv.name[0] == ':') {
1690       continue;
1691     }
1692     switch (kv.token) {
1693     case http2::HD_CONNECTION:
1694     case http2::HD_KEEP_ALIVE:
1695     case http2::HD_PROXY_CONNECTION:
1696     case http2::HD_TE:
1697     case http2::HD_TRANSFER_ENCODING:
1698     case http2::HD_UPGRADE:
1699       continue;
1700     }
1701     nva.push_back(http3::make_nv_nocopy(kv.name, kv.value, kv.no_index));
1702   }
1703 
1704   if (!resp.fs.header(http2::HD_SERVER)) {
1705     nva.push_back(http3::make_nv_ls_nocopy("server", config->http.server_name));
1706   }
1707 
1708   for (auto &p : httpconf.add_response_headers) {
1709     nva.push_back(http3::make_nv_nocopy(p.name, p.value));
1710   }
1711 
1712   rv = nghttp3_conn_submit_response(httpconn_, downstream->get_stream_id(),
1713                                     nva.data(), nva.size(), data_read_ptr);
1714   if (nghttp3_err_is_fatal(rv)) {
1715     ULOG(FATAL, this) << "nghttp3_conn_submit_response() failed: "
1716                       << nghttp3_strerror(rv);
1717     return -1;
1718   }
1719 
1720   auto buf = downstream->get_response_buf();
1721 
1722   buf->append(body, bodylen);
1723 
1724   downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1725 
1726   if (data_read_ptr) {
1727     downstream->reset_upstream_wtimer();
1728   }
1729 
1730   if (shutdown_stream_read(downstream->get_stream_id(), NGHTTP3_H3_NO_ERROR) !=
1731       0) {
1732     return -1;
1733   }
1734 
1735   return 0;
1736 }
1737 
initiate_push(Downstream * downstream,const StringRef & uri)1738 int Http3Upstream::initiate_push(Downstream *downstream, const StringRef &uri) {
1739   return 0;
1740 }
1741 
response_riovec(struct iovec * iov,int iovcnt) const1742 int Http3Upstream::response_riovec(struct iovec *iov, int iovcnt) const {
1743   return 0;
1744 }
1745 
response_drain(size_t n)1746 void Http3Upstream::response_drain(size_t n) {}
1747 
response_empty() const1748 bool Http3Upstream::response_empty() const { return false; }
1749 
1750 Downstream *
on_downstream_push_promise(Downstream * downstream,int32_t promised_stream_id)1751 Http3Upstream::on_downstream_push_promise(Downstream *downstream,
1752                                           int32_t promised_stream_id) {
1753   return nullptr;
1754 }
1755 
on_downstream_push_promise_complete(Downstream * downstream,Downstream * promised_downstream)1756 int Http3Upstream::on_downstream_push_promise_complete(
1757     Downstream *downstream, Downstream *promised_downstream) {
1758   return 0;
1759 }
1760 
push_enabled() const1761 bool Http3Upstream::push_enabled() const { return false; }
1762 
cancel_premature_downstream(Downstream * promised_downstream)1763 void Http3Upstream::cancel_premature_downstream(
1764     Downstream *promised_downstream) {}
1765 
on_read(const UpstreamAddr * faddr,const Address & remote_addr,const Address & local_addr,const ngtcp2_pkt_info & pi,const uint8_t * data,size_t datalen)1766 int Http3Upstream::on_read(const UpstreamAddr *faddr,
1767                            const Address &remote_addr,
1768                            const Address &local_addr, const ngtcp2_pkt_info &pi,
1769                            const uint8_t *data, size_t datalen) {
1770   int rv;
1771 
1772   auto path = ngtcp2_path{
1773       {
1774           const_cast<sockaddr *>(&local_addr.su.sa),
1775           static_cast<socklen_t>(local_addr.len),
1776       },
1777       {
1778           const_cast<sockaddr *>(&remote_addr.su.sa),
1779           static_cast<socklen_t>(remote_addr.len),
1780       },
1781       const_cast<UpstreamAddr *>(faddr),
1782   };
1783 
1784   rv = ngtcp2_conn_read_pkt(conn_, &path, &pi, data, datalen, quic_timestamp());
1785   if (rv != 0) {
1786     switch (rv) {
1787     case NGTCP2_ERR_DRAINING:
1788       return -1;
1789     case NGTCP2_ERR_RETRY: {
1790       auto worker = handler_->get_worker();
1791       auto quic_conn_handler = worker->get_quic_connection_handler();
1792 
1793       if (worker->get_graceful_shutdown()) {
1794         ngtcp2_ccerr_set_transport_error(&last_error_,
1795                                          NGTCP2_CONNECTION_REFUSED, nullptr, 0);
1796 
1797         return handle_error();
1798       }
1799 
1800       ngtcp2_version_cid vc;
1801 
1802       rv =
1803           ngtcp2_pkt_decode_version_cid(&vc, data, datalen, SHRPX_QUIC_SCIDLEN);
1804       if (rv != 0) {
1805         return -1;
1806       }
1807 
1808       retry_close_ = true;
1809 
1810       quic_conn_handler->send_retry(handler_->get_upstream_addr(), vc.version,
1811                                     vc.dcid, vc.dcidlen, vc.scid, vc.scidlen,
1812                                     remote_addr, local_addr, datalen * 3);
1813 
1814       return -1;
1815     }
1816     case NGTCP2_ERR_CRYPTO:
1817       if (!last_error_.error_code) {
1818         ngtcp2_ccerr_set_tls_alert(
1819             &last_error_, ngtcp2_conn_get_tls_alert(conn_), nullptr, 0);
1820       }
1821       break;
1822     case NGTCP2_ERR_DROP_CONN:
1823       return -1;
1824     default:
1825       if (!last_error_.error_code) {
1826         ngtcp2_ccerr_set_liberr(&last_error_, rv, nullptr, 0);
1827       }
1828     }
1829 
1830     ULOG(ERROR, this) << "ngtcp2_conn_read_pkt: " << ngtcp2_strerror(rv);
1831 
1832     return handle_error();
1833   }
1834 
1835   return 0;
1836 }
1837 
send_packet(const UpstreamAddr * faddr,const sockaddr * remote_sa,size_t remote_salen,const sockaddr * local_sa,size_t local_salen,const ngtcp2_pkt_info & pi,const uint8_t * data,size_t datalen,size_t gso_size)1838 int Http3Upstream::send_packet(const UpstreamAddr *faddr,
1839                                const sockaddr *remote_sa, size_t remote_salen,
1840                                const sockaddr *local_sa, size_t local_salen,
1841                                const ngtcp2_pkt_info &pi, const uint8_t *data,
1842                                size_t datalen, size_t gso_size) {
1843   auto rv = quic_send_packet(faddr, remote_sa, remote_salen, local_sa,
1844                              local_salen, pi, data, datalen, gso_size);
1845   switch (rv) {
1846   case 0:
1847     return 0;
1848     // With GSO, sendmsg may fail with EINVAL if UDP payload is too
1849     // large.
1850   case -EINVAL:
1851   case -EMSGSIZE:
1852     // Let the packet lost.
1853     break;
1854   case -EAGAIN:
1855 #if EAGAIN != EWOULDBLOCK
1856   case -EWOULDBLOCK:
1857 #endif // EAGAIN != EWOULDBLOCK
1858     return SHRPX_ERR_SEND_BLOCKED;
1859   default:
1860     break;
1861   }
1862 
1863   return -1;
1864 }
1865 
on_send_blocked(const UpstreamAddr * faddr,const ngtcp2_addr & remote_addr,const ngtcp2_addr & local_addr,const ngtcp2_pkt_info & pi,const uint8_t * data,size_t datalen,size_t gso_size)1866 void Http3Upstream::on_send_blocked(const UpstreamAddr *faddr,
1867                                     const ngtcp2_addr &remote_addr,
1868                                     const ngtcp2_addr &local_addr,
1869                                     const ngtcp2_pkt_info &pi,
1870                                     const uint8_t *data, size_t datalen,
1871                                     size_t gso_size) {
1872   assert(tx_.num_blocked || !tx_.send_blocked);
1873   assert(tx_.num_blocked < 2);
1874 
1875   tx_.send_blocked = true;
1876 
1877   auto &p = tx_.blocked[tx_.num_blocked++];
1878 
1879   memcpy(&p.local_addr.su, local_addr.addr, local_addr.addrlen);
1880   memcpy(&p.remote_addr.su, remote_addr.addr, remote_addr.addrlen);
1881 
1882   p.local_addr.len = local_addr.addrlen;
1883   p.remote_addr.len = remote_addr.addrlen;
1884   p.faddr = faddr;
1885   p.pi = pi;
1886   p.data = data;
1887   p.datalen = datalen;
1888   p.gso_size = gso_size;
1889 }
1890 
send_blocked_packet()1891 int Http3Upstream::send_blocked_packet() {
1892   int rv;
1893 
1894   assert(tx_.send_blocked);
1895 
1896   for (; tx_.num_blocked_sent < tx_.num_blocked; ++tx_.num_blocked_sent) {
1897     auto &p = tx_.blocked[tx_.num_blocked_sent];
1898 
1899     rv = send_packet(p.faddr, &p.remote_addr.su.sa, p.remote_addr.len,
1900                      &p.local_addr.su.sa, p.local_addr.len, p.pi, p.data,
1901                      p.datalen, p.gso_size);
1902     if (rv == SHRPX_ERR_SEND_BLOCKED) {
1903       signal_write_upstream_addr(p.faddr);
1904 
1905       return 0;
1906     }
1907   }
1908 
1909   tx_.send_blocked = false;
1910   tx_.num_blocked = 0;
1911   tx_.num_blocked_sent = 0;
1912 
1913   return 0;
1914 }
1915 
signal_write_upstream_addr(const UpstreamAddr * faddr)1916 void Http3Upstream::signal_write_upstream_addr(const UpstreamAddr *faddr) {
1917   auto conn = handler_->get_connection();
1918 
1919   if (faddr->fd != conn->wev.fd) {
1920     if (ev_is_active(&conn->wev)) {
1921       ev_io_stop(handler_->get_loop(), &conn->wev);
1922     }
1923 
1924     ev_io_set(&conn->wev, faddr->fd, EV_WRITE);
1925   }
1926 
1927   conn->wlimit.startw();
1928 }
1929 
handle_error()1930 int Http3Upstream::handle_error() {
1931   if (ngtcp2_conn_in_closing_period(conn_) ||
1932       ngtcp2_conn_in_draining_period(conn_)) {
1933     return -1;
1934   }
1935 
1936   ngtcp2_path_storage ps;
1937   ngtcp2_pkt_info pi;
1938 
1939   ngtcp2_path_storage_zero(&ps);
1940 
1941   auto ts = quic_timestamp();
1942 
1943   conn_close_.resize(SHRPX_QUIC_CONN_CLOSE_PKTLEN);
1944 
1945   auto nwrite = ngtcp2_conn_write_connection_close(
1946       conn_, &ps.path, &pi, conn_close_.data(), conn_close_.size(),
1947       &last_error_, ts);
1948   if (nwrite < 0) {
1949     ULOG(ERROR, this) << "ngtcp2_conn_write_connection_close: "
1950                       << ngtcp2_strerror(nwrite);
1951     return -1;
1952   }
1953 
1954   conn_close_.resize(nwrite);
1955 
1956   if (nwrite == 0) {
1957     return -1;
1958   }
1959 
1960   send_packet(static_cast<UpstreamAddr *>(ps.path.user_data),
1961               ps.path.remote.addr, ps.path.remote.addrlen, ps.path.local.addr,
1962               ps.path.local.addrlen, pi, conn_close_.data(), nwrite, 0);
1963 
1964   return -1;
1965 }
1966 
handle_expiry()1967 int Http3Upstream::handle_expiry() {
1968   int rv;
1969 
1970   auto ts = quic_timestamp();
1971 
1972   rv = ngtcp2_conn_handle_expiry(conn_, ts);
1973   if (rv != 0) {
1974     if (rv == NGTCP2_ERR_IDLE_CLOSE) {
1975       ULOG(INFO, this) << "Idle connection timeout";
1976     } else {
1977       ULOG(ERROR, this) << "ngtcp2_conn_handle_expiry: " << ngtcp2_strerror(rv);
1978     }
1979     ngtcp2_ccerr_set_liberr(&last_error_, rv, nullptr, 0);
1980     return handle_error();
1981   }
1982 
1983   return 0;
1984 }
1985 
reset_timer()1986 void Http3Upstream::reset_timer() {
1987   auto ts = quic_timestamp();
1988   auto expiry_ts = ngtcp2_conn_get_expiry(conn_);
1989   auto loop = handler_->get_loop();
1990 
1991   if (expiry_ts <= ts) {
1992     ev_feed_event(loop, &timer_, EV_TIMER);
1993     return;
1994   }
1995 
1996   timer_.repeat = static_cast<ev_tstamp>(expiry_ts - ts) / NGTCP2_SECONDS;
1997 
1998   ev_timer_again(loop, &timer_);
1999 }
2000 
2001 namespace {
http_deferred_consume(nghttp3_conn * conn,int64_t stream_id,size_t nconsumed,void * user_data,void * stream_user_data)2002 int http_deferred_consume(nghttp3_conn *conn, int64_t stream_id,
2003                           size_t nconsumed, void *user_data,
2004                           void *stream_user_data) {
2005   auto upstream = static_cast<Http3Upstream *>(user_data);
2006 
2007   upstream->consume(stream_id, nconsumed);
2008 
2009   return 0;
2010 }
2011 } // namespace
2012 
2013 namespace {
http_acked_stream_data(nghttp3_conn * conn,int64_t stream_id,uint64_t datalen,void * user_data,void * stream_user_data)2014 int http_acked_stream_data(nghttp3_conn *conn, int64_t stream_id,
2015                            uint64_t datalen, void *user_data,
2016                            void *stream_user_data) {
2017   auto upstream = static_cast<Http3Upstream *>(user_data);
2018   auto downstream = static_cast<Downstream *>(stream_user_data);
2019 
2020   assert(downstream);
2021 
2022   if (upstream->http_acked_stream_data(downstream, datalen) != 0) {
2023     return NGHTTP3_ERR_CALLBACK_FAILURE;
2024   }
2025 
2026   return 0;
2027 }
2028 } // namespace
2029 
http_acked_stream_data(Downstream * downstream,uint64_t datalen)2030 int Http3Upstream::http_acked_stream_data(Downstream *downstream,
2031                                           uint64_t datalen) {
2032   if (LOG_ENABLED(INFO)) {
2033     ULOG(INFO, this) << "Stream " << downstream->get_stream_id() << " "
2034                      << datalen << " bytes acknowledged";
2035   }
2036 
2037   auto body = downstream->get_response_buf();
2038   auto drained = body->drain_mark(datalen);
2039   (void)drained;
2040 
2041   assert(datalen == drained);
2042 
2043   if (downstream->resume_read(SHRPX_NO_BUFFER, datalen) != 0) {
2044     return -1;
2045   }
2046 
2047   return 0;
2048 }
2049 
2050 namespace {
http_begin_request_headers(nghttp3_conn * conn,int64_t stream_id,void * user_data,void * stream_user_data)2051 int http_begin_request_headers(nghttp3_conn *conn, int64_t stream_id,
2052                                void *user_data, void *stream_user_data) {
2053   if (!ngtcp2_is_bidi_stream(stream_id)) {
2054     return 0;
2055   }
2056 
2057   auto upstream = static_cast<Http3Upstream *>(user_data);
2058   upstream->http_begin_request_headers(stream_id);
2059 
2060   return 0;
2061 }
2062 } // namespace
2063 
2064 namespace {
http_recv_request_header(nghttp3_conn * conn,int64_t stream_id,int32_t token,nghttp3_rcbuf * name,nghttp3_rcbuf * value,uint8_t flags,void * user_data,void * stream_user_data)2065 int http_recv_request_header(nghttp3_conn *conn, int64_t stream_id,
2066                              int32_t token, nghttp3_rcbuf *name,
2067                              nghttp3_rcbuf *value, uint8_t flags,
2068                              void *user_data, void *stream_user_data) {
2069   auto upstream = static_cast<Http3Upstream *>(user_data);
2070   auto downstream = static_cast<Downstream *>(stream_user_data);
2071 
2072   if (!downstream || downstream->get_stop_reading()) {
2073     return 0;
2074   }
2075 
2076   if (upstream->http_recv_request_header(downstream, token, name, value, flags,
2077                                          /* trailer = */ false) != 0) {
2078     return NGHTTP3_ERR_CALLBACK_FAILURE;
2079   }
2080 
2081   return 0;
2082 }
2083 } // namespace
2084 
2085 namespace {
http_recv_request_trailer(nghttp3_conn * conn,int64_t stream_id,int32_t token,nghttp3_rcbuf * name,nghttp3_rcbuf * value,uint8_t flags,void * user_data,void * stream_user_data)2086 int http_recv_request_trailer(nghttp3_conn *conn, int64_t stream_id,
2087                               int32_t token, nghttp3_rcbuf *name,
2088                               nghttp3_rcbuf *value, uint8_t flags,
2089                               void *user_data, void *stream_user_data) {
2090   auto upstream = static_cast<Http3Upstream *>(user_data);
2091   auto downstream = static_cast<Downstream *>(stream_user_data);
2092 
2093   if (!downstream || downstream->get_stop_reading()) {
2094     return 0;
2095   }
2096 
2097   if (upstream->http_recv_request_header(downstream, token, name, value, flags,
2098                                          /* trailer = */ true) != 0) {
2099     return NGHTTP3_ERR_CALLBACK_FAILURE;
2100   }
2101 
2102   return 0;
2103 }
2104 } // namespace
2105 
http_recv_request_header(Downstream * downstream,int32_t h3token,nghttp3_rcbuf * name,nghttp3_rcbuf * value,uint8_t flags,bool trailer)2106 int Http3Upstream::http_recv_request_header(Downstream *downstream,
2107                                             int32_t h3token,
2108                                             nghttp3_rcbuf *name,
2109                                             nghttp3_rcbuf *value, uint8_t flags,
2110                                             bool trailer) {
2111   auto namebuf = nghttp3_rcbuf_get_buf(name);
2112   auto valuebuf = nghttp3_rcbuf_get_buf(value);
2113   auto &req = downstream->request();
2114   auto config = get_config();
2115   auto &httpconf = config->http;
2116 
2117   if (req.fs.buffer_size() + namebuf.len + valuebuf.len >
2118           httpconf.request_header_field_buffer ||
2119       req.fs.num_fields() >= httpconf.max_request_header_fields) {
2120     downstream->set_stop_reading(true);
2121 
2122     if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
2123       return 0;
2124     }
2125 
2126     if (LOG_ENABLED(INFO)) {
2127       ULOG(INFO, this) << "Too large or many header field size="
2128                        << req.fs.buffer_size() + namebuf.len + valuebuf.len
2129                        << ", num=" << req.fs.num_fields() + 1;
2130     }
2131 
2132     // just ignore if this is a trailer part.
2133     if (trailer) {
2134       return 0;
2135     }
2136 
2137     if (error_reply(downstream, 431) != 0) {
2138       return -1;
2139     }
2140 
2141     return 0;
2142   }
2143 
2144   auto token = http2::lookup_token(namebuf.base, namebuf.len);
2145   auto no_index = flags & NGHTTP3_NV_FLAG_NEVER_INDEX;
2146 
2147   downstream->add_rcbuf(name);
2148   downstream->add_rcbuf(value);
2149 
2150   if (trailer) {
2151     req.fs.add_trailer_token(StringRef{namebuf.base, namebuf.len},
2152                              StringRef{valuebuf.base, valuebuf.len}, no_index,
2153                              token);
2154     return 0;
2155   }
2156 
2157   req.fs.add_header_token(StringRef{namebuf.base, namebuf.len},
2158                           StringRef{valuebuf.base, valuebuf.len}, no_index,
2159                           token);
2160   return 0;
2161 }
2162 
2163 namespace {
http_end_request_headers(nghttp3_conn * conn,int64_t stream_id,int fin,void * user_data,void * stream_user_data)2164 int http_end_request_headers(nghttp3_conn *conn, int64_t stream_id, int fin,
2165                              void *user_data, void *stream_user_data) {
2166   auto upstream = static_cast<Http3Upstream *>(user_data);
2167   auto handler = upstream->get_client_handler();
2168   auto downstream = static_cast<Downstream *>(stream_user_data);
2169 
2170   if (!downstream || downstream->get_stop_reading()) {
2171     return 0;
2172   }
2173 
2174   if (upstream->http_end_request_headers(downstream, fin) != 0) {
2175     return NGHTTP3_ERR_CALLBACK_FAILURE;
2176   }
2177 
2178   downstream->reset_upstream_rtimer();
2179   handler->stop_read_timer();
2180 
2181   return 0;
2182 }
2183 } // namespace
2184 
http_end_request_headers(Downstream * downstream,int fin)2185 int Http3Upstream::http_end_request_headers(Downstream *downstream, int fin) {
2186   auto lgconf = log_config();
2187   lgconf->update_tstamp(std::chrono::system_clock::now());
2188   auto &req = downstream->request();
2189   req.tstamp = lgconf->tstamp;
2190 
2191   if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
2192     return 0;
2193   }
2194 
2195   auto &nva = req.fs.headers();
2196 
2197   if (LOG_ENABLED(INFO)) {
2198     std::stringstream ss;
2199     for (auto &nv : nva) {
2200       if (nv.name == "authorization") {
2201         ss << TTY_HTTP_HD << nv.name << TTY_RST << ": <redacted>\n";
2202         continue;
2203       }
2204       ss << TTY_HTTP_HD << nv.name << TTY_RST << ": " << nv.value << "\n";
2205     }
2206     ULOG(INFO, this) << "HTTP request headers. stream_id="
2207                      << downstream->get_stream_id() << "\n"
2208                      << ss.str();
2209   }
2210 
2211   auto content_length = req.fs.header(http2::HD_CONTENT_LENGTH);
2212   if (content_length) {
2213     // libnghttp3 guarantees this can be parsed
2214     req.fs.content_length = util::parse_uint(content_length->value);
2215   }
2216 
2217   // presence of mandatory header fields are guaranteed by libnghttp3.
2218   auto authority = req.fs.header(http2::HD__AUTHORITY);
2219   auto path = req.fs.header(http2::HD__PATH);
2220   auto method = req.fs.header(http2::HD__METHOD);
2221   auto scheme = req.fs.header(http2::HD__SCHEME);
2222 
2223   auto method_token = http2::lookup_method_token(method->value);
2224   if (method_token == -1) {
2225     if (error_reply(downstream, 501) != 0) {
2226       return -1;
2227     }
2228     return 0;
2229   }
2230 
2231   auto faddr = handler_->get_upstream_addr();
2232 
2233   auto config = get_config();
2234 
2235   // For HTTP/2 proxy, we require :authority.
2236   if (method_token != HTTP_CONNECT && config->http2_proxy &&
2237       faddr->alt_mode == UpstreamAltMode::NONE && !authority) {
2238     shutdown_stream(downstream, NGHTTP3_H3_GENERAL_PROTOCOL_ERROR);
2239     return 0;
2240   }
2241 
2242   req.method = method_token;
2243   if (scheme) {
2244     req.scheme = scheme->value;
2245   }
2246 
2247   // nghttp2 library guarantees either :authority or host exist
2248   if (!authority) {
2249     req.no_authority = true;
2250     authority = req.fs.header(http2::HD_HOST);
2251   }
2252 
2253   if (authority) {
2254     req.authority = authority->value;
2255   }
2256 
2257   if (path) {
2258     if (method_token == HTTP_OPTIONS &&
2259         path->value == StringRef::from_lit("*")) {
2260       // Server-wide OPTIONS request.  Path is empty.
2261     } else if (config->http2_proxy &&
2262                faddr->alt_mode == UpstreamAltMode::NONE) {
2263       req.path = path->value;
2264     } else {
2265       req.path = http2::rewrite_clean_path(downstream->get_block_allocator(),
2266                                            path->value);
2267     }
2268   }
2269 
2270   auto connect_proto = req.fs.header(http2::HD__PROTOCOL);
2271   if (connect_proto) {
2272     if (connect_proto->value != "websocket") {
2273       if (error_reply(downstream, 400) != 0) {
2274         return -1;
2275       }
2276       return 0;
2277     }
2278     req.connect_proto = ConnectProto::WEBSOCKET;
2279   }
2280 
2281   if (!fin) {
2282     req.http2_expect_body = true;
2283   } else if (req.fs.content_length == -1) {
2284     req.fs.content_length = 0;
2285   }
2286 
2287   downstream->inspect_http2_request();
2288 
2289   downstream->set_request_state(DownstreamState::HEADER_COMPLETE);
2290 
2291   if (config->http.require_http_scheme &&
2292       !http::check_http_scheme(req.scheme, /* encrypted = */ true)) {
2293     if (error_reply(downstream, 400) != 0) {
2294       return -1;
2295     }
2296   }
2297 
2298 #ifdef HAVE_MRUBY
2299   auto worker = handler_->get_worker();
2300   auto mruby_ctx = worker->get_mruby_context();
2301 
2302   if (mruby_ctx->run_on_request_proc(downstream) != 0) {
2303     if (error_reply(downstream, 500) != 0) {
2304       return -1;
2305     }
2306     return 0;
2307   }
2308 #endif // HAVE_MRUBY
2309 
2310   if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
2311     return 0;
2312   }
2313 
2314   start_downstream(downstream);
2315 
2316   return 0;
2317 }
2318 
start_downstream(Downstream * downstream)2319 void Http3Upstream::start_downstream(Downstream *downstream) {
2320   if (downstream_queue_.can_activate(downstream->request().authority)) {
2321     initiate_downstream(downstream);
2322     return;
2323   }
2324 
2325   downstream_queue_.mark_blocked(downstream);
2326 }
2327 
initiate_downstream(Downstream * downstream)2328 void Http3Upstream::initiate_downstream(Downstream *downstream) {
2329   int rv;
2330 
2331 #ifdef HAVE_MRUBY
2332   DownstreamConnection *dconn_ptr;
2333 #endif // HAVE_MRUBY
2334 
2335   for (;;) {
2336     auto dconn = handler_->get_downstream_connection(rv, downstream);
2337     if (!dconn) {
2338       if (rv == SHRPX_ERR_TLS_REQUIRED) {
2339         assert(0);
2340         abort();
2341       }
2342 
2343       rv = error_reply(downstream, 502);
2344       if (rv != 0) {
2345         shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2346       }
2347 
2348       downstream->set_request_state(DownstreamState::CONNECT_FAIL);
2349       downstream_queue_.mark_failure(downstream);
2350 
2351       return;
2352     }
2353 
2354 #ifdef HAVE_MRUBY
2355     dconn_ptr = dconn.get();
2356 #endif // HAVE_MRUBY
2357     rv = downstream->attach_downstream_connection(std::move(dconn));
2358     if (rv == 0) {
2359       break;
2360     }
2361   }
2362 
2363 #ifdef HAVE_MRUBY
2364   const auto &group = dconn_ptr->get_downstream_addr_group();
2365   if (group) {
2366     const auto &mruby_ctx = group->shared_addr->mruby_ctx;
2367     if (mruby_ctx->run_on_request_proc(downstream) != 0) {
2368       if (error_reply(downstream, 500) != 0) {
2369         shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2370       }
2371 
2372       downstream_queue_.mark_failure(downstream);
2373 
2374       return;
2375     }
2376 
2377     if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
2378       return;
2379     }
2380   }
2381 #endif // HAVE_MRUBY
2382 
2383   rv = downstream->push_request_headers();
2384   if (rv != 0) {
2385 
2386     if (error_reply(downstream, 502) != 0) {
2387       shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2388     }
2389 
2390     downstream_queue_.mark_failure(downstream);
2391 
2392     return;
2393   }
2394 
2395   downstream_queue_.mark_active(downstream);
2396 
2397   auto &req = downstream->request();
2398   if (!req.http2_expect_body) {
2399     rv = downstream->end_upload_data();
2400     if (rv != 0) {
2401       shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2402     }
2403   }
2404 }
2405 
2406 namespace {
http_recv_data(nghttp3_conn * conn,int64_t stream_id,const uint8_t * data,size_t datalen,void * user_data,void * stream_user_data)2407 int http_recv_data(nghttp3_conn *conn, int64_t stream_id, const uint8_t *data,
2408                    size_t datalen, void *user_data, void *stream_user_data) {
2409   auto upstream = static_cast<Http3Upstream *>(user_data);
2410   auto downstream = static_cast<Downstream *>(stream_user_data);
2411 
2412   if (upstream->http_recv_data(downstream, data, datalen) != 0) {
2413     return NGHTTP3_ERR_CALLBACK_FAILURE;
2414   }
2415 
2416   return 0;
2417 }
2418 } // namespace
2419 
http_recv_data(Downstream * downstream,const uint8_t * data,size_t datalen)2420 int Http3Upstream::http_recv_data(Downstream *downstream, const uint8_t *data,
2421                                   size_t datalen) {
2422   downstream->reset_upstream_rtimer();
2423 
2424   if (downstream->push_upload_data_chunk(data, datalen) != 0) {
2425     if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE) {
2426       shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2427     }
2428 
2429     consume(downstream->get_stream_id(), datalen);
2430 
2431     return 0;
2432   }
2433 
2434   return 0;
2435 }
2436 
2437 namespace {
http_end_stream(nghttp3_conn * conn,int64_t stream_id,void * user_data,void * stream_user_data)2438 int http_end_stream(nghttp3_conn *conn, int64_t stream_id, void *user_data,
2439                     void *stream_user_data) {
2440   auto upstream = static_cast<Http3Upstream *>(user_data);
2441   auto downstream = static_cast<Downstream *>(stream_user_data);
2442 
2443   if (!downstream || downstream->get_stop_reading()) {
2444     return 0;
2445   }
2446 
2447   if (upstream->http_end_stream(downstream) != 0) {
2448     return NGHTTP3_ERR_CALLBACK_FAILURE;
2449   }
2450 
2451   return 0;
2452 }
2453 } // namespace
2454 
http_end_stream(Downstream * downstream)2455 int Http3Upstream::http_end_stream(Downstream *downstream) {
2456   downstream->disable_upstream_rtimer();
2457 
2458   if (downstream->end_upload_data() != 0) {
2459     if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE) {
2460       shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2461     }
2462   }
2463 
2464   downstream->set_request_state(DownstreamState::MSG_COMPLETE);
2465 
2466   return 0;
2467 }
2468 
2469 namespace {
http_stream_close(nghttp3_conn * conn,int64_t stream_id,uint64_t app_error_code,void * conn_user_data,void * stream_user_data)2470 int http_stream_close(nghttp3_conn *conn, int64_t stream_id,
2471                       uint64_t app_error_code, void *conn_user_data,
2472                       void *stream_user_data) {
2473   auto upstream = static_cast<Http3Upstream *>(conn_user_data);
2474   auto downstream = static_cast<Downstream *>(stream_user_data);
2475 
2476   if (!downstream) {
2477     return 0;
2478   }
2479 
2480   if (upstream->http_stream_close(downstream, app_error_code) != 0) {
2481     return NGHTTP3_ERR_CALLBACK_FAILURE;
2482   }
2483 
2484   return 0;
2485 }
2486 } // namespace
2487 
http_stream_close(Downstream * downstream,uint64_t app_error_code)2488 int Http3Upstream::http_stream_close(Downstream *downstream,
2489                                      uint64_t app_error_code) {
2490   auto stream_id = downstream->get_stream_id();
2491 
2492   if (LOG_ENABLED(INFO)) {
2493     ULOG(INFO, this) << "Stream stream_id=" << stream_id
2494                      << " is being closed with app_error_code="
2495                      << app_error_code;
2496 
2497     auto body = downstream->get_response_buf();
2498 
2499     ULOG(INFO, this) << "response unacked_left=" << body->rleft()
2500                      << " not_sent=" << body->rleft_mark();
2501   }
2502 
2503   auto &req = downstream->request();
2504 
2505   consume(stream_id, req.unconsumed_body_length);
2506 
2507   req.unconsumed_body_length = 0;
2508 
2509   ngtcp2_conn_extend_max_streams_bidi(conn_, 1);
2510 
2511   if (downstream->get_request_state() == DownstreamState::CONNECT_FAIL) {
2512     remove_downstream(downstream);
2513     // downstream was deleted
2514 
2515     return 0;
2516   }
2517 
2518   if (downstream->can_detach_downstream_connection()) {
2519     // Keep-alive
2520     downstream->detach_downstream_connection();
2521   }
2522 
2523   downstream->set_request_state(DownstreamState::STREAM_CLOSED);
2524 
2525   // At this point, downstream read may be paused.
2526 
2527   // If shrpx_downstream::push_request_headers() failed, the
2528   // error is handled here.
2529   remove_downstream(downstream);
2530   // downstream was deleted
2531 
2532   return 0;
2533 }
2534 
2535 namespace {
http_stop_sending(nghttp3_conn * conn,int64_t stream_id,uint64_t app_error_code,void * user_data,void * stream_user_data)2536 int http_stop_sending(nghttp3_conn *conn, int64_t stream_id,
2537                       uint64_t app_error_code, void *user_data,
2538                       void *stream_user_data) {
2539   auto upstream = static_cast<Http3Upstream *>(user_data);
2540 
2541   if (upstream->http_stop_sending(stream_id, app_error_code) != 0) {
2542     return NGHTTP3_ERR_CALLBACK_FAILURE;
2543   }
2544 
2545   return 0;
2546 }
2547 } // namespace
2548 
http_stop_sending(int64_t stream_id,uint64_t app_error_code)2549 int Http3Upstream::http_stop_sending(int64_t stream_id,
2550                                      uint64_t app_error_code) {
2551   auto rv =
2552       ngtcp2_conn_shutdown_stream_read(conn_, 0, stream_id, app_error_code);
2553   if (ngtcp2_err_is_fatal(rv)) {
2554     ULOG(ERROR, this) << "ngtcp2_conn_shutdown_stream_read: "
2555                       << ngtcp2_strerror(rv);
2556     return -1;
2557   }
2558 
2559   return 0;
2560 }
2561 
2562 namespace {
http_reset_stream(nghttp3_conn * conn,int64_t stream_id,uint64_t app_error_code,void * user_data,void * stream_user_data)2563 int http_reset_stream(nghttp3_conn *conn, int64_t stream_id,
2564                       uint64_t app_error_code, void *user_data,
2565                       void *stream_user_data) {
2566   auto upstream = static_cast<Http3Upstream *>(user_data);
2567 
2568   if (upstream->http_reset_stream(stream_id, app_error_code) != 0) {
2569     return NGHTTP3_ERR_CALLBACK_FAILURE;
2570   }
2571 
2572   return 0;
2573 }
2574 } // namespace
2575 
http_reset_stream(int64_t stream_id,uint64_t app_error_code)2576 int Http3Upstream::http_reset_stream(int64_t stream_id,
2577                                      uint64_t app_error_code) {
2578   auto rv =
2579       ngtcp2_conn_shutdown_stream_write(conn_, 0, stream_id, app_error_code);
2580   if (ngtcp2_err_is_fatal(rv)) {
2581     ULOG(ERROR, this) << "ngtcp2_conn_shutdown_stream_write: "
2582                       << ngtcp2_strerror(rv);
2583     return -1;
2584   }
2585 
2586   return 0;
2587 }
2588 
setup_httpconn()2589 int Http3Upstream::setup_httpconn() {
2590   int rv;
2591 
2592   if (ngtcp2_conn_get_streams_uni_left(conn_) < 3) {
2593     return -1;
2594   }
2595 
2596   nghttp3_callbacks callbacks{
2597       shrpx::http_acked_stream_data,
2598       shrpx::http_stream_close,
2599       shrpx::http_recv_data,
2600       http_deferred_consume,
2601       shrpx::http_begin_request_headers,
2602       shrpx::http_recv_request_header,
2603       shrpx::http_end_request_headers,
2604       nullptr, // begin_trailers
2605       shrpx::http_recv_request_trailer,
2606       nullptr, // end_trailers
2607       shrpx::http_stop_sending,
2608       shrpx::http_end_stream,
2609       shrpx::http_reset_stream,
2610   };
2611 
2612   auto config = get_config();
2613 
2614   nghttp3_settings settings;
2615   nghttp3_settings_default(&settings);
2616   settings.qpack_max_dtable_capacity = 4_k;
2617 
2618   if (!config->http2_proxy) {
2619     settings.enable_connect_protocol = 1;
2620   }
2621 
2622   auto mem = nghttp3_mem_default();
2623 
2624   rv = nghttp3_conn_server_new(&httpconn_, &callbacks, &settings, mem, this);
2625   if (rv != 0) {
2626     ULOG(ERROR, this) << "nghttp3_conn_server_new: " << nghttp3_strerror(rv);
2627     return -1;
2628   }
2629 
2630   auto params = ngtcp2_conn_get_local_transport_params(conn_);
2631 
2632   nghttp3_conn_set_max_client_streams_bidi(httpconn_,
2633                                            params->initial_max_streams_bidi);
2634 
2635   int64_t ctrl_stream_id;
2636 
2637   rv = ngtcp2_conn_open_uni_stream(conn_, &ctrl_stream_id, nullptr);
2638   if (rv != 0) {
2639     ULOG(ERROR, this) << "ngtcp2_conn_open_uni_stream: " << ngtcp2_strerror(rv);
2640     return -1;
2641   }
2642 
2643   rv = nghttp3_conn_bind_control_stream(httpconn_, ctrl_stream_id);
2644   if (rv != 0) {
2645     ULOG(ERROR, this) << "nghttp3_conn_bind_control_stream: "
2646                       << nghttp3_strerror(rv);
2647     return -1;
2648   }
2649 
2650   int64_t qpack_enc_stream_id, qpack_dec_stream_id;
2651 
2652   rv = ngtcp2_conn_open_uni_stream(conn_, &qpack_enc_stream_id, nullptr);
2653   if (rv != 0) {
2654     ULOG(ERROR, this) << "ngtcp2_conn_open_uni_stream: " << ngtcp2_strerror(rv);
2655     return -1;
2656   }
2657 
2658   rv = ngtcp2_conn_open_uni_stream(conn_, &qpack_dec_stream_id, nullptr);
2659   if (rv != 0) {
2660     ULOG(ERROR, this) << "ngtcp2_conn_open_uni_stream: " << ngtcp2_strerror(rv);
2661     return -1;
2662   }
2663 
2664   rv = nghttp3_conn_bind_qpack_streams(httpconn_, qpack_enc_stream_id,
2665                                        qpack_dec_stream_id);
2666   if (rv != 0) {
2667     ULOG(ERROR, this) << "nghttp3_conn_bind_qpack_streams: "
2668                       << nghttp3_strerror(rv);
2669     return -1;
2670   }
2671 
2672   return 0;
2673 }
2674 
error_reply(Downstream * downstream,unsigned int status_code)2675 int Http3Upstream::error_reply(Downstream *downstream,
2676                                unsigned int status_code) {
2677   int rv;
2678   auto &resp = downstream->response();
2679 
2680   auto &balloc = downstream->get_block_allocator();
2681 
2682   auto html = http::create_error_html(balloc, status_code);
2683   resp.http_status = status_code;
2684   auto body = downstream->get_response_buf();
2685   body->append(html);
2686   downstream->set_response_state(DownstreamState::MSG_COMPLETE);
2687 
2688   nghttp3_data_reader data_read;
2689   data_read.read_data = downstream_read_data_callback;
2690 
2691   auto lgconf = log_config();
2692   lgconf->update_tstamp(std::chrono::system_clock::now());
2693 
2694   auto response_status = http2::stringify_status(balloc, status_code);
2695   auto content_length = util::make_string_ref_uint(balloc, html.size());
2696   auto date = make_string_ref(balloc, lgconf->tstamp->time_http);
2697 
2698   auto nva = std::array<nghttp3_nv, 5>{
2699       {http3::make_nv_ls_nocopy(":status", response_status),
2700        http3::make_nv_ll("content-type", "text/html; charset=UTF-8"),
2701        http3::make_nv_ls_nocopy("server", get_config()->http.server_name),
2702        http3::make_nv_ls_nocopy("content-length", content_length),
2703        http3::make_nv_ls_nocopy("date", date)}};
2704 
2705   rv = nghttp3_conn_submit_response(httpconn_, downstream->get_stream_id(),
2706                                     nva.data(), nva.size(), &data_read);
2707   if (nghttp3_err_is_fatal(rv)) {
2708     ULOG(FATAL, this) << "nghttp3_conn_submit_response() failed: "
2709                       << nghttp3_strerror(rv);
2710     return -1;
2711   }
2712 
2713   downstream->reset_upstream_wtimer();
2714 
2715   if (shutdown_stream_read(downstream->get_stream_id(), NGHTTP3_H3_NO_ERROR) !=
2716       0) {
2717     return -1;
2718   }
2719 
2720   return 0;
2721 }
2722 
shutdown_stream(Downstream * downstream,uint64_t app_error_code)2723 int Http3Upstream::shutdown_stream(Downstream *downstream,
2724                                    uint64_t app_error_code) {
2725   auto stream_id = downstream->get_stream_id();
2726 
2727   if (LOG_ENABLED(INFO)) {
2728     ULOG(INFO, this) << "Shutdown stream_id=" << stream_id
2729                      << " with app_error_code=" << app_error_code;
2730   }
2731 
2732   auto rv = ngtcp2_conn_shutdown_stream(conn_, 0, stream_id, app_error_code);
2733   if (rv != 0) {
2734     ULOG(FATAL, this) << "ngtcp2_conn_shutdown_stream() failed: "
2735                       << ngtcp2_strerror(rv);
2736     return -1;
2737   }
2738 
2739   return 0;
2740 }
2741 
shutdown_stream_read(int64_t stream_id,uint64_t app_error_code)2742 int Http3Upstream::shutdown_stream_read(int64_t stream_id,
2743                                         uint64_t app_error_code) {
2744   auto rv = ngtcp2_conn_shutdown_stream_read(conn_, 0, stream_id,
2745                                              NGHTTP3_H3_NO_ERROR);
2746   if (ngtcp2_err_is_fatal(rv)) {
2747     ULOG(FATAL, this) << "ngtcp2_conn_shutdown_stream_read: "
2748                       << ngtcp2_strerror(rv);
2749     return -1;
2750   }
2751 
2752   return 0;
2753 }
2754 
consume(int64_t stream_id,size_t nconsumed)2755 void Http3Upstream::consume(int64_t stream_id, size_t nconsumed) {
2756   ngtcp2_conn_extend_max_stream_offset(conn_, stream_id, nconsumed);
2757   ngtcp2_conn_extend_max_offset(conn_, nconsumed);
2758 }
2759 
remove_downstream(Downstream * downstream)2760 void Http3Upstream::remove_downstream(Downstream *downstream) {
2761   if (downstream->accesslog_ready()) {
2762     handler_->write_accesslog(downstream);
2763   }
2764 
2765   nghttp3_conn_set_stream_user_data(httpconn_, downstream->get_stream_id(),
2766                                     nullptr);
2767 
2768   auto next_downstream = downstream_queue_.remove_and_get_blocked(downstream);
2769 
2770   if (next_downstream) {
2771     initiate_downstream(next_downstream);
2772   }
2773 
2774   if (downstream_queue_.get_downstreams() == nullptr) {
2775     // There is no downstream at the moment.  Start idle timer now.
2776     handler_->repeat_read_timer();
2777   }
2778 }
2779 
log_response_headers(Downstream * downstream,const std::vector<nghttp3_nv> & nva) const2780 void Http3Upstream::log_response_headers(
2781     Downstream *downstream, const std::vector<nghttp3_nv> &nva) const {
2782   std::stringstream ss;
2783   for (auto &nv : nva) {
2784     ss << TTY_HTTP_HD << StringRef{nv.name, nv.namelen} << TTY_RST << ": "
2785        << StringRef{nv.value, nv.valuelen} << "\n";
2786   }
2787   ULOG(INFO, this) << "HTTP response headers. stream_id="
2788                    << downstream->get_stream_id() << "\n"
2789                    << ss.str();
2790 }
2791 
check_shutdown()2792 int Http3Upstream::check_shutdown() {
2793   auto worker = handler_->get_worker();
2794 
2795   if (!worker->get_graceful_shutdown()) {
2796     return 0;
2797   }
2798 
2799   ev_prepare_stop(handler_->get_loop(), &prep_);
2800 
2801   return start_graceful_shutdown();
2802 }
2803 
start_graceful_shutdown()2804 int Http3Upstream::start_graceful_shutdown() {
2805   int rv;
2806 
2807   if (ev_is_active(&shutdown_timer_)) {
2808     return 0;
2809   }
2810 
2811   if (!httpconn_) {
2812     return -1;
2813   }
2814 
2815   rv = nghttp3_conn_submit_shutdown_notice(httpconn_);
2816   if (rv != 0) {
2817     ULOG(FATAL, this) << "nghttp3_conn_submit_shutdown_notice: "
2818                       << nghttp3_strerror(rv);
2819     return -1;
2820   }
2821 
2822   handler_->signal_write();
2823 
2824   auto t = ngtcp2_conn_get_pto(conn_);
2825 
2826   ev_timer_set(&shutdown_timer_, static_cast<ev_tstamp>(t * 3) / NGTCP2_SECONDS,
2827                0.);
2828   ev_timer_start(handler_->get_loop(), &shutdown_timer_);
2829 
2830   return 0;
2831 }
2832 
submit_goaway()2833 int Http3Upstream::submit_goaway() {
2834   int rv;
2835 
2836   rv = nghttp3_conn_shutdown(httpconn_);
2837   if (rv != 0) {
2838     ULOG(FATAL, this) << "nghttp3_conn_shutdown: " << nghttp3_strerror(rv);
2839     return -1;
2840   }
2841 
2842   handler_->signal_write();
2843 
2844   return 0;
2845 }
2846 
open_qlog_file(const StringRef & dir,const ngtcp2_cid & scid) const2847 int Http3Upstream::open_qlog_file(const StringRef &dir,
2848                                   const ngtcp2_cid &scid) const {
2849   std::array<char, sizeof("20141115T125824.741+0900")> buf;
2850 
2851   auto path = dir.str();
2852   path += '/';
2853   path +=
2854       util::format_iso8601_basic(buf.data(), std::chrono::system_clock::now());
2855   path += '-';
2856   path += util::format_hex(scid.data, scid.datalen);
2857   path += ".sqlog";
2858 
2859   int fd;
2860 
2861 #ifdef O_CLOEXEC
2862   while ((fd = open(path.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_CLOEXEC,
2863                     S_IRUSR | S_IWUSR | S_IRGRP)) == -1 &&
2864          errno == EINTR)
2865     ;
2866 #else  // !O_CLOEXEC
2867   while ((fd = open(path.c_str(), O_WRONLY | O_CREAT | O_TRUNC,
2868                     S_IRUSR | S_IWUSR | S_IRGRP)) == -1 &&
2869          errno == EINTR)
2870     ;
2871 
2872   if (fd != -1) {
2873     util::make_socket_closeonexec(fd);
2874   }
2875 #endif // !O_CLOEXEC
2876 
2877   if (fd == -1) {
2878     auto error = errno;
2879     ULOG(ERROR, this) << "Failed to open qlog file " << path
2880                       << ": errno=" << error;
2881     return -1;
2882   }
2883 
2884   return fd;
2885 }
2886 
get_conn() const2887 ngtcp2_conn *Http3Upstream::get_conn() const { return conn_; }
2888 
2889 } // namespace shrpx
2890