1 /*
2 * nghttp2 - HTTP/2 C Library
3 *
4 * Copyright (c) 2021 Tatsuhiro Tsujikawa
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining
7 * a copy of this software and associated documentation files (the
8 * "Software"), to deal in the Software without restriction, including
9 * without limitation the rights to use, copy, modify, merge, publish,
10 * distribute, sublicense, and/or sell copies of the Software, and to
11 * permit persons to whom the Software is furnished to do so, subject to
12 * the following conditions:
13 *
14 * The above copyright notice and this permission notice shall be
15 * included in all copies or substantial portions of the Software.
16 *
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24 */
25 #include "shrpx_http3_upstream.h"
26
27 #include <sys/types.h>
28 #include <sys/stat.h>
29 #include <fcntl.h>
30 #include <netinet/udp.h>
31
32 #include <cstdio>
33
34 #include <ngtcp2/ngtcp2_crypto.h>
35
36 #include "shrpx_client_handler.h"
37 #include "shrpx_downstream.h"
38 #include "shrpx_downstream_connection.h"
39 #include "shrpx_log.h"
40 #include "shrpx_quic.h"
41 #include "shrpx_worker.h"
42 #include "shrpx_http.h"
43 #include "shrpx_connection_handler.h"
44 #ifdef HAVE_MRUBY
45 # include "shrpx_mruby.h"
46 #endif // HAVE_MRUBY
47 #include "http3.h"
48 #include "util.h"
49 #include "ssl_compat.h"
50
51 namespace shrpx {
52
53 namespace {
timeoutcb(struct ev_loop * loop,ev_timer * w,int revents)54 void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
55 auto upstream = static_cast<Http3Upstream *>(w->data);
56
57 if (upstream->handle_expiry() != 0 || upstream->on_write() != 0) {
58 goto fail;
59 }
60
61 return;
62
63 fail:
64 auto handler = upstream->get_client_handler();
65
66 delete handler;
67 }
68 } // namespace
69
70 namespace {
shutdown_timeout_cb(struct ev_loop * loop,ev_timer * w,int revents)71 void shutdown_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
72 auto upstream = static_cast<Http3Upstream *>(w->data);
73 auto handler = upstream->get_client_handler();
74
75 if (upstream->submit_goaway() != 0) {
76 delete handler;
77 }
78 }
79 } // namespace
80
81 namespace {
prepare_cb(struct ev_loop * loop,ev_prepare * w,int revent)82 void prepare_cb(struct ev_loop *loop, ev_prepare *w, int revent) {
83 auto upstream = static_cast<Http3Upstream *>(w->data);
84 auto handler = upstream->get_client_handler();
85
86 if (upstream->check_shutdown() != 0) {
87 delete handler;
88 }
89 }
90 } // namespace
91
92 namespace {
downstream_queue_size(Worker * worker)93 size_t downstream_queue_size(Worker *worker) {
94 auto &downstreamconf = *worker->get_downstream_config();
95
96 if (get_config()->http2_proxy) {
97 return downstreamconf.connections_per_host;
98 }
99
100 return downstreamconf.connections_per_frontend;
101 }
102 } // namespace
103
104 namespace {
get_conn(ngtcp2_crypto_conn_ref * conn_ref)105 ngtcp2_conn *get_conn(ngtcp2_crypto_conn_ref *conn_ref) {
106 auto conn = static_cast<Connection *>(conn_ref->user_data);
107 auto handler = static_cast<ClientHandler *>(conn->data);
108 auto upstream = static_cast<Http3Upstream *>(handler->get_upstream());
109 return upstream->get_conn();
110 }
111 } // namespace
112
Http3Upstream(ClientHandler * handler)113 Http3Upstream::Http3Upstream(ClientHandler *handler)
114 : handler_{handler},
115 qlog_fd_{-1},
116 hashed_scid_{},
117 conn_{nullptr},
118 httpconn_{nullptr},
119 downstream_queue_{downstream_queue_size(handler->get_worker()),
120 !get_config()->http2_proxy},
121 tx_{
122 .data = std::unique_ptr<uint8_t[]>(new uint8_t[64_k]),
123 #ifndef UDP_SEGMENT
124 .no_gso = true,
125 #endif // UDP_SEGMENT
126 } {
127 auto conn = handler_->get_connection();
128 conn->conn_ref.get_conn = shrpx::get_conn;
129
130 ev_timer_init(&timer_, timeoutcb, 0., 0.);
131 timer_.data = this;
132
133 ngtcp2_ccerr_default(&last_error_);
134
135 ev_timer_init(&shutdown_timer_, shutdown_timeout_cb, 0., 0.);
136 shutdown_timer_.data = this;
137
138 ev_prepare_init(&prep_, prepare_cb);
139 prep_.data = this;
140 ev_prepare_start(handler_->get_loop(), &prep_);
141 }
142
~Http3Upstream()143 Http3Upstream::~Http3Upstream() {
144 auto loop = handler_->get_loop();
145
146 ev_prepare_stop(loop, &prep_);
147 ev_timer_stop(loop, &shutdown_timer_);
148 ev_timer_stop(loop, &timer_);
149
150 nghttp3_conn_del(httpconn_);
151
152 ngtcp2_conn_del(conn_);
153
154 if (qlog_fd_ != -1) {
155 close(qlog_fd_);
156 }
157 }
158
159 namespace {
log_printf(void * user_data,const char * fmt,...)160 void log_printf(void *user_data, const char *fmt, ...) {
161 va_list ap;
162 std::array<char, 4096> buf;
163
164 va_start(ap, fmt);
165 auto nwrite = vsnprintf(buf.data(), buf.size(), fmt, ap);
166 va_end(ap);
167
168 if (static_cast<size_t>(nwrite) >= buf.size()) {
169 nwrite = buf.size() - 1;
170 }
171
172 buf[nwrite++] = '\n';
173
174 while (write(fileno(stderr), buf.data(), nwrite) == -1 && errno == EINTR)
175 ;
176 }
177 } // namespace
178
179 namespace {
qlog_write(void * user_data,uint32_t flags,const void * data,size_t datalen)180 void qlog_write(void *user_data, uint32_t flags, const void *data,
181 size_t datalen) {
182 auto upstream = static_cast<Http3Upstream *>(user_data);
183
184 upstream->qlog_write(data, datalen, flags & NGTCP2_QLOG_WRITE_FLAG_FIN);
185 }
186 } // namespace
187
qlog_write(const void * data,size_t datalen,bool fin)188 void Http3Upstream::qlog_write(const void *data, size_t datalen, bool fin) {
189 assert(qlog_fd_ != -1);
190
191 while (write(qlog_fd_, data, datalen) == -1 && errno == EINTR)
192 ;
193
194 if (fin) {
195 close(qlog_fd_);
196 qlog_fd_ = -1;
197 }
198 }
199
200 namespace {
rand(uint8_t * dest,size_t destlen,const ngtcp2_rand_ctx * rand_ctx)201 void rand(uint8_t *dest, size_t destlen, const ngtcp2_rand_ctx *rand_ctx) {
202 util::random_bytes(dest, dest + destlen,
203 *static_cast<std::mt19937 *>(rand_ctx->native_handle));
204 }
205 } // namespace
206
207 namespace {
get_new_connection_id(ngtcp2_conn * conn,ngtcp2_cid * cid,uint8_t * token,size_t cidlen,void * user_data)208 int get_new_connection_id(ngtcp2_conn *conn, ngtcp2_cid *cid, uint8_t *token,
209 size_t cidlen, void *user_data) {
210 auto upstream = static_cast<Http3Upstream *>(user_data);
211 auto handler = upstream->get_client_handler();
212 auto worker = handler->get_worker();
213 auto conn_handler = worker->get_connection_handler();
214 auto &qkms = conn_handler->get_quic_keying_materials();
215 auto &qkm = qkms->keying_materials.front();
216
217 assert(SHRPX_QUIC_SCIDLEN == cidlen);
218
219 if (generate_quic_connection_id(*cid, worker->get_worker_id(), qkm.id,
220 qkm.cid_encryption_ctx) != 0) {
221 return NGTCP2_ERR_CALLBACK_FAILURE;
222 }
223
224 if (generate_quic_stateless_reset_token(token, *cid, qkm.secret.data(),
225 qkm.secret.size()) != 0) {
226 return NGTCP2_ERR_CALLBACK_FAILURE;
227 }
228
229 auto quic_connection_handler = worker->get_quic_connection_handler();
230
231 quic_connection_handler->add_connection_id(*cid, handler);
232
233 return 0;
234 }
235 } // namespace
236
237 namespace {
remove_connection_id(ngtcp2_conn * conn,const ngtcp2_cid * cid,void * user_data)238 int remove_connection_id(ngtcp2_conn *conn, const ngtcp2_cid *cid,
239 void *user_data) {
240 auto upstream = static_cast<Http3Upstream *>(user_data);
241 auto handler = upstream->get_client_handler();
242 auto worker = handler->get_worker();
243 auto quic_conn_handler = worker->get_quic_connection_handler();
244
245 quic_conn_handler->remove_connection_id(*cid);
246
247 return 0;
248 }
249 } // namespace
250
http_begin_request_headers(int64_t stream_id)251 void Http3Upstream::http_begin_request_headers(int64_t stream_id) {
252 auto downstream =
253 std::make_unique<Downstream>(this, handler_->get_mcpool(), stream_id);
254 nghttp3_conn_set_stream_user_data(httpconn_, stream_id, downstream.get());
255
256 downstream->reset_upstream_rtimer();
257 downstream->repeat_header_timer();
258
259 handler_->stop_read_timer();
260
261 auto &req = downstream->request();
262 req.http_major = 3;
263 req.http_minor = 0;
264
265 add_pending_downstream(std::move(downstream));
266 }
267
add_pending_downstream(std::unique_ptr<Downstream> downstream)268 void Http3Upstream::add_pending_downstream(
269 std::unique_ptr<Downstream> downstream) {
270 downstream_queue_.add_pending(std::move(downstream));
271 }
272
273 namespace {
recv_stream_data(ngtcp2_conn * conn,uint32_t flags,int64_t stream_id,uint64_t offset,const uint8_t * data,size_t datalen,void * user_data,void * stream_user_data)274 int recv_stream_data(ngtcp2_conn *conn, uint32_t flags, int64_t stream_id,
275 uint64_t offset, const uint8_t *data, size_t datalen,
276 void *user_data, void *stream_user_data) {
277 auto upstream = static_cast<Http3Upstream *>(user_data);
278
279 if (upstream->recv_stream_data(flags, stream_id, {data, datalen}) != 0) {
280 return NGTCP2_ERR_CALLBACK_FAILURE;
281 }
282
283 return 0;
284 }
285 } // namespace
286
recv_stream_data(uint32_t flags,int64_t stream_id,std::span<const uint8_t> data)287 int Http3Upstream::recv_stream_data(uint32_t flags, int64_t stream_id,
288 std::span<const uint8_t> data) {
289 assert(httpconn_);
290
291 auto nconsumed =
292 nghttp3_conn_read_stream(httpconn_, stream_id, data.data(), data.size(),
293 flags & NGTCP2_STREAM_DATA_FLAG_FIN);
294 if (nconsumed < 0) {
295 ULOG(ERROR, this) << "nghttp3_conn_read_stream: "
296 << nghttp3_strerror(nconsumed);
297 ngtcp2_ccerr_set_application_error(
298 &last_error_, nghttp3_err_infer_quic_app_error_code(nconsumed), nullptr,
299 0);
300 return -1;
301 }
302
303 ngtcp2_conn_extend_max_stream_offset(conn_, stream_id, nconsumed);
304 ngtcp2_conn_extend_max_offset(conn_, nconsumed);
305
306 return 0;
307 }
308
309 namespace {
stream_close(ngtcp2_conn * conn,uint32_t flags,int64_t stream_id,uint64_t app_error_code,void * user_data,void * stream_user_data)310 int stream_close(ngtcp2_conn *conn, uint32_t flags, int64_t stream_id,
311 uint64_t app_error_code, void *user_data,
312 void *stream_user_data) {
313 auto upstream = static_cast<Http3Upstream *>(user_data);
314
315 if (!(flags & NGTCP2_STREAM_CLOSE_FLAG_APP_ERROR_CODE_SET)) {
316 app_error_code = NGHTTP3_H3_NO_ERROR;
317 }
318
319 if (upstream->stream_close(stream_id, app_error_code) != 0) {
320 return NGTCP2_ERR_CALLBACK_FAILURE;
321 }
322
323 return 0;
324 }
325 } // namespace
326
stream_close(int64_t stream_id,uint64_t app_error_code)327 int Http3Upstream::stream_close(int64_t stream_id, uint64_t app_error_code) {
328 if (!httpconn_) {
329 return 0;
330 }
331
332 auto rv = nghttp3_conn_close_stream(httpconn_, stream_id, app_error_code);
333 switch (rv) {
334 case 0:
335 break;
336 case NGHTTP3_ERR_STREAM_NOT_FOUND:
337 if (ngtcp2_is_bidi_stream(stream_id)) {
338 ngtcp2_conn_extend_max_streams_bidi(conn_, 1);
339 }
340 break;
341 default:
342 ULOG(ERROR, this) << "nghttp3_conn_close_stream: " << nghttp3_strerror(rv);
343 ngtcp2_ccerr_set_application_error(
344 &last_error_, nghttp3_err_infer_quic_app_error_code(rv), nullptr, 0);
345 return -1;
346 }
347
348 return 0;
349 }
350
351 namespace {
acked_stream_data_offset(ngtcp2_conn * conn,int64_t stream_id,uint64_t offset,uint64_t datalen,void * user_data,void * stream_user_data)352 int acked_stream_data_offset(ngtcp2_conn *conn, int64_t stream_id,
353 uint64_t offset, uint64_t datalen, void *user_data,
354 void *stream_user_data) {
355 auto upstream = static_cast<Http3Upstream *>(user_data);
356
357 if (upstream->acked_stream_data_offset(stream_id, datalen) != 0) {
358 return NGTCP2_ERR_CALLBACK_FAILURE;
359 }
360
361 return 0;
362 }
363 } // namespace
364
acked_stream_data_offset(int64_t stream_id,uint64_t datalen)365 int Http3Upstream::acked_stream_data_offset(int64_t stream_id,
366 uint64_t datalen) {
367 if (!httpconn_) {
368 return 0;
369 }
370
371 auto rv = nghttp3_conn_add_ack_offset(httpconn_, stream_id, datalen);
372 if (rv != 0) {
373 ULOG(ERROR, this) << "nghttp3_conn_add_ack_offset: "
374 << nghttp3_strerror(rv);
375 return -1;
376 }
377
378 return 0;
379 }
380
381 namespace {
extend_max_stream_data(ngtcp2_conn * conn,int64_t stream_id,uint64_t max_data,void * user_data,void * stream_user_data)382 int extend_max_stream_data(ngtcp2_conn *conn, int64_t stream_id,
383 uint64_t max_data, void *user_data,
384 void *stream_user_data) {
385 auto upstream = static_cast<Http3Upstream *>(user_data);
386
387 if (upstream->extend_max_stream_data(stream_id) != 0) {
388 return NGTCP2_ERR_CALLBACK_FAILURE;
389 }
390
391 return 0;
392 }
393 } // namespace
394
extend_max_stream_data(int64_t stream_id)395 int Http3Upstream::extend_max_stream_data(int64_t stream_id) {
396 if (!httpconn_) {
397 return 0;
398 }
399
400 auto rv = nghttp3_conn_unblock_stream(httpconn_, stream_id);
401 if (rv != 0) {
402 ULOG(ERROR, this) << "nghttp3_conn_unblock_stream: "
403 << nghttp3_strerror(rv);
404 return -1;
405 }
406
407 return 0;
408 }
409
410 namespace {
extend_max_remote_streams_bidi(ngtcp2_conn * conn,uint64_t max_streams,void * user_data)411 int extend_max_remote_streams_bidi(ngtcp2_conn *conn, uint64_t max_streams,
412 void *user_data) {
413 auto upstream = static_cast<Http3Upstream *>(user_data);
414
415 upstream->extend_max_remote_streams_bidi(max_streams);
416
417 return 0;
418 }
419 } // namespace
420
extend_max_remote_streams_bidi(uint64_t max_streams)421 void Http3Upstream::extend_max_remote_streams_bidi(uint64_t max_streams) {
422 nghttp3_conn_set_max_client_streams_bidi(httpconn_, max_streams);
423 }
424
425 namespace {
stream_reset(ngtcp2_conn * conn,int64_t stream_id,uint64_t final_size,uint64_t app_error_code,void * user_data,void * stream_user_data)426 int stream_reset(ngtcp2_conn *conn, int64_t stream_id, uint64_t final_size,
427 uint64_t app_error_code, void *user_data,
428 void *stream_user_data) {
429 auto upstream = static_cast<Http3Upstream *>(user_data);
430
431 if (upstream->http_shutdown_stream_read(stream_id) != 0) {
432 return NGTCP2_ERR_CALLBACK_FAILURE;
433 }
434
435 return 0;
436 }
437 } // namespace
438
http_shutdown_stream_read(int64_t stream_id)439 int Http3Upstream::http_shutdown_stream_read(int64_t stream_id) {
440 if (!httpconn_) {
441 return 0;
442 }
443
444 auto rv = nghttp3_conn_shutdown_stream_read(httpconn_, stream_id);
445 if (rv != 0) {
446 ULOG(ERROR, this) << "nghttp3_conn_shutdown_stream_read: "
447 << nghttp3_strerror(rv);
448 return -1;
449 }
450
451 return 0;
452 }
453
454 namespace {
stream_stop_sending(ngtcp2_conn * conn,int64_t stream_id,uint64_t app_error_code,void * user_data,void * stream_user_data)455 int stream_stop_sending(ngtcp2_conn *conn, int64_t stream_id,
456 uint64_t app_error_code, void *user_data,
457 void *stream_user_data) {
458 auto upstream = static_cast<Http3Upstream *>(user_data);
459
460 if (upstream->http_shutdown_stream_read(stream_id) != 0) {
461 return NGTCP2_ERR_CALLBACK_FAILURE;
462 }
463
464 return 0;
465 }
466 } // namespace
467
468 namespace {
handshake_completed(ngtcp2_conn * conn,void * user_data)469 int handshake_completed(ngtcp2_conn *conn, void *user_data) {
470 auto upstream = static_cast<Http3Upstream *>(user_data);
471
472 if (upstream->handshake_completed() != 0) {
473 return NGTCP2_ERR_CALLBACK_FAILURE;
474 }
475
476 return 0;
477 }
478 } // namespace
479
handshake_completed()480 int Http3Upstream::handshake_completed() {
481 handler_->set_alpn_from_conn();
482
483 auto alpn = handler_->get_alpn();
484 if (alpn.empty()) {
485 ULOG(ERROR, this) << "NO ALPN was negotiated";
486 return -1;
487 }
488
489 auto path = ngtcp2_conn_get_path(conn_);
490
491 return send_new_token(&path->remote);
492 }
493
494 namespace {
path_validation(ngtcp2_conn * conn,uint32_t flags,const ngtcp2_path * path,const ngtcp2_path * old_path,ngtcp2_path_validation_result res,void * user_data)495 int path_validation(ngtcp2_conn *conn, uint32_t flags, const ngtcp2_path *path,
496 const ngtcp2_path *old_path,
497 ngtcp2_path_validation_result res, void *user_data) {
498 if (res != NGTCP2_PATH_VALIDATION_RESULT_SUCCESS ||
499 !(flags & NGTCP2_PATH_VALIDATION_FLAG_NEW_TOKEN)) {
500 return 0;
501 }
502
503 auto upstream = static_cast<Http3Upstream *>(user_data);
504 if (upstream->send_new_token(&path->remote) != 0) {
505 return NGTCP2_ERR_CALLBACK_FAILURE;
506 }
507
508 return 0;
509 }
510 } // namespace
511
send_new_token(const ngtcp2_addr * remote_addr)512 int Http3Upstream::send_new_token(const ngtcp2_addr *remote_addr) {
513 auto worker = handler_->get_worker();
514 auto conn_handler = worker->get_connection_handler();
515 auto &qkms = conn_handler->get_quic_keying_materials();
516 auto &qkm = qkms->keying_materials.front();
517
518 std::array<uint8_t, NGTCP2_CRYPTO_MAX_REGULAR_TOKENLEN + 1> tokenbuf;
519
520 auto token = generate_token(tokenbuf, remote_addr->addr, remote_addr->addrlen,
521 qkm.secret, qkm.id);
522 if (!token) {
523 return -1;
524 }
525
526 assert(token->size() == NGTCP2_CRYPTO_MAX_REGULAR_TOKENLEN + 1);
527
528 auto rv = ngtcp2_conn_submit_new_token(conn_, token->data(), token->size());
529 if (rv != 0) {
530 ULOG(ERROR, this) << "ngtcp2_conn_submit_new_token: "
531 << ngtcp2_strerror(rv);
532 return -1;
533 }
534
535 return 0;
536 }
537
538 namespace {
recv_tx_key(ngtcp2_conn * conn,ngtcp2_encryption_level level,void * user_data)539 int recv_tx_key(ngtcp2_conn *conn, ngtcp2_encryption_level level,
540 void *user_data) {
541 if (level != NGTCP2_ENCRYPTION_LEVEL_1RTT) {
542 return 0;
543 }
544
545 auto upstream = static_cast<Http3Upstream *>(user_data);
546 if (upstream->setup_httpconn() != 0) {
547 return NGTCP2_ERR_CALLBACK_FAILURE;
548 }
549
550 return 0;
551 }
552 } // namespace
553
init(const UpstreamAddr * faddr,const Address & remote_addr,const Address & local_addr,const ngtcp2_pkt_hd & initial_hd,const ngtcp2_cid * odcid,std::span<const uint8_t> token,ngtcp2_token_type token_type)554 int Http3Upstream::init(const UpstreamAddr *faddr, const Address &remote_addr,
555 const Address &local_addr,
556 const ngtcp2_pkt_hd &initial_hd,
557 const ngtcp2_cid *odcid, std::span<const uint8_t> token,
558 ngtcp2_token_type token_type) {
559 int rv;
560
561 auto worker = handler_->get_worker();
562 auto conn_handler = worker->get_connection_handler();
563
564 auto callbacks = ngtcp2_callbacks{
565 nullptr, // client_initial
566 ngtcp2_crypto_recv_client_initial_cb,
567 ngtcp2_crypto_recv_crypto_data_cb,
568 shrpx::handshake_completed,
569 nullptr, // recv_version_negotiation
570 ngtcp2_crypto_encrypt_cb,
571 ngtcp2_crypto_decrypt_cb,
572 ngtcp2_crypto_hp_mask_cb,
573 shrpx::recv_stream_data,
574 shrpx::acked_stream_data_offset,
575 nullptr, // stream_open
576 shrpx::stream_close,
577 nullptr, // recv_stateless_reset
578 nullptr, // recv_retry
579 nullptr, // extend_max_local_streams_bidi
580 nullptr, // extend_max_local_streams_uni
581 rand,
582 get_new_connection_id,
583 remove_connection_id,
584 ngtcp2_crypto_update_key_cb,
585 shrpx::path_validation,
586 nullptr, // select_preferred_addr
587 shrpx::stream_reset,
588 shrpx::extend_max_remote_streams_bidi,
589 nullptr, // extend_max_remote_streams_uni
590 shrpx::extend_max_stream_data,
591 nullptr, // dcid_status
592 nullptr, // handshake_confirmed
593 nullptr, // recv_new_token
594 ngtcp2_crypto_delete_crypto_aead_ctx_cb,
595 ngtcp2_crypto_delete_crypto_cipher_ctx_cb,
596 nullptr, // recv_datagram
597 nullptr, // ack_datagram
598 nullptr, // lost_datagram
599 ngtcp2_crypto_get_path_challenge_data_cb,
600 shrpx::stream_stop_sending,
601 nullptr, // version_negotiation
602 nullptr, // recv_rx_key
603 shrpx::recv_tx_key,
604 };
605
606 auto config = get_config();
607 auto &quicconf = config->quic;
608 auto &http3conf = config->http3;
609
610 auto &qkms = conn_handler->get_quic_keying_materials();
611 auto &qkm = qkms->keying_materials.front();
612
613 ngtcp2_cid scid;
614
615 if (generate_quic_connection_id(scid, worker->get_worker_id(), qkm.id,
616 qkm.cid_encryption_ctx) != 0) {
617 return -1;
618 }
619
620 ngtcp2_settings settings;
621 ngtcp2_settings_default(&settings);
622 if (quicconf.upstream.debug.log) {
623 settings.log_printf = log_printf;
624 }
625
626 if (!quicconf.upstream.qlog.dir.empty()) {
627 auto fd = open_qlog_file(quicconf.upstream.qlog.dir, scid);
628 if (fd != -1) {
629 qlog_fd_ = fd;
630 settings.qlog_write = shrpx::qlog_write;
631 }
632 }
633
634 settings.initial_ts = quic_timestamp();
635 settings.initial_rtt = static_cast<ngtcp2_tstamp>(
636 quicconf.upstream.initial_rtt * NGTCP2_SECONDS);
637 settings.cc_algo = quicconf.upstream.congestion_controller;
638 settings.max_window = http3conf.upstream.max_connection_window_size;
639 settings.max_stream_window = http3conf.upstream.max_window_size;
640 settings.rand_ctx.native_handle = &worker->get_randgen();
641 settings.token = token.data();
642 settings.tokenlen = token.size();
643 settings.token_type = token_type;
644 settings.initial_pkt_num = std::uniform_int_distribution<uint32_t>(
645 0, std::numeric_limits<int32_t>::max())(worker->get_randgen());
646
647 ngtcp2_transport_params params;
648 ngtcp2_transport_params_default(¶ms);
649 params.initial_max_streams_bidi = http3conf.upstream.max_concurrent_streams;
650 // The minimum number of unidirectional streams required for HTTP/3.
651 params.initial_max_streams_uni = 3;
652 params.initial_max_data = http3conf.upstream.connection_window_size;
653 params.initial_max_stream_data_bidi_remote = http3conf.upstream.window_size;
654 params.initial_max_stream_data_uni = http3conf.upstream.window_size;
655 params.max_idle_timeout = static_cast<ngtcp2_tstamp>(
656 quicconf.upstream.timeout.idle * NGTCP2_SECONDS);
657
658 #ifdef NGHTTP2_OPENSSL_IS_BORINGSSL
659 if (quicconf.upstream.early_data) {
660 ngtcp2_transport_params early_data_params;
661
662 ngtcp2_transport_params_default(&early_data_params);
663
664 early_data_params.initial_max_stream_data_bidi_local =
665 params.initial_max_stream_data_bidi_local;
666 early_data_params.initial_max_stream_data_bidi_remote =
667 params.initial_max_stream_data_bidi_remote;
668 early_data_params.initial_max_stream_data_uni =
669 params.initial_max_stream_data_uni;
670 early_data_params.initial_max_data = params.initial_max_data;
671 early_data_params.initial_max_streams_bidi =
672 params.initial_max_streams_bidi;
673 early_data_params.initial_max_streams_uni = params.initial_max_streams_uni;
674
675 // TODO include HTTP/3 SETTINGS
676
677 std::array<uint8_t, 128> quic_early_data_ctx;
678
679 auto quic_early_data_ctxlen = ngtcp2_transport_params_encode(
680 quic_early_data_ctx.data(), quic_early_data_ctx.size(),
681 &early_data_params);
682
683 assert(quic_early_data_ctxlen > 0);
684 assert(static_cast<size_t>(quic_early_data_ctxlen) <=
685 quic_early_data_ctx.size());
686
687 if (SSL_set_quic_early_data_context(handler_->get_ssl(),
688 quic_early_data_ctx.data(),
689 quic_early_data_ctxlen) != 1) {
690 ULOG(ERROR, this) << "SSL_set_quic_early_data_context failed";
691 return -1;
692 }
693 }
694 #endif // NGHTTP2_OPENSSL_IS_BORINGSSL
695
696 if (odcid) {
697 params.original_dcid = *odcid;
698 params.retry_scid = initial_hd.dcid;
699 params.retry_scid_present = 1;
700 } else {
701 params.original_dcid = initial_hd.dcid;
702 }
703
704 params.original_dcid_present = 1;
705
706 rv = generate_quic_stateless_reset_token(
707 params.stateless_reset_token, scid, qkm.secret.data(), qkm.secret.size());
708 if (rv != 0) {
709 ULOG(ERROR, this) << "generate_quic_stateless_reset_token failed";
710 return -1;
711 }
712 params.stateless_reset_token_present = 1;
713
714 auto path = ngtcp2_path{
715 {
716 const_cast<sockaddr *>(&local_addr.su.sa),
717 static_cast<socklen_t>(local_addr.len),
718 },
719 {
720 const_cast<sockaddr *>(&remote_addr.su.sa),
721 static_cast<socklen_t>(remote_addr.len),
722 },
723 const_cast<UpstreamAddr *>(faddr),
724 };
725
726 rv = ngtcp2_conn_server_new(&conn_, &initial_hd.scid, &scid, &path,
727 initial_hd.version, &callbacks, &settings,
728 ¶ms, nullptr, this);
729 if (rv != 0) {
730 ULOG(ERROR, this) << "ngtcp2_conn_server_new: " << ngtcp2_strerror(rv);
731 return -1;
732 }
733
734 ngtcp2_conn_set_tls_native_handle(conn_, handler_->get_ssl());
735
736 auto quic_connection_handler = worker->get_quic_connection_handler();
737
738 if (generate_quic_hashed_connection_id(hashed_scid_, remote_addr, local_addr,
739 initial_hd.dcid) != 0) {
740 return -1;
741 }
742
743 quic_connection_handler->add_connection_id(hashed_scid_, handler_);
744 quic_connection_handler->add_connection_id(scid, handler_);
745
746 return 0;
747 }
748
on_read()749 int Http3Upstream::on_read() { return 0; }
750
on_write()751 int Http3Upstream::on_write() {
752 int rv;
753
754 if (tx_.send_blocked) {
755 rv = send_blocked_packet();
756 if (rv != 0) {
757 return -1;
758 }
759
760 if (tx_.send_blocked) {
761 return 0;
762 }
763 }
764
765 handler_->get_connection()->wlimit.stopw();
766
767 if (write_streams() != 0) {
768 return -1;
769 }
770
771 if (httpconn_ && nghttp3_conn_is_drained(httpconn_)) {
772 return -1;
773 }
774
775 reset_timer();
776
777 return 0;
778 }
779
write_streams()780 int Http3Upstream::write_streams() {
781 std::array<nghttp3_vec, 16> vec;
782 auto max_udp_payload_size = ngtcp2_conn_get_max_tx_udp_payload_size(conn_);
783 auto path_max_udp_payload_size =
784 ngtcp2_conn_get_path_max_tx_udp_payload_size(conn_);
785 ngtcp2_pkt_info pi, prev_pi;
786 auto txbuf =
787 std::span{tx_.data.get(), std::max(ngtcp2_conn_get_send_quantum(conn_),
788 path_max_udp_payload_size)};
789 auto buf = txbuf;
790 ngtcp2_path_storage ps, prev_ps;
791 int rv;
792 size_t gso_size = 0;
793 auto ts = quic_timestamp();
794
795 ngtcp2_path_storage_zero(&ps);
796 ngtcp2_path_storage_zero(&prev_ps);
797
798 for (;;) {
799 int64_t stream_id = -1;
800 int fin = 0;
801 nghttp3_ssize sveccnt = 0;
802
803 if (httpconn_ && ngtcp2_conn_get_max_data_left(conn_)) {
804 sveccnt = nghttp3_conn_writev_stream(httpconn_, &stream_id, &fin,
805 vec.data(), vec.size());
806 if (sveccnt < 0) {
807 ULOG(ERROR, this) << "nghttp3_conn_writev_stream: "
808 << nghttp3_strerror(sveccnt);
809 ngtcp2_ccerr_set_application_error(
810 &last_error_, nghttp3_err_infer_quic_app_error_code(sveccnt),
811 nullptr, 0);
812 return handle_error();
813 }
814 }
815
816 ngtcp2_ssize ndatalen;
817 auto v = vec.data();
818 auto vcnt = static_cast<size_t>(sveccnt);
819
820 uint32_t flags = NGTCP2_WRITE_STREAM_FLAG_MORE;
821 if (fin) {
822 flags |= NGTCP2_WRITE_STREAM_FLAG_FIN;
823 }
824
825 auto buflen = buf.size() >= max_udp_payload_size
826 ? max_udp_payload_size
827 : path_max_udp_payload_size;
828 auto nwrite = ngtcp2_conn_writev_stream(
829 conn_, &ps.path, &pi, buf.data(), buflen, &ndatalen, flags, stream_id,
830 reinterpret_cast<const ngtcp2_vec *>(v), vcnt, ts);
831 if (nwrite < 0) {
832 switch (nwrite) {
833 case NGTCP2_ERR_STREAM_DATA_BLOCKED:
834 assert(ndatalen == -1);
835 nghttp3_conn_block_stream(httpconn_, stream_id);
836 continue;
837 case NGTCP2_ERR_STREAM_SHUT_WR:
838 assert(ndatalen == -1);
839 nghttp3_conn_shutdown_stream_write(httpconn_, stream_id);
840 continue;
841 case NGTCP2_ERR_WRITE_MORE:
842 assert(ndatalen >= 0);
843 rv = nghttp3_conn_add_write_offset(httpconn_, stream_id, ndatalen);
844 if (rv != 0) {
845 ULOG(ERROR, this)
846 << "nghttp3_conn_add_write_offset: " << nghttp3_strerror(rv);
847 ngtcp2_ccerr_set_application_error(
848 &last_error_, nghttp3_err_infer_quic_app_error_code(rv), nullptr,
849 0);
850 return handle_error();
851 }
852 continue;
853 }
854
855 assert(ndatalen == -1);
856
857 ULOG(ERROR, this) << "ngtcp2_conn_writev_stream: "
858 << ngtcp2_strerror(nwrite);
859
860 ngtcp2_ccerr_set_liberr(&last_error_, nwrite, nullptr, 0);
861
862 return handle_error();
863 } else if (ndatalen >= 0) {
864 rv = nghttp3_conn_add_write_offset(httpconn_, stream_id, ndatalen);
865 if (rv != 0) {
866 ULOG(ERROR, this) << "nghttp3_conn_add_write_offset: "
867 << nghttp3_strerror(rv);
868 ngtcp2_ccerr_set_application_error(
869 &last_error_, nghttp3_err_infer_quic_app_error_code(rv), nullptr,
870 0);
871 return handle_error();
872 }
873 }
874
875 if (nwrite == 0) {
876 auto data = std::span{std::begin(txbuf), std::begin(buf)};
877 if (!data.empty()) {
878 auto faddr = static_cast<UpstreamAddr *>(prev_ps.path.user_data);
879
880 auto [rest, rv] =
881 send_packet(faddr, prev_ps.path.remote.addr,
882 prev_ps.path.remote.addrlen, prev_ps.path.local.addr,
883 prev_ps.path.local.addrlen, prev_pi, data, gso_size);
884 if (rv == SHRPX_ERR_SEND_BLOCKED) {
885 on_send_blocked(faddr, prev_ps.path.remote, prev_ps.path.local,
886 prev_pi, rest, gso_size);
887
888 signal_write_upstream_addr(faddr);
889 }
890 }
891
892 ngtcp2_conn_update_pkt_tx_time(conn_, ts);
893
894 return 0;
895 }
896
897 auto last_pkt = std::begin(buf);
898
899 buf = buf.subspan(nwrite);
900
901 if (last_pkt == std::begin(txbuf)) {
902 ngtcp2_path_copy(&prev_ps.path, &ps.path);
903 prev_pi = pi;
904 gso_size = nwrite;
905 } else if (!ngtcp2_path_eq(&prev_ps.path, &ps.path) ||
906 prev_pi.ecn != pi.ecn ||
907 static_cast<size_t>(nwrite) > gso_size ||
908 (gso_size > path_max_udp_payload_size &&
909 static_cast<size_t>(nwrite) != gso_size)) {
910 auto faddr = static_cast<UpstreamAddr *>(prev_ps.path.user_data);
911 auto data = std::span{std::begin(txbuf), last_pkt};
912
913 auto [rest, rv] =
914 send_packet(faddr, prev_ps.path.remote.addr,
915 prev_ps.path.remote.addrlen, prev_ps.path.local.addr,
916 prev_ps.path.local.addrlen, prev_pi, data, gso_size);
917 switch (rv) {
918 case SHRPX_ERR_SEND_BLOCKED:
919 on_send_blocked(faddr, prev_ps.path.remote, prev_ps.path.local, prev_pi,
920 rest, gso_size);
921
922 data = std::span{last_pkt, std::begin(buf)};
923 on_send_blocked(static_cast<UpstreamAddr *>(ps.path.user_data),
924 ps.path.remote, ps.path.local, pi, data, data.size());
925
926 signal_write_upstream_addr(faddr);
927
928 break;
929 default: {
930 auto faddr = static_cast<UpstreamAddr *>(ps.path.user_data);
931 auto data = std::span{last_pkt, std::begin(buf)};
932
933 auto [rest, rv] = send_packet(
934 faddr, ps.path.remote.addr, ps.path.remote.addrlen,
935 ps.path.local.addr, ps.path.local.addrlen, pi, data, data.size());
936 if (rv == SHRPX_ERR_SEND_BLOCKED) {
937 assert(rest.size() == data.size());
938
939 on_send_blocked(faddr, ps.path.remote, ps.path.local, pi, rest,
940 rest.size());
941
942 signal_write_upstream_addr(faddr);
943 }
944 }
945 }
946
947 ngtcp2_conn_update_pkt_tx_time(conn_, ts);
948
949 return 0;
950 }
951
952 if (buf.size() < path_max_udp_payload_size ||
953 static_cast<size_t>(nwrite) < gso_size) {
954 auto faddr = static_cast<UpstreamAddr *>(ps.path.user_data);
955 auto data = std::span{std::begin(txbuf), std::begin(buf)};
956
957 auto [rest, rv] = send_packet(faddr, ps.path.remote.addr,
958 ps.path.remote.addrlen, ps.path.local.addr,
959 ps.path.local.addrlen, pi, data, gso_size);
960 if (rv == SHRPX_ERR_SEND_BLOCKED) {
961 on_send_blocked(faddr, ps.path.remote, ps.path.local, pi, rest,
962 gso_size);
963
964 signal_write_upstream_addr(faddr);
965 }
966
967 ngtcp2_conn_update_pkt_tx_time(conn_, ts);
968
969 return 0;
970 }
971 }
972
973 return 0;
974 }
975
on_timeout(Downstream * downstream)976 int Http3Upstream::on_timeout(Downstream *downstream) {
977 if (LOG_ENABLED(INFO)) {
978 ULOG(INFO, this) << "Stream timeout stream_id="
979 << downstream->get_stream_id();
980 }
981
982 shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
983
984 handler_->signal_write();
985
986 return 0;
987 }
988
on_downstream_abort_request(Downstream * downstream,unsigned int status_code)989 int Http3Upstream::on_downstream_abort_request(Downstream *downstream,
990 unsigned int status_code) {
991 int rv;
992
993 rv = error_reply(downstream, status_code);
994
995 if (rv != 0) {
996 return -1;
997 }
998
999 handler_->signal_write();
1000
1001 return 0;
1002 }
1003
on_downstream_abort_request_with_https_redirect(Downstream * downstream)1004 int Http3Upstream::on_downstream_abort_request_with_https_redirect(
1005 Downstream *downstream) {
1006 assert(0);
1007 abort();
1008 }
1009
1010 namespace {
1011 uint64_t
infer_upstream_shutdown_stream_error_code(uint32_t downstream_error_code)1012 infer_upstream_shutdown_stream_error_code(uint32_t downstream_error_code) {
1013 // NGHTTP2_REFUSED_STREAM is important because it tells upstream
1014 // client to retry.
1015 switch (downstream_error_code) {
1016 case NGHTTP2_NO_ERROR:
1017 return NGHTTP3_H3_NO_ERROR;
1018 case NGHTTP2_REFUSED_STREAM:
1019 return NGHTTP3_H3_REQUEST_REJECTED;
1020 default:
1021 return NGHTTP3_H3_INTERNAL_ERROR;
1022 }
1023 }
1024 } // namespace
1025
downstream_read(DownstreamConnection * dconn)1026 int Http3Upstream::downstream_read(DownstreamConnection *dconn) {
1027 auto downstream = dconn->get_downstream();
1028
1029 if (downstream->get_response_state() == DownstreamState::MSG_RESET) {
1030 // The downstream stream was reset (canceled). In this case,
1031 // RST_STREAM to the upstream and delete downstream connection
1032 // here. Deleting downstream will be taken place at
1033 // on_stream_close_callback.
1034 shutdown_stream(downstream,
1035 infer_upstream_shutdown_stream_error_code(
1036 downstream->get_response_rst_stream_error_code()));
1037 downstream->pop_downstream_connection();
1038 // dconn was deleted
1039 dconn = nullptr;
1040 } else if (downstream->get_response_state() ==
1041 DownstreamState::MSG_BAD_HEADER) {
1042 if (error_reply(downstream, 502) != 0) {
1043 return -1;
1044 }
1045 downstream->pop_downstream_connection();
1046 // dconn was deleted
1047 dconn = nullptr;
1048 } else {
1049 auto rv = downstream->on_read();
1050 if (rv == SHRPX_ERR_EOF) {
1051 if (downstream->get_request_header_sent()) {
1052 return downstream_eof(dconn);
1053 }
1054 return SHRPX_ERR_RETRY;
1055 }
1056 if (rv == SHRPX_ERR_DCONN_CANCELED) {
1057 downstream->pop_downstream_connection();
1058 handler_->signal_write();
1059 return 0;
1060 }
1061 if (rv != 0) {
1062 if (rv != SHRPX_ERR_NETWORK) {
1063 if (LOG_ENABLED(INFO)) {
1064 DCLOG(INFO, dconn) << "HTTP parser failure";
1065 }
1066 }
1067 return downstream_error(dconn, Downstream::EVENT_ERROR);
1068 }
1069
1070 if (downstream->can_detach_downstream_connection()) {
1071 // Keep-alive
1072 downstream->detach_downstream_connection();
1073 }
1074 }
1075
1076 handler_->signal_write();
1077
1078 // At this point, downstream may be deleted.
1079
1080 return 0;
1081 }
1082
downstream_write(DownstreamConnection * dconn)1083 int Http3Upstream::downstream_write(DownstreamConnection *dconn) {
1084 int rv;
1085 rv = dconn->on_write();
1086 if (rv == SHRPX_ERR_NETWORK) {
1087 return downstream_error(dconn, Downstream::EVENT_ERROR);
1088 }
1089 if (rv != 0) {
1090 return rv;
1091 }
1092 return 0;
1093 }
1094
downstream_eof(DownstreamConnection * dconn)1095 int Http3Upstream::downstream_eof(DownstreamConnection *dconn) {
1096 auto downstream = dconn->get_downstream();
1097
1098 if (LOG_ENABLED(INFO)) {
1099 DCLOG(INFO, dconn) << "EOF. stream_id=" << downstream->get_stream_id();
1100 }
1101
1102 // Delete downstream connection. If we don't delete it here, it will
1103 // be pooled in on_stream_close_callback.
1104 downstream->pop_downstream_connection();
1105 // dconn was deleted
1106 dconn = nullptr;
1107 // downstream will be deleted in on_stream_close_callback.
1108 if (downstream->get_response_state() == DownstreamState::HEADER_COMPLETE) {
1109 // Server may indicate the end of the request by EOF
1110 if (LOG_ENABLED(INFO)) {
1111 ULOG(INFO, this) << "Downstream body was ended by EOF";
1112 }
1113 downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1114
1115 // For tunneled connection, MSG_COMPLETE signals
1116 // downstream_read_data_callback to send RST_STREAM after pending
1117 // response body is sent. This is needed to ensure that RST_STREAM
1118 // is sent after all pending data are sent.
1119 if (on_downstream_body_complete(downstream) != 0) {
1120 return -1;
1121 }
1122 } else if (downstream->get_response_state() !=
1123 DownstreamState::MSG_COMPLETE) {
1124 // If stream was not closed, then we set MSG_COMPLETE and let
1125 // on_stream_close_callback delete downstream.
1126 if (error_reply(downstream, 502) != 0) {
1127 return -1;
1128 }
1129 }
1130 handler_->signal_write();
1131 // At this point, downstream may be deleted.
1132 return 0;
1133 }
1134
downstream_error(DownstreamConnection * dconn,int events)1135 int Http3Upstream::downstream_error(DownstreamConnection *dconn, int events) {
1136 auto downstream = dconn->get_downstream();
1137
1138 if (LOG_ENABLED(INFO)) {
1139 if (events & Downstream::EVENT_ERROR) {
1140 DCLOG(INFO, dconn) << "Downstream network/general error";
1141 } else {
1142 DCLOG(INFO, dconn) << "Timeout";
1143 }
1144 if (downstream->get_upgraded()) {
1145 DCLOG(INFO, dconn) << "Note: this is tunnel connection";
1146 }
1147 }
1148
1149 // Delete downstream connection. If we don't delete it here, it will
1150 // be pooled in on_stream_close_callback.
1151 downstream->pop_downstream_connection();
1152 // dconn was deleted
1153 dconn = nullptr;
1154
1155 if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1156 // For SSL tunneling, we issue RST_STREAM. For other types of
1157 // stream, we don't have to do anything since response was
1158 // complete.
1159 if (downstream->get_upgraded()) {
1160 shutdown_stream(downstream, NGHTTP3_H3_NO_ERROR);
1161 }
1162 } else {
1163 if (downstream->get_response_state() == DownstreamState::HEADER_COMPLETE) {
1164 if (downstream->get_upgraded()) {
1165 if (on_downstream_body_complete(downstream) != 0) {
1166 return -1;
1167 }
1168 } else {
1169 shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
1170 }
1171 } else {
1172 unsigned int status;
1173 if (events & Downstream::EVENT_TIMEOUT) {
1174 if (downstream->get_request_header_sent()) {
1175 status = 504;
1176 } else {
1177 status = 408;
1178 }
1179 } else {
1180 status = 502;
1181 }
1182 if (error_reply(downstream, status) != 0) {
1183 return -1;
1184 }
1185 }
1186 downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1187 }
1188 handler_->signal_write();
1189 // At this point, downstream may be deleted.
1190 return 0;
1191 }
1192
get_client_handler() const1193 ClientHandler *Http3Upstream::get_client_handler() const { return handler_; }
1194
1195 namespace {
downstream_read_data_callback(nghttp3_conn * conn,int64_t stream_id,nghttp3_vec * vec,size_t veccnt,uint32_t * pflags,void * conn_user_data,void * stream_user_data)1196 nghttp3_ssize downstream_read_data_callback(nghttp3_conn *conn,
1197 int64_t stream_id, nghttp3_vec *vec,
1198 size_t veccnt, uint32_t *pflags,
1199 void *conn_user_data,
1200 void *stream_user_data) {
1201 auto upstream = static_cast<Http3Upstream *>(conn_user_data);
1202 auto downstream = static_cast<Downstream *>(stream_user_data);
1203
1204 assert(downstream);
1205
1206 auto body = downstream->get_response_buf();
1207
1208 assert(body);
1209
1210 if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE &&
1211 body->rleft_mark() == 0) {
1212 downstream->disable_upstream_wtimer();
1213 return NGHTTP3_ERR_WOULDBLOCK;
1214 }
1215
1216 downstream->reset_upstream_wtimer();
1217
1218 veccnt = body->riovec_mark(reinterpret_cast<struct iovec *>(vec), veccnt);
1219
1220 if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE &&
1221 body->rleft_mark() == 0) {
1222 *pflags |= NGHTTP3_DATA_FLAG_EOF;
1223 }
1224
1225 assert((*pflags & NGHTTP3_DATA_FLAG_EOF) || veccnt);
1226
1227 downstream->response_sent_body_length += nghttp3_vec_len(vec, veccnt);
1228
1229 if ((*pflags & NGHTTP3_DATA_FLAG_EOF) &&
1230 upstream->shutdown_stream_read(stream_id, NGHTTP3_H3_NO_ERROR) != 0) {
1231 return NGHTTP3_ERR_CALLBACK_FAILURE;
1232 }
1233
1234 return veccnt;
1235 }
1236 } // namespace
1237
on_downstream_header_complete(Downstream * downstream)1238 int Http3Upstream::on_downstream_header_complete(Downstream *downstream) {
1239 int rv;
1240
1241 const auto &req = downstream->request();
1242 auto &resp = downstream->response();
1243
1244 auto &balloc = downstream->get_block_allocator();
1245
1246 if (LOG_ENABLED(INFO)) {
1247 if (downstream->get_non_final_response()) {
1248 DLOG(INFO, downstream) << "HTTP non-final response header";
1249 } else {
1250 DLOG(INFO, downstream) << "HTTP response header completed";
1251 }
1252 }
1253
1254 auto config = get_config();
1255 auto &httpconf = config->http;
1256
1257 if (!config->http2_proxy && !httpconf.no_location_rewrite) {
1258 downstream->rewrite_location_response_header(req.scheme);
1259 }
1260
1261 #ifdef HAVE_MRUBY
1262 if (!downstream->get_non_final_response()) {
1263 auto dconn = downstream->get_downstream_connection();
1264 const auto &group = dconn->get_downstream_addr_group();
1265 if (group) {
1266 const auto &dmruby_ctx = group->shared_addr->mruby_ctx;
1267
1268 if (dmruby_ctx->run_on_response_proc(downstream) != 0) {
1269 if (error_reply(downstream, 500) != 0) {
1270 return -1;
1271 }
1272 // Returning -1 will signal deletion of dconn.
1273 return -1;
1274 }
1275
1276 if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1277 return -1;
1278 }
1279 }
1280
1281 auto worker = handler_->get_worker();
1282 auto mruby_ctx = worker->get_mruby_context();
1283
1284 if (mruby_ctx->run_on_response_proc(downstream) != 0) {
1285 if (error_reply(downstream, 500) != 0) {
1286 return -1;
1287 }
1288 // Returning -1 will signal deletion of dconn.
1289 return -1;
1290 }
1291
1292 if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1293 return -1;
1294 }
1295 }
1296 #endif // HAVE_MRUBY
1297
1298 auto nva = std::vector<nghttp3_nv>();
1299 // 4 means :status and possible server, via, and set-cookie (for
1300 // affinity cookie) header field.
1301 nva.reserve(resp.fs.headers().size() + 4 +
1302 httpconf.add_response_headers.size());
1303
1304 if (downstream->get_non_final_response()) {
1305 auto response_status = http2::stringify_status(balloc, resp.http_status);
1306
1307 nva.push_back(http3::make_field(":status"_sr, response_status));
1308
1309 http3::copy_headers_to_nva_nocopy(nva, resp.fs.headers(),
1310 http2::HDOP_STRIP_ALL);
1311
1312 if (LOG_ENABLED(INFO)) {
1313 log_response_headers(downstream, nva);
1314 }
1315
1316 rv = nghttp3_conn_submit_info(httpconn_, downstream->get_stream_id(),
1317 nva.data(), nva.size());
1318
1319 resp.fs.clear_headers();
1320
1321 if (rv != 0) {
1322 ULOG(FATAL, this) << "nghttp3_conn_submit_info() failed";
1323 return -1;
1324 }
1325
1326 return 0;
1327 }
1328
1329 auto striphd_flags = http2::HDOP_STRIP_ALL & ~http2::HDOP_STRIP_VIA;
1330 StringRef response_status;
1331
1332 if (req.connect_proto == ConnectProto::WEBSOCKET && resp.http_status == 101) {
1333 response_status = http2::stringify_status(balloc, 200);
1334 striphd_flags |= http2::HDOP_STRIP_SEC_WEBSOCKET_ACCEPT;
1335 } else {
1336 response_status = http2::stringify_status(balloc, resp.http_status);
1337 }
1338
1339 nva.push_back(http3::make_field(":status"_sr, response_status));
1340
1341 http3::copy_headers_to_nva_nocopy(nva, resp.fs.headers(), striphd_flags);
1342
1343 if (!config->http2_proxy && !httpconf.no_server_rewrite) {
1344 nva.push_back(http3::make_field("server"_sr, httpconf.server_name));
1345 } else {
1346 auto server = resp.fs.header(http2::HD_SERVER);
1347 if (server) {
1348 nva.push_back(http3::make_field("server"_sr, (*server).value));
1349 }
1350 }
1351
1352 if (!req.regular_connect_method() || !downstream->get_upgraded()) {
1353 auto affinity_cookie = downstream->get_affinity_cookie_to_send();
1354 if (affinity_cookie) {
1355 auto dconn = downstream->get_downstream_connection();
1356 assert(dconn);
1357 auto &group = dconn->get_downstream_addr_group();
1358 auto &shared_addr = group->shared_addr;
1359 auto &cookieconf = shared_addr->affinity.cookie;
1360 auto secure =
1361 http::require_cookie_secure_attribute(cookieconf.secure, req.scheme);
1362 auto cookie_str = http::create_affinity_cookie(
1363 balloc, cookieconf.name, affinity_cookie, cookieconf.path, secure);
1364 nva.push_back(http3::make_field("set-cookie"_sr, cookie_str));
1365 }
1366 }
1367
1368 auto via = resp.fs.header(http2::HD_VIA);
1369 if (httpconf.no_via) {
1370 if (via) {
1371 nva.push_back(http3::make_field("via"_sr, (*via).value));
1372 }
1373 } else {
1374 // we don't create more than 16 bytes in
1375 // http::create_via_header_value.
1376 size_t len = 16;
1377 if (via) {
1378 len += via->value.size() + 2;
1379 }
1380
1381 auto iov = make_byte_ref(balloc, len + 1);
1382 auto p = std::begin(iov);
1383 if (via) {
1384 p = std::copy(std::begin(via->value), std::end(via->value), p);
1385 p = util::copy_lit(p, ", ");
1386 }
1387 p = http::create_via_header_value(p, resp.http_major, resp.http_minor);
1388 *p = '\0';
1389
1390 nva.push_back(
1391 http3::make_field("via"_sr, StringRef{std::span{std::begin(iov), p}}));
1392 }
1393
1394 for (auto &p : httpconf.add_response_headers) {
1395 nva.push_back(http3::make_field(p.name, p.value));
1396 }
1397
1398 if (LOG_ENABLED(INFO)) {
1399 log_response_headers(downstream, nva);
1400 }
1401
1402 auto priority = resp.fs.header(http2::HD_PRIORITY);
1403 if (priority) {
1404 nghttp3_pri pri;
1405
1406 if (nghttp3_conn_get_stream_priority(httpconn_, &pri,
1407 downstream->get_stream_id()) == 0 &&
1408 nghttp3_pri_parse_priority(&pri, priority->value.byte(),
1409 priority->value.size()) == 0) {
1410 rv = nghttp3_conn_set_server_stream_priority(
1411 httpconn_, downstream->get_stream_id(), &pri);
1412 if (rv != 0) {
1413 ULOG(ERROR, this) << "nghttp3_conn_set_server_stream_priority: "
1414 << nghttp3_strerror(rv);
1415 }
1416 }
1417 }
1418
1419 nghttp3_data_reader data_read;
1420 data_read.read_data = downstream_read_data_callback;
1421
1422 nghttp3_data_reader *data_readptr;
1423
1424 if (downstream->expect_response_body() ||
1425 downstream->expect_response_trailer()) {
1426 data_readptr = &data_read;
1427 } else {
1428 data_readptr = nullptr;
1429 }
1430
1431 rv = nghttp3_conn_submit_response(httpconn_, downstream->get_stream_id(),
1432 nva.data(), nva.size(), data_readptr);
1433 if (rv != 0) {
1434 ULOG(FATAL, this) << "nghttp3_conn_submit_response() failed";
1435 return -1;
1436 }
1437
1438 if (data_readptr) {
1439 downstream->reset_upstream_wtimer();
1440 } else if (shutdown_stream_read(downstream->get_stream_id(),
1441 NGHTTP3_H3_NO_ERROR) != 0) {
1442 return -1;
1443 }
1444
1445 return 0;
1446 }
1447
on_downstream_body(Downstream * downstream,const uint8_t * data,size_t len,bool flush)1448 int Http3Upstream::on_downstream_body(Downstream *downstream,
1449 const uint8_t *data, size_t len,
1450 bool flush) {
1451 auto body = downstream->get_response_buf();
1452 body->append(data, len);
1453
1454 if (flush) {
1455 nghttp3_conn_resume_stream(httpconn_, downstream->get_stream_id());
1456
1457 downstream->ensure_upstream_wtimer();
1458 }
1459
1460 return 0;
1461 }
1462
on_downstream_body_complete(Downstream * downstream)1463 int Http3Upstream::on_downstream_body_complete(Downstream *downstream) {
1464 if (LOG_ENABLED(INFO)) {
1465 DLOG(INFO, downstream) << "HTTP response completed";
1466 }
1467
1468 auto &resp = downstream->response();
1469
1470 if (!downstream->validate_response_recv_body_length()) {
1471 shutdown_stream(downstream, NGHTTP3_H3_GENERAL_PROTOCOL_ERROR);
1472 resp.connection_close = true;
1473 return 0;
1474 }
1475
1476 if (!downstream->get_upgraded()) {
1477 const auto &trailers = resp.fs.trailers();
1478 if (!trailers.empty()) {
1479 std::vector<nghttp3_nv> nva;
1480 nva.reserve(trailers.size());
1481 http3::copy_headers_to_nva_nocopy(nva, trailers, http2::HDOP_STRIP_ALL);
1482 if (!nva.empty()) {
1483 auto rv = nghttp3_conn_submit_trailers(
1484 httpconn_, downstream->get_stream_id(), nva.data(), nva.size());
1485 if (rv != 0) {
1486 ULOG(FATAL, this) << "nghttp3_conn_submit_trailers() failed: "
1487 << nghttp3_strerror(rv);
1488 return -1;
1489 }
1490 }
1491 }
1492 }
1493
1494 nghttp3_conn_resume_stream(httpconn_, downstream->get_stream_id());
1495 downstream->ensure_upstream_wtimer();
1496
1497 return 0;
1498 }
1499
on_handler_delete()1500 void Http3Upstream::on_handler_delete() {
1501 for (auto d = downstream_queue_.get_downstreams(); d; d = d->dlnext) {
1502 if (d->get_dispatch_state() == DispatchState::ACTIVE &&
1503 d->accesslog_ready()) {
1504 handler_->write_accesslog(d);
1505 }
1506 }
1507
1508 auto worker = handler_->get_worker();
1509 auto quic_conn_handler = worker->get_quic_connection_handler();
1510
1511 std::vector<ngtcp2_cid> scids(ngtcp2_conn_get_scid(conn_, nullptr) + 1);
1512 ngtcp2_conn_get_scid(conn_, scids.data());
1513 scids.back() = hashed_scid_;
1514
1515 for (auto &cid : scids) {
1516 quic_conn_handler->remove_connection_id(cid);
1517 }
1518
1519 switch (last_error_.type) {
1520 case NGTCP2_CCERR_TYPE_IDLE_CLOSE:
1521 case NGTCP2_CCERR_TYPE_DROP_CONN:
1522 case NGTCP2_CCERR_TYPE_RETRY:
1523 return;
1524 default:
1525 break;
1526 }
1527
1528 // If this is not idle close, send CONNECTION_CLOSE.
1529 if (!ngtcp2_conn_in_closing_period(conn_) &&
1530 !ngtcp2_conn_in_draining_period(conn_)) {
1531 ngtcp2_path_storage ps;
1532 ngtcp2_pkt_info pi;
1533 conn_close_.resize(SHRPX_QUIC_CONN_CLOSE_PKTLEN);
1534
1535 ngtcp2_path_storage_zero(&ps);
1536
1537 ngtcp2_ccerr ccerr;
1538 ngtcp2_ccerr_default(&ccerr);
1539
1540 if (worker->get_graceful_shutdown() &&
1541 !ngtcp2_conn_get_handshake_completed(conn_)) {
1542 ccerr.error_code = NGTCP2_CONNECTION_REFUSED;
1543 }
1544
1545 auto nwrite = ngtcp2_conn_write_connection_close(
1546 conn_, &ps.path, &pi, conn_close_.data(), conn_close_.size(), &ccerr,
1547 quic_timestamp());
1548 if (nwrite < 0) {
1549 if (nwrite != NGTCP2_ERR_INVALID_STATE) {
1550 ULOG(ERROR, this) << "ngtcp2_conn_write_connection_close: "
1551 << ngtcp2_strerror(nwrite);
1552 }
1553
1554 return;
1555 }
1556
1557 conn_close_.resize(nwrite);
1558
1559 send_packet(static_cast<UpstreamAddr *>(ps.path.user_data),
1560 ps.path.remote.addr, ps.path.remote.addrlen, ps.path.local.addr,
1561 ps.path.local.addrlen, pi, conn_close_, conn_close_.size());
1562 }
1563
1564 auto d =
1565 static_cast<ev_tstamp>(ngtcp2_conn_get_pto(conn_) * 3) / NGTCP2_SECONDS;
1566
1567 if (LOG_ENABLED(INFO)) {
1568 ULOG(INFO, this) << "Enter close-wait period " << d << "s with "
1569 << conn_close_.size() << " bytes sentinel packet";
1570 }
1571
1572 auto cw = std::make_unique<CloseWait>(worker, std::move(scids),
1573 std::move(conn_close_), d);
1574
1575 quic_conn_handler->add_close_wait(cw.release());
1576 }
1577
on_downstream_reset(Downstream * downstream,bool no_retry)1578 int Http3Upstream::on_downstream_reset(Downstream *downstream, bool no_retry) {
1579 int rv;
1580
1581 if (downstream->get_dispatch_state() != DispatchState::ACTIVE) {
1582 // This is error condition when we failed push_request_headers()
1583 // in initiate_downstream(). Otherwise, we have
1584 // DispatchState::ACTIVE state, or we did not set
1585 // DownstreamConnection.
1586 downstream->pop_downstream_connection();
1587 handler_->signal_write();
1588
1589 return 0;
1590 }
1591
1592 if (!downstream->request_submission_ready()) {
1593 if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1594 // We have got all response body already. Send it off.
1595 downstream->pop_downstream_connection();
1596 return 0;
1597 }
1598 // pushed stream is handled here
1599 shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
1600 downstream->pop_downstream_connection();
1601
1602 handler_->signal_write();
1603
1604 return 0;
1605 }
1606
1607 downstream->pop_downstream_connection();
1608
1609 downstream->add_retry();
1610
1611 std::unique_ptr<DownstreamConnection> dconn;
1612
1613 rv = 0;
1614
1615 if (no_retry || downstream->no_more_retry()) {
1616 goto fail;
1617 }
1618
1619 // downstream connection is clean; we can retry with new
1620 // downstream connection.
1621
1622 for (;;) {
1623 auto dconn = handler_->get_downstream_connection(rv, downstream);
1624 if (!dconn) {
1625 goto fail;
1626 }
1627
1628 rv = downstream->attach_downstream_connection(std::move(dconn));
1629 if (rv == 0) {
1630 break;
1631 }
1632 }
1633
1634 rv = downstream->push_request_headers();
1635 if (rv != 0) {
1636 goto fail;
1637 }
1638
1639 return 0;
1640
1641 fail:
1642 if (rv == SHRPX_ERR_TLS_REQUIRED) {
1643 assert(0);
1644 abort();
1645 }
1646
1647 rv = on_downstream_abort_request(downstream, 502);
1648 if (rv != 0) {
1649 shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
1650 }
1651 downstream->pop_downstream_connection();
1652
1653 handler_->signal_write();
1654
1655 return 0;
1656 }
1657
pause_read(IOCtrlReason reason)1658 void Http3Upstream::pause_read(IOCtrlReason reason) {}
1659
resume_read(IOCtrlReason reason,Downstream * downstream,size_t consumed)1660 int Http3Upstream::resume_read(IOCtrlReason reason, Downstream *downstream,
1661 size_t consumed) {
1662 consume(downstream->get_stream_id(), consumed);
1663
1664 auto &req = downstream->request();
1665
1666 req.consume(consumed);
1667
1668 handler_->signal_write();
1669
1670 return 0;
1671 }
1672
send_reply(Downstream * downstream,const uint8_t * body,size_t bodylen)1673 int Http3Upstream::send_reply(Downstream *downstream, const uint8_t *body,
1674 size_t bodylen) {
1675 int rv;
1676
1677 nghttp3_data_reader data_read, *data_read_ptr = nullptr;
1678
1679 const auto &req = downstream->request();
1680
1681 if (req.method != HTTP_HEAD && bodylen) {
1682 data_read.read_data = downstream_read_data_callback;
1683 data_read_ptr = &data_read;
1684
1685 auto buf = downstream->get_response_buf();
1686
1687 buf->append(body, bodylen);
1688 }
1689
1690 const auto &resp = downstream->response();
1691 auto config = get_config();
1692 auto &httpconf = config->http;
1693
1694 auto &balloc = downstream->get_block_allocator();
1695
1696 const auto &headers = resp.fs.headers();
1697 auto nva = std::vector<nghttp3_nv>();
1698 // 2 for :status and server
1699 nva.reserve(2 + headers.size() + httpconf.add_response_headers.size());
1700
1701 auto response_status = http2::stringify_status(balloc, resp.http_status);
1702
1703 nva.push_back(http3::make_field(":status"_sr, response_status));
1704
1705 for (auto &kv : headers) {
1706 if (kv.name.empty() || kv.name[0] == ':') {
1707 continue;
1708 }
1709 switch (kv.token) {
1710 case http2::HD_CONNECTION:
1711 case http2::HD_KEEP_ALIVE:
1712 case http2::HD_PROXY_CONNECTION:
1713 case http2::HD_TE:
1714 case http2::HD_TRANSFER_ENCODING:
1715 case http2::HD_UPGRADE:
1716 continue;
1717 }
1718 nva.push_back(
1719 http3::make_field(kv.name, kv.value, http3::never_index(kv.no_index)));
1720 }
1721
1722 if (!resp.fs.header(http2::HD_SERVER)) {
1723 nva.push_back(http3::make_field("server"_sr, config->http.server_name));
1724 }
1725
1726 for (auto &p : httpconf.add_response_headers) {
1727 nva.push_back(http3::make_field(p.name, p.value));
1728 }
1729
1730 rv = nghttp3_conn_submit_response(httpconn_, downstream->get_stream_id(),
1731 nva.data(), nva.size(), data_read_ptr);
1732 if (nghttp3_err_is_fatal(rv)) {
1733 ULOG(FATAL, this) << "nghttp3_conn_submit_response() failed: "
1734 << nghttp3_strerror(rv);
1735 return -1;
1736 }
1737
1738 downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1739
1740 if (data_read_ptr) {
1741 downstream->reset_upstream_wtimer();
1742 }
1743
1744 if (shutdown_stream_read(downstream->get_stream_id(), NGHTTP3_H3_NO_ERROR) !=
1745 0) {
1746 return -1;
1747 }
1748
1749 return 0;
1750 }
1751
initiate_push(Downstream * downstream,const StringRef & uri)1752 int Http3Upstream::initiate_push(Downstream *downstream, const StringRef &uri) {
1753 return 0;
1754 }
1755
response_riovec(struct iovec * iov,int iovcnt) const1756 int Http3Upstream::response_riovec(struct iovec *iov, int iovcnt) const {
1757 return 0;
1758 }
1759
response_drain(size_t n)1760 void Http3Upstream::response_drain(size_t n) {}
1761
response_empty() const1762 bool Http3Upstream::response_empty() const { return false; }
1763
1764 Downstream *
on_downstream_push_promise(Downstream * downstream,int32_t promised_stream_id)1765 Http3Upstream::on_downstream_push_promise(Downstream *downstream,
1766 int32_t promised_stream_id) {
1767 return nullptr;
1768 }
1769
on_downstream_push_promise_complete(Downstream * downstream,Downstream * promised_downstream)1770 int Http3Upstream::on_downstream_push_promise_complete(
1771 Downstream *downstream, Downstream *promised_downstream) {
1772 return 0;
1773 }
1774
push_enabled() const1775 bool Http3Upstream::push_enabled() const { return false; }
1776
cancel_premature_downstream(Downstream * promised_downstream)1777 void Http3Upstream::cancel_premature_downstream(
1778 Downstream *promised_downstream) {}
1779
on_read(const UpstreamAddr * faddr,const Address & remote_addr,const Address & local_addr,const ngtcp2_pkt_info & pi,std::span<const uint8_t> data)1780 int Http3Upstream::on_read(const UpstreamAddr *faddr,
1781 const Address &remote_addr,
1782 const Address &local_addr, const ngtcp2_pkt_info &pi,
1783 std::span<const uint8_t> data) {
1784 int rv;
1785
1786 auto path = ngtcp2_path{
1787 {
1788 const_cast<sockaddr *>(&local_addr.su.sa),
1789 static_cast<socklen_t>(local_addr.len),
1790 },
1791 {
1792 const_cast<sockaddr *>(&remote_addr.su.sa),
1793 static_cast<socklen_t>(remote_addr.len),
1794 },
1795 const_cast<UpstreamAddr *>(faddr),
1796 };
1797
1798 rv = ngtcp2_conn_read_pkt(conn_, &path, &pi, data.data(), data.size(),
1799 quic_timestamp());
1800 if (rv != 0) {
1801 switch (rv) {
1802 case NGTCP2_ERR_DRAINING:
1803 return -1;
1804 case NGTCP2_ERR_RETRY: {
1805 auto worker = handler_->get_worker();
1806 auto quic_conn_handler = worker->get_quic_connection_handler();
1807
1808 if (worker->get_graceful_shutdown()) {
1809 ngtcp2_ccerr_set_transport_error(&last_error_,
1810 NGTCP2_CONNECTION_REFUSED, nullptr, 0);
1811
1812 return handle_error();
1813 }
1814
1815 ngtcp2_version_cid vc;
1816
1817 rv = ngtcp2_pkt_decode_version_cid(&vc, data.data(), data.size(),
1818 SHRPX_QUIC_SCIDLEN);
1819 if (rv != 0) {
1820 return -1;
1821 }
1822
1823 // Overwrite error if any is set
1824 ngtcp2_ccerr_set_liberr(&last_error_, rv, nullptr, 0);
1825
1826 quic_conn_handler->send_retry(
1827 handler_->get_upstream_addr(), vc.version, {vc.dcid, vc.dcidlen},
1828 {vc.scid, vc.scidlen}, remote_addr, local_addr, data.size() * 3);
1829
1830 return -1;
1831 }
1832 case NGTCP2_ERR_CRYPTO:
1833 if (!last_error_.error_code) {
1834 ngtcp2_ccerr_set_tls_alert(
1835 &last_error_, ngtcp2_conn_get_tls_alert(conn_), nullptr, 0);
1836 }
1837 break;
1838 case NGTCP2_ERR_DROP_CONN:
1839 // Overwrite error if any is set
1840 ngtcp2_ccerr_set_liberr(&last_error_, rv, nullptr, 0);
1841
1842 return -1;
1843 default:
1844 if (!last_error_.error_code) {
1845 ngtcp2_ccerr_set_liberr(&last_error_, rv, nullptr, 0);
1846 }
1847 }
1848
1849 ULOG(ERROR, this) << "ngtcp2_conn_read_pkt: " << ngtcp2_strerror(rv);
1850
1851 return handle_error();
1852 }
1853
1854 return 0;
1855 }
1856
1857 std::pair<std::span<const uint8_t>, int>
send_packet(const UpstreamAddr * faddr,const sockaddr * remote_sa,size_t remote_salen,const sockaddr * local_sa,size_t local_salen,const ngtcp2_pkt_info & pi,std::span<const uint8_t> data,size_t gso_size)1858 Http3Upstream::send_packet(const UpstreamAddr *faddr, const sockaddr *remote_sa,
1859 size_t remote_salen, const sockaddr *local_sa,
1860 size_t local_salen, const ngtcp2_pkt_info &pi,
1861 std::span<const uint8_t> data, size_t gso_size) {
1862 if (tx_.no_gso) {
1863 for (; !data.empty();) {
1864 auto len = std::min(gso_size, data.size());
1865 auto rv =
1866 quic_send_packet(faddr, remote_sa, remote_salen, local_sa,
1867 local_salen, pi, {std::begin(data), len}, gso_size);
1868 if (rv != 0) {
1869 switch (rv) {
1870 case -EAGAIN:
1871 #if EAGAIN != EWOULDBLOCK
1872 case -EWOULDBLOCK:
1873 #endif // EAGAIN != EWOULDBLOCK
1874 return {data, SHRPX_ERR_SEND_BLOCKED};
1875 default:
1876 return {data, -1};
1877 }
1878 }
1879
1880 data = data.subspan(len);
1881 }
1882
1883 return {{}, 0};
1884 }
1885
1886 auto rv = quic_send_packet(faddr, remote_sa, remote_salen, local_sa,
1887 local_salen, pi, data, gso_size);
1888 switch (rv) {
1889 case 0:
1890 return {{}, 0};
1891 // With GSO, sendmsg may fail with EINVAL if UDP payload is too
1892 // large.
1893 case -EINVAL:
1894 case -EMSGSIZE:
1895 // Let the packet lost.
1896 break;
1897 case -EAGAIN:
1898 #if EAGAIN != EWOULDBLOCK
1899 case -EWOULDBLOCK:
1900 #endif // EAGAIN != EWOULDBLOCK
1901 return {data, SHRPX_ERR_SEND_BLOCKED};
1902 case -EIO:
1903 if (tx_.no_gso) {
1904 break;
1905 }
1906
1907 tx_.no_gso = true;
1908
1909 return send_packet(faddr, remote_sa, remote_salen, local_sa, local_salen,
1910 pi, data, gso_size);
1911 default:
1912 break;
1913 }
1914
1915 return {{}, -1};
1916 }
1917
on_send_blocked(const UpstreamAddr * faddr,const ngtcp2_addr & remote_addr,const ngtcp2_addr & local_addr,const ngtcp2_pkt_info & pi,std::span<const uint8_t> data,size_t gso_size)1918 void Http3Upstream::on_send_blocked(const UpstreamAddr *faddr,
1919 const ngtcp2_addr &remote_addr,
1920 const ngtcp2_addr &local_addr,
1921 const ngtcp2_pkt_info &pi,
1922 std::span<const uint8_t> data,
1923 size_t gso_size) {
1924 assert(tx_.num_blocked || !tx_.send_blocked);
1925 assert(tx_.num_blocked < 2);
1926 assert(gso_size);
1927
1928 tx_.send_blocked = true;
1929
1930 auto &p = tx_.blocked[tx_.num_blocked++];
1931
1932 memcpy(&p.local_addr.su, local_addr.addr, local_addr.addrlen);
1933 memcpy(&p.remote_addr.su, remote_addr.addr, remote_addr.addrlen);
1934
1935 p.local_addr.len = local_addr.addrlen;
1936 p.remote_addr.len = remote_addr.addrlen;
1937 p.faddr = faddr;
1938 p.pi = pi;
1939 p.data = data;
1940 p.gso_size = gso_size;
1941 }
1942
send_blocked_packet()1943 int Http3Upstream::send_blocked_packet() {
1944 assert(tx_.send_blocked);
1945
1946 for (; tx_.num_blocked_sent < tx_.num_blocked; ++tx_.num_blocked_sent) {
1947 auto &p = tx_.blocked[tx_.num_blocked_sent];
1948
1949 auto [rest, rv] = send_packet(p.faddr, &p.remote_addr.su.sa,
1950 p.remote_addr.len, &p.local_addr.su.sa,
1951 p.local_addr.len, p.pi, p.data, p.gso_size);
1952 if (rv == SHRPX_ERR_SEND_BLOCKED) {
1953 p.data = rest;
1954
1955 signal_write_upstream_addr(p.faddr);
1956
1957 return 0;
1958 }
1959 }
1960
1961 tx_.send_blocked = false;
1962 tx_.num_blocked = 0;
1963 tx_.num_blocked_sent = 0;
1964
1965 return 0;
1966 }
1967
signal_write_upstream_addr(const UpstreamAddr * faddr)1968 void Http3Upstream::signal_write_upstream_addr(const UpstreamAddr *faddr) {
1969 auto conn = handler_->get_connection();
1970
1971 if (faddr->fd != conn->wev.fd) {
1972 if (ev_is_active(&conn->wev)) {
1973 ev_io_stop(handler_->get_loop(), &conn->wev);
1974 }
1975
1976 ev_io_set(&conn->wev, faddr->fd, EV_WRITE);
1977 }
1978
1979 conn->wlimit.startw();
1980 }
1981
handle_error()1982 int Http3Upstream::handle_error() {
1983 if (ngtcp2_conn_in_closing_period(conn_) ||
1984 ngtcp2_conn_in_draining_period(conn_)) {
1985 return -1;
1986 }
1987
1988 ngtcp2_path_storage ps;
1989 ngtcp2_pkt_info pi;
1990
1991 ngtcp2_path_storage_zero(&ps);
1992
1993 auto ts = quic_timestamp();
1994
1995 conn_close_.resize(SHRPX_QUIC_CONN_CLOSE_PKTLEN);
1996
1997 auto nwrite = ngtcp2_conn_write_connection_close(
1998 conn_, &ps.path, &pi, conn_close_.data(), conn_close_.size(),
1999 &last_error_, ts);
2000 if (nwrite < 0) {
2001 ULOG(ERROR, this) << "ngtcp2_conn_write_connection_close: "
2002 << ngtcp2_strerror(nwrite);
2003 return -1;
2004 }
2005
2006 conn_close_.resize(nwrite);
2007
2008 if (nwrite == 0) {
2009 return -1;
2010 }
2011
2012 send_packet(static_cast<UpstreamAddr *>(ps.path.user_data),
2013 ps.path.remote.addr, ps.path.remote.addrlen, ps.path.local.addr,
2014 ps.path.local.addrlen, pi, conn_close_, conn_close_.size());
2015
2016 return -1;
2017 }
2018
handle_expiry()2019 int Http3Upstream::handle_expiry() {
2020 int rv;
2021
2022 auto ts = quic_timestamp();
2023
2024 rv = ngtcp2_conn_handle_expiry(conn_, ts);
2025 if (rv != 0) {
2026 if (rv == NGTCP2_ERR_IDLE_CLOSE) {
2027 ULOG(INFO, this) << "Idle connection timeout";
2028 } else {
2029 ULOG(ERROR, this) << "ngtcp2_conn_handle_expiry: " << ngtcp2_strerror(rv);
2030 }
2031 ngtcp2_ccerr_set_liberr(&last_error_, rv, nullptr, 0);
2032 return handle_error();
2033 }
2034
2035 return 0;
2036 }
2037
reset_timer()2038 void Http3Upstream::reset_timer() {
2039 auto ts = quic_timestamp();
2040 auto expiry_ts = ngtcp2_conn_get_expiry(conn_);
2041 auto loop = handler_->get_loop();
2042
2043 if (expiry_ts <= ts) {
2044 ev_feed_event(loop, &timer_, EV_TIMER);
2045 return;
2046 }
2047
2048 timer_.repeat = static_cast<ev_tstamp>(expiry_ts - ts) / NGTCP2_SECONDS;
2049
2050 ev_timer_again(loop, &timer_);
2051 }
2052
2053 namespace {
http_deferred_consume(nghttp3_conn * conn,int64_t stream_id,size_t nconsumed,void * user_data,void * stream_user_data)2054 int http_deferred_consume(nghttp3_conn *conn, int64_t stream_id,
2055 size_t nconsumed, void *user_data,
2056 void *stream_user_data) {
2057 auto upstream = static_cast<Http3Upstream *>(user_data);
2058
2059 upstream->consume(stream_id, nconsumed);
2060
2061 return 0;
2062 }
2063 } // namespace
2064
2065 namespace {
http_acked_stream_data(nghttp3_conn * conn,int64_t stream_id,uint64_t datalen,void * user_data,void * stream_user_data)2066 int http_acked_stream_data(nghttp3_conn *conn, int64_t stream_id,
2067 uint64_t datalen, void *user_data,
2068 void *stream_user_data) {
2069 auto upstream = static_cast<Http3Upstream *>(user_data);
2070 auto downstream = static_cast<Downstream *>(stream_user_data);
2071
2072 assert(downstream);
2073
2074 if (upstream->http_acked_stream_data(downstream, datalen) != 0) {
2075 return NGHTTP3_ERR_CALLBACK_FAILURE;
2076 }
2077
2078 return 0;
2079 }
2080 } // namespace
2081
http_acked_stream_data(Downstream * downstream,uint64_t datalen)2082 int Http3Upstream::http_acked_stream_data(Downstream *downstream,
2083 uint64_t datalen) {
2084 if (LOG_ENABLED(INFO)) {
2085 ULOG(INFO, this) << "Stream " << downstream->get_stream_id() << " "
2086 << datalen << " bytes acknowledged";
2087 }
2088
2089 auto body = downstream->get_response_buf();
2090 auto drained = body->drain_mark(datalen);
2091 (void)drained;
2092
2093 assert(datalen == drained);
2094
2095 if (downstream->resume_read(SHRPX_NO_BUFFER, datalen) != 0) {
2096 return -1;
2097 }
2098
2099 return 0;
2100 }
2101
2102 namespace {
http_begin_request_headers(nghttp3_conn * conn,int64_t stream_id,void * user_data,void * stream_user_data)2103 int http_begin_request_headers(nghttp3_conn *conn, int64_t stream_id,
2104 void *user_data, void *stream_user_data) {
2105 if (!ngtcp2_is_bidi_stream(stream_id)) {
2106 return 0;
2107 }
2108
2109 auto upstream = static_cast<Http3Upstream *>(user_data);
2110 upstream->http_begin_request_headers(stream_id);
2111
2112 return 0;
2113 }
2114 } // namespace
2115
2116 namespace {
http_recv_request_header(nghttp3_conn * conn,int64_t stream_id,int32_t token,nghttp3_rcbuf * name,nghttp3_rcbuf * value,uint8_t flags,void * user_data,void * stream_user_data)2117 int http_recv_request_header(nghttp3_conn *conn, int64_t stream_id,
2118 int32_t token, nghttp3_rcbuf *name,
2119 nghttp3_rcbuf *value, uint8_t flags,
2120 void *user_data, void *stream_user_data) {
2121 auto upstream = static_cast<Http3Upstream *>(user_data);
2122 auto downstream = static_cast<Downstream *>(stream_user_data);
2123
2124 if (!downstream || downstream->get_stop_reading()) {
2125 return 0;
2126 }
2127
2128 if (upstream->http_recv_request_header(downstream, token, name, value, flags,
2129 /* trailer = */ false) != 0) {
2130 return NGHTTP3_ERR_CALLBACK_FAILURE;
2131 }
2132
2133 return 0;
2134 }
2135 } // namespace
2136
2137 namespace {
http_recv_request_trailer(nghttp3_conn * conn,int64_t stream_id,int32_t token,nghttp3_rcbuf * name,nghttp3_rcbuf * value,uint8_t flags,void * user_data,void * stream_user_data)2138 int http_recv_request_trailer(nghttp3_conn *conn, int64_t stream_id,
2139 int32_t token, nghttp3_rcbuf *name,
2140 nghttp3_rcbuf *value, uint8_t flags,
2141 void *user_data, void *stream_user_data) {
2142 auto upstream = static_cast<Http3Upstream *>(user_data);
2143 auto downstream = static_cast<Downstream *>(stream_user_data);
2144
2145 if (!downstream || downstream->get_stop_reading()) {
2146 return 0;
2147 }
2148
2149 if (upstream->http_recv_request_header(downstream, token, name, value, flags,
2150 /* trailer = */ true) != 0) {
2151 return NGHTTP3_ERR_CALLBACK_FAILURE;
2152 }
2153
2154 return 0;
2155 }
2156 } // namespace
2157
http_recv_request_header(Downstream * downstream,int32_t h3token,nghttp3_rcbuf * name,nghttp3_rcbuf * value,uint8_t flags,bool trailer)2158 int Http3Upstream::http_recv_request_header(Downstream *downstream,
2159 int32_t h3token,
2160 nghttp3_rcbuf *name,
2161 nghttp3_rcbuf *value, uint8_t flags,
2162 bool trailer) {
2163 auto namebuf = nghttp3_rcbuf_get_buf(name);
2164 auto valuebuf = nghttp3_rcbuf_get_buf(value);
2165 auto &req = downstream->request();
2166 auto config = get_config();
2167 auto &httpconf = config->http;
2168
2169 if (req.fs.buffer_size() + namebuf.len + valuebuf.len >
2170 httpconf.request_header_field_buffer ||
2171 req.fs.num_fields() >= httpconf.max_request_header_fields) {
2172 downstream->set_stop_reading(true);
2173
2174 if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
2175 return 0;
2176 }
2177
2178 if (LOG_ENABLED(INFO)) {
2179 ULOG(INFO, this) << "Too large or many header field size="
2180 << req.fs.buffer_size() + namebuf.len + valuebuf.len
2181 << ", num=" << req.fs.num_fields() + 1;
2182 }
2183
2184 // just ignore if this is a trailer part.
2185 if (trailer) {
2186 if (shutdown_stream_read(downstream->get_stream_id(),
2187 NGHTTP3_H3_NO_ERROR) != 0) {
2188 return -1;
2189 }
2190
2191 return 0;
2192 }
2193
2194 if (error_reply(downstream, 431) != 0) {
2195 return -1;
2196 }
2197
2198 return 0;
2199 }
2200
2201 auto nameref = StringRef{namebuf.base, namebuf.len};
2202 auto valueref = StringRef{valuebuf.base, valuebuf.len};
2203 auto token = http2::lookup_token(nameref);
2204 auto no_index = flags & NGHTTP3_NV_FLAG_NEVER_INDEX;
2205
2206 downstream->add_rcbuf(name);
2207 downstream->add_rcbuf(value);
2208
2209 if (trailer) {
2210 req.fs.add_trailer_token(nameref, valueref, no_index, token);
2211 return 0;
2212 }
2213
2214 req.fs.add_header_token(nameref, valueref, no_index, token);
2215 return 0;
2216 }
2217
2218 namespace {
http_end_request_headers(nghttp3_conn * conn,int64_t stream_id,int fin,void * user_data,void * stream_user_data)2219 int http_end_request_headers(nghttp3_conn *conn, int64_t stream_id, int fin,
2220 void *user_data, void *stream_user_data) {
2221 auto upstream = static_cast<Http3Upstream *>(user_data);
2222 auto downstream = static_cast<Downstream *>(stream_user_data);
2223
2224 if (!downstream || downstream->get_stop_reading()) {
2225 return 0;
2226 }
2227
2228 if (upstream->http_end_request_headers(downstream, fin) != 0) {
2229 return NGHTTP3_ERR_CALLBACK_FAILURE;
2230 }
2231
2232 downstream->reset_upstream_rtimer();
2233 downstream->stop_header_timer();
2234
2235 return 0;
2236 }
2237 } // namespace
2238
http_end_request_headers(Downstream * downstream,int fin)2239 int Http3Upstream::http_end_request_headers(Downstream *downstream, int fin) {
2240 auto lgconf = log_config();
2241 lgconf->update_tstamp(std::chrono::system_clock::now());
2242 auto &req = downstream->request();
2243 req.tstamp = lgconf->tstamp;
2244
2245 if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
2246 return 0;
2247 }
2248
2249 auto &nva = req.fs.headers();
2250
2251 if (LOG_ENABLED(INFO)) {
2252 std::stringstream ss;
2253 for (auto &nv : nva) {
2254 if (nv.name == "authorization"_sr) {
2255 ss << TTY_HTTP_HD << nv.name << TTY_RST << ": <redacted>\n";
2256 continue;
2257 }
2258 ss << TTY_HTTP_HD << nv.name << TTY_RST << ": " << nv.value << "\n";
2259 }
2260 ULOG(INFO, this) << "HTTP request headers. stream_id="
2261 << downstream->get_stream_id() << "\n"
2262 << ss.str();
2263 }
2264
2265 auto content_length = req.fs.header(http2::HD_CONTENT_LENGTH);
2266 if (content_length) {
2267 // libnghttp3 guarantees this can be parsed
2268 req.fs.content_length =
2269 util::parse_uint(content_length->value).value_or(-1);
2270 }
2271
2272 // presence of mandatory header fields are guaranteed by libnghttp3.
2273 auto authority = req.fs.header(http2::HD__AUTHORITY);
2274 auto path = req.fs.header(http2::HD__PATH);
2275 auto method = req.fs.header(http2::HD__METHOD);
2276 auto scheme = req.fs.header(http2::HD__SCHEME);
2277
2278 auto method_token = http2::lookup_method_token(method->value);
2279 if (method_token == -1) {
2280 if (error_reply(downstream, 501) != 0) {
2281 return -1;
2282 }
2283 return 0;
2284 }
2285
2286 auto faddr = handler_->get_upstream_addr();
2287
2288 auto config = get_config();
2289
2290 // For HTTP/2 proxy, we require :authority.
2291 if (method_token != HTTP_CONNECT && config->http2_proxy &&
2292 faddr->alt_mode == UpstreamAltMode::NONE && !authority) {
2293 shutdown_stream(downstream, NGHTTP3_H3_GENERAL_PROTOCOL_ERROR);
2294 return 0;
2295 }
2296
2297 req.method = method_token;
2298 if (scheme) {
2299 req.scheme = scheme->value;
2300 }
2301
2302 // nghttp2 library guarantees either :authority or host exist
2303 if (!authority) {
2304 req.no_authority = true;
2305 authority = req.fs.header(http2::HD_HOST);
2306 }
2307
2308 if (authority) {
2309 req.authority = authority->value;
2310 }
2311
2312 if (path) {
2313 if (method_token == HTTP_OPTIONS && path->value == "*"_sr) {
2314 // Server-wide OPTIONS request. Path is empty.
2315 } else if (config->http2_proxy &&
2316 faddr->alt_mode == UpstreamAltMode::NONE) {
2317 req.path = path->value;
2318 } else {
2319 req.path = http2::rewrite_clean_path(downstream->get_block_allocator(),
2320 path->value);
2321 }
2322 }
2323
2324 auto connect_proto = req.fs.header(http2::HD__PROTOCOL);
2325 if (connect_proto) {
2326 if (connect_proto->value != "websocket"_sr) {
2327 if (error_reply(downstream, 400) != 0) {
2328 return -1;
2329 }
2330 return 0;
2331 }
2332 req.connect_proto = ConnectProto::WEBSOCKET;
2333 }
2334
2335 if (!fin) {
2336 req.http2_expect_body = true;
2337 } else if (req.fs.content_length == -1) {
2338 req.fs.content_length = 0;
2339 }
2340
2341 downstream->inspect_http2_request();
2342
2343 downstream->set_request_state(DownstreamState::HEADER_COMPLETE);
2344
2345 if (config->http.require_http_scheme &&
2346 !http::check_http_scheme(req.scheme, /* encrypted = */ true)) {
2347 if (error_reply(downstream, 400) != 0) {
2348 return -1;
2349 }
2350 }
2351
2352 #ifdef HAVE_MRUBY
2353 auto worker = handler_->get_worker();
2354 auto mruby_ctx = worker->get_mruby_context();
2355
2356 if (mruby_ctx->run_on_request_proc(downstream) != 0) {
2357 if (error_reply(downstream, 500) != 0) {
2358 return -1;
2359 }
2360 return 0;
2361 }
2362 #endif // HAVE_MRUBY
2363
2364 if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
2365 return 0;
2366 }
2367
2368 start_downstream(downstream);
2369
2370 return 0;
2371 }
2372
start_downstream(Downstream * downstream)2373 void Http3Upstream::start_downstream(Downstream *downstream) {
2374 if (downstream_queue_.can_activate(downstream->request().authority)) {
2375 initiate_downstream(downstream);
2376 return;
2377 }
2378
2379 downstream_queue_.mark_blocked(downstream);
2380 }
2381
initiate_downstream(Downstream * downstream)2382 void Http3Upstream::initiate_downstream(Downstream *downstream) {
2383 int rv;
2384
2385 #ifdef HAVE_MRUBY
2386 DownstreamConnection *dconn_ptr;
2387 #endif // HAVE_MRUBY
2388
2389 for (;;) {
2390 auto dconn = handler_->get_downstream_connection(rv, downstream);
2391 if (!dconn) {
2392 if (rv == SHRPX_ERR_TLS_REQUIRED) {
2393 assert(0);
2394 abort();
2395 }
2396
2397 rv = error_reply(downstream, 502);
2398 if (rv != 0) {
2399 shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2400 }
2401
2402 downstream->set_request_state(DownstreamState::CONNECT_FAIL);
2403 downstream_queue_.mark_failure(downstream);
2404
2405 return;
2406 }
2407
2408 #ifdef HAVE_MRUBY
2409 dconn_ptr = dconn.get();
2410 #endif // HAVE_MRUBY
2411 rv = downstream->attach_downstream_connection(std::move(dconn));
2412 if (rv == 0) {
2413 break;
2414 }
2415 }
2416
2417 #ifdef HAVE_MRUBY
2418 const auto &group = dconn_ptr->get_downstream_addr_group();
2419 if (group) {
2420 const auto &mruby_ctx = group->shared_addr->mruby_ctx;
2421 if (mruby_ctx->run_on_request_proc(downstream) != 0) {
2422 if (error_reply(downstream, 500) != 0) {
2423 shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2424 }
2425
2426 downstream_queue_.mark_failure(downstream);
2427
2428 return;
2429 }
2430
2431 if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
2432 return;
2433 }
2434 }
2435 #endif // HAVE_MRUBY
2436
2437 rv = downstream->push_request_headers();
2438 if (rv != 0) {
2439
2440 if (error_reply(downstream, 502) != 0) {
2441 shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2442 }
2443
2444 downstream_queue_.mark_failure(downstream);
2445
2446 return;
2447 }
2448
2449 downstream_queue_.mark_active(downstream);
2450
2451 auto &req = downstream->request();
2452 if (!req.http2_expect_body) {
2453 rv = downstream->end_upload_data();
2454 if (rv != 0) {
2455 shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2456 }
2457 }
2458 }
2459
2460 namespace {
http_recv_data(nghttp3_conn * conn,int64_t stream_id,const uint8_t * data,size_t datalen,void * user_data,void * stream_user_data)2461 int http_recv_data(nghttp3_conn *conn, int64_t stream_id, const uint8_t *data,
2462 size_t datalen, void *user_data, void *stream_user_data) {
2463 auto upstream = static_cast<Http3Upstream *>(user_data);
2464 auto downstream = static_cast<Downstream *>(stream_user_data);
2465
2466 if (upstream->http_recv_data(downstream, {data, datalen}) != 0) {
2467 return NGHTTP3_ERR_CALLBACK_FAILURE;
2468 }
2469
2470 return 0;
2471 }
2472 } // namespace
2473
http_recv_data(Downstream * downstream,std::span<const uint8_t> data)2474 int Http3Upstream::http_recv_data(Downstream *downstream,
2475 std::span<const uint8_t> data) {
2476 downstream->reset_upstream_rtimer();
2477
2478 if (downstream->push_upload_data_chunk(data.data(), data.size()) != 0) {
2479 if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE) {
2480 shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2481 }
2482
2483 consume(downstream->get_stream_id(), data.size());
2484
2485 return 0;
2486 }
2487
2488 return 0;
2489 }
2490
2491 namespace {
http_end_stream(nghttp3_conn * conn,int64_t stream_id,void * user_data,void * stream_user_data)2492 int http_end_stream(nghttp3_conn *conn, int64_t stream_id, void *user_data,
2493 void *stream_user_data) {
2494 auto upstream = static_cast<Http3Upstream *>(user_data);
2495 auto downstream = static_cast<Downstream *>(stream_user_data);
2496
2497 if (!downstream || downstream->get_stop_reading()) {
2498 return 0;
2499 }
2500
2501 if (upstream->http_end_stream(downstream) != 0) {
2502 return NGHTTP3_ERR_CALLBACK_FAILURE;
2503 }
2504
2505 return 0;
2506 }
2507 } // namespace
2508
http_end_stream(Downstream * downstream)2509 int Http3Upstream::http_end_stream(Downstream *downstream) {
2510 downstream->disable_upstream_rtimer();
2511
2512 if (downstream->end_upload_data() != 0) {
2513 if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE) {
2514 shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2515 }
2516 }
2517
2518 downstream->set_request_state(DownstreamState::MSG_COMPLETE);
2519
2520 return 0;
2521 }
2522
2523 namespace {
http_stream_close(nghttp3_conn * conn,int64_t stream_id,uint64_t app_error_code,void * conn_user_data,void * stream_user_data)2524 int http_stream_close(nghttp3_conn *conn, int64_t stream_id,
2525 uint64_t app_error_code, void *conn_user_data,
2526 void *stream_user_data) {
2527 auto upstream = static_cast<Http3Upstream *>(conn_user_data);
2528 auto downstream = static_cast<Downstream *>(stream_user_data);
2529
2530 if (!downstream) {
2531 return 0;
2532 }
2533
2534 if (upstream->http_stream_close(downstream, app_error_code) != 0) {
2535 return NGHTTP3_ERR_CALLBACK_FAILURE;
2536 }
2537
2538 return 0;
2539 }
2540 } // namespace
2541
http_stream_close(Downstream * downstream,uint64_t app_error_code)2542 int Http3Upstream::http_stream_close(Downstream *downstream,
2543 uint64_t app_error_code) {
2544 auto stream_id = downstream->get_stream_id();
2545
2546 if (LOG_ENABLED(INFO)) {
2547 ULOG(INFO, this) << "Stream stream_id=" << stream_id
2548 << " is being closed with app_error_code="
2549 << app_error_code;
2550
2551 auto body = downstream->get_response_buf();
2552
2553 ULOG(INFO, this) << "response unacked_left=" << body->rleft()
2554 << " not_sent=" << body->rleft_mark();
2555 }
2556
2557 auto &req = downstream->request();
2558
2559 consume(stream_id, req.unconsumed_body_length);
2560
2561 req.unconsumed_body_length = 0;
2562
2563 ngtcp2_conn_extend_max_streams_bidi(conn_, 1);
2564
2565 if (downstream->get_request_state() == DownstreamState::CONNECT_FAIL) {
2566 remove_downstream(downstream);
2567 // downstream was deleted
2568
2569 return 0;
2570 }
2571
2572 if (downstream->can_detach_downstream_connection()) {
2573 // Keep-alive
2574 downstream->detach_downstream_connection();
2575 }
2576
2577 downstream->set_request_state(DownstreamState::STREAM_CLOSED);
2578
2579 // At this point, downstream read may be paused.
2580
2581 // If shrpx_downstream::push_request_headers() failed, the
2582 // error is handled here.
2583 remove_downstream(downstream);
2584 // downstream was deleted
2585
2586 return 0;
2587 }
2588
2589 namespace {
http_stop_sending(nghttp3_conn * conn,int64_t stream_id,uint64_t app_error_code,void * user_data,void * stream_user_data)2590 int http_stop_sending(nghttp3_conn *conn, int64_t stream_id,
2591 uint64_t app_error_code, void *user_data,
2592 void *stream_user_data) {
2593 auto upstream = static_cast<Http3Upstream *>(user_data);
2594
2595 if (upstream->http_stop_sending(stream_id, app_error_code) != 0) {
2596 return NGHTTP3_ERR_CALLBACK_FAILURE;
2597 }
2598
2599 return 0;
2600 }
2601 } // namespace
2602
http_stop_sending(int64_t stream_id,uint64_t app_error_code)2603 int Http3Upstream::http_stop_sending(int64_t stream_id,
2604 uint64_t app_error_code) {
2605 auto rv =
2606 ngtcp2_conn_shutdown_stream_read(conn_, 0, stream_id, app_error_code);
2607 if (ngtcp2_err_is_fatal(rv)) {
2608 ULOG(ERROR, this) << "ngtcp2_conn_shutdown_stream_read: "
2609 << ngtcp2_strerror(rv);
2610 return -1;
2611 }
2612
2613 return 0;
2614 }
2615
2616 namespace {
http_reset_stream(nghttp3_conn * conn,int64_t stream_id,uint64_t app_error_code,void * user_data,void * stream_user_data)2617 int http_reset_stream(nghttp3_conn *conn, int64_t stream_id,
2618 uint64_t app_error_code, void *user_data,
2619 void *stream_user_data) {
2620 auto upstream = static_cast<Http3Upstream *>(user_data);
2621
2622 if (upstream->http_reset_stream(stream_id, app_error_code) != 0) {
2623 return NGHTTP3_ERR_CALLBACK_FAILURE;
2624 }
2625
2626 return 0;
2627 }
2628 } // namespace
2629
http_reset_stream(int64_t stream_id,uint64_t app_error_code)2630 int Http3Upstream::http_reset_stream(int64_t stream_id,
2631 uint64_t app_error_code) {
2632 auto rv =
2633 ngtcp2_conn_shutdown_stream_write(conn_, 0, stream_id, app_error_code);
2634 if (ngtcp2_err_is_fatal(rv)) {
2635 ULOG(ERROR, this) << "ngtcp2_conn_shutdown_stream_write: "
2636 << ngtcp2_strerror(rv);
2637 return -1;
2638 }
2639
2640 return 0;
2641 }
2642
setup_httpconn()2643 int Http3Upstream::setup_httpconn() {
2644 int rv;
2645
2646 if (ngtcp2_conn_get_streams_uni_left(conn_) < 3) {
2647 return -1;
2648 }
2649
2650 nghttp3_callbacks callbacks{
2651 shrpx::http_acked_stream_data,
2652 shrpx::http_stream_close,
2653 shrpx::http_recv_data,
2654 http_deferred_consume,
2655 shrpx::http_begin_request_headers,
2656 shrpx::http_recv_request_header,
2657 shrpx::http_end_request_headers,
2658 nullptr, // begin_trailers
2659 shrpx::http_recv_request_trailer,
2660 nullptr, // end_trailers
2661 shrpx::http_stop_sending,
2662 shrpx::http_end_stream,
2663 shrpx::http_reset_stream,
2664 };
2665
2666 auto config = get_config();
2667
2668 nghttp3_settings settings;
2669 nghttp3_settings_default(&settings);
2670 settings.qpack_max_dtable_capacity = 4_k;
2671
2672 if (!config->http2_proxy) {
2673 settings.enable_connect_protocol = 1;
2674 }
2675
2676 auto mem = nghttp3_mem_default();
2677
2678 rv = nghttp3_conn_server_new(&httpconn_, &callbacks, &settings, mem, this);
2679 if (rv != 0) {
2680 ULOG(ERROR, this) << "nghttp3_conn_server_new: " << nghttp3_strerror(rv);
2681 return -1;
2682 }
2683
2684 auto params = ngtcp2_conn_get_local_transport_params(conn_);
2685
2686 nghttp3_conn_set_max_client_streams_bidi(httpconn_,
2687 params->initial_max_streams_bidi);
2688
2689 int64_t ctrl_stream_id;
2690
2691 rv = ngtcp2_conn_open_uni_stream(conn_, &ctrl_stream_id, nullptr);
2692 if (rv != 0) {
2693 ULOG(ERROR, this) << "ngtcp2_conn_open_uni_stream: " << ngtcp2_strerror(rv);
2694 return -1;
2695 }
2696
2697 rv = nghttp3_conn_bind_control_stream(httpconn_, ctrl_stream_id);
2698 if (rv != 0) {
2699 ULOG(ERROR, this) << "nghttp3_conn_bind_control_stream: "
2700 << nghttp3_strerror(rv);
2701 return -1;
2702 }
2703
2704 int64_t qpack_enc_stream_id, qpack_dec_stream_id;
2705
2706 rv = ngtcp2_conn_open_uni_stream(conn_, &qpack_enc_stream_id, nullptr);
2707 if (rv != 0) {
2708 ULOG(ERROR, this) << "ngtcp2_conn_open_uni_stream: " << ngtcp2_strerror(rv);
2709 return -1;
2710 }
2711
2712 rv = ngtcp2_conn_open_uni_stream(conn_, &qpack_dec_stream_id, nullptr);
2713 if (rv != 0) {
2714 ULOG(ERROR, this) << "ngtcp2_conn_open_uni_stream: " << ngtcp2_strerror(rv);
2715 return -1;
2716 }
2717
2718 rv = nghttp3_conn_bind_qpack_streams(httpconn_, qpack_enc_stream_id,
2719 qpack_dec_stream_id);
2720 if (rv != 0) {
2721 ULOG(ERROR, this) << "nghttp3_conn_bind_qpack_streams: "
2722 << nghttp3_strerror(rv);
2723 return -1;
2724 }
2725
2726 return 0;
2727 }
2728
error_reply(Downstream * downstream,unsigned int status_code)2729 int Http3Upstream::error_reply(Downstream *downstream,
2730 unsigned int status_code) {
2731 int rv;
2732 auto &resp = downstream->response();
2733
2734 auto &balloc = downstream->get_block_allocator();
2735
2736 auto html = http::create_error_html(balloc, status_code);
2737 resp.http_status = status_code;
2738
2739 nghttp3_data_reader data_read, *data_read_ptr = nullptr;
2740
2741 const auto &req = downstream->request();
2742
2743 if (req.method != HTTP_HEAD) {
2744 data_read.read_data = downstream_read_data_callback;
2745 data_read_ptr = &data_read;
2746
2747 auto body = downstream->get_response_buf();
2748
2749 body->append(html);
2750 }
2751
2752 downstream->set_response_state(DownstreamState::MSG_COMPLETE);
2753
2754 auto lgconf = log_config();
2755 lgconf->update_tstamp(std::chrono::system_clock::now());
2756
2757 auto response_status = http2::stringify_status(balloc, status_code);
2758 auto content_length = util::make_string_ref_uint(balloc, html.size());
2759 auto date = make_string_ref(balloc, lgconf->tstamp->time_http);
2760
2761 auto nva = std::to_array(
2762 {http3::make_field(":status"_sr, response_status),
2763 http3::make_field("content-type"_sr, "text/html; charset=UTF-8"_sr),
2764 http3::make_field("server"_sr, get_config()->http.server_name),
2765 http3::make_field("content-length"_sr, content_length),
2766 http3::make_field("date"_sr, date)});
2767
2768 rv = nghttp3_conn_submit_response(httpconn_, downstream->get_stream_id(),
2769 nva.data(), nva.size(), data_read_ptr);
2770 if (nghttp3_err_is_fatal(rv)) {
2771 ULOG(FATAL, this) << "nghttp3_conn_submit_response() failed: "
2772 << nghttp3_strerror(rv);
2773 return -1;
2774 }
2775
2776 downstream->reset_upstream_wtimer();
2777
2778 if (shutdown_stream_read(downstream->get_stream_id(), NGHTTP3_H3_NO_ERROR) !=
2779 0) {
2780 return -1;
2781 }
2782
2783 return 0;
2784 }
2785
shutdown_stream(Downstream * downstream,uint64_t app_error_code)2786 int Http3Upstream::shutdown_stream(Downstream *downstream,
2787 uint64_t app_error_code) {
2788 auto stream_id = downstream->get_stream_id();
2789
2790 if (LOG_ENABLED(INFO)) {
2791 ULOG(INFO, this) << "Shutdown stream_id=" << stream_id
2792 << " with app_error_code=" << app_error_code;
2793 }
2794
2795 auto rv = ngtcp2_conn_shutdown_stream(conn_, 0, stream_id, app_error_code);
2796 if (rv != 0) {
2797 ULOG(FATAL, this) << "ngtcp2_conn_shutdown_stream() failed: "
2798 << ngtcp2_strerror(rv);
2799 return -1;
2800 }
2801
2802 return 0;
2803 }
2804
shutdown_stream_read(int64_t stream_id,uint64_t app_error_code)2805 int Http3Upstream::shutdown_stream_read(int64_t stream_id,
2806 uint64_t app_error_code) {
2807 auto rv = ngtcp2_conn_shutdown_stream_read(conn_, 0, stream_id,
2808 NGHTTP3_H3_NO_ERROR);
2809 if (ngtcp2_err_is_fatal(rv)) {
2810 ULOG(FATAL, this) << "ngtcp2_conn_shutdown_stream_read: "
2811 << ngtcp2_strerror(rv);
2812 return -1;
2813 }
2814
2815 return 0;
2816 }
2817
consume(int64_t stream_id,size_t nconsumed)2818 void Http3Upstream::consume(int64_t stream_id, size_t nconsumed) {
2819 ngtcp2_conn_extend_max_stream_offset(conn_, stream_id, nconsumed);
2820 ngtcp2_conn_extend_max_offset(conn_, nconsumed);
2821 }
2822
remove_downstream(Downstream * downstream)2823 void Http3Upstream::remove_downstream(Downstream *downstream) {
2824 if (downstream->accesslog_ready()) {
2825 handler_->write_accesslog(downstream);
2826 }
2827
2828 nghttp3_conn_set_stream_user_data(httpconn_, downstream->get_stream_id(),
2829 nullptr);
2830
2831 auto next_downstream = downstream_queue_.remove_and_get_blocked(downstream);
2832
2833 if (next_downstream) {
2834 initiate_downstream(next_downstream);
2835 }
2836
2837 if (downstream_queue_.get_downstreams() == nullptr) {
2838 // There is no downstream at the moment. Start idle timer now.
2839 handler_->repeat_read_timer();
2840 }
2841 }
2842
log_response_headers(Downstream * downstream,const std::vector<nghttp3_nv> & nva) const2843 void Http3Upstream::log_response_headers(
2844 Downstream *downstream, const std::vector<nghttp3_nv> &nva) const {
2845 std::stringstream ss;
2846 for (auto &nv : nva) {
2847 ss << TTY_HTTP_HD << StringRef{nv.name, nv.namelen} << TTY_RST << ": "
2848 << StringRef{nv.value, nv.valuelen} << "\n";
2849 }
2850 ULOG(INFO, this) << "HTTP response headers. stream_id="
2851 << downstream->get_stream_id() << "\n"
2852 << ss.str();
2853 }
2854
check_shutdown()2855 int Http3Upstream::check_shutdown() {
2856 auto worker = handler_->get_worker();
2857
2858 if (!worker->get_graceful_shutdown()) {
2859 return 0;
2860 }
2861
2862 ev_prepare_stop(handler_->get_loop(), &prep_);
2863
2864 return start_graceful_shutdown();
2865 }
2866
start_graceful_shutdown()2867 int Http3Upstream::start_graceful_shutdown() {
2868 int rv;
2869
2870 if (ev_is_active(&shutdown_timer_)) {
2871 return 0;
2872 }
2873
2874 if (!httpconn_) {
2875 return -1;
2876 }
2877
2878 rv = nghttp3_conn_submit_shutdown_notice(httpconn_);
2879 if (rv != 0) {
2880 ULOG(FATAL, this) << "nghttp3_conn_submit_shutdown_notice: "
2881 << nghttp3_strerror(rv);
2882 return -1;
2883 }
2884
2885 handler_->signal_write();
2886
2887 auto t = ngtcp2_conn_get_pto(conn_);
2888
2889 ev_timer_set(&shutdown_timer_, static_cast<ev_tstamp>(t * 3) / NGTCP2_SECONDS,
2890 0.);
2891 ev_timer_start(handler_->get_loop(), &shutdown_timer_);
2892
2893 return 0;
2894 }
2895
submit_goaway()2896 int Http3Upstream::submit_goaway() {
2897 int rv;
2898
2899 rv = nghttp3_conn_shutdown(httpconn_);
2900 if (rv != 0) {
2901 ULOG(FATAL, this) << "nghttp3_conn_shutdown: " << nghttp3_strerror(rv);
2902 return -1;
2903 }
2904
2905 handler_->signal_write();
2906
2907 return 0;
2908 }
2909
open_qlog_file(const StringRef & dir,const ngtcp2_cid & scid) const2910 int Http3Upstream::open_qlog_file(const StringRef &dir,
2911 const ngtcp2_cid &scid) const {
2912 std::array<char, sizeof("20141115T125824.741+0900")> buf;
2913
2914 auto path = std::string{dir};
2915 path += '/';
2916 path +=
2917 util::format_iso8601_basic(buf.data(), std::chrono::system_clock::now());
2918 path += '-';
2919 path += util::format_hex(std::span{scid.data, scid.datalen});
2920 path += ".sqlog";
2921
2922 int fd;
2923
2924 #ifdef O_CLOEXEC
2925 while ((fd = open(path.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_CLOEXEC,
2926 S_IRUSR | S_IWUSR | S_IRGRP)) == -1 &&
2927 errno == EINTR)
2928 ;
2929 #else // !O_CLOEXEC
2930 while ((fd = open(path.c_str(), O_WRONLY | O_CREAT | O_TRUNC,
2931 S_IRUSR | S_IWUSR | S_IRGRP)) == -1 &&
2932 errno == EINTR)
2933 ;
2934
2935 if (fd != -1) {
2936 util::make_socket_closeonexec(fd);
2937 }
2938 #endif // !O_CLOEXEC
2939
2940 if (fd == -1) {
2941 auto error = errno;
2942 ULOG(ERROR, this) << "Failed to open qlog file " << path
2943 << ": errno=" << error;
2944 return -1;
2945 }
2946
2947 return fd;
2948 }
2949
get_conn() const2950 ngtcp2_conn *Http3Upstream::get_conn() const { return conn_; }
2951
2952 } // namespace shrpx
2953