1 /*
2 * nghttp2 - HTTP/2 C Library
3 *
4 * Copyright (c) 2021 Tatsuhiro Tsujikawa
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining
7 * a copy of this software and associated documentation files (the
8 * "Software"), to deal in the Software without restriction, including
9 * without limitation the rights to use, copy, modify, merge, publish,
10 * distribute, sublicense, and/or sell copies of the Software, and to
11 * permit persons to whom the Software is furnished to do so, subject to
12 * the following conditions:
13 *
14 * The above copyright notice and this permission notice shall be
15 * included in all copies or substantial portions of the Software.
16 *
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24 */
25 #include "shrpx_http3_upstream.h"
26
27 #include <sys/types.h>
28 #include <sys/stat.h>
29 #include <fcntl.h>
30 #include <netinet/udp.h>
31
32 #include <cstdio>
33
34 #include <ngtcp2/ngtcp2_crypto.h>
35
36 #include "shrpx_client_handler.h"
37 #include "shrpx_downstream.h"
38 #include "shrpx_downstream_connection.h"
39 #include "shrpx_log.h"
40 #include "shrpx_quic.h"
41 #include "shrpx_worker.h"
42 #include "shrpx_http.h"
43 #include "shrpx_connection_handler.h"
44 #ifdef HAVE_MRUBY
45 # include "shrpx_mruby.h"
46 #endif // HAVE_MRUBY
47 #include "http3.h"
48 #include "util.h"
49
50 namespace shrpx {
51
52 namespace {
timeoutcb(struct ev_loop * loop,ev_timer * w,int revents)53 void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
54 auto upstream = static_cast<Http3Upstream *>(w->data);
55
56 if (upstream->handle_expiry() != 0 || upstream->on_write() != 0) {
57 goto fail;
58 }
59
60 return;
61
62 fail:
63 auto handler = upstream->get_client_handler();
64
65 delete handler;
66 }
67 } // namespace
68
69 namespace {
shutdown_timeout_cb(struct ev_loop * loop,ev_timer * w,int revents)70 void shutdown_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
71 auto upstream = static_cast<Http3Upstream *>(w->data);
72 auto handler = upstream->get_client_handler();
73
74 if (upstream->submit_goaway() != 0) {
75 delete handler;
76 }
77 }
78 } // namespace
79
80 namespace {
prepare_cb(struct ev_loop * loop,ev_prepare * w,int revent)81 void prepare_cb(struct ev_loop *loop, ev_prepare *w, int revent) {
82 auto upstream = static_cast<Http3Upstream *>(w->data);
83 auto handler = upstream->get_client_handler();
84
85 if (upstream->check_shutdown() != 0) {
86 delete handler;
87 }
88 }
89 } // namespace
90
91 namespace {
downstream_queue_size(Worker * worker)92 size_t downstream_queue_size(Worker *worker) {
93 auto &downstreamconf = *worker->get_downstream_config();
94
95 if (get_config()->http2_proxy) {
96 return downstreamconf.connections_per_host;
97 }
98
99 return downstreamconf.connections_per_frontend;
100 }
101 } // namespace
102
103 namespace {
get_conn(ngtcp2_crypto_conn_ref * conn_ref)104 ngtcp2_conn *get_conn(ngtcp2_crypto_conn_ref *conn_ref) {
105 auto conn = static_cast<Connection *>(conn_ref->user_data);
106 auto handler = static_cast<ClientHandler *>(conn->data);
107 auto upstream = static_cast<Http3Upstream *>(handler->get_upstream());
108 return upstream->get_conn();
109 }
110 } // namespace
111
Http3Upstream(ClientHandler * handler)112 Http3Upstream::Http3Upstream(ClientHandler *handler)
113 : handler_{handler},
114 qlog_fd_{-1},
115 hashed_scid_{},
116 conn_{nullptr},
117 httpconn_{nullptr},
118 downstream_queue_{downstream_queue_size(handler->get_worker()),
119 !get_config()->http2_proxy},
120 retry_close_{false},
121 tx_{
122 .data = std::unique_ptr<uint8_t[]>(new uint8_t[64_k]),
123 } {
124 auto conn = handler_->get_connection();
125 conn->conn_ref.get_conn = shrpx::get_conn;
126
127 ev_timer_init(&timer_, timeoutcb, 0., 0.);
128 timer_.data = this;
129
130 ngtcp2_ccerr_default(&last_error_);
131
132 ev_timer_init(&shutdown_timer_, shutdown_timeout_cb, 0., 0.);
133 shutdown_timer_.data = this;
134
135 ev_prepare_init(&prep_, prepare_cb);
136 prep_.data = this;
137 ev_prepare_start(handler_->get_loop(), &prep_);
138 }
139
~Http3Upstream()140 Http3Upstream::~Http3Upstream() {
141 auto loop = handler_->get_loop();
142
143 ev_prepare_stop(loop, &prep_);
144 ev_timer_stop(loop, &shutdown_timer_);
145 ev_timer_stop(loop, &timer_);
146
147 nghttp3_conn_del(httpconn_);
148
149 ngtcp2_conn_del(conn_);
150
151 if (qlog_fd_ != -1) {
152 close(qlog_fd_);
153 }
154 }
155
156 namespace {
log_printf(void * user_data,const char * fmt,...)157 void log_printf(void *user_data, const char *fmt, ...) {
158 va_list ap;
159 std::array<char, 4096> buf;
160
161 va_start(ap, fmt);
162 auto nwrite = vsnprintf(buf.data(), buf.size(), fmt, ap);
163 va_end(ap);
164
165 if (static_cast<size_t>(nwrite) >= buf.size()) {
166 nwrite = buf.size() - 1;
167 }
168
169 buf[nwrite++] = '\n';
170
171 while (write(fileno(stderr), buf.data(), nwrite) == -1 && errno == EINTR)
172 ;
173 }
174 } // namespace
175
176 namespace {
qlog_write(void * user_data,uint32_t flags,const void * data,size_t datalen)177 void qlog_write(void *user_data, uint32_t flags, const void *data,
178 size_t datalen) {
179 auto upstream = static_cast<Http3Upstream *>(user_data);
180
181 upstream->qlog_write(data, datalen, flags & NGTCP2_QLOG_WRITE_FLAG_FIN);
182 }
183 } // namespace
184
qlog_write(const void * data,size_t datalen,bool fin)185 void Http3Upstream::qlog_write(const void *data, size_t datalen, bool fin) {
186 assert(qlog_fd_ != -1);
187
188 while (write(qlog_fd_, data, datalen) == -1 && errno == EINTR)
189 ;
190
191 if (fin) {
192 close(qlog_fd_);
193 qlog_fd_ = -1;
194 }
195 }
196
197 namespace {
rand(uint8_t * dest,size_t destlen,const ngtcp2_rand_ctx * rand_ctx)198 void rand(uint8_t *dest, size_t destlen, const ngtcp2_rand_ctx *rand_ctx) {
199 util::random_bytes(dest, dest + destlen,
200 *static_cast<std::mt19937 *>(rand_ctx->native_handle));
201 }
202 } // namespace
203
204 namespace {
get_new_connection_id(ngtcp2_conn * conn,ngtcp2_cid * cid,uint8_t * token,size_t cidlen,void * user_data)205 int get_new_connection_id(ngtcp2_conn *conn, ngtcp2_cid *cid, uint8_t *token,
206 size_t cidlen, void *user_data) {
207 auto upstream = static_cast<Http3Upstream *>(user_data);
208 auto handler = upstream->get_client_handler();
209 auto worker = handler->get_worker();
210 auto conn_handler = worker->get_connection_handler();
211 auto &qkms = conn_handler->get_quic_keying_materials();
212 auto &qkm = qkms->keying_materials.front();
213
214 if (generate_quic_connection_id(*cid, cidlen, worker->get_cid_prefix(),
215 qkm.id, qkm.cid_encryption_key.data()) != 0) {
216 return NGTCP2_ERR_CALLBACK_FAILURE;
217 }
218
219 if (generate_quic_stateless_reset_token(token, *cid, qkm.secret.data(),
220 qkm.secret.size()) != 0) {
221 return NGTCP2_ERR_CALLBACK_FAILURE;
222 }
223
224 auto quic_connection_handler = worker->get_quic_connection_handler();
225
226 quic_connection_handler->add_connection_id(*cid, handler);
227
228 return 0;
229 }
230 } // namespace
231
232 namespace {
remove_connection_id(ngtcp2_conn * conn,const ngtcp2_cid * cid,void * user_data)233 int remove_connection_id(ngtcp2_conn *conn, const ngtcp2_cid *cid,
234 void *user_data) {
235 auto upstream = static_cast<Http3Upstream *>(user_data);
236 auto handler = upstream->get_client_handler();
237 auto worker = handler->get_worker();
238 auto quic_conn_handler = worker->get_quic_connection_handler();
239
240 quic_conn_handler->remove_connection_id(*cid);
241
242 return 0;
243 }
244 } // namespace
245
http_begin_request_headers(int64_t stream_id)246 void Http3Upstream::http_begin_request_headers(int64_t stream_id) {
247 auto downstream =
248 std::make_unique<Downstream>(this, handler_->get_mcpool(), stream_id);
249 nghttp3_conn_set_stream_user_data(httpconn_, stream_id, downstream.get());
250
251 downstream->reset_upstream_rtimer();
252
253 handler_->repeat_read_timer();
254
255 auto &req = downstream->request();
256 req.http_major = 3;
257 req.http_minor = 0;
258
259 add_pending_downstream(std::move(downstream));
260 }
261
add_pending_downstream(std::unique_ptr<Downstream> downstream)262 void Http3Upstream::add_pending_downstream(
263 std::unique_ptr<Downstream> downstream) {
264 downstream_queue_.add_pending(std::move(downstream));
265 }
266
267 namespace {
recv_stream_data(ngtcp2_conn * conn,uint32_t flags,int64_t stream_id,uint64_t offset,const uint8_t * data,size_t datalen,void * user_data,void * stream_user_data)268 int recv_stream_data(ngtcp2_conn *conn, uint32_t flags, int64_t stream_id,
269 uint64_t offset, const uint8_t *data, size_t datalen,
270 void *user_data, void *stream_user_data) {
271 auto upstream = static_cast<Http3Upstream *>(user_data);
272
273 if (upstream->recv_stream_data(flags, stream_id, data, datalen) != 0) {
274 return NGTCP2_ERR_CALLBACK_FAILURE;
275 }
276
277 return 0;
278 }
279 } // namespace
280
recv_stream_data(uint32_t flags,int64_t stream_id,const uint8_t * data,size_t datalen)281 int Http3Upstream::recv_stream_data(uint32_t flags, int64_t stream_id,
282 const uint8_t *data, size_t datalen) {
283 assert(httpconn_);
284
285 auto nconsumed = nghttp3_conn_read_stream(
286 httpconn_, stream_id, data, datalen, flags & NGTCP2_STREAM_DATA_FLAG_FIN);
287 if (nconsumed < 0) {
288 ULOG(ERROR, this) << "nghttp3_conn_read_stream: "
289 << nghttp3_strerror(nconsumed);
290 ngtcp2_ccerr_set_application_error(
291 &last_error_, nghttp3_err_infer_quic_app_error_code(nconsumed), nullptr,
292 0);
293 return -1;
294 }
295
296 ngtcp2_conn_extend_max_stream_offset(conn_, stream_id, nconsumed);
297 ngtcp2_conn_extend_max_offset(conn_, nconsumed);
298
299 return 0;
300 }
301
302 namespace {
stream_close(ngtcp2_conn * conn,uint32_t flags,int64_t stream_id,uint64_t app_error_code,void * user_data,void * stream_user_data)303 int stream_close(ngtcp2_conn *conn, uint32_t flags, int64_t stream_id,
304 uint64_t app_error_code, void *user_data,
305 void *stream_user_data) {
306 auto upstream = static_cast<Http3Upstream *>(user_data);
307
308 if (!(flags & NGTCP2_STREAM_CLOSE_FLAG_APP_ERROR_CODE_SET)) {
309 app_error_code = NGHTTP3_H3_NO_ERROR;
310 }
311
312 if (upstream->stream_close(stream_id, app_error_code) != 0) {
313 return NGTCP2_ERR_CALLBACK_FAILURE;
314 }
315
316 return 0;
317 }
318 } // namespace
319
stream_close(int64_t stream_id,uint64_t app_error_code)320 int Http3Upstream::stream_close(int64_t stream_id, uint64_t app_error_code) {
321 if (!httpconn_) {
322 return 0;
323 }
324
325 auto rv = nghttp3_conn_close_stream(httpconn_, stream_id, app_error_code);
326 switch (rv) {
327 case 0:
328 break;
329 case NGHTTP3_ERR_STREAM_NOT_FOUND:
330 if (ngtcp2_is_bidi_stream(stream_id)) {
331 ngtcp2_conn_extend_max_streams_bidi(conn_, 1);
332 }
333 break;
334 default:
335 ULOG(ERROR, this) << "nghttp3_conn_close_stream: " << nghttp3_strerror(rv);
336 ngtcp2_ccerr_set_application_error(
337 &last_error_, nghttp3_err_infer_quic_app_error_code(rv), nullptr, 0);
338 return -1;
339 }
340
341 return 0;
342 }
343
344 namespace {
acked_stream_data_offset(ngtcp2_conn * conn,int64_t stream_id,uint64_t offset,uint64_t datalen,void * user_data,void * stream_user_data)345 int acked_stream_data_offset(ngtcp2_conn *conn, int64_t stream_id,
346 uint64_t offset, uint64_t datalen, void *user_data,
347 void *stream_user_data) {
348 auto upstream = static_cast<Http3Upstream *>(user_data);
349
350 if (upstream->acked_stream_data_offset(stream_id, datalen) != 0) {
351 return NGTCP2_ERR_CALLBACK_FAILURE;
352 }
353
354 return 0;
355 }
356 } // namespace
357
acked_stream_data_offset(int64_t stream_id,uint64_t datalen)358 int Http3Upstream::acked_stream_data_offset(int64_t stream_id,
359 uint64_t datalen) {
360 if (!httpconn_) {
361 return 0;
362 }
363
364 auto rv = nghttp3_conn_add_ack_offset(httpconn_, stream_id, datalen);
365 if (rv != 0) {
366 ULOG(ERROR, this) << "nghttp3_conn_add_ack_offset: "
367 << nghttp3_strerror(rv);
368 return -1;
369 }
370
371 return 0;
372 }
373
374 namespace {
extend_max_stream_data(ngtcp2_conn * conn,int64_t stream_id,uint64_t max_data,void * user_data,void * stream_user_data)375 int extend_max_stream_data(ngtcp2_conn *conn, int64_t stream_id,
376 uint64_t max_data, void *user_data,
377 void *stream_user_data) {
378 auto upstream = static_cast<Http3Upstream *>(user_data);
379
380 if (upstream->extend_max_stream_data(stream_id) != 0) {
381 return NGTCP2_ERR_CALLBACK_FAILURE;
382 }
383
384 return 0;
385 }
386 } // namespace
387
extend_max_stream_data(int64_t stream_id)388 int Http3Upstream::extend_max_stream_data(int64_t stream_id) {
389 if (!httpconn_) {
390 return 0;
391 }
392
393 auto rv = nghttp3_conn_unblock_stream(httpconn_, stream_id);
394 if (rv != 0) {
395 ULOG(ERROR, this) << "nghttp3_conn_unblock_stream: "
396 << nghttp3_strerror(rv);
397 return -1;
398 }
399
400 return 0;
401 }
402
403 namespace {
extend_max_remote_streams_bidi(ngtcp2_conn * conn,uint64_t max_streams,void * user_data)404 int extend_max_remote_streams_bidi(ngtcp2_conn *conn, uint64_t max_streams,
405 void *user_data) {
406 auto upstream = static_cast<Http3Upstream *>(user_data);
407
408 upstream->extend_max_remote_streams_bidi(max_streams);
409
410 return 0;
411 }
412 } // namespace
413
extend_max_remote_streams_bidi(uint64_t max_streams)414 void Http3Upstream::extend_max_remote_streams_bidi(uint64_t max_streams) {
415 nghttp3_conn_set_max_client_streams_bidi(httpconn_, max_streams);
416 }
417
418 namespace {
stream_reset(ngtcp2_conn * conn,int64_t stream_id,uint64_t final_size,uint64_t app_error_code,void * user_data,void * stream_user_data)419 int stream_reset(ngtcp2_conn *conn, int64_t stream_id, uint64_t final_size,
420 uint64_t app_error_code, void *user_data,
421 void *stream_user_data) {
422 auto upstream = static_cast<Http3Upstream *>(user_data);
423
424 if (upstream->http_shutdown_stream_read(stream_id) != 0) {
425 return NGTCP2_ERR_CALLBACK_FAILURE;
426 }
427
428 return 0;
429 }
430 } // namespace
431
http_shutdown_stream_read(int64_t stream_id)432 int Http3Upstream::http_shutdown_stream_read(int64_t stream_id) {
433 if (!httpconn_) {
434 return 0;
435 }
436
437 auto rv = nghttp3_conn_shutdown_stream_read(httpconn_, stream_id);
438 if (rv != 0) {
439 ULOG(ERROR, this) << "nghttp3_conn_shutdown_stream_read: "
440 << nghttp3_strerror(rv);
441 return -1;
442 }
443
444 return 0;
445 }
446
447 namespace {
stream_stop_sending(ngtcp2_conn * conn,int64_t stream_id,uint64_t app_error_code,void * user_data,void * stream_user_data)448 int stream_stop_sending(ngtcp2_conn *conn, int64_t stream_id,
449 uint64_t app_error_code, void *user_data,
450 void *stream_user_data) {
451 auto upstream = static_cast<Http3Upstream *>(user_data);
452
453 if (upstream->http_shutdown_stream_read(stream_id) != 0) {
454 return NGTCP2_ERR_CALLBACK_FAILURE;
455 }
456
457 return 0;
458 }
459 } // namespace
460
461 namespace {
handshake_completed(ngtcp2_conn * conn,void * user_data)462 int handshake_completed(ngtcp2_conn *conn, void *user_data) {
463 auto upstream = static_cast<Http3Upstream *>(user_data);
464
465 if (upstream->handshake_completed() != 0) {
466 return NGTCP2_ERR_CALLBACK_FAILURE;
467 }
468
469 return 0;
470 }
471 } // namespace
472
handshake_completed()473 int Http3Upstream::handshake_completed() {
474 handler_->set_alpn_from_conn();
475
476 auto alpn = handler_->get_alpn();
477 if (alpn.empty()) {
478 ULOG(ERROR, this) << "NO ALPN was negotiated";
479 return -1;
480 }
481
482 auto path = ngtcp2_conn_get_path(conn_);
483
484 return send_new_token(&path->remote);
485 }
486
487 namespace {
path_validation(ngtcp2_conn * conn,uint32_t flags,const ngtcp2_path * path,const ngtcp2_path * old_path,ngtcp2_path_validation_result res,void * user_data)488 int path_validation(ngtcp2_conn *conn, uint32_t flags, const ngtcp2_path *path,
489 const ngtcp2_path *old_path,
490 ngtcp2_path_validation_result res, void *user_data) {
491 if (res != NGTCP2_PATH_VALIDATION_RESULT_SUCCESS ||
492 !(flags & NGTCP2_PATH_VALIDATION_FLAG_NEW_TOKEN)) {
493 return 0;
494 }
495
496 auto upstream = static_cast<Http3Upstream *>(user_data);
497 if (upstream->send_new_token(&path->remote) != 0) {
498 return NGTCP2_ERR_CALLBACK_FAILURE;
499 }
500
501 return 0;
502 }
503 } // namespace
504
send_new_token(const ngtcp2_addr * remote_addr)505 int Http3Upstream::send_new_token(const ngtcp2_addr *remote_addr) {
506 std::array<uint8_t, NGTCP2_CRYPTO_MAX_REGULAR_TOKENLEN + 1> token;
507 size_t tokenlen;
508
509 auto worker = handler_->get_worker();
510 auto conn_handler = worker->get_connection_handler();
511 auto &qkms = conn_handler->get_quic_keying_materials();
512 auto &qkm = qkms->keying_materials.front();
513
514 if (generate_token(token.data(), tokenlen, remote_addr->addr,
515 remote_addr->addrlen, qkm.secret.data(),
516 qkm.secret.size()) != 0) {
517 return -1;
518 }
519
520 assert(tokenlen == NGTCP2_CRYPTO_MAX_REGULAR_TOKENLEN);
521
522 token[tokenlen++] = qkm.id;
523
524 auto rv = ngtcp2_conn_submit_new_token(conn_, token.data(), tokenlen);
525 if (rv != 0) {
526 ULOG(ERROR, this) << "ngtcp2_conn_submit_new_token: "
527 << ngtcp2_strerror(rv);
528 return -1;
529 }
530
531 return 0;
532 }
533
534 namespace {
recv_tx_key(ngtcp2_conn * conn,ngtcp2_encryption_level level,void * user_data)535 int recv_tx_key(ngtcp2_conn *conn, ngtcp2_encryption_level level,
536 void *user_data) {
537 if (level != NGTCP2_ENCRYPTION_LEVEL_1RTT) {
538 return 0;
539 }
540
541 auto upstream = static_cast<Http3Upstream *>(user_data);
542 if (upstream->setup_httpconn() != 0) {
543 return NGTCP2_ERR_CALLBACK_FAILURE;
544 }
545
546 return 0;
547 }
548 } // namespace
549
init(const UpstreamAddr * faddr,const Address & remote_addr,const Address & local_addr,const ngtcp2_pkt_hd & initial_hd,const ngtcp2_cid * odcid,const uint8_t * token,size_t tokenlen)550 int Http3Upstream::init(const UpstreamAddr *faddr, const Address &remote_addr,
551 const Address &local_addr,
552 const ngtcp2_pkt_hd &initial_hd,
553 const ngtcp2_cid *odcid, const uint8_t *token,
554 size_t tokenlen) {
555 int rv;
556
557 auto worker = handler_->get_worker();
558 auto conn_handler = worker->get_connection_handler();
559
560 auto callbacks = ngtcp2_callbacks{
561 nullptr, // client_initial
562 ngtcp2_crypto_recv_client_initial_cb,
563 ngtcp2_crypto_recv_crypto_data_cb,
564 shrpx::handshake_completed,
565 nullptr, // recv_version_negotiation
566 ngtcp2_crypto_encrypt_cb,
567 ngtcp2_crypto_decrypt_cb,
568 ngtcp2_crypto_hp_mask_cb,
569 shrpx::recv_stream_data,
570 shrpx::acked_stream_data_offset,
571 nullptr, // stream_open
572 shrpx::stream_close,
573 nullptr, // recv_stateless_reset
574 nullptr, // recv_retry
575 nullptr, // extend_max_local_streams_bidi
576 nullptr, // extend_max_local_streams_uni
577 rand,
578 get_new_connection_id,
579 remove_connection_id,
580 ngtcp2_crypto_update_key_cb,
581 shrpx::path_validation,
582 nullptr, // select_preferred_addr
583 shrpx::stream_reset,
584 shrpx::extend_max_remote_streams_bidi,
585 nullptr, // extend_max_remote_streams_uni
586 shrpx::extend_max_stream_data,
587 nullptr, // dcid_status
588 nullptr, // handshake_confirmed
589 nullptr, // recv_new_token
590 ngtcp2_crypto_delete_crypto_aead_ctx_cb,
591 ngtcp2_crypto_delete_crypto_cipher_ctx_cb,
592 nullptr, // recv_datagram
593 nullptr, // ack_datagram
594 nullptr, // lost_datagram
595 ngtcp2_crypto_get_path_challenge_data_cb,
596 shrpx::stream_stop_sending,
597 nullptr, // version_negotiation
598 nullptr, // recv_rx_key
599 shrpx::recv_tx_key,
600 };
601
602 auto config = get_config();
603 auto &quicconf = config->quic;
604 auto &http3conf = config->http3;
605
606 auto &qkms = conn_handler->get_quic_keying_materials();
607 auto &qkm = qkms->keying_materials.front();
608
609 ngtcp2_cid scid;
610
611 if (generate_quic_connection_id(scid, SHRPX_QUIC_SCIDLEN,
612 worker->get_cid_prefix(), qkm.id,
613 qkm.cid_encryption_key.data()) != 0) {
614 return -1;
615 }
616
617 ngtcp2_settings settings;
618 ngtcp2_settings_default(&settings);
619 if (quicconf.upstream.debug.log) {
620 settings.log_printf = log_printf;
621 }
622
623 if (!quicconf.upstream.qlog.dir.empty()) {
624 auto fd = open_qlog_file(quicconf.upstream.qlog.dir, scid);
625 if (fd != -1) {
626 qlog_fd_ = fd;
627 settings.qlog_write = shrpx::qlog_write;
628 }
629 }
630
631 settings.initial_ts = quic_timestamp();
632 settings.initial_rtt = static_cast<ngtcp2_tstamp>(
633 quicconf.upstream.initial_rtt * NGTCP2_SECONDS);
634 settings.cc_algo = quicconf.upstream.congestion_controller;
635 settings.max_window = http3conf.upstream.max_connection_window_size;
636 settings.max_stream_window = http3conf.upstream.max_window_size;
637 settings.max_tx_udp_payload_size = SHRPX_QUIC_MAX_UDP_PAYLOAD_SIZE;
638 settings.rand_ctx.native_handle = &worker->get_randgen();
639 settings.token = token;
640 settings.tokenlen = tokenlen;
641 settings.initial_pkt_num = std::uniform_int_distribution<uint32_t>(
642 0, std::numeric_limits<int32_t>::max())(worker->get_randgen());
643
644 ngtcp2_transport_params params;
645 ngtcp2_transport_params_default(¶ms);
646 params.initial_max_streams_bidi = http3conf.upstream.max_concurrent_streams;
647 // The minimum number of unidirectional streams required for HTTP/3.
648 params.initial_max_streams_uni = 3;
649 params.initial_max_data = http3conf.upstream.connection_window_size;
650 params.initial_max_stream_data_bidi_remote = http3conf.upstream.window_size;
651 params.initial_max_stream_data_uni = http3conf.upstream.window_size;
652 params.max_idle_timeout = static_cast<ngtcp2_tstamp>(
653 quicconf.upstream.timeout.idle * NGTCP2_SECONDS);
654
655 #ifdef OPENSSL_IS_BORINGSSL
656 if (quicconf.upstream.early_data) {
657 ngtcp2_transport_params early_data_params;
658
659 ngtcp2_transport_params_default(&early_data_params);
660
661 early_data_params.initial_max_stream_data_bidi_local =
662 params.initial_max_stream_data_bidi_local;
663 early_data_params.initial_max_stream_data_bidi_remote =
664 params.initial_max_stream_data_bidi_remote;
665 early_data_params.initial_max_stream_data_uni =
666 params.initial_max_stream_data_uni;
667 early_data_params.initial_max_data = params.initial_max_data;
668 early_data_params.initial_max_streams_bidi =
669 params.initial_max_streams_bidi;
670 early_data_params.initial_max_streams_uni = params.initial_max_streams_uni;
671
672 // TODO include HTTP/3 SETTINGS
673
674 std::array<uint8_t, 128> quic_early_data_ctx;
675
676 auto quic_early_data_ctxlen = ngtcp2_transport_params_encode(
677 quic_early_data_ctx.data(), quic_early_data_ctx.size(),
678 &early_data_params);
679
680 assert(quic_early_data_ctxlen > 0);
681 assert(static_cast<size_t>(quic_early_data_ctxlen) <=
682 quic_early_data_ctx.size());
683
684 if (SSL_set_quic_early_data_context(handler_->get_ssl(),
685 quic_early_data_ctx.data(),
686 quic_early_data_ctxlen) != 1) {
687 ULOG(ERROR, this) << "SSL_set_quic_early_data_context failed";
688 return -1;
689 }
690 }
691 #endif // OPENSSL_IS_BORINGSSL
692
693 if (odcid) {
694 params.original_dcid = *odcid;
695 params.retry_scid = initial_hd.dcid;
696 params.retry_scid_present = 1;
697 } else {
698 params.original_dcid = initial_hd.dcid;
699 }
700
701 params.original_dcid_present = 1;
702
703 rv = generate_quic_stateless_reset_token(
704 params.stateless_reset_token, scid, qkm.secret.data(), qkm.secret.size());
705 if (rv != 0) {
706 ULOG(ERROR, this) << "generate_quic_stateless_reset_token failed";
707 return -1;
708 }
709 params.stateless_reset_token_present = 1;
710
711 auto path = ngtcp2_path{
712 {
713 const_cast<sockaddr *>(&local_addr.su.sa),
714 static_cast<socklen_t>(local_addr.len),
715 },
716 {
717 const_cast<sockaddr *>(&remote_addr.su.sa),
718 static_cast<socklen_t>(remote_addr.len),
719 },
720 const_cast<UpstreamAddr *>(faddr),
721 };
722
723 rv = ngtcp2_conn_server_new(&conn_, &initial_hd.scid, &scid, &path,
724 initial_hd.version, &callbacks, &settings,
725 ¶ms, nullptr, this);
726 if (rv != 0) {
727 ULOG(ERROR, this) << "ngtcp2_conn_server_new: " << ngtcp2_strerror(rv);
728 return -1;
729 }
730
731 ngtcp2_conn_set_tls_native_handle(conn_, handler_->get_ssl());
732
733 auto quic_connection_handler = worker->get_quic_connection_handler();
734
735 if (generate_quic_hashed_connection_id(hashed_scid_, remote_addr, local_addr,
736 initial_hd.dcid) != 0) {
737 return -1;
738 }
739
740 quic_connection_handler->add_connection_id(hashed_scid_, handler_);
741 quic_connection_handler->add_connection_id(scid, handler_);
742
743 return 0;
744 }
745
on_read()746 int Http3Upstream::on_read() { return 0; }
747
on_write()748 int Http3Upstream::on_write() {
749 int rv;
750
751 if (tx_.send_blocked) {
752 rv = send_blocked_packet();
753 if (rv != 0) {
754 return -1;
755 }
756
757 if (tx_.send_blocked) {
758 return 0;
759 }
760 }
761
762 handler_->get_connection()->wlimit.stopw();
763
764 if (write_streams() != 0) {
765 return -1;
766 }
767
768 if (httpconn_ && nghttp3_conn_is_drained(httpconn_)) {
769 return -1;
770 }
771
772 reset_timer();
773
774 return 0;
775 }
776
write_streams()777 int Http3Upstream::write_streams() {
778 std::array<nghttp3_vec, 16> vec;
779 auto max_udp_payload_size = ngtcp2_conn_get_max_tx_udp_payload_size(conn_);
780 #ifdef UDP_SEGMENT
781 auto path_max_udp_payload_size =
782 ngtcp2_conn_get_path_max_tx_udp_payload_size(conn_);
783 #endif // UDP_SEGMENT
784 auto max_pktcnt = ngtcp2_conn_get_send_quantum(conn_) / max_udp_payload_size;
785 ngtcp2_pkt_info pi, prev_pi;
786 uint8_t *bufpos = tx_.data.get();
787 ngtcp2_path_storage ps, prev_ps;
788 size_t pktcnt = 0;
789 int rv;
790 size_t gso_size = 0;
791 auto ts = quic_timestamp();
792
793 ngtcp2_path_storage_zero(&ps);
794 ngtcp2_path_storage_zero(&prev_ps);
795
796 for (;;) {
797 int64_t stream_id = -1;
798 int fin = 0;
799 nghttp3_ssize sveccnt = 0;
800
801 if (httpconn_ && ngtcp2_conn_get_max_data_left(conn_)) {
802 sveccnt = nghttp3_conn_writev_stream(httpconn_, &stream_id, &fin,
803 vec.data(), vec.size());
804 if (sveccnt < 0) {
805 ULOG(ERROR, this) << "nghttp3_conn_writev_stream: "
806 << nghttp3_strerror(sveccnt);
807 ngtcp2_ccerr_set_application_error(
808 &last_error_, nghttp3_err_infer_quic_app_error_code(sveccnt),
809 nullptr, 0);
810 return handle_error();
811 }
812 }
813
814 ngtcp2_ssize ndatalen;
815 auto v = vec.data();
816 auto vcnt = static_cast<size_t>(sveccnt);
817
818 uint32_t flags = NGTCP2_WRITE_STREAM_FLAG_MORE;
819 if (fin) {
820 flags |= NGTCP2_WRITE_STREAM_FLAG_FIN;
821 }
822
823 auto nwrite = ngtcp2_conn_writev_stream(
824 conn_, &ps.path, &pi, bufpos, max_udp_payload_size, &ndatalen, flags,
825 stream_id, reinterpret_cast<const ngtcp2_vec *>(v), vcnt, ts);
826 if (nwrite < 0) {
827 switch (nwrite) {
828 case NGTCP2_ERR_STREAM_DATA_BLOCKED:
829 assert(ndatalen == -1);
830 nghttp3_conn_block_stream(httpconn_, stream_id);
831 continue;
832 case NGTCP2_ERR_STREAM_SHUT_WR:
833 assert(ndatalen == -1);
834 nghttp3_conn_shutdown_stream_write(httpconn_, stream_id);
835 continue;
836 case NGTCP2_ERR_WRITE_MORE:
837 assert(ndatalen >= 0);
838 rv = nghttp3_conn_add_write_offset(httpconn_, stream_id, ndatalen);
839 if (rv != 0) {
840 ULOG(ERROR, this)
841 << "nghttp3_conn_add_write_offset: " << nghttp3_strerror(rv);
842 ngtcp2_ccerr_set_application_error(
843 &last_error_, nghttp3_err_infer_quic_app_error_code(rv), nullptr,
844 0);
845 return handle_error();
846 }
847 continue;
848 }
849
850 assert(ndatalen == -1);
851
852 ULOG(ERROR, this) << "ngtcp2_conn_writev_stream: "
853 << ngtcp2_strerror(nwrite);
854
855 ngtcp2_ccerr_set_liberr(&last_error_, nwrite, nullptr, 0);
856
857 return handle_error();
858 } else if (ndatalen >= 0) {
859 rv = nghttp3_conn_add_write_offset(httpconn_, stream_id, ndatalen);
860 if (rv != 0) {
861 ULOG(ERROR, this) << "nghttp3_conn_add_write_offset: "
862 << nghttp3_strerror(rv);
863 ngtcp2_ccerr_set_application_error(
864 &last_error_, nghttp3_err_infer_quic_app_error_code(rv), nullptr,
865 0);
866 return handle_error();
867 }
868 }
869
870 if (nwrite == 0) {
871 if (bufpos - tx_.data.get()) {
872 auto faddr = static_cast<UpstreamAddr *>(prev_ps.path.user_data);
873 auto data = tx_.data.get();
874 auto datalen = bufpos - data;
875
876 rv = send_packet(faddr, prev_ps.path.remote.addr,
877 prev_ps.path.remote.addrlen, prev_ps.path.local.addr,
878 prev_ps.path.local.addrlen, prev_pi, data, datalen,
879 gso_size);
880 if (rv == SHRPX_ERR_SEND_BLOCKED) {
881 on_send_blocked(faddr, prev_ps.path.remote, prev_ps.path.local,
882 prev_pi, data, datalen, gso_size);
883
884 signal_write_upstream_addr(faddr);
885 }
886 }
887
888 ngtcp2_conn_update_pkt_tx_time(conn_, ts);
889
890 return 0;
891 }
892
893 bufpos += nwrite;
894
895 #ifdef UDP_SEGMENT
896 if (pktcnt == 0) {
897 ngtcp2_path_copy(&prev_ps.path, &ps.path);
898 prev_pi = pi;
899 gso_size = nwrite;
900 } else if (!ngtcp2_path_eq(&prev_ps.path, &ps.path) ||
901 prev_pi.ecn != pi.ecn ||
902 static_cast<size_t>(nwrite) > gso_size ||
903 (gso_size > path_max_udp_payload_size &&
904 static_cast<size_t>(nwrite) != gso_size)) {
905 auto faddr = static_cast<UpstreamAddr *>(prev_ps.path.user_data);
906 auto data = tx_.data.get();
907 auto datalen = bufpos - data - nwrite;
908
909 rv = send_packet(faddr, prev_ps.path.remote.addr,
910 prev_ps.path.remote.addrlen, prev_ps.path.local.addr,
911 prev_ps.path.local.addrlen, prev_pi, data, datalen,
912 gso_size);
913 switch (rv) {
914 case SHRPX_ERR_SEND_BLOCKED:
915 on_send_blocked(faddr, prev_ps.path.remote, prev_ps.path.local, prev_pi,
916 data, datalen, gso_size);
917
918 on_send_blocked(static_cast<UpstreamAddr *>(ps.path.user_data),
919 ps.path.remote, ps.path.local, pi, bufpos - nwrite,
920 nwrite, 0);
921
922 signal_write_upstream_addr(faddr);
923
924 break;
925 default: {
926 auto faddr = static_cast<UpstreamAddr *>(ps.path.user_data);
927 auto data = bufpos - nwrite;
928
929 rv = send_packet(faddr, ps.path.remote.addr, ps.path.remote.addrlen,
930 ps.path.local.addr, ps.path.local.addrlen, pi, data,
931 nwrite, 0);
932 if (rv == SHRPX_ERR_SEND_BLOCKED) {
933 on_send_blocked(faddr, ps.path.remote, ps.path.local, pi, data,
934 nwrite, 0);
935
936 signal_write_upstream_addr(faddr);
937 }
938 }
939 }
940
941 ngtcp2_conn_update_pkt_tx_time(conn_, ts);
942
943 return 0;
944 }
945
946 if (++pktcnt == max_pktcnt || static_cast<size_t>(nwrite) < gso_size) {
947 auto faddr = static_cast<UpstreamAddr *>(ps.path.user_data);
948 auto data = tx_.data.get();
949 auto datalen = bufpos - data;
950
951 rv = send_packet(faddr, ps.path.remote.addr, ps.path.remote.addrlen,
952 ps.path.local.addr, ps.path.local.addrlen, pi, data,
953 datalen, gso_size);
954 if (rv == SHRPX_ERR_SEND_BLOCKED) {
955 on_send_blocked(faddr, ps.path.remote, ps.path.local, pi, data, datalen,
956 gso_size);
957
958 signal_write_upstream_addr(faddr);
959 }
960
961 ngtcp2_conn_update_pkt_tx_time(conn_, ts);
962
963 return 0;
964 }
965 #else // !UDP_SEGMENT
966 auto faddr = static_cast<UpstreamAddr *>(ps.path.user_data);
967 auto data = tx_.data.get();
968 auto datalen = bufpos - data;
969
970 rv = send_packet(faddr, ps.path.remote.addr, ps.path.remote.addrlen,
971 ps.path.local.addr, ps.path.local.addrlen, pi, data,
972 datalen, 0);
973 if (rv == SHRPX_ERR_SEND_BLOCKED) {
974 on_send_blocked(faddr, ps.path.remote, ps.path.local, pi, data, datalen,
975 0);
976
977 ngtcp2_conn_update_pkt_tx_time(conn_, ts);
978
979 signal_write_upstream_addr(faddr);
980
981 return 0;
982 }
983
984 if (++pktcnt == max_pktcnt) {
985 ngtcp2_conn_update_pkt_tx_time(conn_, ts);
986
987 return 0;
988 }
989
990 bufpos = tx_.data.get();
991 #endif // !UDP_SEGMENT
992 }
993
994 return 0;
995 }
996
on_timeout(Downstream * downstream)997 int Http3Upstream::on_timeout(Downstream *downstream) { return 0; }
998
on_downstream_abort_request(Downstream * downstream,unsigned int status_code)999 int Http3Upstream::on_downstream_abort_request(Downstream *downstream,
1000 unsigned int status_code) {
1001 int rv;
1002
1003 rv = error_reply(downstream, status_code);
1004
1005 if (rv != 0) {
1006 return -1;
1007 }
1008
1009 handler_->signal_write();
1010
1011 return 0;
1012 }
1013
on_downstream_abort_request_with_https_redirect(Downstream * downstream)1014 int Http3Upstream::on_downstream_abort_request_with_https_redirect(
1015 Downstream *downstream) {
1016 assert(0);
1017 abort();
1018 }
1019
1020 namespace {
1021 uint64_t
infer_upstream_shutdown_stream_error_code(uint32_t downstream_error_code)1022 infer_upstream_shutdown_stream_error_code(uint32_t downstream_error_code) {
1023 // NGHTTP2_REFUSED_STREAM is important because it tells upstream
1024 // client to retry.
1025 switch (downstream_error_code) {
1026 case NGHTTP2_NO_ERROR:
1027 return NGHTTP3_H3_NO_ERROR;
1028 case NGHTTP2_REFUSED_STREAM:
1029 return NGHTTP3_H3_REQUEST_REJECTED;
1030 default:
1031 return NGHTTP3_H3_INTERNAL_ERROR;
1032 }
1033 }
1034 } // namespace
1035
downstream_read(DownstreamConnection * dconn)1036 int Http3Upstream::downstream_read(DownstreamConnection *dconn) {
1037 auto downstream = dconn->get_downstream();
1038
1039 if (downstream->get_response_state() == DownstreamState::MSG_RESET) {
1040 // The downstream stream was reset (canceled). In this case,
1041 // RST_STREAM to the upstream and delete downstream connection
1042 // here. Deleting downstream will be taken place at
1043 // on_stream_close_callback.
1044 shutdown_stream(downstream,
1045 infer_upstream_shutdown_stream_error_code(
1046 downstream->get_response_rst_stream_error_code()));
1047 downstream->pop_downstream_connection();
1048 // dconn was deleted
1049 dconn = nullptr;
1050 } else if (downstream->get_response_state() ==
1051 DownstreamState::MSG_BAD_HEADER) {
1052 if (error_reply(downstream, 502) != 0) {
1053 return -1;
1054 }
1055 downstream->pop_downstream_connection();
1056 // dconn was deleted
1057 dconn = nullptr;
1058 } else {
1059 auto rv = downstream->on_read();
1060 if (rv == SHRPX_ERR_EOF) {
1061 if (downstream->get_request_header_sent()) {
1062 return downstream_eof(dconn);
1063 }
1064 return SHRPX_ERR_RETRY;
1065 }
1066 if (rv == SHRPX_ERR_DCONN_CANCELED) {
1067 downstream->pop_downstream_connection();
1068 handler_->signal_write();
1069 return 0;
1070 }
1071 if (rv != 0) {
1072 if (rv != SHRPX_ERR_NETWORK) {
1073 if (LOG_ENABLED(INFO)) {
1074 DCLOG(INFO, dconn) << "HTTP parser failure";
1075 }
1076 }
1077 return downstream_error(dconn, Downstream::EVENT_ERROR);
1078 }
1079
1080 if (downstream->can_detach_downstream_connection()) {
1081 // Keep-alive
1082 downstream->detach_downstream_connection();
1083 }
1084 }
1085
1086 handler_->signal_write();
1087
1088 // At this point, downstream may be deleted.
1089
1090 return 0;
1091 }
1092
downstream_write(DownstreamConnection * dconn)1093 int Http3Upstream::downstream_write(DownstreamConnection *dconn) {
1094 int rv;
1095 rv = dconn->on_write();
1096 if (rv == SHRPX_ERR_NETWORK) {
1097 return downstream_error(dconn, Downstream::EVENT_ERROR);
1098 }
1099 if (rv != 0) {
1100 return rv;
1101 }
1102 return 0;
1103 }
1104
downstream_eof(DownstreamConnection * dconn)1105 int Http3Upstream::downstream_eof(DownstreamConnection *dconn) {
1106 auto downstream = dconn->get_downstream();
1107
1108 if (LOG_ENABLED(INFO)) {
1109 DCLOG(INFO, dconn) << "EOF. stream_id=" << downstream->get_stream_id();
1110 }
1111
1112 // Delete downstream connection. If we don't delete it here, it will
1113 // be pooled in on_stream_close_callback.
1114 downstream->pop_downstream_connection();
1115 // dconn was deleted
1116 dconn = nullptr;
1117 // downstream will be deleted in on_stream_close_callback.
1118 if (downstream->get_response_state() == DownstreamState::HEADER_COMPLETE) {
1119 // Server may indicate the end of the request by EOF
1120 if (LOG_ENABLED(INFO)) {
1121 ULOG(INFO, this) << "Downstream body was ended by EOF";
1122 }
1123 downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1124
1125 // For tunneled connection, MSG_COMPLETE signals
1126 // downstream_read_data_callback to send RST_STREAM after pending
1127 // response body is sent. This is needed to ensure that RST_STREAM
1128 // is sent after all pending data are sent.
1129 if (on_downstream_body_complete(downstream) != 0) {
1130 return -1;
1131 }
1132 } else if (downstream->get_response_state() !=
1133 DownstreamState::MSG_COMPLETE) {
1134 // If stream was not closed, then we set MSG_COMPLETE and let
1135 // on_stream_close_callback delete downstream.
1136 if (error_reply(downstream, 502) != 0) {
1137 return -1;
1138 }
1139 }
1140 handler_->signal_write();
1141 // At this point, downstream may be deleted.
1142 return 0;
1143 }
1144
downstream_error(DownstreamConnection * dconn,int events)1145 int Http3Upstream::downstream_error(DownstreamConnection *dconn, int events) {
1146 auto downstream = dconn->get_downstream();
1147
1148 if (LOG_ENABLED(INFO)) {
1149 if (events & Downstream::EVENT_ERROR) {
1150 DCLOG(INFO, dconn) << "Downstream network/general error";
1151 } else {
1152 DCLOG(INFO, dconn) << "Timeout";
1153 }
1154 if (downstream->get_upgraded()) {
1155 DCLOG(INFO, dconn) << "Note: this is tunnel connection";
1156 }
1157 }
1158
1159 // Delete downstream connection. If we don't delete it here, it will
1160 // be pooled in on_stream_close_callback.
1161 downstream->pop_downstream_connection();
1162 // dconn was deleted
1163 dconn = nullptr;
1164
1165 if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1166 // For SSL tunneling, we issue RST_STREAM. For other types of
1167 // stream, we don't have to do anything since response was
1168 // complete.
1169 if (downstream->get_upgraded()) {
1170 shutdown_stream(downstream, NGHTTP3_H3_NO_ERROR);
1171 }
1172 } else {
1173 if (downstream->get_response_state() == DownstreamState::HEADER_COMPLETE) {
1174 if (downstream->get_upgraded()) {
1175 if (on_downstream_body_complete(downstream) != 0) {
1176 return -1;
1177 }
1178 } else {
1179 shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
1180 }
1181 } else {
1182 unsigned int status;
1183 if (events & Downstream::EVENT_TIMEOUT) {
1184 if (downstream->get_request_header_sent()) {
1185 status = 504;
1186 } else {
1187 status = 408;
1188 }
1189 } else {
1190 status = 502;
1191 }
1192 if (error_reply(downstream, status) != 0) {
1193 return -1;
1194 }
1195 }
1196 downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1197 }
1198 handler_->signal_write();
1199 // At this point, downstream may be deleted.
1200 return 0;
1201 }
1202
get_client_handler() const1203 ClientHandler *Http3Upstream::get_client_handler() const { return handler_; }
1204
1205 namespace {
downstream_read_data_callback(nghttp3_conn * conn,int64_t stream_id,nghttp3_vec * vec,size_t veccnt,uint32_t * pflags,void * conn_user_data,void * stream_user_data)1206 nghttp3_ssize downstream_read_data_callback(nghttp3_conn *conn,
1207 int64_t stream_id, nghttp3_vec *vec,
1208 size_t veccnt, uint32_t *pflags,
1209 void *conn_user_data,
1210 void *stream_user_data) {
1211 auto upstream = static_cast<Http3Upstream *>(conn_user_data);
1212 auto downstream = static_cast<Downstream *>(stream_user_data);
1213
1214 assert(downstream);
1215
1216 auto body = downstream->get_response_buf();
1217
1218 assert(body);
1219
1220 if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE &&
1221 body->rleft_mark() == 0) {
1222 downstream->disable_upstream_wtimer();
1223 return NGHTTP3_ERR_WOULDBLOCK;
1224 }
1225
1226 downstream->reset_upstream_wtimer();
1227
1228 veccnt = body->riovec_mark(reinterpret_cast<struct iovec *>(vec), veccnt);
1229
1230 if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE &&
1231 body->rleft_mark() == 0) {
1232 *pflags |= NGHTTP3_DATA_FLAG_EOF;
1233 }
1234
1235 assert((*pflags & NGHTTP3_DATA_FLAG_EOF) || veccnt);
1236
1237 downstream->response_sent_body_length += nghttp3_vec_len(vec, veccnt);
1238
1239 if ((*pflags & NGHTTP3_DATA_FLAG_EOF) &&
1240 upstream->shutdown_stream_read(stream_id, NGHTTP3_H3_NO_ERROR) != 0) {
1241 return NGHTTP3_ERR_CALLBACK_FAILURE;
1242 }
1243
1244 return veccnt;
1245 }
1246 } // namespace
1247
on_downstream_header_complete(Downstream * downstream)1248 int Http3Upstream::on_downstream_header_complete(Downstream *downstream) {
1249 int rv;
1250
1251 const auto &req = downstream->request();
1252 auto &resp = downstream->response();
1253
1254 auto &balloc = downstream->get_block_allocator();
1255
1256 if (LOG_ENABLED(INFO)) {
1257 if (downstream->get_non_final_response()) {
1258 DLOG(INFO, downstream) << "HTTP non-final response header";
1259 } else {
1260 DLOG(INFO, downstream) << "HTTP response header completed";
1261 }
1262 }
1263
1264 auto config = get_config();
1265 auto &httpconf = config->http;
1266
1267 if (!config->http2_proxy && !httpconf.no_location_rewrite) {
1268 downstream->rewrite_location_response_header(req.scheme);
1269 }
1270
1271 #ifdef HAVE_MRUBY
1272 if (!downstream->get_non_final_response()) {
1273 auto dconn = downstream->get_downstream_connection();
1274 const auto &group = dconn->get_downstream_addr_group();
1275 if (group) {
1276 const auto &dmruby_ctx = group->shared_addr->mruby_ctx;
1277
1278 if (dmruby_ctx->run_on_response_proc(downstream) != 0) {
1279 if (error_reply(downstream, 500) != 0) {
1280 return -1;
1281 }
1282 // Returning -1 will signal deletion of dconn.
1283 return -1;
1284 }
1285
1286 if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1287 return -1;
1288 }
1289 }
1290
1291 auto worker = handler_->get_worker();
1292 auto mruby_ctx = worker->get_mruby_context();
1293
1294 if (mruby_ctx->run_on_response_proc(downstream) != 0) {
1295 if (error_reply(downstream, 500) != 0) {
1296 return -1;
1297 }
1298 // Returning -1 will signal deletion of dconn.
1299 return -1;
1300 }
1301
1302 if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1303 return -1;
1304 }
1305 }
1306 #endif // HAVE_MRUBY
1307
1308 auto nva = std::vector<nghttp3_nv>();
1309 // 4 means :status and possible server, via, and set-cookie (for
1310 // affinity cookie) header field.
1311 nva.reserve(resp.fs.headers().size() + 4 +
1312 httpconf.add_response_headers.size());
1313
1314 if (downstream->get_non_final_response()) {
1315 auto response_status = http2::stringify_status(balloc, resp.http_status);
1316
1317 nva.push_back(http3::make_nv_ls_nocopy(":status", response_status));
1318
1319 http3::copy_headers_to_nva_nocopy(nva, resp.fs.headers(),
1320 http2::HDOP_STRIP_ALL);
1321
1322 if (LOG_ENABLED(INFO)) {
1323 log_response_headers(downstream, nva);
1324 }
1325
1326 rv = nghttp3_conn_submit_info(httpconn_, downstream->get_stream_id(),
1327 nva.data(), nva.size());
1328
1329 resp.fs.clear_headers();
1330
1331 if (rv != 0) {
1332 ULOG(FATAL, this) << "nghttp3_conn_submit_info() failed";
1333 return -1;
1334 }
1335
1336 return 0;
1337 }
1338
1339 auto striphd_flags = http2::HDOP_STRIP_ALL & ~http2::HDOP_STRIP_VIA;
1340 StringRef response_status;
1341
1342 if (req.connect_proto == ConnectProto::WEBSOCKET && resp.http_status == 101) {
1343 response_status = http2::stringify_status(balloc, 200);
1344 striphd_flags |= http2::HDOP_STRIP_SEC_WEBSOCKET_ACCEPT;
1345 } else {
1346 response_status = http2::stringify_status(balloc, resp.http_status);
1347 }
1348
1349 nva.push_back(http3::make_nv_ls_nocopy(":status", response_status));
1350
1351 http3::copy_headers_to_nva_nocopy(nva, resp.fs.headers(), striphd_flags);
1352
1353 if (!config->http2_proxy && !httpconf.no_server_rewrite) {
1354 nva.push_back(http3::make_nv_ls_nocopy("server", httpconf.server_name));
1355 } else {
1356 auto server = resp.fs.header(http2::HD_SERVER);
1357 if (server) {
1358 nva.push_back(http3::make_nv_ls_nocopy("server", (*server).value));
1359 }
1360 }
1361
1362 if (!req.regular_connect_method() || !downstream->get_upgraded()) {
1363 auto affinity_cookie = downstream->get_affinity_cookie_to_send();
1364 if (affinity_cookie) {
1365 auto dconn = downstream->get_downstream_connection();
1366 assert(dconn);
1367 auto &group = dconn->get_downstream_addr_group();
1368 auto &shared_addr = group->shared_addr;
1369 auto &cookieconf = shared_addr->affinity.cookie;
1370 auto secure =
1371 http::require_cookie_secure_attribute(cookieconf.secure, req.scheme);
1372 auto cookie_str = http::create_affinity_cookie(
1373 balloc, cookieconf.name, affinity_cookie, cookieconf.path, secure);
1374 nva.push_back(http3::make_nv_ls_nocopy("set-cookie", cookie_str));
1375 }
1376 }
1377
1378 auto via = resp.fs.header(http2::HD_VIA);
1379 if (httpconf.no_via) {
1380 if (via) {
1381 nva.push_back(http3::make_nv_ls_nocopy("via", (*via).value));
1382 }
1383 } else {
1384 // we don't create more than 16 bytes in
1385 // http::create_via_header_value.
1386 size_t len = 16;
1387 if (via) {
1388 len += via->value.size() + 2;
1389 }
1390
1391 auto iov = make_byte_ref(balloc, len + 1);
1392 auto p = iov.base;
1393 if (via) {
1394 p = std::copy(std::begin(via->value), std::end(via->value), p);
1395 p = util::copy_lit(p, ", ");
1396 }
1397 p = http::create_via_header_value(p, resp.http_major, resp.http_minor);
1398 *p = '\0';
1399
1400 nva.push_back(http3::make_nv_ls_nocopy("via", StringRef{iov.base, p}));
1401 }
1402
1403 for (auto &p : httpconf.add_response_headers) {
1404 nva.push_back(http3::make_nv_nocopy(p.name, p.value));
1405 }
1406
1407 if (LOG_ENABLED(INFO)) {
1408 log_response_headers(downstream, nva);
1409 }
1410
1411 nghttp3_data_reader data_read;
1412 data_read.read_data = downstream_read_data_callback;
1413
1414 nghttp3_data_reader *data_readptr;
1415
1416 if (downstream->expect_response_body() ||
1417 downstream->expect_response_trailer()) {
1418 data_readptr = &data_read;
1419 } else {
1420 data_readptr = nullptr;
1421 }
1422
1423 rv = nghttp3_conn_submit_response(httpconn_, downstream->get_stream_id(),
1424 nva.data(), nva.size(), data_readptr);
1425 if (rv != 0) {
1426 ULOG(FATAL, this) << "nghttp3_conn_submit_response() failed";
1427 return -1;
1428 }
1429
1430 if (data_readptr) {
1431 downstream->reset_upstream_wtimer();
1432 } else if (shutdown_stream_read(downstream->get_stream_id(),
1433 NGHTTP3_H3_NO_ERROR) != 0) {
1434 return -1;
1435 }
1436
1437 return 0;
1438 }
1439
on_downstream_body(Downstream * downstream,const uint8_t * data,size_t len,bool flush)1440 int Http3Upstream::on_downstream_body(Downstream *downstream,
1441 const uint8_t *data, size_t len,
1442 bool flush) {
1443 auto body = downstream->get_response_buf();
1444 body->append(data, len);
1445
1446 if (flush) {
1447 nghttp3_conn_resume_stream(httpconn_, downstream->get_stream_id());
1448
1449 downstream->ensure_upstream_wtimer();
1450 }
1451
1452 return 0;
1453 }
1454
on_downstream_body_complete(Downstream * downstream)1455 int Http3Upstream::on_downstream_body_complete(Downstream *downstream) {
1456 if (LOG_ENABLED(INFO)) {
1457 DLOG(INFO, downstream) << "HTTP response completed";
1458 }
1459
1460 auto &resp = downstream->response();
1461
1462 if (!downstream->validate_response_recv_body_length()) {
1463 shutdown_stream(downstream, NGHTTP3_H3_GENERAL_PROTOCOL_ERROR);
1464 resp.connection_close = true;
1465 return 0;
1466 }
1467
1468 if (!downstream->get_upgraded()) {
1469 const auto &trailers = resp.fs.trailers();
1470 if (!trailers.empty()) {
1471 std::vector<nghttp3_nv> nva;
1472 nva.reserve(trailers.size());
1473 http3::copy_headers_to_nva_nocopy(nva, trailers, http2::HDOP_STRIP_ALL);
1474 if (!nva.empty()) {
1475 auto rv = nghttp3_conn_submit_trailers(
1476 httpconn_, downstream->get_stream_id(), nva.data(), nva.size());
1477 if (rv != 0) {
1478 ULOG(FATAL, this) << "nghttp3_conn_submit_trailers() failed: "
1479 << nghttp3_strerror(rv);
1480 return -1;
1481 }
1482 }
1483 }
1484 }
1485
1486 nghttp3_conn_resume_stream(httpconn_, downstream->get_stream_id());
1487 downstream->ensure_upstream_wtimer();
1488
1489 return 0;
1490 }
1491
on_handler_delete()1492 void Http3Upstream::on_handler_delete() {
1493 for (auto d = downstream_queue_.get_downstreams(); d; d = d->dlnext) {
1494 if (d->get_dispatch_state() == DispatchState::ACTIVE &&
1495 d->accesslog_ready()) {
1496 handler_->write_accesslog(d);
1497 }
1498 }
1499
1500 auto worker = handler_->get_worker();
1501 auto quic_conn_handler = worker->get_quic_connection_handler();
1502
1503 std::vector<ngtcp2_cid> scids(ngtcp2_conn_get_scid(conn_, nullptr) + 1);
1504 ngtcp2_conn_get_scid(conn_, scids.data());
1505 scids.back() = hashed_scid_;
1506
1507 for (auto &cid : scids) {
1508 quic_conn_handler->remove_connection_id(cid);
1509 }
1510
1511 if (retry_close_ || last_error_.type == NGTCP2_CCERR_TYPE_IDLE_CLOSE) {
1512 return;
1513 }
1514
1515 // If this is not idle close, send CONNECTION_CLOSE.
1516 if (!ngtcp2_conn_in_closing_period(conn_) &&
1517 !ngtcp2_conn_in_draining_period(conn_)) {
1518 ngtcp2_path_storage ps;
1519 ngtcp2_pkt_info pi;
1520 conn_close_.resize(SHRPX_QUIC_CONN_CLOSE_PKTLEN);
1521
1522 ngtcp2_path_storage_zero(&ps);
1523
1524 ngtcp2_ccerr ccerr;
1525 ngtcp2_ccerr_default(&ccerr);
1526
1527 if (worker->get_graceful_shutdown() &&
1528 !ngtcp2_conn_get_handshake_completed(conn_)) {
1529 ccerr.error_code = NGTCP2_CONNECTION_REFUSED;
1530 }
1531
1532 auto nwrite = ngtcp2_conn_write_connection_close(
1533 conn_, &ps.path, &pi, conn_close_.data(), conn_close_.size(), &ccerr,
1534 quic_timestamp());
1535 if (nwrite < 0) {
1536 if (nwrite != NGTCP2_ERR_INVALID_STATE) {
1537 ULOG(ERROR, this) << "ngtcp2_conn_write_connection_close: "
1538 << ngtcp2_strerror(nwrite);
1539 }
1540
1541 return;
1542 }
1543
1544 conn_close_.resize(nwrite);
1545
1546 send_packet(static_cast<UpstreamAddr *>(ps.path.user_data),
1547 ps.path.remote.addr, ps.path.remote.addrlen, ps.path.local.addr,
1548 ps.path.local.addrlen, pi, conn_close_.data(), nwrite, 0);
1549 }
1550
1551 auto d =
1552 static_cast<ev_tstamp>(ngtcp2_conn_get_pto(conn_) * 3) / NGTCP2_SECONDS;
1553
1554 if (LOG_ENABLED(INFO)) {
1555 ULOG(INFO, this) << "Enter close-wait period " << d << "s with "
1556 << conn_close_.size() << " bytes sentinel packet";
1557 }
1558
1559 auto cw = std::make_unique<CloseWait>(worker, std::move(scids),
1560 std::move(conn_close_), d);
1561
1562 quic_conn_handler->add_close_wait(cw.get());
1563
1564 cw.release();
1565 }
1566
on_downstream_reset(Downstream * downstream,bool no_retry)1567 int Http3Upstream::on_downstream_reset(Downstream *downstream, bool no_retry) {
1568 int rv;
1569
1570 if (downstream->get_dispatch_state() != DispatchState::ACTIVE) {
1571 // This is error condition when we failed push_request_headers()
1572 // in initiate_downstream(). Otherwise, we have
1573 // DispatchState::ACTIVE state, or we did not set
1574 // DownstreamConnection.
1575 downstream->pop_downstream_connection();
1576 handler_->signal_write();
1577
1578 return 0;
1579 }
1580
1581 if (!downstream->request_submission_ready()) {
1582 if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1583 // We have got all response body already. Send it off.
1584 downstream->pop_downstream_connection();
1585 return 0;
1586 }
1587 // pushed stream is handled here
1588 shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
1589 downstream->pop_downstream_connection();
1590
1591 handler_->signal_write();
1592
1593 return 0;
1594 }
1595
1596 downstream->pop_downstream_connection();
1597
1598 downstream->add_retry();
1599
1600 std::unique_ptr<DownstreamConnection> dconn;
1601
1602 rv = 0;
1603
1604 if (no_retry || downstream->no_more_retry()) {
1605 goto fail;
1606 }
1607
1608 // downstream connection is clean; we can retry with new
1609 // downstream connection.
1610
1611 for (;;) {
1612 auto dconn = handler_->get_downstream_connection(rv, downstream);
1613 if (!dconn) {
1614 goto fail;
1615 }
1616
1617 rv = downstream->attach_downstream_connection(std::move(dconn));
1618 if (rv == 0) {
1619 break;
1620 }
1621 }
1622
1623 rv = downstream->push_request_headers();
1624 if (rv != 0) {
1625 goto fail;
1626 }
1627
1628 return 0;
1629
1630 fail:
1631 if (rv == SHRPX_ERR_TLS_REQUIRED) {
1632 assert(0);
1633 abort();
1634 }
1635
1636 rv = on_downstream_abort_request(downstream, 502);
1637 if (rv != 0) {
1638 shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
1639 }
1640 downstream->pop_downstream_connection();
1641
1642 handler_->signal_write();
1643
1644 return 0;
1645 }
1646
pause_read(IOCtrlReason reason)1647 void Http3Upstream::pause_read(IOCtrlReason reason) {}
1648
resume_read(IOCtrlReason reason,Downstream * downstream,size_t consumed)1649 int Http3Upstream::resume_read(IOCtrlReason reason, Downstream *downstream,
1650 size_t consumed) {
1651 consume(downstream->get_stream_id(), consumed);
1652
1653 auto &req = downstream->request();
1654
1655 req.consume(consumed);
1656
1657 handler_->signal_write();
1658
1659 return 0;
1660 }
1661
send_reply(Downstream * downstream,const uint8_t * body,size_t bodylen)1662 int Http3Upstream::send_reply(Downstream *downstream, const uint8_t *body,
1663 size_t bodylen) {
1664 int rv;
1665
1666 nghttp3_data_reader data_read, *data_read_ptr = nullptr;
1667
1668 if (bodylen) {
1669 data_read.read_data = downstream_read_data_callback;
1670 data_read_ptr = &data_read;
1671 }
1672
1673 const auto &resp = downstream->response();
1674 auto config = get_config();
1675 auto &httpconf = config->http;
1676
1677 auto &balloc = downstream->get_block_allocator();
1678
1679 const auto &headers = resp.fs.headers();
1680 auto nva = std::vector<nghttp3_nv>();
1681 // 2 for :status and server
1682 nva.reserve(2 + headers.size() + httpconf.add_response_headers.size());
1683
1684 auto response_status = http2::stringify_status(balloc, resp.http_status);
1685
1686 nva.push_back(http3::make_nv_ls_nocopy(":status", response_status));
1687
1688 for (auto &kv : headers) {
1689 if (kv.name.empty() || kv.name[0] == ':') {
1690 continue;
1691 }
1692 switch (kv.token) {
1693 case http2::HD_CONNECTION:
1694 case http2::HD_KEEP_ALIVE:
1695 case http2::HD_PROXY_CONNECTION:
1696 case http2::HD_TE:
1697 case http2::HD_TRANSFER_ENCODING:
1698 case http2::HD_UPGRADE:
1699 continue;
1700 }
1701 nva.push_back(http3::make_nv_nocopy(kv.name, kv.value, kv.no_index));
1702 }
1703
1704 if (!resp.fs.header(http2::HD_SERVER)) {
1705 nva.push_back(http3::make_nv_ls_nocopy("server", config->http.server_name));
1706 }
1707
1708 for (auto &p : httpconf.add_response_headers) {
1709 nva.push_back(http3::make_nv_nocopy(p.name, p.value));
1710 }
1711
1712 rv = nghttp3_conn_submit_response(httpconn_, downstream->get_stream_id(),
1713 nva.data(), nva.size(), data_read_ptr);
1714 if (nghttp3_err_is_fatal(rv)) {
1715 ULOG(FATAL, this) << "nghttp3_conn_submit_response() failed: "
1716 << nghttp3_strerror(rv);
1717 return -1;
1718 }
1719
1720 auto buf = downstream->get_response_buf();
1721
1722 buf->append(body, bodylen);
1723
1724 downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1725
1726 if (data_read_ptr) {
1727 downstream->reset_upstream_wtimer();
1728 }
1729
1730 if (shutdown_stream_read(downstream->get_stream_id(), NGHTTP3_H3_NO_ERROR) !=
1731 0) {
1732 return -1;
1733 }
1734
1735 return 0;
1736 }
1737
initiate_push(Downstream * downstream,const StringRef & uri)1738 int Http3Upstream::initiate_push(Downstream *downstream, const StringRef &uri) {
1739 return 0;
1740 }
1741
response_riovec(struct iovec * iov,int iovcnt) const1742 int Http3Upstream::response_riovec(struct iovec *iov, int iovcnt) const {
1743 return 0;
1744 }
1745
response_drain(size_t n)1746 void Http3Upstream::response_drain(size_t n) {}
1747
response_empty() const1748 bool Http3Upstream::response_empty() const { return false; }
1749
1750 Downstream *
on_downstream_push_promise(Downstream * downstream,int32_t promised_stream_id)1751 Http3Upstream::on_downstream_push_promise(Downstream *downstream,
1752 int32_t promised_stream_id) {
1753 return nullptr;
1754 }
1755
on_downstream_push_promise_complete(Downstream * downstream,Downstream * promised_downstream)1756 int Http3Upstream::on_downstream_push_promise_complete(
1757 Downstream *downstream, Downstream *promised_downstream) {
1758 return 0;
1759 }
1760
push_enabled() const1761 bool Http3Upstream::push_enabled() const { return false; }
1762
cancel_premature_downstream(Downstream * promised_downstream)1763 void Http3Upstream::cancel_premature_downstream(
1764 Downstream *promised_downstream) {}
1765
on_read(const UpstreamAddr * faddr,const Address & remote_addr,const Address & local_addr,const ngtcp2_pkt_info & pi,const uint8_t * data,size_t datalen)1766 int Http3Upstream::on_read(const UpstreamAddr *faddr,
1767 const Address &remote_addr,
1768 const Address &local_addr, const ngtcp2_pkt_info &pi,
1769 const uint8_t *data, size_t datalen) {
1770 int rv;
1771
1772 auto path = ngtcp2_path{
1773 {
1774 const_cast<sockaddr *>(&local_addr.su.sa),
1775 static_cast<socklen_t>(local_addr.len),
1776 },
1777 {
1778 const_cast<sockaddr *>(&remote_addr.su.sa),
1779 static_cast<socklen_t>(remote_addr.len),
1780 },
1781 const_cast<UpstreamAddr *>(faddr),
1782 };
1783
1784 rv = ngtcp2_conn_read_pkt(conn_, &path, &pi, data, datalen, quic_timestamp());
1785 if (rv != 0) {
1786 switch (rv) {
1787 case NGTCP2_ERR_DRAINING:
1788 return -1;
1789 case NGTCP2_ERR_RETRY: {
1790 auto worker = handler_->get_worker();
1791 auto quic_conn_handler = worker->get_quic_connection_handler();
1792
1793 if (worker->get_graceful_shutdown()) {
1794 ngtcp2_ccerr_set_transport_error(&last_error_,
1795 NGTCP2_CONNECTION_REFUSED, nullptr, 0);
1796
1797 return handle_error();
1798 }
1799
1800 ngtcp2_version_cid vc;
1801
1802 rv =
1803 ngtcp2_pkt_decode_version_cid(&vc, data, datalen, SHRPX_QUIC_SCIDLEN);
1804 if (rv != 0) {
1805 return -1;
1806 }
1807
1808 retry_close_ = true;
1809
1810 quic_conn_handler->send_retry(handler_->get_upstream_addr(), vc.version,
1811 vc.dcid, vc.dcidlen, vc.scid, vc.scidlen,
1812 remote_addr, local_addr, datalen * 3);
1813
1814 return -1;
1815 }
1816 case NGTCP2_ERR_CRYPTO:
1817 if (!last_error_.error_code) {
1818 ngtcp2_ccerr_set_tls_alert(
1819 &last_error_, ngtcp2_conn_get_tls_alert(conn_), nullptr, 0);
1820 }
1821 break;
1822 case NGTCP2_ERR_DROP_CONN:
1823 return -1;
1824 default:
1825 if (!last_error_.error_code) {
1826 ngtcp2_ccerr_set_liberr(&last_error_, rv, nullptr, 0);
1827 }
1828 }
1829
1830 ULOG(ERROR, this) << "ngtcp2_conn_read_pkt: " << ngtcp2_strerror(rv);
1831
1832 return handle_error();
1833 }
1834
1835 return 0;
1836 }
1837
send_packet(const UpstreamAddr * faddr,const sockaddr * remote_sa,size_t remote_salen,const sockaddr * local_sa,size_t local_salen,const ngtcp2_pkt_info & pi,const uint8_t * data,size_t datalen,size_t gso_size)1838 int Http3Upstream::send_packet(const UpstreamAddr *faddr,
1839 const sockaddr *remote_sa, size_t remote_salen,
1840 const sockaddr *local_sa, size_t local_salen,
1841 const ngtcp2_pkt_info &pi, const uint8_t *data,
1842 size_t datalen, size_t gso_size) {
1843 auto rv = quic_send_packet(faddr, remote_sa, remote_salen, local_sa,
1844 local_salen, pi, data, datalen, gso_size);
1845 switch (rv) {
1846 case 0:
1847 return 0;
1848 // With GSO, sendmsg may fail with EINVAL if UDP payload is too
1849 // large.
1850 case -EINVAL:
1851 case -EMSGSIZE:
1852 // Let the packet lost.
1853 break;
1854 case -EAGAIN:
1855 #if EAGAIN != EWOULDBLOCK
1856 case -EWOULDBLOCK:
1857 #endif // EAGAIN != EWOULDBLOCK
1858 return SHRPX_ERR_SEND_BLOCKED;
1859 default:
1860 break;
1861 }
1862
1863 return -1;
1864 }
1865
on_send_blocked(const UpstreamAddr * faddr,const ngtcp2_addr & remote_addr,const ngtcp2_addr & local_addr,const ngtcp2_pkt_info & pi,const uint8_t * data,size_t datalen,size_t gso_size)1866 void Http3Upstream::on_send_blocked(const UpstreamAddr *faddr,
1867 const ngtcp2_addr &remote_addr,
1868 const ngtcp2_addr &local_addr,
1869 const ngtcp2_pkt_info &pi,
1870 const uint8_t *data, size_t datalen,
1871 size_t gso_size) {
1872 assert(tx_.num_blocked || !tx_.send_blocked);
1873 assert(tx_.num_blocked < 2);
1874
1875 tx_.send_blocked = true;
1876
1877 auto &p = tx_.blocked[tx_.num_blocked++];
1878
1879 memcpy(&p.local_addr.su, local_addr.addr, local_addr.addrlen);
1880 memcpy(&p.remote_addr.su, remote_addr.addr, remote_addr.addrlen);
1881
1882 p.local_addr.len = local_addr.addrlen;
1883 p.remote_addr.len = remote_addr.addrlen;
1884 p.faddr = faddr;
1885 p.pi = pi;
1886 p.data = data;
1887 p.datalen = datalen;
1888 p.gso_size = gso_size;
1889 }
1890
send_blocked_packet()1891 int Http3Upstream::send_blocked_packet() {
1892 int rv;
1893
1894 assert(tx_.send_blocked);
1895
1896 for (; tx_.num_blocked_sent < tx_.num_blocked; ++tx_.num_blocked_sent) {
1897 auto &p = tx_.blocked[tx_.num_blocked_sent];
1898
1899 rv = send_packet(p.faddr, &p.remote_addr.su.sa, p.remote_addr.len,
1900 &p.local_addr.su.sa, p.local_addr.len, p.pi, p.data,
1901 p.datalen, p.gso_size);
1902 if (rv == SHRPX_ERR_SEND_BLOCKED) {
1903 signal_write_upstream_addr(p.faddr);
1904
1905 return 0;
1906 }
1907 }
1908
1909 tx_.send_blocked = false;
1910 tx_.num_blocked = 0;
1911 tx_.num_blocked_sent = 0;
1912
1913 return 0;
1914 }
1915
signal_write_upstream_addr(const UpstreamAddr * faddr)1916 void Http3Upstream::signal_write_upstream_addr(const UpstreamAddr *faddr) {
1917 auto conn = handler_->get_connection();
1918
1919 if (faddr->fd != conn->wev.fd) {
1920 if (ev_is_active(&conn->wev)) {
1921 ev_io_stop(handler_->get_loop(), &conn->wev);
1922 }
1923
1924 ev_io_set(&conn->wev, faddr->fd, EV_WRITE);
1925 }
1926
1927 conn->wlimit.startw();
1928 }
1929
handle_error()1930 int Http3Upstream::handle_error() {
1931 if (ngtcp2_conn_in_closing_period(conn_) ||
1932 ngtcp2_conn_in_draining_period(conn_)) {
1933 return -1;
1934 }
1935
1936 ngtcp2_path_storage ps;
1937 ngtcp2_pkt_info pi;
1938
1939 ngtcp2_path_storage_zero(&ps);
1940
1941 auto ts = quic_timestamp();
1942
1943 conn_close_.resize(SHRPX_QUIC_CONN_CLOSE_PKTLEN);
1944
1945 auto nwrite = ngtcp2_conn_write_connection_close(
1946 conn_, &ps.path, &pi, conn_close_.data(), conn_close_.size(),
1947 &last_error_, ts);
1948 if (nwrite < 0) {
1949 ULOG(ERROR, this) << "ngtcp2_conn_write_connection_close: "
1950 << ngtcp2_strerror(nwrite);
1951 return -1;
1952 }
1953
1954 conn_close_.resize(nwrite);
1955
1956 if (nwrite == 0) {
1957 return -1;
1958 }
1959
1960 send_packet(static_cast<UpstreamAddr *>(ps.path.user_data),
1961 ps.path.remote.addr, ps.path.remote.addrlen, ps.path.local.addr,
1962 ps.path.local.addrlen, pi, conn_close_.data(), nwrite, 0);
1963
1964 return -1;
1965 }
1966
handle_expiry()1967 int Http3Upstream::handle_expiry() {
1968 int rv;
1969
1970 auto ts = quic_timestamp();
1971
1972 rv = ngtcp2_conn_handle_expiry(conn_, ts);
1973 if (rv != 0) {
1974 if (rv == NGTCP2_ERR_IDLE_CLOSE) {
1975 ULOG(INFO, this) << "Idle connection timeout";
1976 } else {
1977 ULOG(ERROR, this) << "ngtcp2_conn_handle_expiry: " << ngtcp2_strerror(rv);
1978 }
1979 ngtcp2_ccerr_set_liberr(&last_error_, rv, nullptr, 0);
1980 return handle_error();
1981 }
1982
1983 return 0;
1984 }
1985
reset_timer()1986 void Http3Upstream::reset_timer() {
1987 auto ts = quic_timestamp();
1988 auto expiry_ts = ngtcp2_conn_get_expiry(conn_);
1989 auto loop = handler_->get_loop();
1990
1991 if (expiry_ts <= ts) {
1992 ev_feed_event(loop, &timer_, EV_TIMER);
1993 return;
1994 }
1995
1996 timer_.repeat = static_cast<ev_tstamp>(expiry_ts - ts) / NGTCP2_SECONDS;
1997
1998 ev_timer_again(loop, &timer_);
1999 }
2000
2001 namespace {
http_deferred_consume(nghttp3_conn * conn,int64_t stream_id,size_t nconsumed,void * user_data,void * stream_user_data)2002 int http_deferred_consume(nghttp3_conn *conn, int64_t stream_id,
2003 size_t nconsumed, void *user_data,
2004 void *stream_user_data) {
2005 auto upstream = static_cast<Http3Upstream *>(user_data);
2006
2007 upstream->consume(stream_id, nconsumed);
2008
2009 return 0;
2010 }
2011 } // namespace
2012
2013 namespace {
http_acked_stream_data(nghttp3_conn * conn,int64_t stream_id,uint64_t datalen,void * user_data,void * stream_user_data)2014 int http_acked_stream_data(nghttp3_conn *conn, int64_t stream_id,
2015 uint64_t datalen, void *user_data,
2016 void *stream_user_data) {
2017 auto upstream = static_cast<Http3Upstream *>(user_data);
2018 auto downstream = static_cast<Downstream *>(stream_user_data);
2019
2020 assert(downstream);
2021
2022 if (upstream->http_acked_stream_data(downstream, datalen) != 0) {
2023 return NGHTTP3_ERR_CALLBACK_FAILURE;
2024 }
2025
2026 return 0;
2027 }
2028 } // namespace
2029
http_acked_stream_data(Downstream * downstream,uint64_t datalen)2030 int Http3Upstream::http_acked_stream_data(Downstream *downstream,
2031 uint64_t datalen) {
2032 if (LOG_ENABLED(INFO)) {
2033 ULOG(INFO, this) << "Stream " << downstream->get_stream_id() << " "
2034 << datalen << " bytes acknowledged";
2035 }
2036
2037 auto body = downstream->get_response_buf();
2038 auto drained = body->drain_mark(datalen);
2039 (void)drained;
2040
2041 assert(datalen == drained);
2042
2043 if (downstream->resume_read(SHRPX_NO_BUFFER, datalen) != 0) {
2044 return -1;
2045 }
2046
2047 return 0;
2048 }
2049
2050 namespace {
http_begin_request_headers(nghttp3_conn * conn,int64_t stream_id,void * user_data,void * stream_user_data)2051 int http_begin_request_headers(nghttp3_conn *conn, int64_t stream_id,
2052 void *user_data, void *stream_user_data) {
2053 if (!ngtcp2_is_bidi_stream(stream_id)) {
2054 return 0;
2055 }
2056
2057 auto upstream = static_cast<Http3Upstream *>(user_data);
2058 upstream->http_begin_request_headers(stream_id);
2059
2060 return 0;
2061 }
2062 } // namespace
2063
2064 namespace {
http_recv_request_header(nghttp3_conn * conn,int64_t stream_id,int32_t token,nghttp3_rcbuf * name,nghttp3_rcbuf * value,uint8_t flags,void * user_data,void * stream_user_data)2065 int http_recv_request_header(nghttp3_conn *conn, int64_t stream_id,
2066 int32_t token, nghttp3_rcbuf *name,
2067 nghttp3_rcbuf *value, uint8_t flags,
2068 void *user_data, void *stream_user_data) {
2069 auto upstream = static_cast<Http3Upstream *>(user_data);
2070 auto downstream = static_cast<Downstream *>(stream_user_data);
2071
2072 if (!downstream || downstream->get_stop_reading()) {
2073 return 0;
2074 }
2075
2076 if (upstream->http_recv_request_header(downstream, token, name, value, flags,
2077 /* trailer = */ false) != 0) {
2078 return NGHTTP3_ERR_CALLBACK_FAILURE;
2079 }
2080
2081 return 0;
2082 }
2083 } // namespace
2084
2085 namespace {
http_recv_request_trailer(nghttp3_conn * conn,int64_t stream_id,int32_t token,nghttp3_rcbuf * name,nghttp3_rcbuf * value,uint8_t flags,void * user_data,void * stream_user_data)2086 int http_recv_request_trailer(nghttp3_conn *conn, int64_t stream_id,
2087 int32_t token, nghttp3_rcbuf *name,
2088 nghttp3_rcbuf *value, uint8_t flags,
2089 void *user_data, void *stream_user_data) {
2090 auto upstream = static_cast<Http3Upstream *>(user_data);
2091 auto downstream = static_cast<Downstream *>(stream_user_data);
2092
2093 if (!downstream || downstream->get_stop_reading()) {
2094 return 0;
2095 }
2096
2097 if (upstream->http_recv_request_header(downstream, token, name, value, flags,
2098 /* trailer = */ true) != 0) {
2099 return NGHTTP3_ERR_CALLBACK_FAILURE;
2100 }
2101
2102 return 0;
2103 }
2104 } // namespace
2105
http_recv_request_header(Downstream * downstream,int32_t h3token,nghttp3_rcbuf * name,nghttp3_rcbuf * value,uint8_t flags,bool trailer)2106 int Http3Upstream::http_recv_request_header(Downstream *downstream,
2107 int32_t h3token,
2108 nghttp3_rcbuf *name,
2109 nghttp3_rcbuf *value, uint8_t flags,
2110 bool trailer) {
2111 auto namebuf = nghttp3_rcbuf_get_buf(name);
2112 auto valuebuf = nghttp3_rcbuf_get_buf(value);
2113 auto &req = downstream->request();
2114 auto config = get_config();
2115 auto &httpconf = config->http;
2116
2117 if (req.fs.buffer_size() + namebuf.len + valuebuf.len >
2118 httpconf.request_header_field_buffer ||
2119 req.fs.num_fields() >= httpconf.max_request_header_fields) {
2120 downstream->set_stop_reading(true);
2121
2122 if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
2123 return 0;
2124 }
2125
2126 if (LOG_ENABLED(INFO)) {
2127 ULOG(INFO, this) << "Too large or many header field size="
2128 << req.fs.buffer_size() + namebuf.len + valuebuf.len
2129 << ", num=" << req.fs.num_fields() + 1;
2130 }
2131
2132 // just ignore if this is a trailer part.
2133 if (trailer) {
2134 return 0;
2135 }
2136
2137 if (error_reply(downstream, 431) != 0) {
2138 return -1;
2139 }
2140
2141 return 0;
2142 }
2143
2144 auto token = http2::lookup_token(namebuf.base, namebuf.len);
2145 auto no_index = flags & NGHTTP3_NV_FLAG_NEVER_INDEX;
2146
2147 downstream->add_rcbuf(name);
2148 downstream->add_rcbuf(value);
2149
2150 if (trailer) {
2151 req.fs.add_trailer_token(StringRef{namebuf.base, namebuf.len},
2152 StringRef{valuebuf.base, valuebuf.len}, no_index,
2153 token);
2154 return 0;
2155 }
2156
2157 req.fs.add_header_token(StringRef{namebuf.base, namebuf.len},
2158 StringRef{valuebuf.base, valuebuf.len}, no_index,
2159 token);
2160 return 0;
2161 }
2162
2163 namespace {
http_end_request_headers(nghttp3_conn * conn,int64_t stream_id,int fin,void * user_data,void * stream_user_data)2164 int http_end_request_headers(nghttp3_conn *conn, int64_t stream_id, int fin,
2165 void *user_data, void *stream_user_data) {
2166 auto upstream = static_cast<Http3Upstream *>(user_data);
2167 auto handler = upstream->get_client_handler();
2168 auto downstream = static_cast<Downstream *>(stream_user_data);
2169
2170 if (!downstream || downstream->get_stop_reading()) {
2171 return 0;
2172 }
2173
2174 if (upstream->http_end_request_headers(downstream, fin) != 0) {
2175 return NGHTTP3_ERR_CALLBACK_FAILURE;
2176 }
2177
2178 downstream->reset_upstream_rtimer();
2179 handler->stop_read_timer();
2180
2181 return 0;
2182 }
2183 } // namespace
2184
http_end_request_headers(Downstream * downstream,int fin)2185 int Http3Upstream::http_end_request_headers(Downstream *downstream, int fin) {
2186 auto lgconf = log_config();
2187 lgconf->update_tstamp(std::chrono::system_clock::now());
2188 auto &req = downstream->request();
2189 req.tstamp = lgconf->tstamp;
2190
2191 if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
2192 return 0;
2193 }
2194
2195 auto &nva = req.fs.headers();
2196
2197 if (LOG_ENABLED(INFO)) {
2198 std::stringstream ss;
2199 for (auto &nv : nva) {
2200 if (nv.name == "authorization") {
2201 ss << TTY_HTTP_HD << nv.name << TTY_RST << ": <redacted>\n";
2202 continue;
2203 }
2204 ss << TTY_HTTP_HD << nv.name << TTY_RST << ": " << nv.value << "\n";
2205 }
2206 ULOG(INFO, this) << "HTTP request headers. stream_id="
2207 << downstream->get_stream_id() << "\n"
2208 << ss.str();
2209 }
2210
2211 auto content_length = req.fs.header(http2::HD_CONTENT_LENGTH);
2212 if (content_length) {
2213 // libnghttp3 guarantees this can be parsed
2214 req.fs.content_length = util::parse_uint(content_length->value);
2215 }
2216
2217 // presence of mandatory header fields are guaranteed by libnghttp3.
2218 auto authority = req.fs.header(http2::HD__AUTHORITY);
2219 auto path = req.fs.header(http2::HD__PATH);
2220 auto method = req.fs.header(http2::HD__METHOD);
2221 auto scheme = req.fs.header(http2::HD__SCHEME);
2222
2223 auto method_token = http2::lookup_method_token(method->value);
2224 if (method_token == -1) {
2225 if (error_reply(downstream, 501) != 0) {
2226 return -1;
2227 }
2228 return 0;
2229 }
2230
2231 auto faddr = handler_->get_upstream_addr();
2232
2233 auto config = get_config();
2234
2235 // For HTTP/2 proxy, we require :authority.
2236 if (method_token != HTTP_CONNECT && config->http2_proxy &&
2237 faddr->alt_mode == UpstreamAltMode::NONE && !authority) {
2238 shutdown_stream(downstream, NGHTTP3_H3_GENERAL_PROTOCOL_ERROR);
2239 return 0;
2240 }
2241
2242 req.method = method_token;
2243 if (scheme) {
2244 req.scheme = scheme->value;
2245 }
2246
2247 // nghttp2 library guarantees either :authority or host exist
2248 if (!authority) {
2249 req.no_authority = true;
2250 authority = req.fs.header(http2::HD_HOST);
2251 }
2252
2253 if (authority) {
2254 req.authority = authority->value;
2255 }
2256
2257 if (path) {
2258 if (method_token == HTTP_OPTIONS &&
2259 path->value == StringRef::from_lit("*")) {
2260 // Server-wide OPTIONS request. Path is empty.
2261 } else if (config->http2_proxy &&
2262 faddr->alt_mode == UpstreamAltMode::NONE) {
2263 req.path = path->value;
2264 } else {
2265 req.path = http2::rewrite_clean_path(downstream->get_block_allocator(),
2266 path->value);
2267 }
2268 }
2269
2270 auto connect_proto = req.fs.header(http2::HD__PROTOCOL);
2271 if (connect_proto) {
2272 if (connect_proto->value != "websocket") {
2273 if (error_reply(downstream, 400) != 0) {
2274 return -1;
2275 }
2276 return 0;
2277 }
2278 req.connect_proto = ConnectProto::WEBSOCKET;
2279 }
2280
2281 if (!fin) {
2282 req.http2_expect_body = true;
2283 } else if (req.fs.content_length == -1) {
2284 req.fs.content_length = 0;
2285 }
2286
2287 downstream->inspect_http2_request();
2288
2289 downstream->set_request_state(DownstreamState::HEADER_COMPLETE);
2290
2291 if (config->http.require_http_scheme &&
2292 !http::check_http_scheme(req.scheme, /* encrypted = */ true)) {
2293 if (error_reply(downstream, 400) != 0) {
2294 return -1;
2295 }
2296 }
2297
2298 #ifdef HAVE_MRUBY
2299 auto worker = handler_->get_worker();
2300 auto mruby_ctx = worker->get_mruby_context();
2301
2302 if (mruby_ctx->run_on_request_proc(downstream) != 0) {
2303 if (error_reply(downstream, 500) != 0) {
2304 return -1;
2305 }
2306 return 0;
2307 }
2308 #endif // HAVE_MRUBY
2309
2310 if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
2311 return 0;
2312 }
2313
2314 start_downstream(downstream);
2315
2316 return 0;
2317 }
2318
start_downstream(Downstream * downstream)2319 void Http3Upstream::start_downstream(Downstream *downstream) {
2320 if (downstream_queue_.can_activate(downstream->request().authority)) {
2321 initiate_downstream(downstream);
2322 return;
2323 }
2324
2325 downstream_queue_.mark_blocked(downstream);
2326 }
2327
initiate_downstream(Downstream * downstream)2328 void Http3Upstream::initiate_downstream(Downstream *downstream) {
2329 int rv;
2330
2331 #ifdef HAVE_MRUBY
2332 DownstreamConnection *dconn_ptr;
2333 #endif // HAVE_MRUBY
2334
2335 for (;;) {
2336 auto dconn = handler_->get_downstream_connection(rv, downstream);
2337 if (!dconn) {
2338 if (rv == SHRPX_ERR_TLS_REQUIRED) {
2339 assert(0);
2340 abort();
2341 }
2342
2343 rv = error_reply(downstream, 502);
2344 if (rv != 0) {
2345 shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2346 }
2347
2348 downstream->set_request_state(DownstreamState::CONNECT_FAIL);
2349 downstream_queue_.mark_failure(downstream);
2350
2351 return;
2352 }
2353
2354 #ifdef HAVE_MRUBY
2355 dconn_ptr = dconn.get();
2356 #endif // HAVE_MRUBY
2357 rv = downstream->attach_downstream_connection(std::move(dconn));
2358 if (rv == 0) {
2359 break;
2360 }
2361 }
2362
2363 #ifdef HAVE_MRUBY
2364 const auto &group = dconn_ptr->get_downstream_addr_group();
2365 if (group) {
2366 const auto &mruby_ctx = group->shared_addr->mruby_ctx;
2367 if (mruby_ctx->run_on_request_proc(downstream) != 0) {
2368 if (error_reply(downstream, 500) != 0) {
2369 shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2370 }
2371
2372 downstream_queue_.mark_failure(downstream);
2373
2374 return;
2375 }
2376
2377 if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
2378 return;
2379 }
2380 }
2381 #endif // HAVE_MRUBY
2382
2383 rv = downstream->push_request_headers();
2384 if (rv != 0) {
2385
2386 if (error_reply(downstream, 502) != 0) {
2387 shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2388 }
2389
2390 downstream_queue_.mark_failure(downstream);
2391
2392 return;
2393 }
2394
2395 downstream_queue_.mark_active(downstream);
2396
2397 auto &req = downstream->request();
2398 if (!req.http2_expect_body) {
2399 rv = downstream->end_upload_data();
2400 if (rv != 0) {
2401 shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2402 }
2403 }
2404 }
2405
2406 namespace {
http_recv_data(nghttp3_conn * conn,int64_t stream_id,const uint8_t * data,size_t datalen,void * user_data,void * stream_user_data)2407 int http_recv_data(nghttp3_conn *conn, int64_t stream_id, const uint8_t *data,
2408 size_t datalen, void *user_data, void *stream_user_data) {
2409 auto upstream = static_cast<Http3Upstream *>(user_data);
2410 auto downstream = static_cast<Downstream *>(stream_user_data);
2411
2412 if (upstream->http_recv_data(downstream, data, datalen) != 0) {
2413 return NGHTTP3_ERR_CALLBACK_FAILURE;
2414 }
2415
2416 return 0;
2417 }
2418 } // namespace
2419
http_recv_data(Downstream * downstream,const uint8_t * data,size_t datalen)2420 int Http3Upstream::http_recv_data(Downstream *downstream, const uint8_t *data,
2421 size_t datalen) {
2422 downstream->reset_upstream_rtimer();
2423
2424 if (downstream->push_upload_data_chunk(data, datalen) != 0) {
2425 if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE) {
2426 shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2427 }
2428
2429 consume(downstream->get_stream_id(), datalen);
2430
2431 return 0;
2432 }
2433
2434 return 0;
2435 }
2436
2437 namespace {
http_end_stream(nghttp3_conn * conn,int64_t stream_id,void * user_data,void * stream_user_data)2438 int http_end_stream(nghttp3_conn *conn, int64_t stream_id, void *user_data,
2439 void *stream_user_data) {
2440 auto upstream = static_cast<Http3Upstream *>(user_data);
2441 auto downstream = static_cast<Downstream *>(stream_user_data);
2442
2443 if (!downstream || downstream->get_stop_reading()) {
2444 return 0;
2445 }
2446
2447 if (upstream->http_end_stream(downstream) != 0) {
2448 return NGHTTP3_ERR_CALLBACK_FAILURE;
2449 }
2450
2451 return 0;
2452 }
2453 } // namespace
2454
http_end_stream(Downstream * downstream)2455 int Http3Upstream::http_end_stream(Downstream *downstream) {
2456 downstream->disable_upstream_rtimer();
2457
2458 if (downstream->end_upload_data() != 0) {
2459 if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE) {
2460 shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2461 }
2462 }
2463
2464 downstream->set_request_state(DownstreamState::MSG_COMPLETE);
2465
2466 return 0;
2467 }
2468
2469 namespace {
http_stream_close(nghttp3_conn * conn,int64_t stream_id,uint64_t app_error_code,void * conn_user_data,void * stream_user_data)2470 int http_stream_close(nghttp3_conn *conn, int64_t stream_id,
2471 uint64_t app_error_code, void *conn_user_data,
2472 void *stream_user_data) {
2473 auto upstream = static_cast<Http3Upstream *>(conn_user_data);
2474 auto downstream = static_cast<Downstream *>(stream_user_data);
2475
2476 if (!downstream) {
2477 return 0;
2478 }
2479
2480 if (upstream->http_stream_close(downstream, app_error_code) != 0) {
2481 return NGHTTP3_ERR_CALLBACK_FAILURE;
2482 }
2483
2484 return 0;
2485 }
2486 } // namespace
2487
http_stream_close(Downstream * downstream,uint64_t app_error_code)2488 int Http3Upstream::http_stream_close(Downstream *downstream,
2489 uint64_t app_error_code) {
2490 auto stream_id = downstream->get_stream_id();
2491
2492 if (LOG_ENABLED(INFO)) {
2493 ULOG(INFO, this) << "Stream stream_id=" << stream_id
2494 << " is being closed with app_error_code="
2495 << app_error_code;
2496
2497 auto body = downstream->get_response_buf();
2498
2499 ULOG(INFO, this) << "response unacked_left=" << body->rleft()
2500 << " not_sent=" << body->rleft_mark();
2501 }
2502
2503 auto &req = downstream->request();
2504
2505 consume(stream_id, req.unconsumed_body_length);
2506
2507 req.unconsumed_body_length = 0;
2508
2509 ngtcp2_conn_extend_max_streams_bidi(conn_, 1);
2510
2511 if (downstream->get_request_state() == DownstreamState::CONNECT_FAIL) {
2512 remove_downstream(downstream);
2513 // downstream was deleted
2514
2515 return 0;
2516 }
2517
2518 if (downstream->can_detach_downstream_connection()) {
2519 // Keep-alive
2520 downstream->detach_downstream_connection();
2521 }
2522
2523 downstream->set_request_state(DownstreamState::STREAM_CLOSED);
2524
2525 // At this point, downstream read may be paused.
2526
2527 // If shrpx_downstream::push_request_headers() failed, the
2528 // error is handled here.
2529 remove_downstream(downstream);
2530 // downstream was deleted
2531
2532 return 0;
2533 }
2534
2535 namespace {
http_stop_sending(nghttp3_conn * conn,int64_t stream_id,uint64_t app_error_code,void * user_data,void * stream_user_data)2536 int http_stop_sending(nghttp3_conn *conn, int64_t stream_id,
2537 uint64_t app_error_code, void *user_data,
2538 void *stream_user_data) {
2539 auto upstream = static_cast<Http3Upstream *>(user_data);
2540
2541 if (upstream->http_stop_sending(stream_id, app_error_code) != 0) {
2542 return NGHTTP3_ERR_CALLBACK_FAILURE;
2543 }
2544
2545 return 0;
2546 }
2547 } // namespace
2548
http_stop_sending(int64_t stream_id,uint64_t app_error_code)2549 int Http3Upstream::http_stop_sending(int64_t stream_id,
2550 uint64_t app_error_code) {
2551 auto rv =
2552 ngtcp2_conn_shutdown_stream_read(conn_, 0, stream_id, app_error_code);
2553 if (ngtcp2_err_is_fatal(rv)) {
2554 ULOG(ERROR, this) << "ngtcp2_conn_shutdown_stream_read: "
2555 << ngtcp2_strerror(rv);
2556 return -1;
2557 }
2558
2559 return 0;
2560 }
2561
2562 namespace {
http_reset_stream(nghttp3_conn * conn,int64_t stream_id,uint64_t app_error_code,void * user_data,void * stream_user_data)2563 int http_reset_stream(nghttp3_conn *conn, int64_t stream_id,
2564 uint64_t app_error_code, void *user_data,
2565 void *stream_user_data) {
2566 auto upstream = static_cast<Http3Upstream *>(user_data);
2567
2568 if (upstream->http_reset_stream(stream_id, app_error_code) != 0) {
2569 return NGHTTP3_ERR_CALLBACK_FAILURE;
2570 }
2571
2572 return 0;
2573 }
2574 } // namespace
2575
http_reset_stream(int64_t stream_id,uint64_t app_error_code)2576 int Http3Upstream::http_reset_stream(int64_t stream_id,
2577 uint64_t app_error_code) {
2578 auto rv =
2579 ngtcp2_conn_shutdown_stream_write(conn_, 0, stream_id, app_error_code);
2580 if (ngtcp2_err_is_fatal(rv)) {
2581 ULOG(ERROR, this) << "ngtcp2_conn_shutdown_stream_write: "
2582 << ngtcp2_strerror(rv);
2583 return -1;
2584 }
2585
2586 return 0;
2587 }
2588
setup_httpconn()2589 int Http3Upstream::setup_httpconn() {
2590 int rv;
2591
2592 if (ngtcp2_conn_get_streams_uni_left(conn_) < 3) {
2593 return -1;
2594 }
2595
2596 nghttp3_callbacks callbacks{
2597 shrpx::http_acked_stream_data,
2598 shrpx::http_stream_close,
2599 shrpx::http_recv_data,
2600 http_deferred_consume,
2601 shrpx::http_begin_request_headers,
2602 shrpx::http_recv_request_header,
2603 shrpx::http_end_request_headers,
2604 nullptr, // begin_trailers
2605 shrpx::http_recv_request_trailer,
2606 nullptr, // end_trailers
2607 shrpx::http_stop_sending,
2608 shrpx::http_end_stream,
2609 shrpx::http_reset_stream,
2610 };
2611
2612 auto config = get_config();
2613
2614 nghttp3_settings settings;
2615 nghttp3_settings_default(&settings);
2616 settings.qpack_max_dtable_capacity = 4_k;
2617
2618 if (!config->http2_proxy) {
2619 settings.enable_connect_protocol = 1;
2620 }
2621
2622 auto mem = nghttp3_mem_default();
2623
2624 rv = nghttp3_conn_server_new(&httpconn_, &callbacks, &settings, mem, this);
2625 if (rv != 0) {
2626 ULOG(ERROR, this) << "nghttp3_conn_server_new: " << nghttp3_strerror(rv);
2627 return -1;
2628 }
2629
2630 auto params = ngtcp2_conn_get_local_transport_params(conn_);
2631
2632 nghttp3_conn_set_max_client_streams_bidi(httpconn_,
2633 params->initial_max_streams_bidi);
2634
2635 int64_t ctrl_stream_id;
2636
2637 rv = ngtcp2_conn_open_uni_stream(conn_, &ctrl_stream_id, nullptr);
2638 if (rv != 0) {
2639 ULOG(ERROR, this) << "ngtcp2_conn_open_uni_stream: " << ngtcp2_strerror(rv);
2640 return -1;
2641 }
2642
2643 rv = nghttp3_conn_bind_control_stream(httpconn_, ctrl_stream_id);
2644 if (rv != 0) {
2645 ULOG(ERROR, this) << "nghttp3_conn_bind_control_stream: "
2646 << nghttp3_strerror(rv);
2647 return -1;
2648 }
2649
2650 int64_t qpack_enc_stream_id, qpack_dec_stream_id;
2651
2652 rv = ngtcp2_conn_open_uni_stream(conn_, &qpack_enc_stream_id, nullptr);
2653 if (rv != 0) {
2654 ULOG(ERROR, this) << "ngtcp2_conn_open_uni_stream: " << ngtcp2_strerror(rv);
2655 return -1;
2656 }
2657
2658 rv = ngtcp2_conn_open_uni_stream(conn_, &qpack_dec_stream_id, nullptr);
2659 if (rv != 0) {
2660 ULOG(ERROR, this) << "ngtcp2_conn_open_uni_stream: " << ngtcp2_strerror(rv);
2661 return -1;
2662 }
2663
2664 rv = nghttp3_conn_bind_qpack_streams(httpconn_, qpack_enc_stream_id,
2665 qpack_dec_stream_id);
2666 if (rv != 0) {
2667 ULOG(ERROR, this) << "nghttp3_conn_bind_qpack_streams: "
2668 << nghttp3_strerror(rv);
2669 return -1;
2670 }
2671
2672 return 0;
2673 }
2674
error_reply(Downstream * downstream,unsigned int status_code)2675 int Http3Upstream::error_reply(Downstream *downstream,
2676 unsigned int status_code) {
2677 int rv;
2678 auto &resp = downstream->response();
2679
2680 auto &balloc = downstream->get_block_allocator();
2681
2682 auto html = http::create_error_html(balloc, status_code);
2683 resp.http_status = status_code;
2684 auto body = downstream->get_response_buf();
2685 body->append(html);
2686 downstream->set_response_state(DownstreamState::MSG_COMPLETE);
2687
2688 nghttp3_data_reader data_read;
2689 data_read.read_data = downstream_read_data_callback;
2690
2691 auto lgconf = log_config();
2692 lgconf->update_tstamp(std::chrono::system_clock::now());
2693
2694 auto response_status = http2::stringify_status(balloc, status_code);
2695 auto content_length = util::make_string_ref_uint(balloc, html.size());
2696 auto date = make_string_ref(balloc, lgconf->tstamp->time_http);
2697
2698 auto nva = std::array<nghttp3_nv, 5>{
2699 {http3::make_nv_ls_nocopy(":status", response_status),
2700 http3::make_nv_ll("content-type", "text/html; charset=UTF-8"),
2701 http3::make_nv_ls_nocopy("server", get_config()->http.server_name),
2702 http3::make_nv_ls_nocopy("content-length", content_length),
2703 http3::make_nv_ls_nocopy("date", date)}};
2704
2705 rv = nghttp3_conn_submit_response(httpconn_, downstream->get_stream_id(),
2706 nva.data(), nva.size(), &data_read);
2707 if (nghttp3_err_is_fatal(rv)) {
2708 ULOG(FATAL, this) << "nghttp3_conn_submit_response() failed: "
2709 << nghttp3_strerror(rv);
2710 return -1;
2711 }
2712
2713 downstream->reset_upstream_wtimer();
2714
2715 if (shutdown_stream_read(downstream->get_stream_id(), NGHTTP3_H3_NO_ERROR) !=
2716 0) {
2717 return -1;
2718 }
2719
2720 return 0;
2721 }
2722
shutdown_stream(Downstream * downstream,uint64_t app_error_code)2723 int Http3Upstream::shutdown_stream(Downstream *downstream,
2724 uint64_t app_error_code) {
2725 auto stream_id = downstream->get_stream_id();
2726
2727 if (LOG_ENABLED(INFO)) {
2728 ULOG(INFO, this) << "Shutdown stream_id=" << stream_id
2729 << " with app_error_code=" << app_error_code;
2730 }
2731
2732 auto rv = ngtcp2_conn_shutdown_stream(conn_, 0, stream_id, app_error_code);
2733 if (rv != 0) {
2734 ULOG(FATAL, this) << "ngtcp2_conn_shutdown_stream() failed: "
2735 << ngtcp2_strerror(rv);
2736 return -1;
2737 }
2738
2739 return 0;
2740 }
2741
shutdown_stream_read(int64_t stream_id,uint64_t app_error_code)2742 int Http3Upstream::shutdown_stream_read(int64_t stream_id,
2743 uint64_t app_error_code) {
2744 auto rv = ngtcp2_conn_shutdown_stream_read(conn_, 0, stream_id,
2745 NGHTTP3_H3_NO_ERROR);
2746 if (ngtcp2_err_is_fatal(rv)) {
2747 ULOG(FATAL, this) << "ngtcp2_conn_shutdown_stream_read: "
2748 << ngtcp2_strerror(rv);
2749 return -1;
2750 }
2751
2752 return 0;
2753 }
2754
consume(int64_t stream_id,size_t nconsumed)2755 void Http3Upstream::consume(int64_t stream_id, size_t nconsumed) {
2756 ngtcp2_conn_extend_max_stream_offset(conn_, stream_id, nconsumed);
2757 ngtcp2_conn_extend_max_offset(conn_, nconsumed);
2758 }
2759
remove_downstream(Downstream * downstream)2760 void Http3Upstream::remove_downstream(Downstream *downstream) {
2761 if (downstream->accesslog_ready()) {
2762 handler_->write_accesslog(downstream);
2763 }
2764
2765 nghttp3_conn_set_stream_user_data(httpconn_, downstream->get_stream_id(),
2766 nullptr);
2767
2768 auto next_downstream = downstream_queue_.remove_and_get_blocked(downstream);
2769
2770 if (next_downstream) {
2771 initiate_downstream(next_downstream);
2772 }
2773
2774 if (downstream_queue_.get_downstreams() == nullptr) {
2775 // There is no downstream at the moment. Start idle timer now.
2776 handler_->repeat_read_timer();
2777 }
2778 }
2779
log_response_headers(Downstream * downstream,const std::vector<nghttp3_nv> & nva) const2780 void Http3Upstream::log_response_headers(
2781 Downstream *downstream, const std::vector<nghttp3_nv> &nva) const {
2782 std::stringstream ss;
2783 for (auto &nv : nva) {
2784 ss << TTY_HTTP_HD << StringRef{nv.name, nv.namelen} << TTY_RST << ": "
2785 << StringRef{nv.value, nv.valuelen} << "\n";
2786 }
2787 ULOG(INFO, this) << "HTTP response headers. stream_id="
2788 << downstream->get_stream_id() << "\n"
2789 << ss.str();
2790 }
2791
check_shutdown()2792 int Http3Upstream::check_shutdown() {
2793 auto worker = handler_->get_worker();
2794
2795 if (!worker->get_graceful_shutdown()) {
2796 return 0;
2797 }
2798
2799 ev_prepare_stop(handler_->get_loop(), &prep_);
2800
2801 return start_graceful_shutdown();
2802 }
2803
start_graceful_shutdown()2804 int Http3Upstream::start_graceful_shutdown() {
2805 int rv;
2806
2807 if (ev_is_active(&shutdown_timer_)) {
2808 return 0;
2809 }
2810
2811 if (!httpconn_) {
2812 return -1;
2813 }
2814
2815 rv = nghttp3_conn_submit_shutdown_notice(httpconn_);
2816 if (rv != 0) {
2817 ULOG(FATAL, this) << "nghttp3_conn_submit_shutdown_notice: "
2818 << nghttp3_strerror(rv);
2819 return -1;
2820 }
2821
2822 handler_->signal_write();
2823
2824 auto t = ngtcp2_conn_get_pto(conn_);
2825
2826 ev_timer_set(&shutdown_timer_, static_cast<ev_tstamp>(t * 3) / NGTCP2_SECONDS,
2827 0.);
2828 ev_timer_start(handler_->get_loop(), &shutdown_timer_);
2829
2830 return 0;
2831 }
2832
submit_goaway()2833 int Http3Upstream::submit_goaway() {
2834 int rv;
2835
2836 rv = nghttp3_conn_shutdown(httpconn_);
2837 if (rv != 0) {
2838 ULOG(FATAL, this) << "nghttp3_conn_shutdown: " << nghttp3_strerror(rv);
2839 return -1;
2840 }
2841
2842 handler_->signal_write();
2843
2844 return 0;
2845 }
2846
open_qlog_file(const StringRef & dir,const ngtcp2_cid & scid) const2847 int Http3Upstream::open_qlog_file(const StringRef &dir,
2848 const ngtcp2_cid &scid) const {
2849 std::array<char, sizeof("20141115T125824.741+0900")> buf;
2850
2851 auto path = dir.str();
2852 path += '/';
2853 path +=
2854 util::format_iso8601_basic(buf.data(), std::chrono::system_clock::now());
2855 path += '-';
2856 path += util::format_hex(scid.data, scid.datalen);
2857 path += ".sqlog";
2858
2859 int fd;
2860
2861 #ifdef O_CLOEXEC
2862 while ((fd = open(path.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_CLOEXEC,
2863 S_IRUSR | S_IWUSR | S_IRGRP)) == -1 &&
2864 errno == EINTR)
2865 ;
2866 #else // !O_CLOEXEC
2867 while ((fd = open(path.c_str(), O_WRONLY | O_CREAT | O_TRUNC,
2868 S_IRUSR | S_IWUSR | S_IRGRP)) == -1 &&
2869 errno == EINTR)
2870 ;
2871
2872 if (fd != -1) {
2873 util::make_socket_closeonexec(fd);
2874 }
2875 #endif // !O_CLOEXEC
2876
2877 if (fd == -1) {
2878 auto error = errno;
2879 ULOG(ERROR, this) << "Failed to open qlog file " << path
2880 << ": errno=" << error;
2881 return -1;
2882 }
2883
2884 return fd;
2885 }
2886
get_conn() const2887 ngtcp2_conn *Http3Upstream::get_conn() const { return conn_; }
2888
2889 } // namespace shrpx
2890