1 /*
2 * nghttp2 - HTTP/2 C Library
3 *
4 * Copyright (c) 2021 Tatsuhiro Tsujikawa
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining
7 * a copy of this software and associated documentation files (the
8 * "Software"), to deal in the Software without restriction, including
9 * without limitation the rights to use, copy, modify, merge, publish,
10 * distribute, sublicense, and/or sell copies of the Software, and to
11 * permit persons to whom the Software is furnished to do so, subject to
12 * the following conditions:
13 *
14 * The above copyright notice and this permission notice shall be
15 * included in all copies or substantial portions of the Software.
16 *
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24 */
25 #include "shrpx_http3_upstream.h"
26
27 #include <sys/types.h>
28 #include <sys/stat.h>
29 #include <fcntl.h>
30 #include <netinet/udp.h>
31
32 #include <cstdio>
33
34 #include <ngtcp2/ngtcp2_crypto.h>
35
36 #include "shrpx_client_handler.h"
37 #include "shrpx_downstream.h"
38 #include "shrpx_downstream_connection.h"
39 #include "shrpx_log.h"
40 #include "shrpx_quic.h"
41 #include "shrpx_worker.h"
42 #include "shrpx_http.h"
43 #include "shrpx_connection_handler.h"
44 #ifdef HAVE_MRUBY
45 # include "shrpx_mruby.h"
46 #endif // HAVE_MRUBY
47 #include "http3.h"
48 #include "util.h"
49 #include "ssl_compat.h"
50
51 namespace shrpx {
52
53 namespace {
timeoutcb(struct ev_loop * loop,ev_timer * w,int revents)54 void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
55 auto upstream = static_cast<Http3Upstream *>(w->data);
56
57 if (upstream->handle_expiry() != 0 || upstream->on_write() != 0) {
58 goto fail;
59 }
60
61 return;
62
63 fail:
64 auto handler = upstream->get_client_handler();
65
66 delete handler;
67 }
68 } // namespace
69
70 namespace {
shutdown_timeout_cb(struct ev_loop * loop,ev_timer * w,int revents)71 void shutdown_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
72 auto upstream = static_cast<Http3Upstream *>(w->data);
73 auto handler = upstream->get_client_handler();
74
75 if (upstream->submit_goaway() != 0) {
76 delete handler;
77 }
78 }
79 } // namespace
80
81 namespace {
prepare_cb(struct ev_loop * loop,ev_prepare * w,int revent)82 void prepare_cb(struct ev_loop *loop, ev_prepare *w, int revent) {
83 auto upstream = static_cast<Http3Upstream *>(w->data);
84 auto handler = upstream->get_client_handler();
85
86 if (upstream->check_shutdown() != 0) {
87 delete handler;
88 }
89 }
90 } // namespace
91
92 namespace {
downstream_queue_size(Worker * worker)93 size_t downstream_queue_size(Worker *worker) {
94 auto &downstreamconf = *worker->get_downstream_config();
95
96 if (get_config()->http2_proxy) {
97 return downstreamconf.connections_per_host;
98 }
99
100 return downstreamconf.connections_per_frontend;
101 }
102 } // namespace
103
104 namespace {
get_conn(ngtcp2_crypto_conn_ref * conn_ref)105 ngtcp2_conn *get_conn(ngtcp2_crypto_conn_ref *conn_ref) {
106 auto conn = static_cast<Connection *>(conn_ref->user_data);
107 auto handler = static_cast<ClientHandler *>(conn->data);
108 auto upstream = static_cast<Http3Upstream *>(handler->get_upstream());
109 return upstream->get_conn();
110 }
111 } // namespace
112
Http3Upstream(ClientHandler * handler)113 Http3Upstream::Http3Upstream(ClientHandler *handler)
114 : handler_{handler},
115 qlog_fd_{-1},
116 hashed_scid_{},
117 conn_{nullptr},
118 httpconn_{nullptr},
119 downstream_queue_{downstream_queue_size(handler->get_worker()),
120 !get_config()->http2_proxy},
121 tx_{
122 .data = std::unique_ptr<uint8_t[]>(new uint8_t[64_k]),
123 #ifndef UDP_SEGMENT
124 .no_gso = true,
125 #endif // UDP_SEGMENT
126 } {
127 auto conn = handler_->get_connection();
128 conn->conn_ref.get_conn = shrpx::get_conn;
129
130 ev_timer_init(&timer_, timeoutcb, 0., 0.);
131 timer_.data = this;
132
133 ngtcp2_ccerr_default(&last_error_);
134
135 ev_timer_init(&shutdown_timer_, shutdown_timeout_cb, 0., 0.);
136 shutdown_timer_.data = this;
137
138 ev_prepare_init(&prep_, prepare_cb);
139 prep_.data = this;
140 ev_prepare_start(handler_->get_loop(), &prep_);
141 }
142
~Http3Upstream()143 Http3Upstream::~Http3Upstream() {
144 auto loop = handler_->get_loop();
145
146 ev_prepare_stop(loop, &prep_);
147 ev_timer_stop(loop, &shutdown_timer_);
148 ev_timer_stop(loop, &timer_);
149
150 nghttp3_conn_del(httpconn_);
151
152 ngtcp2_conn_del(conn_);
153
154 if (qlog_fd_ != -1) {
155 close(qlog_fd_);
156 }
157 }
158
159 namespace {
log_printf(void * user_data,const char * fmt,...)160 void log_printf(void *user_data, const char *fmt, ...) {
161 va_list ap;
162 std::array<char, 4096> buf;
163
164 va_start(ap, fmt);
165 auto nwrite = vsnprintf(buf.data(), buf.size(), fmt, ap);
166 va_end(ap);
167
168 if (static_cast<size_t>(nwrite) >= buf.size()) {
169 nwrite = buf.size() - 1;
170 }
171
172 buf[nwrite++] = '\n';
173
174 while (write(fileno(stderr), buf.data(), nwrite) == -1 && errno == EINTR)
175 ;
176 }
177 } // namespace
178
179 namespace {
qlog_write(void * user_data,uint32_t flags,const void * data,size_t datalen)180 void qlog_write(void *user_data, uint32_t flags, const void *data,
181 size_t datalen) {
182 auto upstream = static_cast<Http3Upstream *>(user_data);
183
184 upstream->qlog_write(data, datalen, flags & NGTCP2_QLOG_WRITE_FLAG_FIN);
185 }
186 } // namespace
187
qlog_write(const void * data,size_t datalen,bool fin)188 void Http3Upstream::qlog_write(const void *data, size_t datalen, bool fin) {
189 assert(qlog_fd_ != -1);
190
191 while (write(qlog_fd_, data, datalen) == -1 && errno == EINTR)
192 ;
193
194 if (fin) {
195 close(qlog_fd_);
196 qlog_fd_ = -1;
197 }
198 }
199
200 namespace {
rand(uint8_t * dest,size_t destlen,const ngtcp2_rand_ctx * rand_ctx)201 void rand(uint8_t *dest, size_t destlen, const ngtcp2_rand_ctx *rand_ctx) {
202 util::random_bytes(dest, dest + destlen,
203 *static_cast<std::mt19937 *>(rand_ctx->native_handle));
204 }
205 } // namespace
206
207 namespace {
get_new_connection_id(ngtcp2_conn * conn,ngtcp2_cid * cid,uint8_t * token,size_t cidlen,void * user_data)208 int get_new_connection_id(ngtcp2_conn *conn, ngtcp2_cid *cid, uint8_t *token,
209 size_t cidlen, void *user_data) {
210 auto upstream = static_cast<Http3Upstream *>(user_data);
211 auto handler = upstream->get_client_handler();
212 auto worker = handler->get_worker();
213 auto conn_handler = worker->get_connection_handler();
214 auto &qkms = conn_handler->get_quic_keying_materials();
215 auto &qkm = qkms->keying_materials.front();
216
217 assert(SHRPX_QUIC_SCIDLEN == cidlen);
218
219 if (generate_quic_connection_id(*cid, worker->get_worker_id(), qkm.id,
220 qkm.cid_encryption_ctx) != 0) {
221 return NGTCP2_ERR_CALLBACK_FAILURE;
222 }
223
224 if (generate_quic_stateless_reset_token(token, *cid, qkm.secret.data(),
225 qkm.secret.size()) != 0) {
226 return NGTCP2_ERR_CALLBACK_FAILURE;
227 }
228
229 auto quic_connection_handler = worker->get_quic_connection_handler();
230
231 quic_connection_handler->add_connection_id(*cid, handler);
232
233 return 0;
234 }
235 } // namespace
236
237 namespace {
remove_connection_id(ngtcp2_conn * conn,const ngtcp2_cid * cid,void * user_data)238 int remove_connection_id(ngtcp2_conn *conn, const ngtcp2_cid *cid,
239 void *user_data) {
240 auto upstream = static_cast<Http3Upstream *>(user_data);
241 auto handler = upstream->get_client_handler();
242 auto worker = handler->get_worker();
243 auto quic_conn_handler = worker->get_quic_connection_handler();
244
245 quic_conn_handler->remove_connection_id(*cid);
246
247 return 0;
248 }
249 } // namespace
250
http_begin_request_headers(int64_t stream_id)251 void Http3Upstream::http_begin_request_headers(int64_t stream_id) {
252 auto downstream =
253 std::make_unique<Downstream>(this, handler_->get_mcpool(), stream_id);
254 nghttp3_conn_set_stream_user_data(httpconn_, stream_id, downstream.get());
255
256 downstream->reset_upstream_rtimer();
257 downstream->repeat_header_timer();
258
259 handler_->stop_read_timer();
260
261 auto &req = downstream->request();
262 req.http_major = 3;
263 req.http_minor = 0;
264
265 add_pending_downstream(std::move(downstream));
266 }
267
add_pending_downstream(std::unique_ptr<Downstream> downstream)268 void Http3Upstream::add_pending_downstream(
269 std::unique_ptr<Downstream> downstream) {
270 downstream_queue_.add_pending(std::move(downstream));
271 }
272
273 namespace {
recv_stream_data(ngtcp2_conn * conn,uint32_t flags,int64_t stream_id,uint64_t offset,const uint8_t * data,size_t datalen,void * user_data,void * stream_user_data)274 int recv_stream_data(ngtcp2_conn *conn, uint32_t flags, int64_t stream_id,
275 uint64_t offset, const uint8_t *data, size_t datalen,
276 void *user_data, void *stream_user_data) {
277 auto upstream = static_cast<Http3Upstream *>(user_data);
278
279 if (upstream->recv_stream_data(flags, stream_id, {data, datalen}) != 0) {
280 return NGTCP2_ERR_CALLBACK_FAILURE;
281 }
282
283 return 0;
284 }
285 } // namespace
286
recv_stream_data(uint32_t flags,int64_t stream_id,std::span<const uint8_t> data)287 int Http3Upstream::recv_stream_data(uint32_t flags, int64_t stream_id,
288 std::span<const uint8_t> data) {
289 assert(httpconn_);
290
291 auto nconsumed =
292 nghttp3_conn_read_stream(httpconn_, stream_id, data.data(), data.size(),
293 flags & NGTCP2_STREAM_DATA_FLAG_FIN);
294 if (nconsumed < 0) {
295 ULOG(ERROR, this) << "nghttp3_conn_read_stream: "
296 << nghttp3_strerror(nconsumed);
297 ngtcp2_ccerr_set_application_error(
298 &last_error_, nghttp3_err_infer_quic_app_error_code(nconsumed), nullptr,
299 0);
300 return -1;
301 }
302
303 ngtcp2_conn_extend_max_stream_offset(conn_, stream_id, nconsumed);
304 ngtcp2_conn_extend_max_offset(conn_, nconsumed);
305
306 return 0;
307 }
308
309 namespace {
stream_close(ngtcp2_conn * conn,uint32_t flags,int64_t stream_id,uint64_t app_error_code,void * user_data,void * stream_user_data)310 int stream_close(ngtcp2_conn *conn, uint32_t flags, int64_t stream_id,
311 uint64_t app_error_code, void *user_data,
312 void *stream_user_data) {
313 auto upstream = static_cast<Http3Upstream *>(user_data);
314
315 if (!(flags & NGTCP2_STREAM_CLOSE_FLAG_APP_ERROR_CODE_SET)) {
316 app_error_code = NGHTTP3_H3_NO_ERROR;
317 }
318
319 if (upstream->stream_close(stream_id, app_error_code) != 0) {
320 return NGTCP2_ERR_CALLBACK_FAILURE;
321 }
322
323 return 0;
324 }
325 } // namespace
326
stream_close(int64_t stream_id,uint64_t app_error_code)327 int Http3Upstream::stream_close(int64_t stream_id, uint64_t app_error_code) {
328 if (!httpconn_) {
329 return 0;
330 }
331
332 auto rv = nghttp3_conn_close_stream(httpconn_, stream_id, app_error_code);
333 switch (rv) {
334 case 0:
335 break;
336 case NGHTTP3_ERR_STREAM_NOT_FOUND:
337 if (ngtcp2_is_bidi_stream(stream_id)) {
338 ngtcp2_conn_extend_max_streams_bidi(conn_, 1);
339 }
340 break;
341 default:
342 ULOG(ERROR, this) << "nghttp3_conn_close_stream: " << nghttp3_strerror(rv);
343 ngtcp2_ccerr_set_application_error(
344 &last_error_, nghttp3_err_infer_quic_app_error_code(rv), nullptr, 0);
345 return -1;
346 }
347
348 return 0;
349 }
350
351 namespace {
acked_stream_data_offset(ngtcp2_conn * conn,int64_t stream_id,uint64_t offset,uint64_t datalen,void * user_data,void * stream_user_data)352 int acked_stream_data_offset(ngtcp2_conn *conn, int64_t stream_id,
353 uint64_t offset, uint64_t datalen, void *user_data,
354 void *stream_user_data) {
355 auto upstream = static_cast<Http3Upstream *>(user_data);
356
357 if (upstream->acked_stream_data_offset(stream_id, datalen) != 0) {
358 return NGTCP2_ERR_CALLBACK_FAILURE;
359 }
360
361 return 0;
362 }
363 } // namespace
364
acked_stream_data_offset(int64_t stream_id,uint64_t datalen)365 int Http3Upstream::acked_stream_data_offset(int64_t stream_id,
366 uint64_t datalen) {
367 if (!httpconn_) {
368 return 0;
369 }
370
371 auto rv = nghttp3_conn_add_ack_offset(httpconn_, stream_id, datalen);
372 if (rv != 0) {
373 ULOG(ERROR, this) << "nghttp3_conn_add_ack_offset: "
374 << nghttp3_strerror(rv);
375 return -1;
376 }
377
378 return 0;
379 }
380
381 namespace {
extend_max_stream_data(ngtcp2_conn * conn,int64_t stream_id,uint64_t max_data,void * user_data,void * stream_user_data)382 int extend_max_stream_data(ngtcp2_conn *conn, int64_t stream_id,
383 uint64_t max_data, void *user_data,
384 void *stream_user_data) {
385 auto upstream = static_cast<Http3Upstream *>(user_data);
386
387 if (upstream->extend_max_stream_data(stream_id) != 0) {
388 return NGTCP2_ERR_CALLBACK_FAILURE;
389 }
390
391 return 0;
392 }
393 } // namespace
394
extend_max_stream_data(int64_t stream_id)395 int Http3Upstream::extend_max_stream_data(int64_t stream_id) {
396 if (!httpconn_) {
397 return 0;
398 }
399
400 auto rv = nghttp3_conn_unblock_stream(httpconn_, stream_id);
401 if (rv != 0) {
402 ULOG(ERROR, this) << "nghttp3_conn_unblock_stream: "
403 << nghttp3_strerror(rv);
404 return -1;
405 }
406
407 return 0;
408 }
409
410 namespace {
extend_max_remote_streams_bidi(ngtcp2_conn * conn,uint64_t max_streams,void * user_data)411 int extend_max_remote_streams_bidi(ngtcp2_conn *conn, uint64_t max_streams,
412 void *user_data) {
413 auto upstream = static_cast<Http3Upstream *>(user_data);
414
415 upstream->extend_max_remote_streams_bidi(max_streams);
416
417 return 0;
418 }
419 } // namespace
420
extend_max_remote_streams_bidi(uint64_t max_streams)421 void Http3Upstream::extend_max_remote_streams_bidi(uint64_t max_streams) {
422 nghttp3_conn_set_max_client_streams_bidi(httpconn_, max_streams);
423 }
424
425 namespace {
stream_reset(ngtcp2_conn * conn,int64_t stream_id,uint64_t final_size,uint64_t app_error_code,void * user_data,void * stream_user_data)426 int stream_reset(ngtcp2_conn *conn, int64_t stream_id, uint64_t final_size,
427 uint64_t app_error_code, void *user_data,
428 void *stream_user_data) {
429 auto upstream = static_cast<Http3Upstream *>(user_data);
430
431 if (upstream->http_shutdown_stream_read(stream_id) != 0) {
432 return NGTCP2_ERR_CALLBACK_FAILURE;
433 }
434
435 return 0;
436 }
437 } // namespace
438
http_shutdown_stream_read(int64_t stream_id)439 int Http3Upstream::http_shutdown_stream_read(int64_t stream_id) {
440 if (!httpconn_) {
441 return 0;
442 }
443
444 auto rv = nghttp3_conn_shutdown_stream_read(httpconn_, stream_id);
445 if (rv != 0) {
446 ULOG(ERROR, this) << "nghttp3_conn_shutdown_stream_read: "
447 << nghttp3_strerror(rv);
448 return -1;
449 }
450
451 return 0;
452 }
453
454 namespace {
stream_stop_sending(ngtcp2_conn * conn,int64_t stream_id,uint64_t app_error_code,void * user_data,void * stream_user_data)455 int stream_stop_sending(ngtcp2_conn *conn, int64_t stream_id,
456 uint64_t app_error_code, void *user_data,
457 void *stream_user_data) {
458 auto upstream = static_cast<Http3Upstream *>(user_data);
459
460 if (upstream->http_shutdown_stream_read(stream_id) != 0) {
461 return NGTCP2_ERR_CALLBACK_FAILURE;
462 }
463
464 return 0;
465 }
466 } // namespace
467
468 namespace {
handshake_completed(ngtcp2_conn * conn,void * user_data)469 int handshake_completed(ngtcp2_conn *conn, void *user_data) {
470 auto upstream = static_cast<Http3Upstream *>(user_data);
471
472 if (upstream->handshake_completed() != 0) {
473 return NGTCP2_ERR_CALLBACK_FAILURE;
474 }
475
476 return 0;
477 }
478 } // namespace
479
handshake_completed()480 int Http3Upstream::handshake_completed() {
481 handler_->set_alpn_from_conn();
482
483 auto alpn = handler_->get_alpn();
484 if (alpn.empty()) {
485 ULOG(ERROR, this) << "NO ALPN was negotiated";
486 return -1;
487 }
488
489 auto path = ngtcp2_conn_get_path(conn_);
490
491 return send_new_token(&path->remote);
492 }
493
494 namespace {
path_validation(ngtcp2_conn * conn,uint32_t flags,const ngtcp2_path * path,const ngtcp2_path * old_path,ngtcp2_path_validation_result res,void * user_data)495 int path_validation(ngtcp2_conn *conn, uint32_t flags, const ngtcp2_path *path,
496 const ngtcp2_path *old_path,
497 ngtcp2_path_validation_result res, void *user_data) {
498 if (res != NGTCP2_PATH_VALIDATION_RESULT_SUCCESS ||
499 !(flags & NGTCP2_PATH_VALIDATION_FLAG_NEW_TOKEN)) {
500 return 0;
501 }
502
503 auto upstream = static_cast<Http3Upstream *>(user_data);
504 if (upstream->send_new_token(&path->remote) != 0) {
505 return NGTCP2_ERR_CALLBACK_FAILURE;
506 }
507
508 return 0;
509 }
510 } // namespace
511
send_new_token(const ngtcp2_addr * remote_addr)512 int Http3Upstream::send_new_token(const ngtcp2_addr *remote_addr) {
513 auto worker = handler_->get_worker();
514 auto conn_handler = worker->get_connection_handler();
515 auto &qkms = conn_handler->get_quic_keying_materials();
516 auto &qkm = qkms->keying_materials.front();
517
518 std::array<uint8_t, NGTCP2_CRYPTO_MAX_REGULAR_TOKENLEN + 1> tokenbuf;
519
520 auto token = generate_token(tokenbuf, remote_addr->addr, remote_addr->addrlen,
521 qkm.secret, qkm.id);
522 if (!token) {
523 return -1;
524 }
525
526 assert(token->size() == NGTCP2_CRYPTO_MAX_REGULAR_TOKENLEN + 1);
527
528 auto rv = ngtcp2_conn_submit_new_token(conn_, token->data(), token->size());
529 if (rv != 0) {
530 ULOG(ERROR, this) << "ngtcp2_conn_submit_new_token: "
531 << ngtcp2_strerror(rv);
532 return -1;
533 }
534
535 return 0;
536 }
537
538 namespace {
recv_tx_key(ngtcp2_conn * conn,ngtcp2_encryption_level level,void * user_data)539 int recv_tx_key(ngtcp2_conn *conn, ngtcp2_encryption_level level,
540 void *user_data) {
541 if (level != NGTCP2_ENCRYPTION_LEVEL_1RTT) {
542 return 0;
543 }
544
545 auto upstream = static_cast<Http3Upstream *>(user_data);
546 if (upstream->setup_httpconn() != 0) {
547 return NGTCP2_ERR_CALLBACK_FAILURE;
548 }
549
550 return 0;
551 }
552 } // namespace
553
init(const UpstreamAddr * faddr,const Address & remote_addr,const Address & local_addr,const ngtcp2_pkt_hd & initial_hd,const ngtcp2_cid * odcid,std::span<const uint8_t> token,ngtcp2_token_type token_type)554 int Http3Upstream::init(const UpstreamAddr *faddr, const Address &remote_addr,
555 const Address &local_addr,
556 const ngtcp2_pkt_hd &initial_hd,
557 const ngtcp2_cid *odcid, std::span<const uint8_t> token,
558 ngtcp2_token_type token_type) {
559 int rv;
560
561 auto worker = handler_->get_worker();
562 auto conn_handler = worker->get_connection_handler();
563
564 auto callbacks = ngtcp2_callbacks{
565 nullptr, // client_initial
566 ngtcp2_crypto_recv_client_initial_cb,
567 ngtcp2_crypto_recv_crypto_data_cb,
568 shrpx::handshake_completed,
569 nullptr, // recv_version_negotiation
570 ngtcp2_crypto_encrypt_cb,
571 ngtcp2_crypto_decrypt_cb,
572 ngtcp2_crypto_hp_mask_cb,
573 shrpx::recv_stream_data,
574 shrpx::acked_stream_data_offset,
575 nullptr, // stream_open
576 shrpx::stream_close,
577 nullptr, // recv_stateless_reset
578 nullptr, // recv_retry
579 nullptr, // extend_max_local_streams_bidi
580 nullptr, // extend_max_local_streams_uni
581 rand,
582 get_new_connection_id,
583 remove_connection_id,
584 ngtcp2_crypto_update_key_cb,
585 shrpx::path_validation,
586 nullptr, // select_preferred_addr
587 shrpx::stream_reset,
588 shrpx::extend_max_remote_streams_bidi,
589 nullptr, // extend_max_remote_streams_uni
590 shrpx::extend_max_stream_data,
591 nullptr, // dcid_status
592 nullptr, // handshake_confirmed
593 nullptr, // recv_new_token
594 ngtcp2_crypto_delete_crypto_aead_ctx_cb,
595 ngtcp2_crypto_delete_crypto_cipher_ctx_cb,
596 nullptr, // recv_datagram
597 nullptr, // ack_datagram
598 nullptr, // lost_datagram
599 ngtcp2_crypto_get_path_challenge_data_cb,
600 shrpx::stream_stop_sending,
601 nullptr, // version_negotiation
602 nullptr, // recv_rx_key
603 shrpx::recv_tx_key,
604 };
605
606 auto config = get_config();
607 auto &quicconf = config->quic;
608 auto &http3conf = config->http3;
609
610 auto &qkms = conn_handler->get_quic_keying_materials();
611 auto &qkm = qkms->keying_materials.front();
612
613 ngtcp2_cid scid;
614
615 if (generate_quic_connection_id(scid, worker->get_worker_id(), qkm.id,
616 qkm.cid_encryption_ctx) != 0) {
617 return -1;
618 }
619
620 ngtcp2_settings settings;
621 ngtcp2_settings_default(&settings);
622 if (quicconf.upstream.debug.log) {
623 settings.log_printf = log_printf;
624 }
625
626 if (!quicconf.upstream.qlog.dir.empty()) {
627 auto fd = open_qlog_file(quicconf.upstream.qlog.dir, scid);
628 if (fd != -1) {
629 qlog_fd_ = fd;
630 settings.qlog_write = shrpx::qlog_write;
631 }
632 }
633
634 settings.initial_ts = quic_timestamp();
635 settings.initial_rtt =
636 static_cast<ngtcp2_tstamp>(quicconf.upstream.initial_rtt * NGTCP2_SECONDS);
637 settings.cc_algo = quicconf.upstream.congestion_controller;
638 settings.max_window = http3conf.upstream.max_connection_window_size;
639 settings.max_stream_window = http3conf.upstream.max_window_size;
640 settings.rand_ctx.native_handle = &worker->get_randgen();
641 settings.token = token.data();
642 settings.tokenlen = token.size();
643 settings.token_type = token_type;
644 settings.initial_pkt_num = std::uniform_int_distribution<uint32_t>(
645 0, std::numeric_limits<int32_t>::max())(worker->get_randgen());
646
647 ngtcp2_transport_params params;
648 ngtcp2_transport_params_default(¶ms);
649 params.initial_max_streams_bidi = http3conf.upstream.max_concurrent_streams;
650 // The minimum number of unidirectional streams required for HTTP/3.
651 params.initial_max_streams_uni = 3;
652 params.initial_max_data = http3conf.upstream.connection_window_size;
653 params.initial_max_stream_data_bidi_remote = http3conf.upstream.window_size;
654 params.initial_max_stream_data_uni = http3conf.upstream.window_size;
655 params.max_idle_timeout =
656 static_cast<ngtcp2_tstamp>(quicconf.upstream.timeout.idle * NGTCP2_SECONDS);
657
658 #ifdef NGHTTP2_OPENSSL_IS_BORINGSSL
659 if (quicconf.upstream.early_data) {
660 ngtcp2_transport_params early_data_params;
661
662 ngtcp2_transport_params_default(&early_data_params);
663
664 early_data_params.initial_max_stream_data_bidi_local =
665 params.initial_max_stream_data_bidi_local;
666 early_data_params.initial_max_stream_data_bidi_remote =
667 params.initial_max_stream_data_bidi_remote;
668 early_data_params.initial_max_stream_data_uni =
669 params.initial_max_stream_data_uni;
670 early_data_params.initial_max_data = params.initial_max_data;
671 early_data_params.initial_max_streams_bidi =
672 params.initial_max_streams_bidi;
673 early_data_params.initial_max_streams_uni = params.initial_max_streams_uni;
674
675 // TODO include HTTP/3 SETTINGS
676
677 std::array<uint8_t, 128> quic_early_data_ctx;
678
679 auto quic_early_data_ctxlen = ngtcp2_transport_params_encode(
680 quic_early_data_ctx.data(), quic_early_data_ctx.size(),
681 &early_data_params);
682
683 assert(quic_early_data_ctxlen > 0);
684 assert(static_cast<size_t>(quic_early_data_ctxlen) <=
685 quic_early_data_ctx.size());
686
687 if (SSL_set_quic_early_data_context(handler_->get_ssl(),
688 quic_early_data_ctx.data(),
689 quic_early_data_ctxlen) != 1) {
690 ULOG(ERROR, this) << "SSL_set_quic_early_data_context failed";
691 return -1;
692 }
693 }
694 #endif // NGHTTP2_OPENSSL_IS_BORINGSSL
695
696 if (odcid) {
697 params.original_dcid = *odcid;
698 params.retry_scid = initial_hd.dcid;
699 params.retry_scid_present = 1;
700 } else {
701 params.original_dcid = initial_hd.dcid;
702 }
703
704 params.original_dcid_present = 1;
705
706 rv = generate_quic_stateless_reset_token(
707 params.stateless_reset_token, scid, qkm.secret.data(), qkm.secret.size());
708 if (rv != 0) {
709 ULOG(ERROR, this) << "generate_quic_stateless_reset_token failed";
710 return -1;
711 }
712 params.stateless_reset_token_present = 1;
713
714 auto path = ngtcp2_path{
715 {
716 const_cast<sockaddr *>(&local_addr.su.sa),
717 static_cast<socklen_t>(local_addr.len),
718 },
719 {
720 const_cast<sockaddr *>(&remote_addr.su.sa),
721 static_cast<socklen_t>(remote_addr.len),
722 },
723 const_cast<UpstreamAddr *>(faddr),
724 };
725
726 rv = ngtcp2_conn_server_new(&conn_, &initial_hd.scid, &scid, &path,
727 initial_hd.version, &callbacks, &settings,
728 ¶ms, nullptr, this);
729 if (rv != 0) {
730 ULOG(ERROR, this) << "ngtcp2_conn_server_new: " << ngtcp2_strerror(rv);
731 return -1;
732 }
733
734 ngtcp2_conn_set_tls_native_handle(conn_, handler_->get_ssl());
735
736 auto quic_connection_handler = worker->get_quic_connection_handler();
737
738 if (generate_quic_hashed_connection_id(hashed_scid_, remote_addr, local_addr,
739 initial_hd.dcid) != 0) {
740 return -1;
741 }
742
743 quic_connection_handler->add_connection_id(hashed_scid_, handler_);
744 quic_connection_handler->add_connection_id(scid, handler_);
745
746 return 0;
747 }
748
on_read()749 int Http3Upstream::on_read() { return 0; }
750
on_write()751 int Http3Upstream::on_write() {
752 int rv;
753
754 if (tx_.send_blocked) {
755 rv = send_blocked_packet();
756 if (rv != 0) {
757 return -1;
758 }
759
760 if (tx_.send_blocked) {
761 return 0;
762 }
763 }
764
765 handler_->get_connection()->wlimit.stopw();
766
767 if (write_streams() != 0) {
768 return -1;
769 }
770
771 if (httpconn_ && nghttp3_conn_is_drained(httpconn_)) {
772 return -1;
773 }
774
775 reset_timer();
776
777 return 0;
778 }
779
write_streams()780 int Http3Upstream::write_streams() {
781 std::array<nghttp3_vec, 16> vec;
782 auto max_udp_payload_size = ngtcp2_conn_get_max_tx_udp_payload_size(conn_);
783 auto path_max_udp_payload_size =
784 ngtcp2_conn_get_path_max_tx_udp_payload_size(conn_);
785 ngtcp2_pkt_info pi, prev_pi;
786 auto txbuf =
787 std::span{tx_.data.get(), std::max(ngtcp2_conn_get_send_quantum(conn_),
788 path_max_udp_payload_size)};
789 auto buf = txbuf;
790 ngtcp2_path_storage ps, prev_ps;
791 int rv;
792 size_t gso_size = 0;
793 auto ts = quic_timestamp();
794
795 ngtcp2_path_storage_zero(&ps);
796 ngtcp2_path_storage_zero(&prev_ps);
797
798 for (;;) {
799 int64_t stream_id = -1;
800 int fin = 0;
801 nghttp3_ssize sveccnt = 0;
802
803 if (httpconn_ && ngtcp2_conn_get_max_data_left(conn_)) {
804 sveccnt = nghttp3_conn_writev_stream(httpconn_, &stream_id, &fin,
805 vec.data(), vec.size());
806 if (sveccnt < 0) {
807 ULOG(ERROR, this) << "nghttp3_conn_writev_stream: "
808 << nghttp3_strerror(sveccnt);
809 ngtcp2_ccerr_set_application_error(
810 &last_error_, nghttp3_err_infer_quic_app_error_code(sveccnt), nullptr,
811 0);
812 return handle_error();
813 }
814 }
815
816 ngtcp2_ssize ndatalen;
817 auto v = vec.data();
818 auto vcnt = static_cast<size_t>(sveccnt);
819
820 uint32_t flags = NGTCP2_WRITE_STREAM_FLAG_MORE;
821 if (fin) {
822 flags |= NGTCP2_WRITE_STREAM_FLAG_FIN;
823 }
824
825 auto buflen = buf.size() >= max_udp_payload_size
826 ? max_udp_payload_size
827 : path_max_udp_payload_size;
828 auto nwrite = ngtcp2_conn_writev_stream(
829 conn_, &ps.path, &pi, buf.data(), buflen, &ndatalen, flags, stream_id,
830 reinterpret_cast<const ngtcp2_vec *>(v), vcnt, ts);
831 if (nwrite < 0) {
832 switch (nwrite) {
833 case NGTCP2_ERR_STREAM_DATA_BLOCKED:
834 assert(ndatalen == -1);
835 nghttp3_conn_block_stream(httpconn_, stream_id);
836 continue;
837 case NGTCP2_ERR_STREAM_SHUT_WR:
838 assert(ndatalen == -1);
839 nghttp3_conn_shutdown_stream_write(httpconn_, stream_id);
840 continue;
841 case NGTCP2_ERR_WRITE_MORE:
842 assert(ndatalen >= 0);
843 rv = nghttp3_conn_add_write_offset(httpconn_, stream_id, ndatalen);
844 if (rv != 0) {
845 ULOG(ERROR, this)
846 << "nghttp3_conn_add_write_offset: " << nghttp3_strerror(rv);
847 ngtcp2_ccerr_set_application_error(
848 &last_error_, nghttp3_err_infer_quic_app_error_code(rv), nullptr,
849 0);
850 return handle_error();
851 }
852 continue;
853 }
854
855 assert(ndatalen == -1);
856
857 ULOG(ERROR, this) << "ngtcp2_conn_writev_stream: "
858 << ngtcp2_strerror(nwrite);
859
860 ngtcp2_ccerr_set_liberr(&last_error_, nwrite, nullptr, 0);
861
862 return handle_error();
863 } else if (ndatalen >= 0) {
864 rv = nghttp3_conn_add_write_offset(httpconn_, stream_id, ndatalen);
865 if (rv != 0) {
866 ULOG(ERROR, this) << "nghttp3_conn_add_write_offset: "
867 << nghttp3_strerror(rv);
868 ngtcp2_ccerr_set_application_error(
869 &last_error_, nghttp3_err_infer_quic_app_error_code(rv), nullptr, 0);
870 return handle_error();
871 }
872 }
873
874 if (nwrite == 0) {
875 auto data = std::span{std::begin(txbuf), std::begin(buf)};
876 if (!data.empty()) {
877 auto faddr = static_cast<UpstreamAddr *>(prev_ps.path.user_data);
878
879 auto [rest, rv] =
880 send_packet(faddr, prev_ps.path.remote.addr,
881 prev_ps.path.remote.addrlen, prev_ps.path.local.addr,
882 prev_ps.path.local.addrlen, prev_pi, data, gso_size);
883 if (rv == SHRPX_ERR_SEND_BLOCKED) {
884 on_send_blocked(faddr, prev_ps.path.remote, prev_ps.path.local,
885 prev_pi, rest, gso_size);
886
887 signal_write_upstream_addr(faddr);
888 }
889 }
890
891 ngtcp2_conn_update_pkt_tx_time(conn_, ts);
892
893 return 0;
894 }
895
896 auto last_pkt = std::begin(buf);
897
898 buf = buf.subspan(nwrite);
899
900 if (last_pkt == std::begin(txbuf)) {
901 ngtcp2_path_copy(&prev_ps.path, &ps.path);
902 prev_pi = pi;
903 gso_size = nwrite;
904 } else if (!ngtcp2_path_eq(&prev_ps.path, &ps.path) ||
905 prev_pi.ecn != pi.ecn ||
906 static_cast<size_t>(nwrite) > gso_size ||
907 (gso_size > path_max_udp_payload_size &&
908 static_cast<size_t>(nwrite) != gso_size)) {
909 auto faddr = static_cast<UpstreamAddr *>(prev_ps.path.user_data);
910 auto data = std::span{std::begin(txbuf), last_pkt};
911
912 auto [rest, rv] =
913 send_packet(faddr, prev_ps.path.remote.addr,
914 prev_ps.path.remote.addrlen, prev_ps.path.local.addr,
915 prev_ps.path.local.addrlen, prev_pi, data, gso_size);
916 switch (rv) {
917 case SHRPX_ERR_SEND_BLOCKED:
918 on_send_blocked(faddr, prev_ps.path.remote, prev_ps.path.local, prev_pi,
919 rest, gso_size);
920
921 data = std::span{last_pkt, std::begin(buf)};
922 on_send_blocked(static_cast<UpstreamAddr *>(ps.path.user_data),
923 ps.path.remote, ps.path.local, pi, data, data.size());
924
925 signal_write_upstream_addr(faddr);
926
927 break;
928 default: {
929 auto faddr = static_cast<UpstreamAddr *>(ps.path.user_data);
930 auto data = std::span{last_pkt, std::begin(buf)};
931
932 auto [rest, rv] = send_packet(
933 faddr, ps.path.remote.addr, ps.path.remote.addrlen,
934 ps.path.local.addr, ps.path.local.addrlen, pi, data, data.size());
935 if (rv == SHRPX_ERR_SEND_BLOCKED) {
936 assert(rest.size() == data.size());
937
938 on_send_blocked(faddr, ps.path.remote, ps.path.local, pi, rest,
939 rest.size());
940
941 signal_write_upstream_addr(faddr);
942 }
943 }
944 }
945
946 ngtcp2_conn_update_pkt_tx_time(conn_, ts);
947
948 return 0;
949 }
950
951 if (buf.size() < path_max_udp_payload_size ||
952 static_cast<size_t>(nwrite) < gso_size) {
953 auto faddr = static_cast<UpstreamAddr *>(ps.path.user_data);
954 auto data = std::span{std::begin(txbuf), std::begin(buf)};
955
956 auto [rest, rv] = send_packet(faddr, ps.path.remote.addr,
957 ps.path.remote.addrlen, ps.path.local.addr,
958 ps.path.local.addrlen, pi, data, gso_size);
959 if (rv == SHRPX_ERR_SEND_BLOCKED) {
960 on_send_blocked(faddr, ps.path.remote, ps.path.local, pi, rest,
961 gso_size);
962
963 signal_write_upstream_addr(faddr);
964 }
965
966 ngtcp2_conn_update_pkt_tx_time(conn_, ts);
967
968 return 0;
969 }
970 }
971
972 return 0;
973 }
974
on_timeout(Downstream * downstream)975 int Http3Upstream::on_timeout(Downstream *downstream) {
976 if (LOG_ENABLED(INFO)) {
977 ULOG(INFO, this) << "Stream timeout stream_id="
978 << downstream->get_stream_id();
979 }
980
981 shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
982
983 handler_->signal_write();
984
985 return 0;
986 }
987
on_downstream_abort_request(Downstream * downstream,unsigned int status_code)988 int Http3Upstream::on_downstream_abort_request(Downstream *downstream,
989 unsigned int status_code) {
990 int rv;
991
992 rv = error_reply(downstream, status_code);
993
994 if (rv != 0) {
995 return -1;
996 }
997
998 handler_->signal_write();
999
1000 return 0;
1001 }
1002
on_downstream_abort_request_with_https_redirect(Downstream * downstream)1003 int Http3Upstream::on_downstream_abort_request_with_https_redirect(
1004 Downstream *downstream) {
1005 assert(0);
1006 abort();
1007 }
1008
1009 namespace {
1010 uint64_t
infer_upstream_shutdown_stream_error_code(uint32_t downstream_error_code)1011 infer_upstream_shutdown_stream_error_code(uint32_t downstream_error_code) {
1012 // NGHTTP2_REFUSED_STREAM is important because it tells upstream
1013 // client to retry.
1014 switch (downstream_error_code) {
1015 case NGHTTP2_NO_ERROR:
1016 return NGHTTP3_H3_NO_ERROR;
1017 case NGHTTP2_REFUSED_STREAM:
1018 return NGHTTP3_H3_REQUEST_REJECTED;
1019 default:
1020 return NGHTTP3_H3_INTERNAL_ERROR;
1021 }
1022 }
1023 } // namespace
1024
downstream_read(DownstreamConnection * dconn)1025 int Http3Upstream::downstream_read(DownstreamConnection *dconn) {
1026 auto downstream = dconn->get_downstream();
1027
1028 if (downstream->get_response_state() == DownstreamState::MSG_RESET) {
1029 // The downstream stream was reset (canceled). In this case,
1030 // RST_STREAM to the upstream and delete downstream connection
1031 // here. Deleting downstream will be taken place at
1032 // on_stream_close_callback.
1033 shutdown_stream(downstream,
1034 infer_upstream_shutdown_stream_error_code(
1035 downstream->get_response_rst_stream_error_code()));
1036 downstream->pop_downstream_connection();
1037 // dconn was deleted
1038 dconn = nullptr;
1039 } else if (downstream->get_response_state() ==
1040 DownstreamState::MSG_BAD_HEADER) {
1041 if (error_reply(downstream, 502) != 0) {
1042 return -1;
1043 }
1044 downstream->pop_downstream_connection();
1045 // dconn was deleted
1046 dconn = nullptr;
1047 } else {
1048 auto rv = downstream->on_read();
1049 if (rv == SHRPX_ERR_EOF) {
1050 if (downstream->get_request_header_sent()) {
1051 return downstream_eof(dconn);
1052 }
1053 return SHRPX_ERR_RETRY;
1054 }
1055 if (rv == SHRPX_ERR_DCONN_CANCELED) {
1056 downstream->pop_downstream_connection();
1057 handler_->signal_write();
1058 return 0;
1059 }
1060 if (rv != 0) {
1061 if (rv != SHRPX_ERR_NETWORK) {
1062 if (LOG_ENABLED(INFO)) {
1063 DCLOG(INFO, dconn) << "HTTP parser failure";
1064 }
1065 }
1066 return downstream_error(dconn, Downstream::EVENT_ERROR);
1067 }
1068
1069 if (downstream->can_detach_downstream_connection()) {
1070 // Keep-alive
1071 downstream->detach_downstream_connection();
1072 }
1073 }
1074
1075 handler_->signal_write();
1076
1077 // At this point, downstream may be deleted.
1078
1079 return 0;
1080 }
1081
downstream_write(DownstreamConnection * dconn)1082 int Http3Upstream::downstream_write(DownstreamConnection *dconn) {
1083 int rv;
1084 rv = dconn->on_write();
1085 if (rv == SHRPX_ERR_NETWORK) {
1086 return downstream_error(dconn, Downstream::EVENT_ERROR);
1087 }
1088 if (rv != 0) {
1089 return rv;
1090 }
1091 return 0;
1092 }
1093
downstream_eof(DownstreamConnection * dconn)1094 int Http3Upstream::downstream_eof(DownstreamConnection *dconn) {
1095 auto downstream = dconn->get_downstream();
1096
1097 if (LOG_ENABLED(INFO)) {
1098 DCLOG(INFO, dconn) << "EOF. stream_id=" << downstream->get_stream_id();
1099 }
1100
1101 // Delete downstream connection. If we don't delete it here, it will
1102 // be pooled in on_stream_close_callback.
1103 downstream->pop_downstream_connection();
1104 // dconn was deleted
1105 dconn = nullptr;
1106 // downstream will be deleted in on_stream_close_callback.
1107 if (downstream->get_response_state() == DownstreamState::HEADER_COMPLETE) {
1108 // Server may indicate the end of the request by EOF
1109 if (LOG_ENABLED(INFO)) {
1110 ULOG(INFO, this) << "Downstream body was ended by EOF";
1111 }
1112 downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1113
1114 // For tunneled connection, MSG_COMPLETE signals
1115 // downstream_read_data_callback to send RST_STREAM after pending
1116 // response body is sent. This is needed to ensure that RST_STREAM
1117 // is sent after all pending data are sent.
1118 if (on_downstream_body_complete(downstream) != 0) {
1119 return -1;
1120 }
1121 } else if (downstream->get_response_state() !=
1122 DownstreamState::MSG_COMPLETE) {
1123 // If stream was not closed, then we set MSG_COMPLETE and let
1124 // on_stream_close_callback delete downstream.
1125 if (error_reply(downstream, 502) != 0) {
1126 return -1;
1127 }
1128 }
1129 handler_->signal_write();
1130 // At this point, downstream may be deleted.
1131 return 0;
1132 }
1133
downstream_error(DownstreamConnection * dconn,int events)1134 int Http3Upstream::downstream_error(DownstreamConnection *dconn, int events) {
1135 auto downstream = dconn->get_downstream();
1136
1137 if (LOG_ENABLED(INFO)) {
1138 if (events & Downstream::EVENT_ERROR) {
1139 DCLOG(INFO, dconn) << "Downstream network/general error";
1140 } else {
1141 DCLOG(INFO, dconn) << "Timeout";
1142 }
1143 if (downstream->get_upgraded()) {
1144 DCLOG(INFO, dconn) << "Note: this is tunnel connection";
1145 }
1146 }
1147
1148 // Delete downstream connection. If we don't delete it here, it will
1149 // be pooled in on_stream_close_callback.
1150 downstream->pop_downstream_connection();
1151 // dconn was deleted
1152 dconn = nullptr;
1153
1154 if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1155 // For SSL tunneling, we issue RST_STREAM. For other types of
1156 // stream, we don't have to do anything since response was
1157 // complete.
1158 if (downstream->get_upgraded()) {
1159 shutdown_stream(downstream, NGHTTP3_H3_NO_ERROR);
1160 }
1161 } else {
1162 if (downstream->get_response_state() == DownstreamState::HEADER_COMPLETE) {
1163 if (downstream->get_upgraded()) {
1164 if (on_downstream_body_complete(downstream) != 0) {
1165 return -1;
1166 }
1167 } else {
1168 shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
1169 }
1170 } else {
1171 unsigned int status;
1172 if (events & Downstream::EVENT_TIMEOUT) {
1173 if (downstream->get_request_header_sent()) {
1174 status = 504;
1175 } else {
1176 status = 408;
1177 }
1178 } else {
1179 status = 502;
1180 }
1181 if (error_reply(downstream, status) != 0) {
1182 return -1;
1183 }
1184 }
1185 downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1186 }
1187 handler_->signal_write();
1188 // At this point, downstream may be deleted.
1189 return 0;
1190 }
1191
get_client_handler() const1192 ClientHandler *Http3Upstream::get_client_handler() const { return handler_; }
1193
1194 namespace {
downstream_read_data_callback(nghttp3_conn * conn,int64_t stream_id,nghttp3_vec * vec,size_t veccnt,uint32_t * pflags,void * conn_user_data,void * stream_user_data)1195 nghttp3_ssize downstream_read_data_callback(nghttp3_conn *conn,
1196 int64_t stream_id, nghttp3_vec *vec,
1197 size_t veccnt, uint32_t *pflags,
1198 void *conn_user_data,
1199 void *stream_user_data) {
1200 auto upstream = static_cast<Http3Upstream *>(conn_user_data);
1201 auto downstream = static_cast<Downstream *>(stream_user_data);
1202
1203 assert(downstream);
1204
1205 auto body = downstream->get_response_buf();
1206
1207 assert(body);
1208
1209 if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE &&
1210 body->rleft_mark() == 0) {
1211 downstream->disable_upstream_wtimer();
1212 return NGHTTP3_ERR_WOULDBLOCK;
1213 }
1214
1215 downstream->reset_upstream_wtimer();
1216
1217 veccnt = body->riovec_mark(reinterpret_cast<struct iovec *>(vec), veccnt);
1218
1219 if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE &&
1220 body->rleft_mark() == 0) {
1221 *pflags |= NGHTTP3_DATA_FLAG_EOF;
1222 }
1223
1224 assert((*pflags & NGHTTP3_DATA_FLAG_EOF) || veccnt);
1225
1226 downstream->response_sent_body_length += nghttp3_vec_len(vec, veccnt);
1227
1228 if ((*pflags & NGHTTP3_DATA_FLAG_EOF) &&
1229 upstream->shutdown_stream_read(stream_id, NGHTTP3_H3_NO_ERROR) != 0) {
1230 return NGHTTP3_ERR_CALLBACK_FAILURE;
1231 }
1232
1233 return veccnt;
1234 }
1235 } // namespace
1236
on_downstream_header_complete(Downstream * downstream)1237 int Http3Upstream::on_downstream_header_complete(Downstream *downstream) {
1238 int rv;
1239
1240 const auto &req = downstream->request();
1241 auto &resp = downstream->response();
1242
1243 auto &balloc = downstream->get_block_allocator();
1244
1245 if (LOG_ENABLED(INFO)) {
1246 if (downstream->get_non_final_response()) {
1247 DLOG(INFO, downstream) << "HTTP non-final response header";
1248 } else {
1249 DLOG(INFO, downstream) << "HTTP response header completed";
1250 }
1251 }
1252
1253 auto config = get_config();
1254 auto &httpconf = config->http;
1255
1256 if (!config->http2_proxy && !httpconf.no_location_rewrite) {
1257 downstream->rewrite_location_response_header(req.scheme);
1258 }
1259
1260 #ifdef HAVE_MRUBY
1261 if (!downstream->get_non_final_response()) {
1262 auto dconn = downstream->get_downstream_connection();
1263 const auto &group = dconn->get_downstream_addr_group();
1264 if (group) {
1265 const auto &dmruby_ctx = group->shared_addr->mruby_ctx;
1266
1267 if (dmruby_ctx->run_on_response_proc(downstream) != 0) {
1268 if (error_reply(downstream, 500) != 0) {
1269 return -1;
1270 }
1271 // Returning -1 will signal deletion of dconn.
1272 return -1;
1273 }
1274
1275 if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1276 return -1;
1277 }
1278 }
1279
1280 auto worker = handler_->get_worker();
1281 auto mruby_ctx = worker->get_mruby_context();
1282
1283 if (mruby_ctx->run_on_response_proc(downstream) != 0) {
1284 if (error_reply(downstream, 500) != 0) {
1285 return -1;
1286 }
1287 // Returning -1 will signal deletion of dconn.
1288 return -1;
1289 }
1290
1291 if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1292 return -1;
1293 }
1294 }
1295 #endif // HAVE_MRUBY
1296
1297 auto nva = std::vector<nghttp3_nv>();
1298 // 4 means :status and possible server, via, and set-cookie (for
1299 // affinity cookie) header field.
1300 nva.reserve(resp.fs.headers().size() + 4 +
1301 httpconf.add_response_headers.size());
1302
1303 if (downstream->get_non_final_response()) {
1304 auto response_status = http2::stringify_status(balloc, resp.http_status);
1305
1306 nva.push_back(http3::make_field(":status"_sr, response_status));
1307
1308 http3::copy_headers_to_nva_nocopy(nva, resp.fs.headers(),
1309 http2::HDOP_STRIP_ALL);
1310
1311 if (LOG_ENABLED(INFO)) {
1312 log_response_headers(downstream, nva);
1313 }
1314
1315 rv = nghttp3_conn_submit_info(httpconn_, downstream->get_stream_id(),
1316 nva.data(), nva.size());
1317
1318 resp.fs.clear_headers();
1319
1320 if (rv != 0) {
1321 ULOG(FATAL, this) << "nghttp3_conn_submit_info() failed";
1322 return -1;
1323 }
1324
1325 return 0;
1326 }
1327
1328 auto striphd_flags = http2::HDOP_STRIP_ALL & ~http2::HDOP_STRIP_VIA;
1329 StringRef response_status;
1330
1331 if (req.connect_proto == ConnectProto::WEBSOCKET && resp.http_status == 101) {
1332 response_status = http2::stringify_status(balloc, 200);
1333 striphd_flags |= http2::HDOP_STRIP_SEC_WEBSOCKET_ACCEPT;
1334 } else {
1335 response_status = http2::stringify_status(balloc, resp.http_status);
1336 }
1337
1338 nva.push_back(http3::make_field(":status"_sr, response_status));
1339
1340 http3::copy_headers_to_nva_nocopy(nva, resp.fs.headers(), striphd_flags);
1341
1342 if (!config->http2_proxy && !httpconf.no_server_rewrite) {
1343 nva.push_back(http3::make_field("server"_sr, httpconf.server_name));
1344 } else {
1345 auto server = resp.fs.header(http2::HD_SERVER);
1346 if (server) {
1347 nva.push_back(http3::make_field("server"_sr, (*server).value));
1348 }
1349 }
1350
1351 if (!req.regular_connect_method() || !downstream->get_upgraded()) {
1352 auto affinity_cookie = downstream->get_affinity_cookie_to_send();
1353 if (affinity_cookie) {
1354 auto dconn = downstream->get_downstream_connection();
1355 assert(dconn);
1356 auto &group = dconn->get_downstream_addr_group();
1357 auto &shared_addr = group->shared_addr;
1358 auto &cookieconf = shared_addr->affinity.cookie;
1359 auto secure =
1360 http::require_cookie_secure_attribute(cookieconf.secure, req.scheme);
1361 auto cookie_str = http::create_affinity_cookie(
1362 balloc, cookieconf.name, affinity_cookie, cookieconf.path, secure);
1363 nva.push_back(http3::make_field("set-cookie"_sr, cookie_str));
1364 }
1365 }
1366
1367 auto via = resp.fs.header(http2::HD_VIA);
1368 if (httpconf.no_via) {
1369 if (via) {
1370 nva.push_back(http3::make_field("via"_sr, (*via).value));
1371 }
1372 } else {
1373 // we don't create more than 16 bytes in
1374 // http::create_via_header_value.
1375 size_t len = 16;
1376 if (via) {
1377 len += via->value.size() + 2;
1378 }
1379
1380 auto iov = make_byte_ref(balloc, len + 1);
1381 auto p = std::begin(iov);
1382 if (via) {
1383 p = std::copy(std::begin(via->value), std::end(via->value), p);
1384 p = util::copy_lit(p, ", ");
1385 }
1386 p = http::create_via_header_value(p, resp.http_major, resp.http_minor);
1387 *p = '\0';
1388
1389 nva.push_back(
1390 http3::make_field("via"_sr, StringRef{std::span{std::begin(iov), p}}));
1391 }
1392
1393 for (auto &p : httpconf.add_response_headers) {
1394 nva.push_back(http3::make_field(p.name, p.value));
1395 }
1396
1397 if (LOG_ENABLED(INFO)) {
1398 log_response_headers(downstream, nva);
1399 }
1400
1401 auto priority = resp.fs.header(http2::HD_PRIORITY);
1402 if (priority) {
1403 nghttp3_pri pri;
1404
1405 if (nghttp3_conn_get_stream_priority(httpconn_, &pri,
1406 downstream->get_stream_id()) == 0 &&
1407 nghttp3_pri_parse_priority(&pri, priority->value.byte(),
1408 priority->value.size()) == 0) {
1409 rv = nghttp3_conn_set_server_stream_priority(
1410 httpconn_, downstream->get_stream_id(), &pri);
1411 if (rv != 0) {
1412 ULOG(ERROR, this) << "nghttp3_conn_set_server_stream_priority: "
1413 << nghttp3_strerror(rv);
1414 }
1415 }
1416 }
1417
1418 nghttp3_data_reader data_read;
1419 data_read.read_data = downstream_read_data_callback;
1420
1421 nghttp3_data_reader *data_readptr;
1422
1423 if (downstream->expect_response_body() ||
1424 downstream->expect_response_trailer()) {
1425 data_readptr = &data_read;
1426 } else {
1427 data_readptr = nullptr;
1428 }
1429
1430 rv = nghttp3_conn_submit_response(httpconn_, downstream->get_stream_id(),
1431 nva.data(), nva.size(), data_readptr);
1432 if (rv != 0) {
1433 ULOG(FATAL, this) << "nghttp3_conn_submit_response() failed";
1434 return -1;
1435 }
1436
1437 if (data_readptr) {
1438 downstream->reset_upstream_wtimer();
1439 } else if (shutdown_stream_read(downstream->get_stream_id(),
1440 NGHTTP3_H3_NO_ERROR) != 0) {
1441 return -1;
1442 }
1443
1444 return 0;
1445 }
1446
on_downstream_body(Downstream * downstream,const uint8_t * data,size_t len,bool flush)1447 int Http3Upstream::on_downstream_body(Downstream *downstream,
1448 const uint8_t *data, size_t len,
1449 bool flush) {
1450 auto body = downstream->get_response_buf();
1451 body->append(data, len);
1452
1453 if (flush) {
1454 nghttp3_conn_resume_stream(httpconn_, downstream->get_stream_id());
1455
1456 downstream->ensure_upstream_wtimer();
1457 }
1458
1459 return 0;
1460 }
1461
on_downstream_body_complete(Downstream * downstream)1462 int Http3Upstream::on_downstream_body_complete(Downstream *downstream) {
1463 if (LOG_ENABLED(INFO)) {
1464 DLOG(INFO, downstream) << "HTTP response completed";
1465 }
1466
1467 auto &resp = downstream->response();
1468
1469 if (!downstream->validate_response_recv_body_length()) {
1470 shutdown_stream(downstream, NGHTTP3_H3_GENERAL_PROTOCOL_ERROR);
1471 resp.connection_close = true;
1472 return 0;
1473 }
1474
1475 if (!downstream->get_upgraded()) {
1476 const auto &trailers = resp.fs.trailers();
1477 if (!trailers.empty()) {
1478 std::vector<nghttp3_nv> nva;
1479 nva.reserve(trailers.size());
1480 http3::copy_headers_to_nva_nocopy(nva, trailers, http2::HDOP_STRIP_ALL);
1481 if (!nva.empty()) {
1482 auto rv = nghttp3_conn_submit_trailers(
1483 httpconn_, downstream->get_stream_id(), nva.data(), nva.size());
1484 if (rv != 0) {
1485 ULOG(FATAL, this) << "nghttp3_conn_submit_trailers() failed: "
1486 << nghttp3_strerror(rv);
1487 return -1;
1488 }
1489 }
1490 }
1491 }
1492
1493 nghttp3_conn_resume_stream(httpconn_, downstream->get_stream_id());
1494 downstream->ensure_upstream_wtimer();
1495
1496 return 0;
1497 }
1498
on_handler_delete()1499 void Http3Upstream::on_handler_delete() {
1500 for (auto d = downstream_queue_.get_downstreams(); d; d = d->dlnext) {
1501 if (d->get_dispatch_state() == DispatchState::ACTIVE &&
1502 d->accesslog_ready()) {
1503 handler_->write_accesslog(d);
1504 }
1505 }
1506
1507 auto worker = handler_->get_worker();
1508 auto quic_conn_handler = worker->get_quic_connection_handler();
1509
1510 std::vector<ngtcp2_cid> scids(ngtcp2_conn_get_scid(conn_, nullptr) + 1);
1511 ngtcp2_conn_get_scid(conn_, scids.data());
1512 scids.back() = hashed_scid_;
1513
1514 for (auto &cid : scids) {
1515 quic_conn_handler->remove_connection_id(cid);
1516 }
1517
1518 switch (last_error_.type) {
1519 case NGTCP2_CCERR_TYPE_IDLE_CLOSE:
1520 case NGTCP2_CCERR_TYPE_DROP_CONN:
1521 case NGTCP2_CCERR_TYPE_RETRY:
1522 return;
1523 default:
1524 break;
1525 }
1526
1527 // If this is not idle close, send CONNECTION_CLOSE.
1528 if (!ngtcp2_conn_in_closing_period(conn_) &&
1529 !ngtcp2_conn_in_draining_period(conn_)) {
1530 ngtcp2_path_storage ps;
1531 ngtcp2_pkt_info pi;
1532 conn_close_.resize(SHRPX_QUIC_CONN_CLOSE_PKTLEN);
1533
1534 ngtcp2_path_storage_zero(&ps);
1535
1536 ngtcp2_ccerr ccerr;
1537 ngtcp2_ccerr_default(&ccerr);
1538
1539 if (worker->get_graceful_shutdown() &&
1540 !ngtcp2_conn_get_handshake_completed(conn_)) {
1541 ccerr.error_code = NGTCP2_CONNECTION_REFUSED;
1542 }
1543
1544 auto nwrite = ngtcp2_conn_write_connection_close(
1545 conn_, &ps.path, &pi, conn_close_.data(), conn_close_.size(), &ccerr,
1546 quic_timestamp());
1547 if (nwrite < 0) {
1548 if (nwrite != NGTCP2_ERR_INVALID_STATE) {
1549 ULOG(ERROR, this) << "ngtcp2_conn_write_connection_close: "
1550 << ngtcp2_strerror(nwrite);
1551 }
1552
1553 return;
1554 }
1555
1556 conn_close_.resize(nwrite);
1557
1558 send_packet(static_cast<UpstreamAddr *>(ps.path.user_data),
1559 ps.path.remote.addr, ps.path.remote.addrlen, ps.path.local.addr,
1560 ps.path.local.addrlen, pi, conn_close_, conn_close_.size());
1561 }
1562
1563 auto d =
1564 static_cast<ev_tstamp>(ngtcp2_conn_get_pto(conn_) * 3) / NGTCP2_SECONDS;
1565
1566 if (LOG_ENABLED(INFO)) {
1567 ULOG(INFO, this) << "Enter close-wait period " << d << "s with "
1568 << conn_close_.size() << " bytes sentinel packet";
1569 }
1570
1571 auto cw = std::make_unique<CloseWait>(worker, std::move(scids),
1572 std::move(conn_close_), d);
1573
1574 quic_conn_handler->add_close_wait(cw.release());
1575 }
1576
on_downstream_reset(Downstream * downstream,bool no_retry)1577 int Http3Upstream::on_downstream_reset(Downstream *downstream, bool no_retry) {
1578 int rv;
1579
1580 if (downstream->get_dispatch_state() != DispatchState::ACTIVE) {
1581 // This is error condition when we failed push_request_headers()
1582 // in initiate_downstream(). Otherwise, we have
1583 // DispatchState::ACTIVE state, or we did not set
1584 // DownstreamConnection.
1585 downstream->pop_downstream_connection();
1586 handler_->signal_write();
1587
1588 return 0;
1589 }
1590
1591 if (!downstream->request_submission_ready()) {
1592 if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1593 // We have got all response body already. Send it off.
1594 downstream->pop_downstream_connection();
1595 return 0;
1596 }
1597 // pushed stream is handled here
1598 shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
1599 downstream->pop_downstream_connection();
1600
1601 handler_->signal_write();
1602
1603 return 0;
1604 }
1605
1606 downstream->pop_downstream_connection();
1607
1608 downstream->add_retry();
1609
1610 std::unique_ptr<DownstreamConnection> dconn;
1611
1612 rv = 0;
1613
1614 if (no_retry || downstream->no_more_retry()) {
1615 goto fail;
1616 }
1617
1618 // downstream connection is clean; we can retry with new
1619 // downstream connection.
1620
1621 for (;;) {
1622 auto dconn = handler_->get_downstream_connection(rv, downstream);
1623 if (!dconn) {
1624 goto fail;
1625 }
1626
1627 rv = downstream->attach_downstream_connection(std::move(dconn));
1628 if (rv == 0) {
1629 break;
1630 }
1631 }
1632
1633 rv = downstream->push_request_headers();
1634 if (rv != 0) {
1635 goto fail;
1636 }
1637
1638 return 0;
1639
1640 fail:
1641 if (rv == SHRPX_ERR_TLS_REQUIRED) {
1642 assert(0);
1643 abort();
1644 }
1645
1646 rv = on_downstream_abort_request(downstream, 502);
1647 if (rv != 0) {
1648 shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
1649 }
1650 downstream->pop_downstream_connection();
1651
1652 handler_->signal_write();
1653
1654 return 0;
1655 }
1656
pause_read(IOCtrlReason reason)1657 void Http3Upstream::pause_read(IOCtrlReason reason) {}
1658
resume_read(IOCtrlReason reason,Downstream * downstream,size_t consumed)1659 int Http3Upstream::resume_read(IOCtrlReason reason, Downstream *downstream,
1660 size_t consumed) {
1661 consume(downstream->get_stream_id(), consumed);
1662
1663 auto &req = downstream->request();
1664
1665 req.consume(consumed);
1666
1667 handler_->signal_write();
1668
1669 return 0;
1670 }
1671
send_reply(Downstream * downstream,const uint8_t * body,size_t bodylen)1672 int Http3Upstream::send_reply(Downstream *downstream, const uint8_t *body,
1673 size_t bodylen) {
1674 int rv;
1675
1676 nghttp3_data_reader data_read, *data_read_ptr = nullptr;
1677
1678 const auto &req = downstream->request();
1679
1680 if (req.method != HTTP_HEAD && bodylen) {
1681 data_read.read_data = downstream_read_data_callback;
1682 data_read_ptr = &data_read;
1683
1684 auto buf = downstream->get_response_buf();
1685
1686 buf->append(body, bodylen);
1687 }
1688
1689 const auto &resp = downstream->response();
1690 auto config = get_config();
1691 auto &httpconf = config->http;
1692
1693 auto &balloc = downstream->get_block_allocator();
1694
1695 const auto &headers = resp.fs.headers();
1696 auto nva = std::vector<nghttp3_nv>();
1697 // 2 for :status and server
1698 nva.reserve(2 + headers.size() + httpconf.add_response_headers.size());
1699
1700 auto response_status = http2::stringify_status(balloc, resp.http_status);
1701
1702 nva.push_back(http3::make_field(":status"_sr, response_status));
1703
1704 for (auto &kv : headers) {
1705 if (kv.name.empty() || kv.name[0] == ':') {
1706 continue;
1707 }
1708 switch (kv.token) {
1709 case http2::HD_CONNECTION:
1710 case http2::HD_KEEP_ALIVE:
1711 case http2::HD_PROXY_CONNECTION:
1712 case http2::HD_TE:
1713 case http2::HD_TRANSFER_ENCODING:
1714 case http2::HD_UPGRADE:
1715 continue;
1716 }
1717 nva.push_back(
1718 http3::make_field(kv.name, kv.value, http3::never_index(kv.no_index)));
1719 }
1720
1721 if (!resp.fs.header(http2::HD_SERVER)) {
1722 nva.push_back(http3::make_field("server"_sr, config->http.server_name));
1723 }
1724
1725 for (auto &p : httpconf.add_response_headers) {
1726 nva.push_back(http3::make_field(p.name, p.value));
1727 }
1728
1729 rv = nghttp3_conn_submit_response(httpconn_, downstream->get_stream_id(),
1730 nva.data(), nva.size(), data_read_ptr);
1731 if (nghttp3_err_is_fatal(rv)) {
1732 ULOG(FATAL, this) << "nghttp3_conn_submit_response() failed: "
1733 << nghttp3_strerror(rv);
1734 return -1;
1735 }
1736
1737 downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1738
1739 if (data_read_ptr) {
1740 downstream->reset_upstream_wtimer();
1741 }
1742
1743 if (shutdown_stream_read(downstream->get_stream_id(), NGHTTP3_H3_NO_ERROR) !=
1744 0) {
1745 return -1;
1746 }
1747
1748 return 0;
1749 }
1750
initiate_push(Downstream * downstream,const StringRef & uri)1751 int Http3Upstream::initiate_push(Downstream *downstream, const StringRef &uri) {
1752 return 0;
1753 }
1754
response_riovec(struct iovec * iov,int iovcnt) const1755 int Http3Upstream::response_riovec(struct iovec *iov, int iovcnt) const {
1756 return 0;
1757 }
1758
response_drain(size_t n)1759 void Http3Upstream::response_drain(size_t n) {}
1760
response_empty() const1761 bool Http3Upstream::response_empty() const { return false; }
1762
1763 Downstream *
on_downstream_push_promise(Downstream * downstream,int32_t promised_stream_id)1764 Http3Upstream::on_downstream_push_promise(Downstream *downstream,
1765 int32_t promised_stream_id) {
1766 return nullptr;
1767 }
1768
on_downstream_push_promise_complete(Downstream * downstream,Downstream * promised_downstream)1769 int Http3Upstream::on_downstream_push_promise_complete(
1770 Downstream *downstream, Downstream *promised_downstream) {
1771 return 0;
1772 }
1773
push_enabled() const1774 bool Http3Upstream::push_enabled() const { return false; }
1775
cancel_premature_downstream(Downstream * promised_downstream)1776 void Http3Upstream::cancel_premature_downstream(
1777 Downstream *promised_downstream) {}
1778
on_read(const UpstreamAddr * faddr,const Address & remote_addr,const Address & local_addr,const ngtcp2_pkt_info & pi,std::span<const uint8_t> data)1779 int Http3Upstream::on_read(const UpstreamAddr *faddr,
1780 const Address &remote_addr,
1781 const Address &local_addr, const ngtcp2_pkt_info &pi,
1782 std::span<const uint8_t> data) {
1783 int rv;
1784
1785 auto path = ngtcp2_path{
1786 {
1787 const_cast<sockaddr *>(&local_addr.su.sa),
1788 static_cast<socklen_t>(local_addr.len),
1789 },
1790 {
1791 const_cast<sockaddr *>(&remote_addr.su.sa),
1792 static_cast<socklen_t>(remote_addr.len),
1793 },
1794 const_cast<UpstreamAddr *>(faddr),
1795 };
1796
1797 rv = ngtcp2_conn_read_pkt(conn_, &path, &pi, data.data(), data.size(),
1798 quic_timestamp());
1799 if (rv != 0) {
1800 switch (rv) {
1801 case NGTCP2_ERR_DRAINING:
1802 return -1;
1803 case NGTCP2_ERR_RETRY: {
1804 auto worker = handler_->get_worker();
1805 auto quic_conn_handler = worker->get_quic_connection_handler();
1806
1807 if (worker->get_graceful_shutdown()) {
1808 ngtcp2_ccerr_set_transport_error(&last_error_,
1809 NGTCP2_CONNECTION_REFUSED, nullptr, 0);
1810
1811 return handle_error();
1812 }
1813
1814 ngtcp2_version_cid vc;
1815
1816 rv = ngtcp2_pkt_decode_version_cid(&vc, data.data(), data.size(),
1817 SHRPX_QUIC_SCIDLEN);
1818 if (rv != 0) {
1819 return -1;
1820 }
1821
1822 // Overwrite error if any is set
1823 ngtcp2_ccerr_set_liberr(&last_error_, rv, nullptr, 0);
1824
1825 quic_conn_handler->send_retry(
1826 handler_->get_upstream_addr(), vc.version, {vc.dcid, vc.dcidlen},
1827 {vc.scid, vc.scidlen}, remote_addr, local_addr, data.size() * 3);
1828
1829 return -1;
1830 }
1831 case NGTCP2_ERR_CRYPTO:
1832 if (!last_error_.error_code) {
1833 ngtcp2_ccerr_set_tls_alert(
1834 &last_error_, ngtcp2_conn_get_tls_alert(conn_), nullptr, 0);
1835 }
1836 break;
1837 case NGTCP2_ERR_DROP_CONN:
1838 // Overwrite error if any is set
1839 ngtcp2_ccerr_set_liberr(&last_error_, rv, nullptr, 0);
1840
1841 return -1;
1842 default:
1843 if (!last_error_.error_code) {
1844 ngtcp2_ccerr_set_liberr(&last_error_, rv, nullptr, 0);
1845 }
1846 }
1847
1848 ULOG(ERROR, this) << "ngtcp2_conn_read_pkt: " << ngtcp2_strerror(rv);
1849
1850 return handle_error();
1851 }
1852
1853 return 0;
1854 }
1855
1856 std::pair<std::span<const uint8_t>, int>
send_packet(const UpstreamAddr * faddr,const sockaddr * remote_sa,size_t remote_salen,const sockaddr * local_sa,size_t local_salen,const ngtcp2_pkt_info & pi,std::span<const uint8_t> data,size_t gso_size)1857 Http3Upstream::send_packet(const UpstreamAddr *faddr, const sockaddr *remote_sa,
1858 size_t remote_salen, const sockaddr *local_sa,
1859 size_t local_salen, const ngtcp2_pkt_info &pi,
1860 std::span<const uint8_t> data, size_t gso_size) {
1861 if (tx_.no_gso) {
1862 for (; !data.empty();) {
1863 auto len = std::min(gso_size, data.size());
1864 auto rv =
1865 quic_send_packet(faddr, remote_sa, remote_salen, local_sa, local_salen,
1866 pi, {std::begin(data), len}, gso_size);
1867 if (rv != 0) {
1868 switch (rv) {
1869 case -EAGAIN:
1870 #if EAGAIN != EWOULDBLOCK
1871 case -EWOULDBLOCK:
1872 #endif // EAGAIN != EWOULDBLOCK
1873 return {data, SHRPX_ERR_SEND_BLOCKED};
1874 default:
1875 return {data, -1};
1876 }
1877 }
1878
1879 data = data.subspan(len);
1880 }
1881
1882 return {{}, 0};
1883 }
1884
1885 auto rv = quic_send_packet(faddr, remote_sa, remote_salen, local_sa,
1886 local_salen, pi, data, gso_size);
1887 switch (rv) {
1888 case 0:
1889 return {{}, 0};
1890 // With GSO, sendmsg may fail with EINVAL if UDP payload is too
1891 // large.
1892 case -EINVAL:
1893 case -EMSGSIZE:
1894 // Let the packet lost.
1895 break;
1896 case -EAGAIN:
1897 #if EAGAIN != EWOULDBLOCK
1898 case -EWOULDBLOCK:
1899 #endif // EAGAIN != EWOULDBLOCK
1900 return {data, SHRPX_ERR_SEND_BLOCKED};
1901 case -EIO:
1902 if (tx_.no_gso) {
1903 break;
1904 }
1905
1906 tx_.no_gso = true;
1907
1908 return send_packet(faddr, remote_sa, remote_salen, local_sa, local_salen,
1909 pi, data, gso_size);
1910 default:
1911 break;
1912 }
1913
1914 return {{}, -1};
1915 }
1916
on_send_blocked(const UpstreamAddr * faddr,const ngtcp2_addr & remote_addr,const ngtcp2_addr & local_addr,const ngtcp2_pkt_info & pi,std::span<const uint8_t> data,size_t gso_size)1917 void Http3Upstream::on_send_blocked(const UpstreamAddr *faddr,
1918 const ngtcp2_addr &remote_addr,
1919 const ngtcp2_addr &local_addr,
1920 const ngtcp2_pkt_info &pi,
1921 std::span<const uint8_t> data,
1922 size_t gso_size) {
1923 assert(tx_.num_blocked || !tx_.send_blocked);
1924 assert(tx_.num_blocked < 2);
1925 assert(gso_size);
1926
1927 tx_.send_blocked = true;
1928
1929 auto &p = tx_.blocked[tx_.num_blocked++];
1930
1931 memcpy(&p.local_addr.su, local_addr.addr, local_addr.addrlen);
1932 memcpy(&p.remote_addr.su, remote_addr.addr, remote_addr.addrlen);
1933
1934 p.local_addr.len = local_addr.addrlen;
1935 p.remote_addr.len = remote_addr.addrlen;
1936 p.faddr = faddr;
1937 p.pi = pi;
1938 p.data = data;
1939 p.gso_size = gso_size;
1940 }
1941
send_blocked_packet()1942 int Http3Upstream::send_blocked_packet() {
1943 assert(tx_.send_blocked);
1944
1945 for (; tx_.num_blocked_sent < tx_.num_blocked; ++tx_.num_blocked_sent) {
1946 auto &p = tx_.blocked[tx_.num_blocked_sent];
1947
1948 auto [rest, rv] = send_packet(p.faddr, &p.remote_addr.su.sa,
1949 p.remote_addr.len, &p.local_addr.su.sa,
1950 p.local_addr.len, p.pi, p.data, p.gso_size);
1951 if (rv == SHRPX_ERR_SEND_BLOCKED) {
1952 p.data = rest;
1953
1954 signal_write_upstream_addr(p.faddr);
1955
1956 return 0;
1957 }
1958 }
1959
1960 tx_.send_blocked = false;
1961 tx_.num_blocked = 0;
1962 tx_.num_blocked_sent = 0;
1963
1964 return 0;
1965 }
1966
signal_write_upstream_addr(const UpstreamAddr * faddr)1967 void Http3Upstream::signal_write_upstream_addr(const UpstreamAddr *faddr) {
1968 auto conn = handler_->get_connection();
1969
1970 if (faddr->fd != conn->wev.fd) {
1971 if (ev_is_active(&conn->wev)) {
1972 ev_io_stop(handler_->get_loop(), &conn->wev);
1973 }
1974
1975 ev_io_set(&conn->wev, faddr->fd, EV_WRITE);
1976 }
1977
1978 conn->wlimit.startw();
1979 }
1980
handle_error()1981 int Http3Upstream::handle_error() {
1982 if (ngtcp2_conn_in_closing_period(conn_) ||
1983 ngtcp2_conn_in_draining_period(conn_)) {
1984 return -1;
1985 }
1986
1987 ngtcp2_path_storage ps;
1988 ngtcp2_pkt_info pi;
1989
1990 ngtcp2_path_storage_zero(&ps);
1991
1992 auto ts = quic_timestamp();
1993
1994 conn_close_.resize(SHRPX_QUIC_CONN_CLOSE_PKTLEN);
1995
1996 auto nwrite =
1997 ngtcp2_conn_write_connection_close(conn_, &ps.path, &pi, conn_close_.data(),
1998 conn_close_.size(), &last_error_, ts);
1999 if (nwrite < 0) {
2000 ULOG(ERROR, this) << "ngtcp2_conn_write_connection_close: "
2001 << ngtcp2_strerror(nwrite);
2002 return -1;
2003 }
2004
2005 conn_close_.resize(nwrite);
2006
2007 if (nwrite == 0) {
2008 return -1;
2009 }
2010
2011 send_packet(static_cast<UpstreamAddr *>(ps.path.user_data),
2012 ps.path.remote.addr, ps.path.remote.addrlen, ps.path.local.addr,
2013 ps.path.local.addrlen, pi, conn_close_, conn_close_.size());
2014
2015 return -1;
2016 }
2017
handle_expiry()2018 int Http3Upstream::handle_expiry() {
2019 int rv;
2020
2021 auto ts = quic_timestamp();
2022
2023 rv = ngtcp2_conn_handle_expiry(conn_, ts);
2024 if (rv != 0) {
2025 if (rv == NGTCP2_ERR_IDLE_CLOSE) {
2026 ULOG(INFO, this) << "Idle connection timeout";
2027 } else {
2028 ULOG(ERROR, this) << "ngtcp2_conn_handle_expiry: " << ngtcp2_strerror(rv);
2029 }
2030 ngtcp2_ccerr_set_liberr(&last_error_, rv, nullptr, 0);
2031 return handle_error();
2032 }
2033
2034 return 0;
2035 }
2036
reset_timer()2037 void Http3Upstream::reset_timer() {
2038 auto ts = quic_timestamp();
2039 auto expiry_ts = ngtcp2_conn_get_expiry(conn_);
2040 auto loop = handler_->get_loop();
2041
2042 if (expiry_ts <= ts) {
2043 ev_feed_event(loop, &timer_, EV_TIMER);
2044 return;
2045 }
2046
2047 timer_.repeat = static_cast<ev_tstamp>(expiry_ts - ts) / NGTCP2_SECONDS;
2048
2049 ev_timer_again(loop, &timer_);
2050 }
2051
2052 namespace {
http_deferred_consume(nghttp3_conn * conn,int64_t stream_id,size_t nconsumed,void * user_data,void * stream_user_data)2053 int http_deferred_consume(nghttp3_conn *conn, int64_t stream_id,
2054 size_t nconsumed, void *user_data,
2055 void *stream_user_data) {
2056 auto upstream = static_cast<Http3Upstream *>(user_data);
2057
2058 upstream->consume(stream_id, nconsumed);
2059
2060 return 0;
2061 }
2062 } // namespace
2063
2064 namespace {
http_acked_stream_data(nghttp3_conn * conn,int64_t stream_id,uint64_t datalen,void * user_data,void * stream_user_data)2065 int http_acked_stream_data(nghttp3_conn *conn, int64_t stream_id,
2066 uint64_t datalen, void *user_data,
2067 void *stream_user_data) {
2068 auto upstream = static_cast<Http3Upstream *>(user_data);
2069 auto downstream = static_cast<Downstream *>(stream_user_data);
2070
2071 assert(downstream);
2072
2073 if (upstream->http_acked_stream_data(downstream, datalen) != 0) {
2074 return NGHTTP3_ERR_CALLBACK_FAILURE;
2075 }
2076
2077 return 0;
2078 }
2079 } // namespace
2080
http_acked_stream_data(Downstream * downstream,uint64_t datalen)2081 int Http3Upstream::http_acked_stream_data(Downstream *downstream,
2082 uint64_t datalen) {
2083 if (LOG_ENABLED(INFO)) {
2084 ULOG(INFO, this) << "Stream " << downstream->get_stream_id() << " "
2085 << datalen << " bytes acknowledged";
2086 }
2087
2088 auto body = downstream->get_response_buf();
2089 auto drained = body->drain_mark(datalen);
2090 (void)drained;
2091
2092 assert(datalen == drained);
2093
2094 if (downstream->resume_read(SHRPX_NO_BUFFER, datalen) != 0) {
2095 return -1;
2096 }
2097
2098 return 0;
2099 }
2100
2101 namespace {
http_begin_request_headers(nghttp3_conn * conn,int64_t stream_id,void * user_data,void * stream_user_data)2102 int http_begin_request_headers(nghttp3_conn *conn, int64_t stream_id,
2103 void *user_data, void *stream_user_data) {
2104 if (!ngtcp2_is_bidi_stream(stream_id)) {
2105 return 0;
2106 }
2107
2108 auto upstream = static_cast<Http3Upstream *>(user_data);
2109 upstream->http_begin_request_headers(stream_id);
2110
2111 return 0;
2112 }
2113 } // namespace
2114
2115 namespace {
http_recv_request_header(nghttp3_conn * conn,int64_t stream_id,int32_t token,nghttp3_rcbuf * name,nghttp3_rcbuf * value,uint8_t flags,void * user_data,void * stream_user_data)2116 int http_recv_request_header(nghttp3_conn *conn, int64_t stream_id,
2117 int32_t token, nghttp3_rcbuf *name,
2118 nghttp3_rcbuf *value, uint8_t flags,
2119 void *user_data, void *stream_user_data) {
2120 auto upstream = static_cast<Http3Upstream *>(user_data);
2121 auto downstream = static_cast<Downstream *>(stream_user_data);
2122
2123 if (!downstream || downstream->get_stop_reading()) {
2124 return 0;
2125 }
2126
2127 if (upstream->http_recv_request_header(downstream, token, name, value, flags,
2128 /* trailer = */ false) != 0) {
2129 return NGHTTP3_ERR_CALLBACK_FAILURE;
2130 }
2131
2132 return 0;
2133 }
2134 } // namespace
2135
2136 namespace {
http_recv_request_trailer(nghttp3_conn * conn,int64_t stream_id,int32_t token,nghttp3_rcbuf * name,nghttp3_rcbuf * value,uint8_t flags,void * user_data,void * stream_user_data)2137 int http_recv_request_trailer(nghttp3_conn *conn, int64_t stream_id,
2138 int32_t token, nghttp3_rcbuf *name,
2139 nghttp3_rcbuf *value, uint8_t flags,
2140 void *user_data, void *stream_user_data) {
2141 auto upstream = static_cast<Http3Upstream *>(user_data);
2142 auto downstream = static_cast<Downstream *>(stream_user_data);
2143
2144 if (!downstream || downstream->get_stop_reading()) {
2145 return 0;
2146 }
2147
2148 if (upstream->http_recv_request_header(downstream, token, name, value, flags,
2149 /* trailer = */ true) != 0) {
2150 return NGHTTP3_ERR_CALLBACK_FAILURE;
2151 }
2152
2153 return 0;
2154 }
2155 } // namespace
2156
http_recv_request_header(Downstream * downstream,int32_t h3token,nghttp3_rcbuf * name,nghttp3_rcbuf * value,uint8_t flags,bool trailer)2157 int Http3Upstream::http_recv_request_header(Downstream *downstream,
2158 int32_t h3token,
2159 nghttp3_rcbuf *name,
2160 nghttp3_rcbuf *value, uint8_t flags,
2161 bool trailer) {
2162 auto namebuf = nghttp3_rcbuf_get_buf(name);
2163 auto valuebuf = nghttp3_rcbuf_get_buf(value);
2164 auto &req = downstream->request();
2165 auto config = get_config();
2166 auto &httpconf = config->http;
2167
2168 if (req.fs.buffer_size() + namebuf.len + valuebuf.len >
2169 httpconf.request_header_field_buffer ||
2170 req.fs.num_fields() >= httpconf.max_request_header_fields) {
2171 downstream->set_stop_reading(true);
2172
2173 if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
2174 return 0;
2175 }
2176
2177 if (LOG_ENABLED(INFO)) {
2178 ULOG(INFO, this) << "Too large or many header field size="
2179 << req.fs.buffer_size() + namebuf.len + valuebuf.len
2180 << ", num=" << req.fs.num_fields() + 1;
2181 }
2182
2183 // just ignore if this is a trailer part.
2184 if (trailer) {
2185 if (shutdown_stream_read(downstream->get_stream_id(),
2186 NGHTTP3_H3_NO_ERROR) != 0) {
2187 return -1;
2188 }
2189
2190 return 0;
2191 }
2192
2193 if (error_reply(downstream, 431) != 0) {
2194 return -1;
2195 }
2196
2197 return 0;
2198 }
2199
2200 auto nameref = StringRef{namebuf.base, namebuf.len};
2201 auto valueref = StringRef{valuebuf.base, valuebuf.len};
2202 auto token = http2::lookup_token(nameref);
2203 auto no_index = flags & NGHTTP3_NV_FLAG_NEVER_INDEX;
2204
2205 downstream->add_rcbuf(name);
2206 downstream->add_rcbuf(value);
2207
2208 if (trailer) {
2209 req.fs.add_trailer_token(nameref, valueref, no_index, token);
2210 return 0;
2211 }
2212
2213 req.fs.add_header_token(nameref, valueref, no_index, token);
2214 return 0;
2215 }
2216
2217 namespace {
http_end_request_headers(nghttp3_conn * conn,int64_t stream_id,int fin,void * user_data,void * stream_user_data)2218 int http_end_request_headers(nghttp3_conn *conn, int64_t stream_id, int fin,
2219 void *user_data, void *stream_user_data) {
2220 auto upstream = static_cast<Http3Upstream *>(user_data);
2221 auto downstream = static_cast<Downstream *>(stream_user_data);
2222
2223 if (!downstream || downstream->get_stop_reading()) {
2224 return 0;
2225 }
2226
2227 if (upstream->http_end_request_headers(downstream, fin) != 0) {
2228 return NGHTTP3_ERR_CALLBACK_FAILURE;
2229 }
2230
2231 downstream->reset_upstream_rtimer();
2232 downstream->stop_header_timer();
2233
2234 return 0;
2235 }
2236 } // namespace
2237
http_end_request_headers(Downstream * downstream,int fin)2238 int Http3Upstream::http_end_request_headers(Downstream *downstream, int fin) {
2239 auto lgconf = log_config();
2240 lgconf->update_tstamp(std::chrono::system_clock::now());
2241 auto &req = downstream->request();
2242 req.tstamp = lgconf->tstamp;
2243
2244 if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
2245 return 0;
2246 }
2247
2248 auto &nva = req.fs.headers();
2249
2250 if (LOG_ENABLED(INFO)) {
2251 std::stringstream ss;
2252 for (auto &nv : nva) {
2253 if (nv.name == "authorization"_sr) {
2254 ss << TTY_HTTP_HD << nv.name << TTY_RST << ": <redacted>\n";
2255 continue;
2256 }
2257 ss << TTY_HTTP_HD << nv.name << TTY_RST << ": " << nv.value << "\n";
2258 }
2259 ULOG(INFO, this) << "HTTP request headers. stream_id="
2260 << downstream->get_stream_id() << "\n"
2261 << ss.str();
2262 }
2263
2264 auto content_length = req.fs.header(http2::HD_CONTENT_LENGTH);
2265 if (content_length) {
2266 // libnghttp3 guarantees this can be parsed
2267 req.fs.content_length =
2268 util::parse_uint(content_length->value).value_or(-1);
2269 }
2270
2271 // presence of mandatory header fields are guaranteed by libnghttp3.
2272 auto authority = req.fs.header(http2::HD__AUTHORITY);
2273 auto path = req.fs.header(http2::HD__PATH);
2274 auto method = req.fs.header(http2::HD__METHOD);
2275 auto scheme = req.fs.header(http2::HD__SCHEME);
2276
2277 auto method_token = http2::lookup_method_token(method->value);
2278 if (method_token == -1) {
2279 if (error_reply(downstream, 501) != 0) {
2280 return -1;
2281 }
2282 return 0;
2283 }
2284
2285 auto faddr = handler_->get_upstream_addr();
2286
2287 auto config = get_config();
2288
2289 // For HTTP/2 proxy, we require :authority.
2290 if (method_token != HTTP_CONNECT && config->http2_proxy &&
2291 faddr->alt_mode == UpstreamAltMode::NONE && !authority) {
2292 shutdown_stream(downstream, NGHTTP3_H3_GENERAL_PROTOCOL_ERROR);
2293 return 0;
2294 }
2295
2296 req.method = method_token;
2297 if (scheme) {
2298 req.scheme = scheme->value;
2299 }
2300
2301 // nghttp2 library guarantees either :authority or host exist
2302 if (!authority) {
2303 req.no_authority = true;
2304 authority = req.fs.header(http2::HD_HOST);
2305 }
2306
2307 if (authority) {
2308 req.authority = authority->value;
2309 }
2310
2311 if (path) {
2312 if (method_token == HTTP_OPTIONS && path->value == "*"_sr) {
2313 // Server-wide OPTIONS request. Path is empty.
2314 } else if (config->http2_proxy &&
2315 faddr->alt_mode == UpstreamAltMode::NONE) {
2316 req.path = path->value;
2317 } else {
2318 req.path = http2::rewrite_clean_path(downstream->get_block_allocator(),
2319 path->value);
2320 }
2321 }
2322
2323 auto connect_proto = req.fs.header(http2::HD__PROTOCOL);
2324 if (connect_proto) {
2325 if (connect_proto->value != "websocket"_sr) {
2326 if (error_reply(downstream, 400) != 0) {
2327 return -1;
2328 }
2329 return 0;
2330 }
2331 req.connect_proto = ConnectProto::WEBSOCKET;
2332 }
2333
2334 if (!fin) {
2335 req.http2_expect_body = true;
2336 } else if (req.fs.content_length == -1) {
2337 req.fs.content_length = 0;
2338 }
2339
2340 downstream->inspect_http2_request();
2341
2342 downstream->set_request_state(DownstreamState::HEADER_COMPLETE);
2343
2344 if (config->http.require_http_scheme &&
2345 !http::check_http_scheme(req.scheme, /* encrypted = */ true)) {
2346 if (error_reply(downstream, 400) != 0) {
2347 return -1;
2348 }
2349 }
2350
2351 #ifdef HAVE_MRUBY
2352 auto worker = handler_->get_worker();
2353 auto mruby_ctx = worker->get_mruby_context();
2354
2355 if (mruby_ctx->run_on_request_proc(downstream) != 0) {
2356 if (error_reply(downstream, 500) != 0) {
2357 return -1;
2358 }
2359 return 0;
2360 }
2361 #endif // HAVE_MRUBY
2362
2363 if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
2364 return 0;
2365 }
2366
2367 start_downstream(downstream);
2368
2369 return 0;
2370 }
2371
start_downstream(Downstream * downstream)2372 void Http3Upstream::start_downstream(Downstream *downstream) {
2373 if (downstream_queue_.can_activate(downstream->request().authority)) {
2374 initiate_downstream(downstream);
2375 return;
2376 }
2377
2378 downstream_queue_.mark_blocked(downstream);
2379 }
2380
initiate_downstream(Downstream * downstream)2381 void Http3Upstream::initiate_downstream(Downstream *downstream) {
2382 int rv;
2383
2384 #ifdef HAVE_MRUBY
2385 DownstreamConnection *dconn_ptr;
2386 #endif // HAVE_MRUBY
2387
2388 for (;;) {
2389 auto dconn = handler_->get_downstream_connection(rv, downstream);
2390 if (!dconn) {
2391 if (rv == SHRPX_ERR_TLS_REQUIRED) {
2392 assert(0);
2393 abort();
2394 }
2395
2396 rv = error_reply(downstream, 502);
2397 if (rv != 0) {
2398 shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2399 }
2400
2401 downstream->set_request_state(DownstreamState::CONNECT_FAIL);
2402 downstream_queue_.mark_failure(downstream);
2403
2404 return;
2405 }
2406
2407 #ifdef HAVE_MRUBY
2408 dconn_ptr = dconn.get();
2409 #endif // HAVE_MRUBY
2410 rv = downstream->attach_downstream_connection(std::move(dconn));
2411 if (rv == 0) {
2412 break;
2413 }
2414 }
2415
2416 #ifdef HAVE_MRUBY
2417 const auto &group = dconn_ptr->get_downstream_addr_group();
2418 if (group) {
2419 const auto &mruby_ctx = group->shared_addr->mruby_ctx;
2420 if (mruby_ctx->run_on_request_proc(downstream) != 0) {
2421 if (error_reply(downstream, 500) != 0) {
2422 shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2423 }
2424
2425 downstream_queue_.mark_failure(downstream);
2426
2427 return;
2428 }
2429
2430 if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
2431 return;
2432 }
2433 }
2434 #endif // HAVE_MRUBY
2435
2436 rv = downstream->push_request_headers();
2437 if (rv != 0) {
2438 if (error_reply(downstream, 502) != 0) {
2439 shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2440 }
2441
2442 downstream_queue_.mark_failure(downstream);
2443
2444 return;
2445 }
2446
2447 downstream_queue_.mark_active(downstream);
2448
2449 auto &req = downstream->request();
2450 if (!req.http2_expect_body) {
2451 rv = downstream->end_upload_data();
2452 if (rv != 0) {
2453 shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2454 }
2455 }
2456 }
2457
2458 namespace {
http_recv_data(nghttp3_conn * conn,int64_t stream_id,const uint8_t * data,size_t datalen,void * user_data,void * stream_user_data)2459 int http_recv_data(nghttp3_conn *conn, int64_t stream_id, const uint8_t *data,
2460 size_t datalen, void *user_data, void *stream_user_data) {
2461 auto upstream = static_cast<Http3Upstream *>(user_data);
2462 auto downstream = static_cast<Downstream *>(stream_user_data);
2463
2464 if (upstream->http_recv_data(downstream, {data, datalen}) != 0) {
2465 return NGHTTP3_ERR_CALLBACK_FAILURE;
2466 }
2467
2468 return 0;
2469 }
2470 } // namespace
2471
http_recv_data(Downstream * downstream,std::span<const uint8_t> data)2472 int Http3Upstream::http_recv_data(Downstream *downstream,
2473 std::span<const uint8_t> data) {
2474 downstream->reset_upstream_rtimer();
2475
2476 if (downstream->push_upload_data_chunk(data.data(), data.size()) != 0) {
2477 if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE) {
2478 shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2479 }
2480
2481 consume(downstream->get_stream_id(), data.size());
2482
2483 return 0;
2484 }
2485
2486 return 0;
2487 }
2488
2489 namespace {
http_end_stream(nghttp3_conn * conn,int64_t stream_id,void * user_data,void * stream_user_data)2490 int http_end_stream(nghttp3_conn *conn, int64_t stream_id, void *user_data,
2491 void *stream_user_data) {
2492 auto upstream = static_cast<Http3Upstream *>(user_data);
2493 auto downstream = static_cast<Downstream *>(stream_user_data);
2494
2495 if (!downstream || downstream->get_stop_reading()) {
2496 return 0;
2497 }
2498
2499 if (upstream->http_end_stream(downstream) != 0) {
2500 return NGHTTP3_ERR_CALLBACK_FAILURE;
2501 }
2502
2503 return 0;
2504 }
2505 } // namespace
2506
http_end_stream(Downstream * downstream)2507 int Http3Upstream::http_end_stream(Downstream *downstream) {
2508 downstream->disable_upstream_rtimer();
2509
2510 if (downstream->end_upload_data() != 0) {
2511 if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE) {
2512 shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2513 }
2514 }
2515
2516 downstream->set_request_state(DownstreamState::MSG_COMPLETE);
2517
2518 return 0;
2519 }
2520
2521 namespace {
http_stream_close(nghttp3_conn * conn,int64_t stream_id,uint64_t app_error_code,void * conn_user_data,void * stream_user_data)2522 int http_stream_close(nghttp3_conn *conn, int64_t stream_id,
2523 uint64_t app_error_code, void *conn_user_data,
2524 void *stream_user_data) {
2525 auto upstream = static_cast<Http3Upstream *>(conn_user_data);
2526 auto downstream = static_cast<Downstream *>(stream_user_data);
2527
2528 if (!downstream) {
2529 return 0;
2530 }
2531
2532 if (upstream->http_stream_close(downstream, app_error_code) != 0) {
2533 return NGHTTP3_ERR_CALLBACK_FAILURE;
2534 }
2535
2536 return 0;
2537 }
2538 } // namespace
2539
http_stream_close(Downstream * downstream,uint64_t app_error_code)2540 int Http3Upstream::http_stream_close(Downstream *downstream,
2541 uint64_t app_error_code) {
2542 auto stream_id = downstream->get_stream_id();
2543
2544 if (LOG_ENABLED(INFO)) {
2545 ULOG(INFO, this) << "Stream stream_id=" << stream_id
2546 << " is being closed with app_error_code="
2547 << app_error_code;
2548
2549 auto body = downstream->get_response_buf();
2550
2551 ULOG(INFO, this) << "response unacked_left=" << body->rleft()
2552 << " not_sent=" << body->rleft_mark();
2553 }
2554
2555 auto &req = downstream->request();
2556
2557 consume(stream_id, req.unconsumed_body_length);
2558
2559 req.unconsumed_body_length = 0;
2560
2561 ngtcp2_conn_extend_max_streams_bidi(conn_, 1);
2562
2563 if (downstream->get_request_state() == DownstreamState::CONNECT_FAIL) {
2564 remove_downstream(downstream);
2565 // downstream was deleted
2566
2567 return 0;
2568 }
2569
2570 if (downstream->can_detach_downstream_connection()) {
2571 // Keep-alive
2572 downstream->detach_downstream_connection();
2573 }
2574
2575 downstream->set_request_state(DownstreamState::STREAM_CLOSED);
2576
2577 // At this point, downstream read may be paused.
2578
2579 // If shrpx_downstream::push_request_headers() failed, the
2580 // error is handled here.
2581 remove_downstream(downstream);
2582 // downstream was deleted
2583
2584 return 0;
2585 }
2586
2587 namespace {
http_stop_sending(nghttp3_conn * conn,int64_t stream_id,uint64_t app_error_code,void * user_data,void * stream_user_data)2588 int http_stop_sending(nghttp3_conn *conn, int64_t stream_id,
2589 uint64_t app_error_code, void *user_data,
2590 void *stream_user_data) {
2591 auto upstream = static_cast<Http3Upstream *>(user_data);
2592
2593 if (upstream->http_stop_sending(stream_id, app_error_code) != 0) {
2594 return NGHTTP3_ERR_CALLBACK_FAILURE;
2595 }
2596
2597 return 0;
2598 }
2599 } // namespace
2600
http_stop_sending(int64_t stream_id,uint64_t app_error_code)2601 int Http3Upstream::http_stop_sending(int64_t stream_id,
2602 uint64_t app_error_code) {
2603 auto rv =
2604 ngtcp2_conn_shutdown_stream_read(conn_, 0, stream_id, app_error_code);
2605 if (ngtcp2_err_is_fatal(rv)) {
2606 ULOG(ERROR, this) << "ngtcp2_conn_shutdown_stream_read: "
2607 << ngtcp2_strerror(rv);
2608 return -1;
2609 }
2610
2611 return 0;
2612 }
2613
2614 namespace {
http_reset_stream(nghttp3_conn * conn,int64_t stream_id,uint64_t app_error_code,void * user_data,void * stream_user_data)2615 int http_reset_stream(nghttp3_conn *conn, int64_t stream_id,
2616 uint64_t app_error_code, void *user_data,
2617 void *stream_user_data) {
2618 auto upstream = static_cast<Http3Upstream *>(user_data);
2619
2620 if (upstream->http_reset_stream(stream_id, app_error_code) != 0) {
2621 return NGHTTP3_ERR_CALLBACK_FAILURE;
2622 }
2623
2624 return 0;
2625 }
2626 } // namespace
2627
http_reset_stream(int64_t stream_id,uint64_t app_error_code)2628 int Http3Upstream::http_reset_stream(int64_t stream_id,
2629 uint64_t app_error_code) {
2630 auto rv =
2631 ngtcp2_conn_shutdown_stream_write(conn_, 0, stream_id, app_error_code);
2632 if (ngtcp2_err_is_fatal(rv)) {
2633 ULOG(ERROR, this) << "ngtcp2_conn_shutdown_stream_write: "
2634 << ngtcp2_strerror(rv);
2635 return -1;
2636 }
2637
2638 return 0;
2639 }
2640
setup_httpconn()2641 int Http3Upstream::setup_httpconn() {
2642 int rv;
2643
2644 if (ngtcp2_conn_get_streams_uni_left(conn_) < 3) {
2645 return -1;
2646 }
2647
2648 nghttp3_callbacks callbacks{
2649 shrpx::http_acked_stream_data,
2650 shrpx::http_stream_close,
2651 shrpx::http_recv_data,
2652 http_deferred_consume,
2653 shrpx::http_begin_request_headers,
2654 shrpx::http_recv_request_header,
2655 shrpx::http_end_request_headers,
2656 nullptr, // begin_trailers
2657 shrpx::http_recv_request_trailer,
2658 nullptr, // end_trailers
2659 shrpx::http_stop_sending,
2660 shrpx::http_end_stream,
2661 shrpx::http_reset_stream,
2662 };
2663
2664 auto config = get_config();
2665
2666 nghttp3_settings settings;
2667 nghttp3_settings_default(&settings);
2668 settings.qpack_max_dtable_capacity = 4_k;
2669
2670 if (!config->http2_proxy) {
2671 settings.enable_connect_protocol = 1;
2672 }
2673
2674 auto mem = nghttp3_mem_default();
2675
2676 rv = nghttp3_conn_server_new(&httpconn_, &callbacks, &settings, mem, this);
2677 if (rv != 0) {
2678 ULOG(ERROR, this) << "nghttp3_conn_server_new: " << nghttp3_strerror(rv);
2679 return -1;
2680 }
2681
2682 auto params = ngtcp2_conn_get_local_transport_params(conn_);
2683
2684 nghttp3_conn_set_max_client_streams_bidi(httpconn_,
2685 params->initial_max_streams_bidi);
2686
2687 int64_t ctrl_stream_id;
2688
2689 rv = ngtcp2_conn_open_uni_stream(conn_, &ctrl_stream_id, nullptr);
2690 if (rv != 0) {
2691 ULOG(ERROR, this) << "ngtcp2_conn_open_uni_stream: " << ngtcp2_strerror(rv);
2692 return -1;
2693 }
2694
2695 rv = nghttp3_conn_bind_control_stream(httpconn_, ctrl_stream_id);
2696 if (rv != 0) {
2697 ULOG(ERROR, this) << "nghttp3_conn_bind_control_stream: "
2698 << nghttp3_strerror(rv);
2699 return -1;
2700 }
2701
2702 int64_t qpack_enc_stream_id, qpack_dec_stream_id;
2703
2704 rv = ngtcp2_conn_open_uni_stream(conn_, &qpack_enc_stream_id, nullptr);
2705 if (rv != 0) {
2706 ULOG(ERROR, this) << "ngtcp2_conn_open_uni_stream: " << ngtcp2_strerror(rv);
2707 return -1;
2708 }
2709
2710 rv = ngtcp2_conn_open_uni_stream(conn_, &qpack_dec_stream_id, nullptr);
2711 if (rv != 0) {
2712 ULOG(ERROR, this) << "ngtcp2_conn_open_uni_stream: " << ngtcp2_strerror(rv);
2713 return -1;
2714 }
2715
2716 rv = nghttp3_conn_bind_qpack_streams(httpconn_, qpack_enc_stream_id,
2717 qpack_dec_stream_id);
2718 if (rv != 0) {
2719 ULOG(ERROR, this) << "nghttp3_conn_bind_qpack_streams: "
2720 << nghttp3_strerror(rv);
2721 return -1;
2722 }
2723
2724 return 0;
2725 }
2726
error_reply(Downstream * downstream,unsigned int status_code)2727 int Http3Upstream::error_reply(Downstream *downstream,
2728 unsigned int status_code) {
2729 int rv;
2730 auto &resp = downstream->response();
2731
2732 auto &balloc = downstream->get_block_allocator();
2733
2734 auto html = http::create_error_html(balloc, status_code);
2735 resp.http_status = status_code;
2736
2737 nghttp3_data_reader data_read, *data_read_ptr = nullptr;
2738
2739 const auto &req = downstream->request();
2740
2741 if (req.method != HTTP_HEAD) {
2742 data_read.read_data = downstream_read_data_callback;
2743 data_read_ptr = &data_read;
2744
2745 auto body = downstream->get_response_buf();
2746
2747 body->append(html);
2748 }
2749
2750 downstream->set_response_state(DownstreamState::MSG_COMPLETE);
2751
2752 auto lgconf = log_config();
2753 lgconf->update_tstamp(std::chrono::system_clock::now());
2754
2755 auto response_status = http2::stringify_status(balloc, status_code);
2756 auto content_length = util::make_string_ref_uint(balloc, html.size());
2757 auto date = make_string_ref(balloc, lgconf->tstamp->time_http);
2758
2759 auto nva = std::to_array(
2760 {http3::make_field(":status"_sr, response_status),
2761 http3::make_field("content-type"_sr, "text/html; charset=UTF-8"_sr),
2762 http3::make_field("server"_sr, get_config()->http.server_name),
2763 http3::make_field("content-length"_sr, content_length),
2764 http3::make_field("date"_sr, date)});
2765
2766 rv = nghttp3_conn_submit_response(httpconn_, downstream->get_stream_id(),
2767 nva.data(), nva.size(), data_read_ptr);
2768 if (nghttp3_err_is_fatal(rv)) {
2769 ULOG(FATAL, this) << "nghttp3_conn_submit_response() failed: "
2770 << nghttp3_strerror(rv);
2771 return -1;
2772 }
2773
2774 downstream->reset_upstream_wtimer();
2775
2776 if (shutdown_stream_read(downstream->get_stream_id(), NGHTTP3_H3_NO_ERROR) !=
2777 0) {
2778 return -1;
2779 }
2780
2781 return 0;
2782 }
2783
shutdown_stream(Downstream * downstream,uint64_t app_error_code)2784 int Http3Upstream::shutdown_stream(Downstream *downstream,
2785 uint64_t app_error_code) {
2786 auto stream_id = downstream->get_stream_id();
2787
2788 if (LOG_ENABLED(INFO)) {
2789 ULOG(INFO, this) << "Shutdown stream_id=" << stream_id
2790 << " with app_error_code=" << app_error_code;
2791 }
2792
2793 auto rv = ngtcp2_conn_shutdown_stream(conn_, 0, stream_id, app_error_code);
2794 if (rv != 0) {
2795 ULOG(FATAL, this) << "ngtcp2_conn_shutdown_stream() failed: "
2796 << ngtcp2_strerror(rv);
2797 return -1;
2798 }
2799
2800 return 0;
2801 }
2802
shutdown_stream_read(int64_t stream_id,uint64_t app_error_code)2803 int Http3Upstream::shutdown_stream_read(int64_t stream_id,
2804 uint64_t app_error_code) {
2805 auto rv =
2806 ngtcp2_conn_shutdown_stream_read(conn_, 0, stream_id, NGHTTP3_H3_NO_ERROR);
2807 if (ngtcp2_err_is_fatal(rv)) {
2808 ULOG(FATAL, this) << "ngtcp2_conn_shutdown_stream_read: "
2809 << ngtcp2_strerror(rv);
2810 return -1;
2811 }
2812
2813 return 0;
2814 }
2815
consume(int64_t stream_id,size_t nconsumed)2816 void Http3Upstream::consume(int64_t stream_id, size_t nconsumed) {
2817 ngtcp2_conn_extend_max_stream_offset(conn_, stream_id, nconsumed);
2818 ngtcp2_conn_extend_max_offset(conn_, nconsumed);
2819 }
2820
remove_downstream(Downstream * downstream)2821 void Http3Upstream::remove_downstream(Downstream *downstream) {
2822 if (downstream->accesslog_ready()) {
2823 handler_->write_accesslog(downstream);
2824 }
2825
2826 nghttp3_conn_set_stream_user_data(httpconn_, downstream->get_stream_id(),
2827 nullptr);
2828
2829 auto next_downstream = downstream_queue_.remove_and_get_blocked(downstream);
2830
2831 if (next_downstream) {
2832 initiate_downstream(next_downstream);
2833 }
2834
2835 if (downstream_queue_.get_downstreams() == nullptr) {
2836 // There is no downstream at the moment. Start idle timer now.
2837 handler_->repeat_read_timer();
2838 }
2839 }
2840
log_response_headers(Downstream * downstream,const std::vector<nghttp3_nv> & nva) const2841 void Http3Upstream::log_response_headers(
2842 Downstream *downstream, const std::vector<nghttp3_nv> &nva) const {
2843 std::stringstream ss;
2844 for (auto &nv : nva) {
2845 ss << TTY_HTTP_HD << StringRef{nv.name, nv.namelen} << TTY_RST << ": "
2846 << StringRef{nv.value, nv.valuelen} << "\n";
2847 }
2848 ULOG(INFO, this) << "HTTP response headers. stream_id="
2849 << downstream->get_stream_id() << "\n"
2850 << ss.str();
2851 }
2852
check_shutdown()2853 int Http3Upstream::check_shutdown() {
2854 auto worker = handler_->get_worker();
2855
2856 if (!worker->get_graceful_shutdown()) {
2857 return 0;
2858 }
2859
2860 ev_prepare_stop(handler_->get_loop(), &prep_);
2861
2862 return start_graceful_shutdown();
2863 }
2864
start_graceful_shutdown()2865 int Http3Upstream::start_graceful_shutdown() {
2866 int rv;
2867
2868 if (ev_is_active(&shutdown_timer_)) {
2869 return 0;
2870 }
2871
2872 if (!httpconn_) {
2873 return -1;
2874 }
2875
2876 rv = nghttp3_conn_submit_shutdown_notice(httpconn_);
2877 if (rv != 0) {
2878 ULOG(FATAL, this) << "nghttp3_conn_submit_shutdown_notice: "
2879 << nghttp3_strerror(rv);
2880 return -1;
2881 }
2882
2883 handler_->signal_write();
2884
2885 auto t = ngtcp2_conn_get_pto(conn_);
2886
2887 ev_timer_set(&shutdown_timer_, static_cast<ev_tstamp>(t * 3) / NGTCP2_SECONDS,
2888 0.);
2889 ev_timer_start(handler_->get_loop(), &shutdown_timer_);
2890
2891 return 0;
2892 }
2893
submit_goaway()2894 int Http3Upstream::submit_goaway() {
2895 int rv;
2896
2897 rv = nghttp3_conn_shutdown(httpconn_);
2898 if (rv != 0) {
2899 ULOG(FATAL, this) << "nghttp3_conn_shutdown: " << nghttp3_strerror(rv);
2900 return -1;
2901 }
2902
2903 handler_->signal_write();
2904
2905 return 0;
2906 }
2907
open_qlog_file(const StringRef & dir,const ngtcp2_cid & scid) const2908 int Http3Upstream::open_qlog_file(const StringRef &dir,
2909 const ngtcp2_cid &scid) const {
2910 std::array<char, sizeof("20141115T125824.741+0900")> buf;
2911
2912 auto path = std::string{dir};
2913 path += '/';
2914 path +=
2915 util::format_iso8601_basic(buf.data(), std::chrono::system_clock::now());
2916 path += '-';
2917 path += util::format_hex(std::span{scid.data, scid.datalen});
2918 path += ".sqlog";
2919
2920 int fd;
2921
2922 #ifdef O_CLOEXEC
2923 while ((fd = open(path.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_CLOEXEC,
2924 S_IRUSR | S_IWUSR | S_IRGRP)) == -1 &&
2925 errno == EINTR)
2926 ;
2927 #else // !O_CLOEXEC
2928 while ((fd = open(path.c_str(), O_WRONLY | O_CREAT | O_TRUNC,
2929 S_IRUSR | S_IWUSR | S_IRGRP)) == -1 &&
2930 errno == EINTR)
2931 ;
2932
2933 if (fd != -1) {
2934 util::make_socket_closeonexec(fd);
2935 }
2936 #endif // !O_CLOEXEC
2937
2938 if (fd == -1) {
2939 auto error = errno;
2940 ULOG(ERROR, this) << "Failed to open qlog file " << path
2941 << ": errno=" << error;
2942 return -1;
2943 }
2944
2945 return fd;
2946 }
2947
get_conn() const2948 ngtcp2_conn *Http3Upstream::get_conn() const { return conn_; }
2949
2950 } // namespace shrpx
2951