1 /*
2 * nghttp2 - HTTP/2 C Library
3 *
4 * Copyright (c) 2021 Tatsuhiro Tsujikawa
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining
7 * a copy of this software and associated documentation files (the
8 * "Software"), to deal in the Software without restriction, including
9 * without limitation the rights to use, copy, modify, merge, publish,
10 * distribute, sublicense, and/or sell copies of the Software, and to
11 * permit persons to whom the Software is furnished to do so, subject to
12 * the following conditions:
13 *
14 * The above copyright notice and this permission notice shall be
15 * included in all copies or substantial portions of the Software.
16 *
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24 */
25 #include "shrpx_http3_upstream.h"
26
27 #include <sys/types.h>
28 #include <sys/stat.h>
29 #include <fcntl.h>
30 #include <netinet/udp.h>
31
32 #include <cstdio>
33
34 #include <ngtcp2/ngtcp2_crypto.h>
35
36 #include "shrpx_client_handler.h"
37 #include "shrpx_downstream.h"
38 #include "shrpx_downstream_connection.h"
39 #include "shrpx_log.h"
40 #include "shrpx_quic.h"
41 #include "shrpx_worker.h"
42 #include "shrpx_http.h"
43 #include "shrpx_connection_handler.h"
44 #ifdef HAVE_MRUBY
45 # include "shrpx_mruby.h"
46 #endif // HAVE_MRUBY
47 #include "http3.h"
48 #include "util.h"
49
50 namespace shrpx {
51
52 namespace {
timeoutcb(struct ev_loop * loop,ev_timer * w,int revents)53 void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
54 auto upstream = static_cast<Http3Upstream *>(w->data);
55
56 if (upstream->handle_expiry() != 0 || upstream->on_write() != 0) {
57 goto fail;
58 }
59
60 return;
61
62 fail:
63 auto handler = upstream->get_client_handler();
64
65 delete handler;
66 }
67 } // namespace
68
69 namespace {
shutdown_timeout_cb(struct ev_loop * loop,ev_timer * w,int revents)70 void shutdown_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
71 auto upstream = static_cast<Http3Upstream *>(w->data);
72 auto handler = upstream->get_client_handler();
73
74 if (upstream->submit_goaway() != 0) {
75 delete handler;
76 }
77 }
78 } // namespace
79
80 namespace {
prepare_cb(struct ev_loop * loop,ev_prepare * w,int revent)81 void prepare_cb(struct ev_loop *loop, ev_prepare *w, int revent) {
82 auto upstream = static_cast<Http3Upstream *>(w->data);
83 auto handler = upstream->get_client_handler();
84
85 if (upstream->check_shutdown() != 0) {
86 delete handler;
87 }
88 }
89 } // namespace
90
91 namespace {
downstream_queue_size(Worker * worker)92 size_t downstream_queue_size(Worker *worker) {
93 auto &downstreamconf = *worker->get_downstream_config();
94
95 if (get_config()->http2_proxy) {
96 return downstreamconf.connections_per_host;
97 }
98
99 return downstreamconf.connections_per_frontend;
100 }
101 } // namespace
102
103 namespace {
get_conn(ngtcp2_crypto_conn_ref * conn_ref)104 ngtcp2_conn *get_conn(ngtcp2_crypto_conn_ref *conn_ref) {
105 auto conn = static_cast<Connection *>(conn_ref->user_data);
106 auto handler = static_cast<ClientHandler *>(conn->data);
107 auto upstream = static_cast<Http3Upstream *>(handler->get_upstream());
108 return upstream->get_conn();
109 }
110 } // namespace
111
Http3Upstream(ClientHandler * handler)112 Http3Upstream::Http3Upstream(ClientHandler *handler)
113 : handler_{handler},
114 qlog_fd_{-1},
115 hashed_scid_{},
116 conn_{nullptr},
117 httpconn_{nullptr},
118 downstream_queue_{downstream_queue_size(handler->get_worker()),
119 !get_config()->http2_proxy},
120 retry_close_{false},
121 tx_{
122 .data = std::unique_ptr<uint8_t[]>(new uint8_t[64_k]),
123 } {
124 auto conn = handler_->get_connection();
125 conn->conn_ref.get_conn = shrpx::get_conn;
126
127 ev_timer_init(&timer_, timeoutcb, 0., 0.);
128 timer_.data = this;
129
130 ngtcp2_ccerr_default(&last_error_);
131
132 ev_timer_init(&shutdown_timer_, shutdown_timeout_cb, 0., 0.);
133 shutdown_timer_.data = this;
134
135 ev_prepare_init(&prep_, prepare_cb);
136 prep_.data = this;
137 ev_prepare_start(handler_->get_loop(), &prep_);
138 }
139
~Http3Upstream()140 Http3Upstream::~Http3Upstream() {
141 auto loop = handler_->get_loop();
142
143 ev_prepare_stop(loop, &prep_);
144 ev_timer_stop(loop, &shutdown_timer_);
145 ev_timer_stop(loop, &timer_);
146
147 nghttp3_conn_del(httpconn_);
148
149 ngtcp2_conn_del(conn_);
150
151 if (qlog_fd_ != -1) {
152 close(qlog_fd_);
153 }
154 }
155
156 namespace {
log_printf(void * user_data,const char * fmt,...)157 void log_printf(void *user_data, const char *fmt, ...) {
158 va_list ap;
159 std::array<char, 4096> buf;
160
161 va_start(ap, fmt);
162 auto nwrite = vsnprintf(buf.data(), buf.size(), fmt, ap);
163 va_end(ap);
164
165 if (static_cast<size_t>(nwrite) >= buf.size()) {
166 nwrite = buf.size() - 1;
167 }
168
169 buf[nwrite++] = '\n';
170
171 while (write(fileno(stderr), buf.data(), nwrite) == -1 && errno == EINTR)
172 ;
173 }
174 } // namespace
175
176 namespace {
qlog_write(void * user_data,uint32_t flags,const void * data,size_t datalen)177 void qlog_write(void *user_data, uint32_t flags, const void *data,
178 size_t datalen) {
179 auto upstream = static_cast<Http3Upstream *>(user_data);
180
181 upstream->qlog_write(data, datalen, flags & NGTCP2_QLOG_WRITE_FLAG_FIN);
182 }
183 } // namespace
184
qlog_write(const void * data,size_t datalen,bool fin)185 void Http3Upstream::qlog_write(const void *data, size_t datalen, bool fin) {
186 assert(qlog_fd_ != -1);
187
188 while (write(qlog_fd_, data, datalen) == -1 && errno == EINTR)
189 ;
190
191 if (fin) {
192 close(qlog_fd_);
193 qlog_fd_ = -1;
194 }
195 }
196
197 namespace {
rand(uint8_t * dest,size_t destlen,const ngtcp2_rand_ctx * rand_ctx)198 void rand(uint8_t *dest, size_t destlen, const ngtcp2_rand_ctx *rand_ctx) {
199 util::random_bytes(dest, dest + destlen,
200 *static_cast<std::mt19937 *>(rand_ctx->native_handle));
201 }
202 } // namespace
203
204 namespace {
get_new_connection_id(ngtcp2_conn * conn,ngtcp2_cid * cid,uint8_t * token,size_t cidlen,void * user_data)205 int get_new_connection_id(ngtcp2_conn *conn, ngtcp2_cid *cid, uint8_t *token,
206 size_t cidlen, void *user_data) {
207 auto upstream = static_cast<Http3Upstream *>(user_data);
208 auto handler = upstream->get_client_handler();
209 auto worker = handler->get_worker();
210 auto conn_handler = worker->get_connection_handler();
211 auto &qkms = conn_handler->get_quic_keying_materials();
212 auto &qkm = qkms->keying_materials.front();
213
214 if (generate_quic_connection_id(*cid, cidlen, worker->get_cid_prefix(),
215 qkm.id, qkm.cid_encryption_key.data()) != 0) {
216 return NGTCP2_ERR_CALLBACK_FAILURE;
217 }
218
219 if (generate_quic_stateless_reset_token(token, *cid, qkm.secret.data(),
220 qkm.secret.size()) != 0) {
221 return NGTCP2_ERR_CALLBACK_FAILURE;
222 }
223
224 auto quic_connection_handler = worker->get_quic_connection_handler();
225
226 quic_connection_handler->add_connection_id(*cid, handler);
227
228 return 0;
229 }
230 } // namespace
231
232 namespace {
remove_connection_id(ngtcp2_conn * conn,const ngtcp2_cid * cid,void * user_data)233 int remove_connection_id(ngtcp2_conn *conn, const ngtcp2_cid *cid,
234 void *user_data) {
235 auto upstream = static_cast<Http3Upstream *>(user_data);
236 auto handler = upstream->get_client_handler();
237 auto worker = handler->get_worker();
238 auto quic_conn_handler = worker->get_quic_connection_handler();
239
240 quic_conn_handler->remove_connection_id(*cid);
241
242 return 0;
243 }
244 } // namespace
245
http_begin_request_headers(int64_t stream_id)246 void Http3Upstream::http_begin_request_headers(int64_t stream_id) {
247 auto downstream =
248 std::make_unique<Downstream>(this, handler_->get_mcpool(), stream_id);
249 nghttp3_conn_set_stream_user_data(httpconn_, stream_id, downstream.get());
250
251 downstream->reset_upstream_rtimer();
252
253 handler_->repeat_read_timer();
254
255 auto &req = downstream->request();
256 req.http_major = 3;
257 req.http_minor = 0;
258
259 add_pending_downstream(std::move(downstream));
260 }
261
add_pending_downstream(std::unique_ptr<Downstream> downstream)262 void Http3Upstream::add_pending_downstream(
263 std::unique_ptr<Downstream> downstream) {
264 downstream_queue_.add_pending(std::move(downstream));
265 }
266
267 namespace {
recv_stream_data(ngtcp2_conn * conn,uint32_t flags,int64_t stream_id,uint64_t offset,const uint8_t * data,size_t datalen,void * user_data,void * stream_user_data)268 int recv_stream_data(ngtcp2_conn *conn, uint32_t flags, int64_t stream_id,
269 uint64_t offset, const uint8_t *data, size_t datalen,
270 void *user_data, void *stream_user_data) {
271 auto upstream = static_cast<Http3Upstream *>(user_data);
272
273 if (upstream->recv_stream_data(flags, stream_id, data, datalen) != 0) {
274 return NGTCP2_ERR_CALLBACK_FAILURE;
275 }
276
277 return 0;
278 }
279 } // namespace
280
recv_stream_data(uint32_t flags,int64_t stream_id,const uint8_t * data,size_t datalen)281 int Http3Upstream::recv_stream_data(uint32_t flags, int64_t stream_id,
282 const uint8_t *data, size_t datalen) {
283 assert(httpconn_);
284
285 auto nconsumed = nghttp3_conn_read_stream(
286 httpconn_, stream_id, data, datalen, flags & NGTCP2_STREAM_DATA_FLAG_FIN);
287 if (nconsumed < 0) {
288 ULOG(ERROR, this) << "nghttp3_conn_read_stream: "
289 << nghttp3_strerror(nconsumed);
290 ngtcp2_ccerr_set_application_error(
291 &last_error_, nghttp3_err_infer_quic_app_error_code(nconsumed), nullptr,
292 0);
293 return -1;
294 }
295
296 ngtcp2_conn_extend_max_stream_offset(conn_, stream_id, nconsumed);
297 ngtcp2_conn_extend_max_offset(conn_, nconsumed);
298
299 return 0;
300 }
301
302 namespace {
stream_close(ngtcp2_conn * conn,uint32_t flags,int64_t stream_id,uint64_t app_error_code,void * user_data,void * stream_user_data)303 int stream_close(ngtcp2_conn *conn, uint32_t flags, int64_t stream_id,
304 uint64_t app_error_code, void *user_data,
305 void *stream_user_data) {
306 auto upstream = static_cast<Http3Upstream *>(user_data);
307
308 if (!(flags & NGTCP2_STREAM_CLOSE_FLAG_APP_ERROR_CODE_SET)) {
309 app_error_code = NGHTTP3_H3_NO_ERROR;
310 }
311
312 if (upstream->stream_close(stream_id, app_error_code) != 0) {
313 return NGTCP2_ERR_CALLBACK_FAILURE;
314 }
315
316 return 0;
317 }
318 } // namespace
319
stream_close(int64_t stream_id,uint64_t app_error_code)320 int Http3Upstream::stream_close(int64_t stream_id, uint64_t app_error_code) {
321 if (!httpconn_) {
322 return 0;
323 }
324
325 auto rv = nghttp3_conn_close_stream(httpconn_, stream_id, app_error_code);
326 switch (rv) {
327 case 0:
328 break;
329 case NGHTTP3_ERR_STREAM_NOT_FOUND:
330 if (ngtcp2_is_bidi_stream(stream_id)) {
331 ngtcp2_conn_extend_max_streams_bidi(conn_, 1);
332 }
333 break;
334 default:
335 ULOG(ERROR, this) << "nghttp3_conn_close_stream: " << nghttp3_strerror(rv);
336 ngtcp2_ccerr_set_application_error(
337 &last_error_, nghttp3_err_infer_quic_app_error_code(rv), nullptr, 0);
338 return -1;
339 }
340
341 return 0;
342 }
343
344 namespace {
acked_stream_data_offset(ngtcp2_conn * conn,int64_t stream_id,uint64_t offset,uint64_t datalen,void * user_data,void * stream_user_data)345 int acked_stream_data_offset(ngtcp2_conn *conn, int64_t stream_id,
346 uint64_t offset, uint64_t datalen, void *user_data,
347 void *stream_user_data) {
348 auto upstream = static_cast<Http3Upstream *>(user_data);
349
350 if (upstream->acked_stream_data_offset(stream_id, datalen) != 0) {
351 return NGTCP2_ERR_CALLBACK_FAILURE;
352 }
353
354 return 0;
355 }
356 } // namespace
357
acked_stream_data_offset(int64_t stream_id,uint64_t datalen)358 int Http3Upstream::acked_stream_data_offset(int64_t stream_id,
359 uint64_t datalen) {
360 if (!httpconn_) {
361 return 0;
362 }
363
364 auto rv = nghttp3_conn_add_ack_offset(httpconn_, stream_id, datalen);
365 if (rv != 0) {
366 ULOG(ERROR, this) << "nghttp3_conn_add_ack_offset: "
367 << nghttp3_strerror(rv);
368 return -1;
369 }
370
371 return 0;
372 }
373
374 namespace {
extend_max_stream_data(ngtcp2_conn * conn,int64_t stream_id,uint64_t max_data,void * user_data,void * stream_user_data)375 int extend_max_stream_data(ngtcp2_conn *conn, int64_t stream_id,
376 uint64_t max_data, void *user_data,
377 void *stream_user_data) {
378 auto upstream = static_cast<Http3Upstream *>(user_data);
379
380 if (upstream->extend_max_stream_data(stream_id) != 0) {
381 return NGTCP2_ERR_CALLBACK_FAILURE;
382 }
383
384 return 0;
385 }
386 } // namespace
387
extend_max_stream_data(int64_t stream_id)388 int Http3Upstream::extend_max_stream_data(int64_t stream_id) {
389 if (!httpconn_) {
390 return 0;
391 }
392
393 auto rv = nghttp3_conn_unblock_stream(httpconn_, stream_id);
394 if (rv != 0) {
395 ULOG(ERROR, this) << "nghttp3_conn_unblock_stream: "
396 << nghttp3_strerror(rv);
397 return -1;
398 }
399
400 return 0;
401 }
402
403 namespace {
extend_max_remote_streams_bidi(ngtcp2_conn * conn,uint64_t max_streams,void * user_data)404 int extend_max_remote_streams_bidi(ngtcp2_conn *conn, uint64_t max_streams,
405 void *user_data) {
406 auto upstream = static_cast<Http3Upstream *>(user_data);
407
408 upstream->extend_max_remote_streams_bidi(max_streams);
409
410 return 0;
411 }
412 } // namespace
413
extend_max_remote_streams_bidi(uint64_t max_streams)414 void Http3Upstream::extend_max_remote_streams_bidi(uint64_t max_streams) {
415 nghttp3_conn_set_max_client_streams_bidi(httpconn_, max_streams);
416 }
417
418 namespace {
stream_reset(ngtcp2_conn * conn,int64_t stream_id,uint64_t final_size,uint64_t app_error_code,void * user_data,void * stream_user_data)419 int stream_reset(ngtcp2_conn *conn, int64_t stream_id, uint64_t final_size,
420 uint64_t app_error_code, void *user_data,
421 void *stream_user_data) {
422 auto upstream = static_cast<Http3Upstream *>(user_data);
423
424 if (upstream->stream_reset(stream_id) != 0) {
425 return NGTCP2_ERR_CALLBACK_FAILURE;
426 }
427
428 return 0;
429 }
430 } // namespace
431
stream_reset(int64_t stream_id)432 int Http3Upstream::stream_reset(int64_t stream_id) {
433 if (http_shutdown_stream_read(stream_id) != 0) {
434 return -1;
435 }
436
437 if (ngtcp2_is_bidi_stream(stream_id)) {
438 auto rv = ngtcp2_conn_shutdown_stream_write(conn_, 0, stream_id,
439 NGHTTP3_H3_NO_ERROR);
440 if (rv != 0) {
441 ULOG(ERROR, this) << "ngtcp2_conn_shutdown_stream_write: "
442 << ngtcp2_strerror(rv);
443 return -1;
444 }
445 }
446
447 return 0;
448 }
449
http_shutdown_stream_read(int64_t stream_id)450 int Http3Upstream::http_shutdown_stream_read(int64_t stream_id) {
451 if (!httpconn_) {
452 return 0;
453 }
454
455 auto rv = nghttp3_conn_shutdown_stream_read(httpconn_, stream_id);
456 if (rv != 0) {
457 ULOG(ERROR, this) << "nghttp3_conn_shutdown_stream_read: "
458 << nghttp3_strerror(rv);
459 return -1;
460 }
461
462 return 0;
463 }
464
465 namespace {
stream_stop_sending(ngtcp2_conn * conn,int64_t stream_id,uint64_t app_error_code,void * user_data,void * stream_user_data)466 int stream_stop_sending(ngtcp2_conn *conn, int64_t stream_id,
467 uint64_t app_error_code, void *user_data,
468 void *stream_user_data) {
469 auto upstream = static_cast<Http3Upstream *>(user_data);
470
471 if (upstream->http_shutdown_stream_read(stream_id) != 0) {
472 return NGTCP2_ERR_CALLBACK_FAILURE;
473 }
474
475 return 0;
476 }
477 } // namespace
478
479 namespace {
handshake_completed(ngtcp2_conn * conn,void * user_data)480 int handshake_completed(ngtcp2_conn *conn, void *user_data) {
481 auto upstream = static_cast<Http3Upstream *>(user_data);
482
483 if (upstream->handshake_completed() != 0) {
484 return NGTCP2_ERR_CALLBACK_FAILURE;
485 }
486
487 return 0;
488 }
489 } // namespace
490
handshake_completed()491 int Http3Upstream::handshake_completed() {
492 handler_->set_alpn_from_conn();
493
494 auto alpn = handler_->get_alpn();
495 if (alpn.empty()) {
496 ULOG(ERROR, this) << "NO ALPN was negotiated";
497 return -1;
498 }
499
500 auto path = ngtcp2_conn_get_path(conn_);
501
502 return send_new_token(&path->remote);
503 }
504
505 namespace {
path_validation(ngtcp2_conn * conn,uint32_t flags,const ngtcp2_path * path,const ngtcp2_path * old_path,ngtcp2_path_validation_result res,void * user_data)506 int path_validation(ngtcp2_conn *conn, uint32_t flags, const ngtcp2_path *path,
507 const ngtcp2_path *old_path,
508 ngtcp2_path_validation_result res, void *user_data) {
509 if (res != NGTCP2_PATH_VALIDATION_RESULT_SUCCESS ||
510 !(flags & NGTCP2_PATH_VALIDATION_FLAG_NEW_TOKEN)) {
511 return 0;
512 }
513
514 auto upstream = static_cast<Http3Upstream *>(user_data);
515 if (upstream->send_new_token(&path->remote) != 0) {
516 return NGTCP2_ERR_CALLBACK_FAILURE;
517 }
518
519 return 0;
520 }
521 } // namespace
522
send_new_token(const ngtcp2_addr * remote_addr)523 int Http3Upstream::send_new_token(const ngtcp2_addr *remote_addr) {
524 std::array<uint8_t, NGTCP2_CRYPTO_MAX_REGULAR_TOKENLEN + 1> token;
525 size_t tokenlen;
526
527 auto worker = handler_->get_worker();
528 auto conn_handler = worker->get_connection_handler();
529 auto &qkms = conn_handler->get_quic_keying_materials();
530 auto &qkm = qkms->keying_materials.front();
531
532 if (generate_token(token.data(), tokenlen, remote_addr->addr,
533 remote_addr->addrlen, qkm.secret.data(),
534 qkm.secret.size()) != 0) {
535 return -1;
536 }
537
538 assert(tokenlen == NGTCP2_CRYPTO_MAX_REGULAR_TOKENLEN);
539
540 token[tokenlen++] = qkm.id;
541
542 auto rv = ngtcp2_conn_submit_new_token(conn_, token.data(), tokenlen);
543 if (rv != 0) {
544 ULOG(ERROR, this) << "ngtcp2_conn_submit_new_token: "
545 << ngtcp2_strerror(rv);
546 return -1;
547 }
548
549 return 0;
550 }
551
552 namespace {
recv_tx_key(ngtcp2_conn * conn,ngtcp2_encryption_level level,void * user_data)553 int recv_tx_key(ngtcp2_conn *conn, ngtcp2_encryption_level level,
554 void *user_data) {
555 if (level != NGTCP2_ENCRYPTION_LEVEL_1RTT) {
556 return 0;
557 }
558
559 auto upstream = static_cast<Http3Upstream *>(user_data);
560 if (upstream->setup_httpconn() != 0) {
561 return NGTCP2_ERR_CALLBACK_FAILURE;
562 }
563
564 return 0;
565 }
566 } // namespace
567
init(const UpstreamAddr * faddr,const Address & remote_addr,const Address & local_addr,const ngtcp2_pkt_hd & initial_hd,const ngtcp2_cid * odcid,const uint8_t * token,size_t tokenlen,ngtcp2_token_type token_type)568 int Http3Upstream::init(const UpstreamAddr *faddr, const Address &remote_addr,
569 const Address &local_addr,
570 const ngtcp2_pkt_hd &initial_hd,
571 const ngtcp2_cid *odcid, const uint8_t *token,
572 size_t tokenlen, ngtcp2_token_type token_type) {
573 int rv;
574
575 auto worker = handler_->get_worker();
576 auto conn_handler = worker->get_connection_handler();
577
578 auto callbacks = ngtcp2_callbacks{
579 nullptr, // client_initial
580 ngtcp2_crypto_recv_client_initial_cb,
581 ngtcp2_crypto_recv_crypto_data_cb,
582 shrpx::handshake_completed,
583 nullptr, // recv_version_negotiation
584 ngtcp2_crypto_encrypt_cb,
585 ngtcp2_crypto_decrypt_cb,
586 ngtcp2_crypto_hp_mask_cb,
587 shrpx::recv_stream_data,
588 shrpx::acked_stream_data_offset,
589 nullptr, // stream_open
590 shrpx::stream_close,
591 nullptr, // recv_stateless_reset
592 nullptr, // recv_retry
593 nullptr, // extend_max_local_streams_bidi
594 nullptr, // extend_max_local_streams_uni
595 rand,
596 get_new_connection_id,
597 remove_connection_id,
598 ngtcp2_crypto_update_key_cb,
599 shrpx::path_validation,
600 nullptr, // select_preferred_addr
601 shrpx::stream_reset,
602 shrpx::extend_max_remote_streams_bidi,
603 nullptr, // extend_max_remote_streams_uni
604 shrpx::extend_max_stream_data,
605 nullptr, // dcid_status
606 nullptr, // handshake_confirmed
607 nullptr, // recv_new_token
608 ngtcp2_crypto_delete_crypto_aead_ctx_cb,
609 ngtcp2_crypto_delete_crypto_cipher_ctx_cb,
610 nullptr, // recv_datagram
611 nullptr, // ack_datagram
612 nullptr, // lost_datagram
613 ngtcp2_crypto_get_path_challenge_data_cb,
614 shrpx::stream_stop_sending,
615 nullptr, // version_negotiation
616 nullptr, // recv_rx_key
617 shrpx::recv_tx_key,
618 };
619
620 auto config = get_config();
621 auto &quicconf = config->quic;
622 auto &http3conf = config->http3;
623
624 auto &qkms = conn_handler->get_quic_keying_materials();
625 auto &qkm = qkms->keying_materials.front();
626
627 ngtcp2_cid scid;
628
629 if (generate_quic_connection_id(scid, SHRPX_QUIC_SCIDLEN,
630 worker->get_cid_prefix(), qkm.id,
631 qkm.cid_encryption_key.data()) != 0) {
632 return -1;
633 }
634
635 ngtcp2_settings settings;
636 ngtcp2_settings_default(&settings);
637 if (quicconf.upstream.debug.log) {
638 settings.log_printf = log_printf;
639 }
640
641 if (!quicconf.upstream.qlog.dir.empty()) {
642 auto fd = open_qlog_file(quicconf.upstream.qlog.dir, scid);
643 if (fd != -1) {
644 qlog_fd_ = fd;
645 settings.qlog_write = shrpx::qlog_write;
646 }
647 }
648
649 settings.initial_ts = quic_timestamp();
650 settings.initial_rtt = static_cast<ngtcp2_tstamp>(
651 quicconf.upstream.initial_rtt * NGTCP2_SECONDS);
652 settings.cc_algo = quicconf.upstream.congestion_controller;
653 settings.max_window = http3conf.upstream.max_connection_window_size;
654 settings.max_stream_window = http3conf.upstream.max_window_size;
655 settings.max_tx_udp_payload_size = SHRPX_QUIC_MAX_UDP_PAYLOAD_SIZE;
656 settings.rand_ctx.native_handle = &worker->get_randgen();
657 settings.token = token;
658 settings.tokenlen = tokenlen;
659 settings.token_type = token_type;
660 settings.initial_pkt_num = std::uniform_int_distribution<uint32_t>(
661 0, std::numeric_limits<int32_t>::max())(worker->get_randgen());
662
663 ngtcp2_transport_params params;
664 ngtcp2_transport_params_default(¶ms);
665 params.initial_max_streams_bidi = http3conf.upstream.max_concurrent_streams;
666 // The minimum number of unidirectional streams required for HTTP/3.
667 params.initial_max_streams_uni = 3;
668 params.initial_max_data = http3conf.upstream.connection_window_size;
669 params.initial_max_stream_data_bidi_remote = http3conf.upstream.window_size;
670 params.initial_max_stream_data_uni = http3conf.upstream.window_size;
671 params.max_idle_timeout = static_cast<ngtcp2_tstamp>(
672 quicconf.upstream.timeout.idle * NGTCP2_SECONDS);
673
674 #ifdef OPENSSL_IS_BORINGSSL
675 if (quicconf.upstream.early_data) {
676 ngtcp2_transport_params early_data_params;
677
678 ngtcp2_transport_params_default(&early_data_params);
679
680 early_data_params.initial_max_stream_data_bidi_local =
681 params.initial_max_stream_data_bidi_local;
682 early_data_params.initial_max_stream_data_bidi_remote =
683 params.initial_max_stream_data_bidi_remote;
684 early_data_params.initial_max_stream_data_uni =
685 params.initial_max_stream_data_uni;
686 early_data_params.initial_max_data = params.initial_max_data;
687 early_data_params.initial_max_streams_bidi =
688 params.initial_max_streams_bidi;
689 early_data_params.initial_max_streams_uni = params.initial_max_streams_uni;
690
691 // TODO include HTTP/3 SETTINGS
692
693 std::array<uint8_t, 128> quic_early_data_ctx;
694
695 auto quic_early_data_ctxlen = ngtcp2_transport_params_encode(
696 quic_early_data_ctx.data(), quic_early_data_ctx.size(),
697 &early_data_params);
698
699 assert(quic_early_data_ctxlen > 0);
700 assert(static_cast<size_t>(quic_early_data_ctxlen) <=
701 quic_early_data_ctx.size());
702
703 if (SSL_set_quic_early_data_context(handler_->get_ssl(),
704 quic_early_data_ctx.data(),
705 quic_early_data_ctxlen) != 1) {
706 ULOG(ERROR, this) << "SSL_set_quic_early_data_context failed";
707 return -1;
708 }
709 }
710 #endif // OPENSSL_IS_BORINGSSL
711
712 if (odcid) {
713 params.original_dcid = *odcid;
714 params.retry_scid = initial_hd.dcid;
715 params.retry_scid_present = 1;
716 } else {
717 params.original_dcid = initial_hd.dcid;
718 }
719
720 params.original_dcid_present = 1;
721
722 rv = generate_quic_stateless_reset_token(
723 params.stateless_reset_token, scid, qkm.secret.data(), qkm.secret.size());
724 if (rv != 0) {
725 ULOG(ERROR, this) << "generate_quic_stateless_reset_token failed";
726 return -1;
727 }
728 params.stateless_reset_token_present = 1;
729
730 auto path = ngtcp2_path{
731 {
732 const_cast<sockaddr *>(&local_addr.su.sa),
733 static_cast<socklen_t>(local_addr.len),
734 },
735 {
736 const_cast<sockaddr *>(&remote_addr.su.sa),
737 static_cast<socklen_t>(remote_addr.len),
738 },
739 const_cast<UpstreamAddr *>(faddr),
740 };
741
742 rv = ngtcp2_conn_server_new(&conn_, &initial_hd.scid, &scid, &path,
743 initial_hd.version, &callbacks, &settings,
744 ¶ms, nullptr, this);
745 if (rv != 0) {
746 ULOG(ERROR, this) << "ngtcp2_conn_server_new: " << ngtcp2_strerror(rv);
747 return -1;
748 }
749
750 ngtcp2_conn_set_tls_native_handle(conn_, handler_->get_ssl());
751
752 auto quic_connection_handler = worker->get_quic_connection_handler();
753
754 if (generate_quic_hashed_connection_id(hashed_scid_, remote_addr, local_addr,
755 initial_hd.dcid) != 0) {
756 return -1;
757 }
758
759 quic_connection_handler->add_connection_id(hashed_scid_, handler_);
760 quic_connection_handler->add_connection_id(scid, handler_);
761
762 return 0;
763 }
764
on_read()765 int Http3Upstream::on_read() { return 0; }
766
on_write()767 int Http3Upstream::on_write() {
768 int rv;
769
770 if (tx_.send_blocked) {
771 rv = send_blocked_packet();
772 if (rv != 0) {
773 return -1;
774 }
775
776 if (tx_.send_blocked) {
777 return 0;
778 }
779 }
780
781 handler_->get_connection()->wlimit.stopw();
782
783 if (write_streams() != 0) {
784 return -1;
785 }
786
787 if (httpconn_ && nghttp3_conn_is_drained(httpconn_)) {
788 return -1;
789 }
790
791 reset_timer();
792
793 return 0;
794 }
795
write_streams()796 int Http3Upstream::write_streams() {
797 std::array<nghttp3_vec, 16> vec;
798 auto max_udp_payload_size = ngtcp2_conn_get_max_tx_udp_payload_size(conn_);
799 #ifdef UDP_SEGMENT
800 auto path_max_udp_payload_size =
801 ngtcp2_conn_get_path_max_tx_udp_payload_size(conn_);
802 #endif // UDP_SEGMENT
803 auto max_pktcnt = ngtcp2_conn_get_send_quantum(conn_) / max_udp_payload_size;
804 ngtcp2_pkt_info pi, prev_pi;
805 uint8_t *bufpos = tx_.data.get();
806 ngtcp2_path_storage ps, prev_ps;
807 size_t pktcnt = 0;
808 int rv;
809 size_t gso_size = 0;
810 auto ts = quic_timestamp();
811
812 ngtcp2_path_storage_zero(&ps);
813 ngtcp2_path_storage_zero(&prev_ps);
814
815 for (;;) {
816 int64_t stream_id = -1;
817 int fin = 0;
818 nghttp3_ssize sveccnt = 0;
819
820 if (httpconn_ && ngtcp2_conn_get_max_data_left(conn_)) {
821 sveccnt = nghttp3_conn_writev_stream(httpconn_, &stream_id, &fin,
822 vec.data(), vec.size());
823 if (sveccnt < 0) {
824 ULOG(ERROR, this) << "nghttp3_conn_writev_stream: "
825 << nghttp3_strerror(sveccnt);
826 ngtcp2_ccerr_set_application_error(
827 &last_error_, nghttp3_err_infer_quic_app_error_code(sveccnt),
828 nullptr, 0);
829 return handle_error();
830 }
831 }
832
833 ngtcp2_ssize ndatalen;
834 auto v = vec.data();
835 auto vcnt = static_cast<size_t>(sveccnt);
836
837 uint32_t flags = NGTCP2_WRITE_STREAM_FLAG_MORE;
838 if (fin) {
839 flags |= NGTCP2_WRITE_STREAM_FLAG_FIN;
840 }
841
842 auto nwrite = ngtcp2_conn_writev_stream(
843 conn_, &ps.path, &pi, bufpos, max_udp_payload_size, &ndatalen, flags,
844 stream_id, reinterpret_cast<const ngtcp2_vec *>(v), vcnt, ts);
845 if (nwrite < 0) {
846 switch (nwrite) {
847 case NGTCP2_ERR_STREAM_DATA_BLOCKED:
848 assert(ndatalen == -1);
849 nghttp3_conn_block_stream(httpconn_, stream_id);
850 continue;
851 case NGTCP2_ERR_STREAM_SHUT_WR:
852 assert(ndatalen == -1);
853 nghttp3_conn_shutdown_stream_write(httpconn_, stream_id);
854 continue;
855 case NGTCP2_ERR_WRITE_MORE:
856 assert(ndatalen >= 0);
857 rv = nghttp3_conn_add_write_offset(httpconn_, stream_id, ndatalen);
858 if (rv != 0) {
859 ULOG(ERROR, this)
860 << "nghttp3_conn_add_write_offset: " << nghttp3_strerror(rv);
861 ngtcp2_ccerr_set_application_error(
862 &last_error_, nghttp3_err_infer_quic_app_error_code(rv), nullptr,
863 0);
864 return handle_error();
865 }
866 continue;
867 }
868
869 assert(ndatalen == -1);
870
871 ULOG(ERROR, this) << "ngtcp2_conn_writev_stream: "
872 << ngtcp2_strerror(nwrite);
873
874 ngtcp2_ccerr_set_liberr(&last_error_, nwrite, nullptr, 0);
875
876 return handle_error();
877 } else if (ndatalen >= 0) {
878 rv = nghttp3_conn_add_write_offset(httpconn_, stream_id, ndatalen);
879 if (rv != 0) {
880 ULOG(ERROR, this) << "nghttp3_conn_add_write_offset: "
881 << nghttp3_strerror(rv);
882 ngtcp2_ccerr_set_application_error(
883 &last_error_, nghttp3_err_infer_quic_app_error_code(rv), nullptr,
884 0);
885 return handle_error();
886 }
887 }
888
889 if (nwrite == 0) {
890 if (bufpos - tx_.data.get()) {
891 auto faddr = static_cast<UpstreamAddr *>(prev_ps.path.user_data);
892 auto data = tx_.data.get();
893 auto datalen = bufpos - data;
894
895 rv = send_packet(faddr, prev_ps.path.remote.addr,
896 prev_ps.path.remote.addrlen, prev_ps.path.local.addr,
897 prev_ps.path.local.addrlen, prev_pi, data, datalen,
898 gso_size);
899 if (rv == SHRPX_ERR_SEND_BLOCKED) {
900 on_send_blocked(faddr, prev_ps.path.remote, prev_ps.path.local,
901 prev_pi, data, datalen, gso_size);
902
903 signal_write_upstream_addr(faddr);
904 }
905 }
906
907 ngtcp2_conn_update_pkt_tx_time(conn_, ts);
908
909 return 0;
910 }
911
912 bufpos += nwrite;
913
914 #ifdef UDP_SEGMENT
915 if (pktcnt == 0) {
916 ngtcp2_path_copy(&prev_ps.path, &ps.path);
917 prev_pi = pi;
918 gso_size = nwrite;
919 } else if (!ngtcp2_path_eq(&prev_ps.path, &ps.path) ||
920 prev_pi.ecn != pi.ecn ||
921 static_cast<size_t>(nwrite) > gso_size ||
922 (gso_size > path_max_udp_payload_size &&
923 static_cast<size_t>(nwrite) != gso_size)) {
924 auto faddr = static_cast<UpstreamAddr *>(prev_ps.path.user_data);
925 auto data = tx_.data.get();
926 auto datalen = bufpos - data - nwrite;
927
928 rv = send_packet(faddr, prev_ps.path.remote.addr,
929 prev_ps.path.remote.addrlen, prev_ps.path.local.addr,
930 prev_ps.path.local.addrlen, prev_pi, data, datalen,
931 gso_size);
932 switch (rv) {
933 case SHRPX_ERR_SEND_BLOCKED:
934 on_send_blocked(faddr, prev_ps.path.remote, prev_ps.path.local, prev_pi,
935 data, datalen, gso_size);
936
937 on_send_blocked(static_cast<UpstreamAddr *>(ps.path.user_data),
938 ps.path.remote, ps.path.local, pi, bufpos - nwrite,
939 nwrite, 0);
940
941 signal_write_upstream_addr(faddr);
942
943 break;
944 default: {
945 auto faddr = static_cast<UpstreamAddr *>(ps.path.user_data);
946 auto data = bufpos - nwrite;
947
948 rv = send_packet(faddr, ps.path.remote.addr, ps.path.remote.addrlen,
949 ps.path.local.addr, ps.path.local.addrlen, pi, data,
950 nwrite, 0);
951 if (rv == SHRPX_ERR_SEND_BLOCKED) {
952 on_send_blocked(faddr, ps.path.remote, ps.path.local, pi, data,
953 nwrite, 0);
954
955 signal_write_upstream_addr(faddr);
956 }
957 }
958 }
959
960 ngtcp2_conn_update_pkt_tx_time(conn_, ts);
961
962 return 0;
963 }
964
965 if (++pktcnt == max_pktcnt || static_cast<size_t>(nwrite) < gso_size) {
966 auto faddr = static_cast<UpstreamAddr *>(ps.path.user_data);
967 auto data = tx_.data.get();
968 auto datalen = bufpos - data;
969
970 rv = send_packet(faddr, ps.path.remote.addr, ps.path.remote.addrlen,
971 ps.path.local.addr, ps.path.local.addrlen, pi, data,
972 datalen, gso_size);
973 if (rv == SHRPX_ERR_SEND_BLOCKED) {
974 on_send_blocked(faddr, ps.path.remote, ps.path.local, pi, data, datalen,
975 gso_size);
976
977 signal_write_upstream_addr(faddr);
978 }
979
980 ngtcp2_conn_update_pkt_tx_time(conn_, ts);
981
982 return 0;
983 }
984 #else // !UDP_SEGMENT
985 auto faddr = static_cast<UpstreamAddr *>(ps.path.user_data);
986 auto data = tx_.data.get();
987 auto datalen = bufpos - data;
988
989 rv = send_packet(faddr, ps.path.remote.addr, ps.path.remote.addrlen,
990 ps.path.local.addr, ps.path.local.addrlen, pi, data,
991 datalen, 0);
992 if (rv == SHRPX_ERR_SEND_BLOCKED) {
993 on_send_blocked(faddr, ps.path.remote, ps.path.local, pi, data, datalen,
994 0);
995
996 ngtcp2_conn_update_pkt_tx_time(conn_, ts);
997
998 signal_write_upstream_addr(faddr);
999
1000 return 0;
1001 }
1002
1003 if (++pktcnt == max_pktcnt) {
1004 ngtcp2_conn_update_pkt_tx_time(conn_, ts);
1005
1006 return 0;
1007 }
1008
1009 bufpos = tx_.data.get();
1010 #endif // !UDP_SEGMENT
1011 }
1012
1013 return 0;
1014 }
1015
on_timeout(Downstream * downstream)1016 int Http3Upstream::on_timeout(Downstream *downstream) { return 0; }
1017
on_downstream_abort_request(Downstream * downstream,unsigned int status_code)1018 int Http3Upstream::on_downstream_abort_request(Downstream *downstream,
1019 unsigned int status_code) {
1020 int rv;
1021
1022 rv = error_reply(downstream, status_code);
1023
1024 if (rv != 0) {
1025 return -1;
1026 }
1027
1028 handler_->signal_write();
1029
1030 return 0;
1031 }
1032
on_downstream_abort_request_with_https_redirect(Downstream * downstream)1033 int Http3Upstream::on_downstream_abort_request_with_https_redirect(
1034 Downstream *downstream) {
1035 assert(0);
1036 abort();
1037 }
1038
1039 namespace {
1040 uint64_t
infer_upstream_shutdown_stream_error_code(uint32_t downstream_error_code)1041 infer_upstream_shutdown_stream_error_code(uint32_t downstream_error_code) {
1042 // NGHTTP2_REFUSED_STREAM is important because it tells upstream
1043 // client to retry.
1044 switch (downstream_error_code) {
1045 case NGHTTP2_NO_ERROR:
1046 return NGHTTP3_H3_NO_ERROR;
1047 case NGHTTP2_REFUSED_STREAM:
1048 return NGHTTP3_H3_REQUEST_REJECTED;
1049 default:
1050 return NGHTTP3_H3_INTERNAL_ERROR;
1051 }
1052 }
1053 } // namespace
1054
downstream_read(DownstreamConnection * dconn)1055 int Http3Upstream::downstream_read(DownstreamConnection *dconn) {
1056 auto downstream = dconn->get_downstream();
1057
1058 if (downstream->get_response_state() == DownstreamState::MSG_RESET) {
1059 // The downstream stream was reset (canceled). In this case,
1060 // RST_STREAM to the upstream and delete downstream connection
1061 // here. Deleting downstream will be taken place at
1062 // on_stream_close_callback.
1063 shutdown_stream(downstream,
1064 infer_upstream_shutdown_stream_error_code(
1065 downstream->get_response_rst_stream_error_code()));
1066 downstream->pop_downstream_connection();
1067 // dconn was deleted
1068 dconn = nullptr;
1069 } else if (downstream->get_response_state() ==
1070 DownstreamState::MSG_BAD_HEADER) {
1071 if (error_reply(downstream, 502) != 0) {
1072 return -1;
1073 }
1074 downstream->pop_downstream_connection();
1075 // dconn was deleted
1076 dconn = nullptr;
1077 } else {
1078 auto rv = downstream->on_read();
1079 if (rv == SHRPX_ERR_EOF) {
1080 if (downstream->get_request_header_sent()) {
1081 return downstream_eof(dconn);
1082 }
1083 return SHRPX_ERR_RETRY;
1084 }
1085 if (rv == SHRPX_ERR_DCONN_CANCELED) {
1086 downstream->pop_downstream_connection();
1087 handler_->signal_write();
1088 return 0;
1089 }
1090 if (rv != 0) {
1091 if (rv != SHRPX_ERR_NETWORK) {
1092 if (LOG_ENABLED(INFO)) {
1093 DCLOG(INFO, dconn) << "HTTP parser failure";
1094 }
1095 }
1096 return downstream_error(dconn, Downstream::EVENT_ERROR);
1097 }
1098
1099 if (downstream->can_detach_downstream_connection()) {
1100 // Keep-alive
1101 downstream->detach_downstream_connection();
1102 }
1103 }
1104
1105 handler_->signal_write();
1106
1107 // At this point, downstream may be deleted.
1108
1109 return 0;
1110 }
1111
downstream_write(DownstreamConnection * dconn)1112 int Http3Upstream::downstream_write(DownstreamConnection *dconn) {
1113 int rv;
1114 rv = dconn->on_write();
1115 if (rv == SHRPX_ERR_NETWORK) {
1116 return downstream_error(dconn, Downstream::EVENT_ERROR);
1117 }
1118 if (rv != 0) {
1119 return rv;
1120 }
1121 return 0;
1122 }
1123
downstream_eof(DownstreamConnection * dconn)1124 int Http3Upstream::downstream_eof(DownstreamConnection *dconn) {
1125 auto downstream = dconn->get_downstream();
1126
1127 if (LOG_ENABLED(INFO)) {
1128 DCLOG(INFO, dconn) << "EOF. stream_id=" << downstream->get_stream_id();
1129 }
1130
1131 // Delete downstream connection. If we don't delete it here, it will
1132 // be pooled in on_stream_close_callback.
1133 downstream->pop_downstream_connection();
1134 // dconn was deleted
1135 dconn = nullptr;
1136 // downstream will be deleted in on_stream_close_callback.
1137 if (downstream->get_response_state() == DownstreamState::HEADER_COMPLETE) {
1138 // Server may indicate the end of the request by EOF
1139 if (LOG_ENABLED(INFO)) {
1140 ULOG(INFO, this) << "Downstream body was ended by EOF";
1141 }
1142 downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1143
1144 // For tunneled connection, MSG_COMPLETE signals
1145 // downstream_read_data_callback to send RST_STREAM after pending
1146 // response body is sent. This is needed to ensure that RST_STREAM
1147 // is sent after all pending data are sent.
1148 if (on_downstream_body_complete(downstream) != 0) {
1149 return -1;
1150 }
1151 } else if (downstream->get_response_state() !=
1152 DownstreamState::MSG_COMPLETE) {
1153 // If stream was not closed, then we set MSG_COMPLETE and let
1154 // on_stream_close_callback delete downstream.
1155 if (error_reply(downstream, 502) != 0) {
1156 return -1;
1157 }
1158 }
1159 handler_->signal_write();
1160 // At this point, downstream may be deleted.
1161 return 0;
1162 }
1163
downstream_error(DownstreamConnection * dconn,int events)1164 int Http3Upstream::downstream_error(DownstreamConnection *dconn, int events) {
1165 auto downstream = dconn->get_downstream();
1166
1167 if (LOG_ENABLED(INFO)) {
1168 if (events & Downstream::EVENT_ERROR) {
1169 DCLOG(INFO, dconn) << "Downstream network/general error";
1170 } else {
1171 DCLOG(INFO, dconn) << "Timeout";
1172 }
1173 if (downstream->get_upgraded()) {
1174 DCLOG(INFO, dconn) << "Note: this is tunnel connection";
1175 }
1176 }
1177
1178 // Delete downstream connection. If we don't delete it here, it will
1179 // be pooled in on_stream_close_callback.
1180 downstream->pop_downstream_connection();
1181 // dconn was deleted
1182 dconn = nullptr;
1183
1184 if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1185 // For SSL tunneling, we issue RST_STREAM. For other types of
1186 // stream, we don't have to do anything since response was
1187 // complete.
1188 if (downstream->get_upgraded()) {
1189 shutdown_stream(downstream, NGHTTP3_H3_NO_ERROR);
1190 }
1191 } else {
1192 if (downstream->get_response_state() == DownstreamState::HEADER_COMPLETE) {
1193 if (downstream->get_upgraded()) {
1194 if (on_downstream_body_complete(downstream) != 0) {
1195 return -1;
1196 }
1197 } else {
1198 shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
1199 }
1200 } else {
1201 unsigned int status;
1202 if (events & Downstream::EVENT_TIMEOUT) {
1203 if (downstream->get_request_header_sent()) {
1204 status = 504;
1205 } else {
1206 status = 408;
1207 }
1208 } else {
1209 status = 502;
1210 }
1211 if (error_reply(downstream, status) != 0) {
1212 return -1;
1213 }
1214 }
1215 downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1216 }
1217 handler_->signal_write();
1218 // At this point, downstream may be deleted.
1219 return 0;
1220 }
1221
get_client_handler() const1222 ClientHandler *Http3Upstream::get_client_handler() const { return handler_; }
1223
1224 namespace {
downstream_read_data_callback(nghttp3_conn * conn,int64_t stream_id,nghttp3_vec * vec,size_t veccnt,uint32_t * pflags,void * conn_user_data,void * stream_user_data)1225 nghttp3_ssize downstream_read_data_callback(nghttp3_conn *conn,
1226 int64_t stream_id, nghttp3_vec *vec,
1227 size_t veccnt, uint32_t *pflags,
1228 void *conn_user_data,
1229 void *stream_user_data) {
1230 auto upstream = static_cast<Http3Upstream *>(conn_user_data);
1231 auto downstream = static_cast<Downstream *>(stream_user_data);
1232
1233 assert(downstream);
1234
1235 auto body = downstream->get_response_buf();
1236
1237 assert(body);
1238
1239 if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE &&
1240 body->rleft_mark() == 0) {
1241 downstream->disable_upstream_wtimer();
1242 return NGHTTP3_ERR_WOULDBLOCK;
1243 }
1244
1245 downstream->reset_upstream_wtimer();
1246
1247 veccnt = body->riovec_mark(reinterpret_cast<struct iovec *>(vec), veccnt);
1248
1249 if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE &&
1250 body->rleft_mark() == 0) {
1251 *pflags |= NGHTTP3_DATA_FLAG_EOF;
1252 }
1253
1254 assert((*pflags & NGHTTP3_DATA_FLAG_EOF) || veccnt);
1255
1256 downstream->response_sent_body_length += nghttp3_vec_len(vec, veccnt);
1257
1258 if ((*pflags & NGHTTP3_DATA_FLAG_EOF) &&
1259 upstream->shutdown_stream_read(stream_id, NGHTTP3_H3_NO_ERROR) != 0) {
1260 return NGHTTP3_ERR_CALLBACK_FAILURE;
1261 }
1262
1263 return veccnt;
1264 }
1265 } // namespace
1266
on_downstream_header_complete(Downstream * downstream)1267 int Http3Upstream::on_downstream_header_complete(Downstream *downstream) {
1268 int rv;
1269
1270 const auto &req = downstream->request();
1271 auto &resp = downstream->response();
1272
1273 auto &balloc = downstream->get_block_allocator();
1274
1275 if (LOG_ENABLED(INFO)) {
1276 if (downstream->get_non_final_response()) {
1277 DLOG(INFO, downstream) << "HTTP non-final response header";
1278 } else {
1279 DLOG(INFO, downstream) << "HTTP response header completed";
1280 }
1281 }
1282
1283 auto config = get_config();
1284 auto &httpconf = config->http;
1285
1286 if (!config->http2_proxy && !httpconf.no_location_rewrite) {
1287 downstream->rewrite_location_response_header(req.scheme);
1288 }
1289
1290 #ifdef HAVE_MRUBY
1291 if (!downstream->get_non_final_response()) {
1292 auto dconn = downstream->get_downstream_connection();
1293 const auto &group = dconn->get_downstream_addr_group();
1294 if (group) {
1295 const auto &dmruby_ctx = group->shared_addr->mruby_ctx;
1296
1297 if (dmruby_ctx->run_on_response_proc(downstream) != 0) {
1298 if (error_reply(downstream, 500) != 0) {
1299 return -1;
1300 }
1301 // Returning -1 will signal deletion of dconn.
1302 return -1;
1303 }
1304
1305 if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1306 return -1;
1307 }
1308 }
1309
1310 auto worker = handler_->get_worker();
1311 auto mruby_ctx = worker->get_mruby_context();
1312
1313 if (mruby_ctx->run_on_response_proc(downstream) != 0) {
1314 if (error_reply(downstream, 500) != 0) {
1315 return -1;
1316 }
1317 // Returning -1 will signal deletion of dconn.
1318 return -1;
1319 }
1320
1321 if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1322 return -1;
1323 }
1324 }
1325 #endif // HAVE_MRUBY
1326
1327 auto nva = std::vector<nghttp3_nv>();
1328 // 4 means :status and possible server, via, and set-cookie (for
1329 // affinity cookie) header field.
1330 nva.reserve(resp.fs.headers().size() + 4 +
1331 httpconf.add_response_headers.size());
1332
1333 if (downstream->get_non_final_response()) {
1334 auto response_status = http2::stringify_status(balloc, resp.http_status);
1335
1336 nva.push_back(http3::make_nv_ls_nocopy(":status", response_status));
1337
1338 http3::copy_headers_to_nva_nocopy(nva, resp.fs.headers(),
1339 http2::HDOP_STRIP_ALL);
1340
1341 if (LOG_ENABLED(INFO)) {
1342 log_response_headers(downstream, nva);
1343 }
1344
1345 rv = nghttp3_conn_submit_info(httpconn_, downstream->get_stream_id(),
1346 nva.data(), nva.size());
1347
1348 resp.fs.clear_headers();
1349
1350 if (rv != 0) {
1351 ULOG(FATAL, this) << "nghttp3_conn_submit_info() failed";
1352 return -1;
1353 }
1354
1355 return 0;
1356 }
1357
1358 auto striphd_flags = http2::HDOP_STRIP_ALL & ~http2::HDOP_STRIP_VIA;
1359 StringRef response_status;
1360
1361 if (req.connect_proto == ConnectProto::WEBSOCKET && resp.http_status == 101) {
1362 response_status = http2::stringify_status(balloc, 200);
1363 striphd_flags |= http2::HDOP_STRIP_SEC_WEBSOCKET_ACCEPT;
1364 } else {
1365 response_status = http2::stringify_status(balloc, resp.http_status);
1366 }
1367
1368 nva.push_back(http3::make_nv_ls_nocopy(":status", response_status));
1369
1370 http3::copy_headers_to_nva_nocopy(nva, resp.fs.headers(), striphd_flags);
1371
1372 if (!config->http2_proxy && !httpconf.no_server_rewrite) {
1373 nva.push_back(http3::make_nv_ls_nocopy("server", httpconf.server_name));
1374 } else {
1375 auto server = resp.fs.header(http2::HD_SERVER);
1376 if (server) {
1377 nva.push_back(http3::make_nv_ls_nocopy("server", (*server).value));
1378 }
1379 }
1380
1381 if (!req.regular_connect_method() || !downstream->get_upgraded()) {
1382 auto affinity_cookie = downstream->get_affinity_cookie_to_send();
1383 if (affinity_cookie) {
1384 auto dconn = downstream->get_downstream_connection();
1385 assert(dconn);
1386 auto &group = dconn->get_downstream_addr_group();
1387 auto &shared_addr = group->shared_addr;
1388 auto &cookieconf = shared_addr->affinity.cookie;
1389 auto secure =
1390 http::require_cookie_secure_attribute(cookieconf.secure, req.scheme);
1391 auto cookie_str = http::create_affinity_cookie(
1392 balloc, cookieconf.name, affinity_cookie, cookieconf.path, secure);
1393 nva.push_back(http3::make_nv_ls_nocopy("set-cookie", cookie_str));
1394 }
1395 }
1396
1397 auto via = resp.fs.header(http2::HD_VIA);
1398 if (httpconf.no_via) {
1399 if (via) {
1400 nva.push_back(http3::make_nv_ls_nocopy("via", (*via).value));
1401 }
1402 } else {
1403 // we don't create more than 16 bytes in
1404 // http::create_via_header_value.
1405 size_t len = 16;
1406 if (via) {
1407 len += via->value.size() + 2;
1408 }
1409
1410 auto iov = make_byte_ref(balloc, len + 1);
1411 auto p = iov.base;
1412 if (via) {
1413 p = std::copy(std::begin(via->value), std::end(via->value), p);
1414 p = util::copy_lit(p, ", ");
1415 }
1416 p = http::create_via_header_value(p, resp.http_major, resp.http_minor);
1417 *p = '\0';
1418
1419 nva.push_back(http3::make_nv_ls_nocopy("via", StringRef{iov.base, p}));
1420 }
1421
1422 for (auto &p : httpconf.add_response_headers) {
1423 nva.push_back(http3::make_nv_nocopy(p.name, p.value));
1424 }
1425
1426 if (LOG_ENABLED(INFO)) {
1427 log_response_headers(downstream, nva);
1428 }
1429
1430 nghttp3_data_reader data_read;
1431 data_read.read_data = downstream_read_data_callback;
1432
1433 nghttp3_data_reader *data_readptr;
1434
1435 if (downstream->expect_response_body() ||
1436 downstream->expect_response_trailer()) {
1437 data_readptr = &data_read;
1438 } else {
1439 data_readptr = nullptr;
1440 }
1441
1442 rv = nghttp3_conn_submit_response(httpconn_, downstream->get_stream_id(),
1443 nva.data(), nva.size(), data_readptr);
1444 if (rv != 0) {
1445 ULOG(FATAL, this) << "nghttp3_conn_submit_response() failed";
1446 return -1;
1447 }
1448
1449 if (data_readptr) {
1450 downstream->reset_upstream_wtimer();
1451 } else if (shutdown_stream_read(downstream->get_stream_id(),
1452 NGHTTP3_H3_NO_ERROR) != 0) {
1453 return -1;
1454 }
1455
1456 return 0;
1457 }
1458
on_downstream_body(Downstream * downstream,const uint8_t * data,size_t len,bool flush)1459 int Http3Upstream::on_downstream_body(Downstream *downstream,
1460 const uint8_t *data, size_t len,
1461 bool flush) {
1462 auto body = downstream->get_response_buf();
1463 body->append(data, len);
1464
1465 if (flush) {
1466 nghttp3_conn_resume_stream(httpconn_, downstream->get_stream_id());
1467
1468 downstream->ensure_upstream_wtimer();
1469 }
1470
1471 return 0;
1472 }
1473
on_downstream_body_complete(Downstream * downstream)1474 int Http3Upstream::on_downstream_body_complete(Downstream *downstream) {
1475 if (LOG_ENABLED(INFO)) {
1476 DLOG(INFO, downstream) << "HTTP response completed";
1477 }
1478
1479 auto &resp = downstream->response();
1480
1481 if (!downstream->validate_response_recv_body_length()) {
1482 shutdown_stream(downstream, NGHTTP3_H3_GENERAL_PROTOCOL_ERROR);
1483 resp.connection_close = true;
1484 return 0;
1485 }
1486
1487 if (!downstream->get_upgraded()) {
1488 const auto &trailers = resp.fs.trailers();
1489 if (!trailers.empty()) {
1490 std::vector<nghttp3_nv> nva;
1491 nva.reserve(trailers.size());
1492 http3::copy_headers_to_nva_nocopy(nva, trailers, http2::HDOP_STRIP_ALL);
1493 if (!nva.empty()) {
1494 auto rv = nghttp3_conn_submit_trailers(
1495 httpconn_, downstream->get_stream_id(), nva.data(), nva.size());
1496 if (rv != 0) {
1497 ULOG(FATAL, this) << "nghttp3_conn_submit_trailers() failed: "
1498 << nghttp3_strerror(rv);
1499 return -1;
1500 }
1501 }
1502 }
1503 }
1504
1505 nghttp3_conn_resume_stream(httpconn_, downstream->get_stream_id());
1506 downstream->ensure_upstream_wtimer();
1507
1508 return 0;
1509 }
1510
on_handler_delete()1511 void Http3Upstream::on_handler_delete() {
1512 for (auto d = downstream_queue_.get_downstreams(); d; d = d->dlnext) {
1513 if (d->get_dispatch_state() == DispatchState::ACTIVE &&
1514 d->accesslog_ready()) {
1515 handler_->write_accesslog(d);
1516 }
1517 }
1518
1519 auto worker = handler_->get_worker();
1520 auto quic_conn_handler = worker->get_quic_connection_handler();
1521
1522 std::vector<ngtcp2_cid> scids(ngtcp2_conn_get_scid(conn_, nullptr) + 1);
1523 ngtcp2_conn_get_scid(conn_, scids.data());
1524 scids.back() = hashed_scid_;
1525
1526 for (auto &cid : scids) {
1527 quic_conn_handler->remove_connection_id(cid);
1528 }
1529
1530 if (retry_close_ || last_error_.type == NGTCP2_CCERR_TYPE_IDLE_CLOSE) {
1531 return;
1532 }
1533
1534 // If this is not idle close, send CONNECTION_CLOSE.
1535 if (!ngtcp2_conn_in_closing_period(conn_) &&
1536 !ngtcp2_conn_in_draining_period(conn_)) {
1537 ngtcp2_path_storage ps;
1538 ngtcp2_pkt_info pi;
1539 conn_close_.resize(SHRPX_QUIC_CONN_CLOSE_PKTLEN);
1540
1541 ngtcp2_path_storage_zero(&ps);
1542
1543 ngtcp2_ccerr ccerr;
1544 ngtcp2_ccerr_default(&ccerr);
1545
1546 if (worker->get_graceful_shutdown() &&
1547 !ngtcp2_conn_get_handshake_completed(conn_)) {
1548 ccerr.error_code = NGTCP2_CONNECTION_REFUSED;
1549 }
1550
1551 auto nwrite = ngtcp2_conn_write_connection_close(
1552 conn_, &ps.path, &pi, conn_close_.data(), conn_close_.size(), &ccerr,
1553 quic_timestamp());
1554 if (nwrite < 0) {
1555 if (nwrite != NGTCP2_ERR_INVALID_STATE) {
1556 ULOG(ERROR, this) << "ngtcp2_conn_write_connection_close: "
1557 << ngtcp2_strerror(nwrite);
1558 }
1559
1560 return;
1561 }
1562
1563 conn_close_.resize(nwrite);
1564
1565 send_packet(static_cast<UpstreamAddr *>(ps.path.user_data),
1566 ps.path.remote.addr, ps.path.remote.addrlen, ps.path.local.addr,
1567 ps.path.local.addrlen, pi, conn_close_.data(), nwrite, 0);
1568 }
1569
1570 auto d =
1571 static_cast<ev_tstamp>(ngtcp2_conn_get_pto(conn_) * 3) / NGTCP2_SECONDS;
1572
1573 if (LOG_ENABLED(INFO)) {
1574 ULOG(INFO, this) << "Enter close-wait period " << d << "s with "
1575 << conn_close_.size() << " bytes sentinel packet";
1576 }
1577
1578 auto cw = std::make_unique<CloseWait>(worker, std::move(scids),
1579 std::move(conn_close_), d);
1580
1581 quic_conn_handler->add_close_wait(cw.release());
1582 }
1583
on_downstream_reset(Downstream * downstream,bool no_retry)1584 int Http3Upstream::on_downstream_reset(Downstream *downstream, bool no_retry) {
1585 int rv;
1586
1587 if (downstream->get_dispatch_state() != DispatchState::ACTIVE) {
1588 // This is error condition when we failed push_request_headers()
1589 // in initiate_downstream(). Otherwise, we have
1590 // DispatchState::ACTIVE state, or we did not set
1591 // DownstreamConnection.
1592 downstream->pop_downstream_connection();
1593 handler_->signal_write();
1594
1595 return 0;
1596 }
1597
1598 if (!downstream->request_submission_ready()) {
1599 if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
1600 // We have got all response body already. Send it off.
1601 downstream->pop_downstream_connection();
1602 return 0;
1603 }
1604 // pushed stream is handled here
1605 shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
1606 downstream->pop_downstream_connection();
1607
1608 handler_->signal_write();
1609
1610 return 0;
1611 }
1612
1613 downstream->pop_downstream_connection();
1614
1615 downstream->add_retry();
1616
1617 std::unique_ptr<DownstreamConnection> dconn;
1618
1619 rv = 0;
1620
1621 if (no_retry || downstream->no_more_retry()) {
1622 goto fail;
1623 }
1624
1625 // downstream connection is clean; we can retry with new
1626 // downstream connection.
1627
1628 for (;;) {
1629 auto dconn = handler_->get_downstream_connection(rv, downstream);
1630 if (!dconn) {
1631 goto fail;
1632 }
1633
1634 rv = downstream->attach_downstream_connection(std::move(dconn));
1635 if (rv == 0) {
1636 break;
1637 }
1638 }
1639
1640 rv = downstream->push_request_headers();
1641 if (rv != 0) {
1642 goto fail;
1643 }
1644
1645 return 0;
1646
1647 fail:
1648 if (rv == SHRPX_ERR_TLS_REQUIRED) {
1649 assert(0);
1650 abort();
1651 }
1652
1653 rv = on_downstream_abort_request(downstream, 502);
1654 if (rv != 0) {
1655 shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
1656 }
1657 downstream->pop_downstream_connection();
1658
1659 handler_->signal_write();
1660
1661 return 0;
1662 }
1663
pause_read(IOCtrlReason reason)1664 void Http3Upstream::pause_read(IOCtrlReason reason) {}
1665
resume_read(IOCtrlReason reason,Downstream * downstream,size_t consumed)1666 int Http3Upstream::resume_read(IOCtrlReason reason, Downstream *downstream,
1667 size_t consumed) {
1668 consume(downstream->get_stream_id(), consumed);
1669
1670 auto &req = downstream->request();
1671
1672 req.consume(consumed);
1673
1674 handler_->signal_write();
1675
1676 return 0;
1677 }
1678
send_reply(Downstream * downstream,const uint8_t * body,size_t bodylen)1679 int Http3Upstream::send_reply(Downstream *downstream, const uint8_t *body,
1680 size_t bodylen) {
1681 int rv;
1682
1683 nghttp3_data_reader data_read, *data_read_ptr = nullptr;
1684
1685 if (bodylen) {
1686 data_read.read_data = downstream_read_data_callback;
1687 data_read_ptr = &data_read;
1688 }
1689
1690 const auto &resp = downstream->response();
1691 auto config = get_config();
1692 auto &httpconf = config->http;
1693
1694 auto &balloc = downstream->get_block_allocator();
1695
1696 const auto &headers = resp.fs.headers();
1697 auto nva = std::vector<nghttp3_nv>();
1698 // 2 for :status and server
1699 nva.reserve(2 + headers.size() + httpconf.add_response_headers.size());
1700
1701 auto response_status = http2::stringify_status(balloc, resp.http_status);
1702
1703 nva.push_back(http3::make_nv_ls_nocopy(":status", response_status));
1704
1705 for (auto &kv : headers) {
1706 if (kv.name.empty() || kv.name[0] == ':') {
1707 continue;
1708 }
1709 switch (kv.token) {
1710 case http2::HD_CONNECTION:
1711 case http2::HD_KEEP_ALIVE:
1712 case http2::HD_PROXY_CONNECTION:
1713 case http2::HD_TE:
1714 case http2::HD_TRANSFER_ENCODING:
1715 case http2::HD_UPGRADE:
1716 continue;
1717 }
1718 nva.push_back(http3::make_nv_nocopy(kv.name, kv.value, kv.no_index));
1719 }
1720
1721 if (!resp.fs.header(http2::HD_SERVER)) {
1722 nva.push_back(http3::make_nv_ls_nocopy("server", config->http.server_name));
1723 }
1724
1725 for (auto &p : httpconf.add_response_headers) {
1726 nva.push_back(http3::make_nv_nocopy(p.name, p.value));
1727 }
1728
1729 rv = nghttp3_conn_submit_response(httpconn_, downstream->get_stream_id(),
1730 nva.data(), nva.size(), data_read_ptr);
1731 if (nghttp3_err_is_fatal(rv)) {
1732 ULOG(FATAL, this) << "nghttp3_conn_submit_response() failed: "
1733 << nghttp3_strerror(rv);
1734 return -1;
1735 }
1736
1737 auto buf = downstream->get_response_buf();
1738
1739 buf->append(body, bodylen);
1740
1741 downstream->set_response_state(DownstreamState::MSG_COMPLETE);
1742
1743 if (data_read_ptr) {
1744 downstream->reset_upstream_wtimer();
1745 }
1746
1747 if (shutdown_stream_read(downstream->get_stream_id(), NGHTTP3_H3_NO_ERROR) !=
1748 0) {
1749 return -1;
1750 }
1751
1752 return 0;
1753 }
1754
initiate_push(Downstream * downstream,const StringRef & uri)1755 int Http3Upstream::initiate_push(Downstream *downstream, const StringRef &uri) {
1756 return 0;
1757 }
1758
response_riovec(struct iovec * iov,int iovcnt) const1759 int Http3Upstream::response_riovec(struct iovec *iov, int iovcnt) const {
1760 return 0;
1761 }
1762
response_drain(size_t n)1763 void Http3Upstream::response_drain(size_t n) {}
1764
response_empty() const1765 bool Http3Upstream::response_empty() const { return false; }
1766
1767 Downstream *
on_downstream_push_promise(Downstream * downstream,int32_t promised_stream_id)1768 Http3Upstream::on_downstream_push_promise(Downstream *downstream,
1769 int32_t promised_stream_id) {
1770 return nullptr;
1771 }
1772
on_downstream_push_promise_complete(Downstream * downstream,Downstream * promised_downstream)1773 int Http3Upstream::on_downstream_push_promise_complete(
1774 Downstream *downstream, Downstream *promised_downstream) {
1775 return 0;
1776 }
1777
push_enabled() const1778 bool Http3Upstream::push_enabled() const { return false; }
1779
cancel_premature_downstream(Downstream * promised_downstream)1780 void Http3Upstream::cancel_premature_downstream(
1781 Downstream *promised_downstream) {}
1782
on_read(const UpstreamAddr * faddr,const Address & remote_addr,const Address & local_addr,const ngtcp2_pkt_info & pi,const uint8_t * data,size_t datalen)1783 int Http3Upstream::on_read(const UpstreamAddr *faddr,
1784 const Address &remote_addr,
1785 const Address &local_addr, const ngtcp2_pkt_info &pi,
1786 const uint8_t *data, size_t datalen) {
1787 int rv;
1788
1789 auto path = ngtcp2_path{
1790 {
1791 const_cast<sockaddr *>(&local_addr.su.sa),
1792 static_cast<socklen_t>(local_addr.len),
1793 },
1794 {
1795 const_cast<sockaddr *>(&remote_addr.su.sa),
1796 static_cast<socklen_t>(remote_addr.len),
1797 },
1798 const_cast<UpstreamAddr *>(faddr),
1799 };
1800
1801 rv = ngtcp2_conn_read_pkt(conn_, &path, &pi, data, datalen, quic_timestamp());
1802 if (rv != 0) {
1803 switch (rv) {
1804 case NGTCP2_ERR_DRAINING:
1805 return -1;
1806 case NGTCP2_ERR_RETRY: {
1807 auto worker = handler_->get_worker();
1808 auto quic_conn_handler = worker->get_quic_connection_handler();
1809
1810 if (worker->get_graceful_shutdown()) {
1811 ngtcp2_ccerr_set_transport_error(&last_error_,
1812 NGTCP2_CONNECTION_REFUSED, nullptr, 0);
1813
1814 return handle_error();
1815 }
1816
1817 ngtcp2_version_cid vc;
1818
1819 rv =
1820 ngtcp2_pkt_decode_version_cid(&vc, data, datalen, SHRPX_QUIC_SCIDLEN);
1821 if (rv != 0) {
1822 return -1;
1823 }
1824
1825 retry_close_ = true;
1826
1827 quic_conn_handler->send_retry(handler_->get_upstream_addr(), vc.version,
1828 vc.dcid, vc.dcidlen, vc.scid, vc.scidlen,
1829 remote_addr, local_addr, datalen * 3);
1830
1831 return -1;
1832 }
1833 case NGTCP2_ERR_CRYPTO:
1834 if (!last_error_.error_code) {
1835 ngtcp2_ccerr_set_tls_alert(
1836 &last_error_, ngtcp2_conn_get_tls_alert(conn_), nullptr, 0);
1837 }
1838 break;
1839 case NGTCP2_ERR_DROP_CONN:
1840 return -1;
1841 default:
1842 if (!last_error_.error_code) {
1843 ngtcp2_ccerr_set_liberr(&last_error_, rv, nullptr, 0);
1844 }
1845 }
1846
1847 ULOG(ERROR, this) << "ngtcp2_conn_read_pkt: " << ngtcp2_strerror(rv);
1848
1849 return handle_error();
1850 }
1851
1852 return 0;
1853 }
1854
send_packet(const UpstreamAddr * faddr,const sockaddr * remote_sa,size_t remote_salen,const sockaddr * local_sa,size_t local_salen,const ngtcp2_pkt_info & pi,const uint8_t * data,size_t datalen,size_t gso_size)1855 int Http3Upstream::send_packet(const UpstreamAddr *faddr,
1856 const sockaddr *remote_sa, size_t remote_salen,
1857 const sockaddr *local_sa, size_t local_salen,
1858 const ngtcp2_pkt_info &pi, const uint8_t *data,
1859 size_t datalen, size_t gso_size) {
1860 auto rv = quic_send_packet(faddr, remote_sa, remote_salen, local_sa,
1861 local_salen, pi, data, datalen, gso_size);
1862 switch (rv) {
1863 case 0:
1864 return 0;
1865 // With GSO, sendmsg may fail with EINVAL if UDP payload is too
1866 // large.
1867 case -EINVAL:
1868 case -EMSGSIZE:
1869 // Let the packet lost.
1870 break;
1871 case -EAGAIN:
1872 #if EAGAIN != EWOULDBLOCK
1873 case -EWOULDBLOCK:
1874 #endif // EAGAIN != EWOULDBLOCK
1875 return SHRPX_ERR_SEND_BLOCKED;
1876 default:
1877 break;
1878 }
1879
1880 return -1;
1881 }
1882
on_send_blocked(const UpstreamAddr * faddr,const ngtcp2_addr & remote_addr,const ngtcp2_addr & local_addr,const ngtcp2_pkt_info & pi,const uint8_t * data,size_t datalen,size_t gso_size)1883 void Http3Upstream::on_send_blocked(const UpstreamAddr *faddr,
1884 const ngtcp2_addr &remote_addr,
1885 const ngtcp2_addr &local_addr,
1886 const ngtcp2_pkt_info &pi,
1887 const uint8_t *data, size_t datalen,
1888 size_t gso_size) {
1889 assert(tx_.num_blocked || !tx_.send_blocked);
1890 assert(tx_.num_blocked < 2);
1891
1892 tx_.send_blocked = true;
1893
1894 auto &p = tx_.blocked[tx_.num_blocked++];
1895
1896 memcpy(&p.local_addr.su, local_addr.addr, local_addr.addrlen);
1897 memcpy(&p.remote_addr.su, remote_addr.addr, remote_addr.addrlen);
1898
1899 p.local_addr.len = local_addr.addrlen;
1900 p.remote_addr.len = remote_addr.addrlen;
1901 p.faddr = faddr;
1902 p.pi = pi;
1903 p.data = data;
1904 p.datalen = datalen;
1905 p.gso_size = gso_size;
1906 }
1907
send_blocked_packet()1908 int Http3Upstream::send_blocked_packet() {
1909 int rv;
1910
1911 assert(tx_.send_blocked);
1912
1913 for (; tx_.num_blocked_sent < tx_.num_blocked; ++tx_.num_blocked_sent) {
1914 auto &p = tx_.blocked[tx_.num_blocked_sent];
1915
1916 rv = send_packet(p.faddr, &p.remote_addr.su.sa, p.remote_addr.len,
1917 &p.local_addr.su.sa, p.local_addr.len, p.pi, p.data,
1918 p.datalen, p.gso_size);
1919 if (rv == SHRPX_ERR_SEND_BLOCKED) {
1920 signal_write_upstream_addr(p.faddr);
1921
1922 return 0;
1923 }
1924 }
1925
1926 tx_.send_blocked = false;
1927 tx_.num_blocked = 0;
1928 tx_.num_blocked_sent = 0;
1929
1930 return 0;
1931 }
1932
signal_write_upstream_addr(const UpstreamAddr * faddr)1933 void Http3Upstream::signal_write_upstream_addr(const UpstreamAddr *faddr) {
1934 auto conn = handler_->get_connection();
1935
1936 if (faddr->fd != conn->wev.fd) {
1937 if (ev_is_active(&conn->wev)) {
1938 ev_io_stop(handler_->get_loop(), &conn->wev);
1939 }
1940
1941 ev_io_set(&conn->wev, faddr->fd, EV_WRITE);
1942 }
1943
1944 conn->wlimit.startw();
1945 }
1946
handle_error()1947 int Http3Upstream::handle_error() {
1948 if (ngtcp2_conn_in_closing_period(conn_) ||
1949 ngtcp2_conn_in_draining_period(conn_)) {
1950 return -1;
1951 }
1952
1953 ngtcp2_path_storage ps;
1954 ngtcp2_pkt_info pi;
1955
1956 ngtcp2_path_storage_zero(&ps);
1957
1958 auto ts = quic_timestamp();
1959
1960 conn_close_.resize(SHRPX_QUIC_CONN_CLOSE_PKTLEN);
1961
1962 auto nwrite = ngtcp2_conn_write_connection_close(
1963 conn_, &ps.path, &pi, conn_close_.data(), conn_close_.size(),
1964 &last_error_, ts);
1965 if (nwrite < 0) {
1966 ULOG(ERROR, this) << "ngtcp2_conn_write_connection_close: "
1967 << ngtcp2_strerror(nwrite);
1968 return -1;
1969 }
1970
1971 conn_close_.resize(nwrite);
1972
1973 if (nwrite == 0) {
1974 return -1;
1975 }
1976
1977 send_packet(static_cast<UpstreamAddr *>(ps.path.user_data),
1978 ps.path.remote.addr, ps.path.remote.addrlen, ps.path.local.addr,
1979 ps.path.local.addrlen, pi, conn_close_.data(), nwrite, 0);
1980
1981 return -1;
1982 }
1983
handle_expiry()1984 int Http3Upstream::handle_expiry() {
1985 int rv;
1986
1987 auto ts = quic_timestamp();
1988
1989 rv = ngtcp2_conn_handle_expiry(conn_, ts);
1990 if (rv != 0) {
1991 if (rv == NGTCP2_ERR_IDLE_CLOSE) {
1992 ULOG(INFO, this) << "Idle connection timeout";
1993 } else {
1994 ULOG(ERROR, this) << "ngtcp2_conn_handle_expiry: " << ngtcp2_strerror(rv);
1995 }
1996 ngtcp2_ccerr_set_liberr(&last_error_, rv, nullptr, 0);
1997 return handle_error();
1998 }
1999
2000 return 0;
2001 }
2002
reset_timer()2003 void Http3Upstream::reset_timer() {
2004 auto ts = quic_timestamp();
2005 auto expiry_ts = ngtcp2_conn_get_expiry(conn_);
2006 auto loop = handler_->get_loop();
2007
2008 if (expiry_ts <= ts) {
2009 ev_feed_event(loop, &timer_, EV_TIMER);
2010 return;
2011 }
2012
2013 timer_.repeat = static_cast<ev_tstamp>(expiry_ts - ts) / NGTCP2_SECONDS;
2014
2015 ev_timer_again(loop, &timer_);
2016 }
2017
2018 namespace {
http_deferred_consume(nghttp3_conn * conn,int64_t stream_id,size_t nconsumed,void * user_data,void * stream_user_data)2019 int http_deferred_consume(nghttp3_conn *conn, int64_t stream_id,
2020 size_t nconsumed, void *user_data,
2021 void *stream_user_data) {
2022 auto upstream = static_cast<Http3Upstream *>(user_data);
2023
2024 upstream->consume(stream_id, nconsumed);
2025
2026 return 0;
2027 }
2028 } // namespace
2029
2030 namespace {
http_acked_stream_data(nghttp3_conn * conn,int64_t stream_id,uint64_t datalen,void * user_data,void * stream_user_data)2031 int http_acked_stream_data(nghttp3_conn *conn, int64_t stream_id,
2032 uint64_t datalen, void *user_data,
2033 void *stream_user_data) {
2034 auto upstream = static_cast<Http3Upstream *>(user_data);
2035 auto downstream = static_cast<Downstream *>(stream_user_data);
2036
2037 assert(downstream);
2038
2039 if (upstream->http_acked_stream_data(downstream, datalen) != 0) {
2040 return NGHTTP3_ERR_CALLBACK_FAILURE;
2041 }
2042
2043 return 0;
2044 }
2045 } // namespace
2046
http_acked_stream_data(Downstream * downstream,uint64_t datalen)2047 int Http3Upstream::http_acked_stream_data(Downstream *downstream,
2048 uint64_t datalen) {
2049 if (LOG_ENABLED(INFO)) {
2050 ULOG(INFO, this) << "Stream " << downstream->get_stream_id() << " "
2051 << datalen << " bytes acknowledged";
2052 }
2053
2054 auto body = downstream->get_response_buf();
2055 auto drained = body->drain_mark(datalen);
2056 (void)drained;
2057
2058 assert(datalen == drained);
2059
2060 if (downstream->resume_read(SHRPX_NO_BUFFER, datalen) != 0) {
2061 return -1;
2062 }
2063
2064 return 0;
2065 }
2066
2067 namespace {
http_begin_request_headers(nghttp3_conn * conn,int64_t stream_id,void * user_data,void * stream_user_data)2068 int http_begin_request_headers(nghttp3_conn *conn, int64_t stream_id,
2069 void *user_data, void *stream_user_data) {
2070 if (!ngtcp2_is_bidi_stream(stream_id)) {
2071 return 0;
2072 }
2073
2074 auto upstream = static_cast<Http3Upstream *>(user_data);
2075 upstream->http_begin_request_headers(stream_id);
2076
2077 return 0;
2078 }
2079 } // namespace
2080
2081 namespace {
http_recv_request_header(nghttp3_conn * conn,int64_t stream_id,int32_t token,nghttp3_rcbuf * name,nghttp3_rcbuf * value,uint8_t flags,void * user_data,void * stream_user_data)2082 int http_recv_request_header(nghttp3_conn *conn, int64_t stream_id,
2083 int32_t token, nghttp3_rcbuf *name,
2084 nghttp3_rcbuf *value, uint8_t flags,
2085 void *user_data, void *stream_user_data) {
2086 auto upstream = static_cast<Http3Upstream *>(user_data);
2087 auto downstream = static_cast<Downstream *>(stream_user_data);
2088
2089 if (!downstream || downstream->get_stop_reading()) {
2090 return 0;
2091 }
2092
2093 if (upstream->http_recv_request_header(downstream, token, name, value, flags,
2094 /* trailer = */ false) != 0) {
2095 return NGHTTP3_ERR_CALLBACK_FAILURE;
2096 }
2097
2098 return 0;
2099 }
2100 } // namespace
2101
2102 namespace {
http_recv_request_trailer(nghttp3_conn * conn,int64_t stream_id,int32_t token,nghttp3_rcbuf * name,nghttp3_rcbuf * value,uint8_t flags,void * user_data,void * stream_user_data)2103 int http_recv_request_trailer(nghttp3_conn *conn, int64_t stream_id,
2104 int32_t token, nghttp3_rcbuf *name,
2105 nghttp3_rcbuf *value, uint8_t flags,
2106 void *user_data, void *stream_user_data) {
2107 auto upstream = static_cast<Http3Upstream *>(user_data);
2108 auto downstream = static_cast<Downstream *>(stream_user_data);
2109
2110 if (!downstream || downstream->get_stop_reading()) {
2111 return 0;
2112 }
2113
2114 if (upstream->http_recv_request_header(downstream, token, name, value, flags,
2115 /* trailer = */ true) != 0) {
2116 return NGHTTP3_ERR_CALLBACK_FAILURE;
2117 }
2118
2119 return 0;
2120 }
2121 } // namespace
2122
http_recv_request_header(Downstream * downstream,int32_t h3token,nghttp3_rcbuf * name,nghttp3_rcbuf * value,uint8_t flags,bool trailer)2123 int Http3Upstream::http_recv_request_header(Downstream *downstream,
2124 int32_t h3token,
2125 nghttp3_rcbuf *name,
2126 nghttp3_rcbuf *value, uint8_t flags,
2127 bool trailer) {
2128 auto namebuf = nghttp3_rcbuf_get_buf(name);
2129 auto valuebuf = nghttp3_rcbuf_get_buf(value);
2130 auto &req = downstream->request();
2131 auto config = get_config();
2132 auto &httpconf = config->http;
2133
2134 if (req.fs.buffer_size() + namebuf.len + valuebuf.len >
2135 httpconf.request_header_field_buffer ||
2136 req.fs.num_fields() >= httpconf.max_request_header_fields) {
2137 downstream->set_stop_reading(true);
2138
2139 if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
2140 return 0;
2141 }
2142
2143 if (LOG_ENABLED(INFO)) {
2144 ULOG(INFO, this) << "Too large or many header field size="
2145 << req.fs.buffer_size() + namebuf.len + valuebuf.len
2146 << ", num=" << req.fs.num_fields() + 1;
2147 }
2148
2149 // just ignore if this is a trailer part.
2150 if (trailer) {
2151 return 0;
2152 }
2153
2154 if (error_reply(downstream, 431) != 0) {
2155 return -1;
2156 }
2157
2158 return 0;
2159 }
2160
2161 auto token = http2::lookup_token(namebuf.base, namebuf.len);
2162 auto no_index = flags & NGHTTP3_NV_FLAG_NEVER_INDEX;
2163
2164 downstream->add_rcbuf(name);
2165 downstream->add_rcbuf(value);
2166
2167 if (trailer) {
2168 req.fs.add_trailer_token(StringRef{namebuf.base, namebuf.len},
2169 StringRef{valuebuf.base, valuebuf.len}, no_index,
2170 token);
2171 return 0;
2172 }
2173
2174 req.fs.add_header_token(StringRef{namebuf.base, namebuf.len},
2175 StringRef{valuebuf.base, valuebuf.len}, no_index,
2176 token);
2177 return 0;
2178 }
2179
2180 namespace {
http_end_request_headers(nghttp3_conn * conn,int64_t stream_id,int fin,void * user_data,void * stream_user_data)2181 int http_end_request_headers(nghttp3_conn *conn, int64_t stream_id, int fin,
2182 void *user_data, void *stream_user_data) {
2183 auto upstream = static_cast<Http3Upstream *>(user_data);
2184 auto handler = upstream->get_client_handler();
2185 auto downstream = static_cast<Downstream *>(stream_user_data);
2186
2187 if (!downstream || downstream->get_stop_reading()) {
2188 return 0;
2189 }
2190
2191 if (upstream->http_end_request_headers(downstream, fin) != 0) {
2192 return NGHTTP3_ERR_CALLBACK_FAILURE;
2193 }
2194
2195 downstream->reset_upstream_rtimer();
2196 handler->stop_read_timer();
2197
2198 return 0;
2199 }
2200 } // namespace
2201
http_end_request_headers(Downstream * downstream,int fin)2202 int Http3Upstream::http_end_request_headers(Downstream *downstream, int fin) {
2203 auto lgconf = log_config();
2204 lgconf->update_tstamp(std::chrono::system_clock::now());
2205 auto &req = downstream->request();
2206 req.tstamp = lgconf->tstamp;
2207
2208 if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
2209 return 0;
2210 }
2211
2212 auto &nva = req.fs.headers();
2213
2214 if (LOG_ENABLED(INFO)) {
2215 std::stringstream ss;
2216 for (auto &nv : nva) {
2217 if (nv.name == "authorization") {
2218 ss << TTY_HTTP_HD << nv.name << TTY_RST << ": <redacted>\n";
2219 continue;
2220 }
2221 ss << TTY_HTTP_HD << nv.name << TTY_RST << ": " << nv.value << "\n";
2222 }
2223 ULOG(INFO, this) << "HTTP request headers. stream_id="
2224 << downstream->get_stream_id() << "\n"
2225 << ss.str();
2226 }
2227
2228 auto content_length = req.fs.header(http2::HD_CONTENT_LENGTH);
2229 if (content_length) {
2230 // libnghttp3 guarantees this can be parsed
2231 req.fs.content_length = util::parse_uint(content_length->value);
2232 }
2233
2234 // presence of mandatory header fields are guaranteed by libnghttp3.
2235 auto authority = req.fs.header(http2::HD__AUTHORITY);
2236 auto path = req.fs.header(http2::HD__PATH);
2237 auto method = req.fs.header(http2::HD__METHOD);
2238 auto scheme = req.fs.header(http2::HD__SCHEME);
2239
2240 auto method_token = http2::lookup_method_token(method->value);
2241 if (method_token == -1) {
2242 if (error_reply(downstream, 501) != 0) {
2243 return -1;
2244 }
2245 return 0;
2246 }
2247
2248 auto faddr = handler_->get_upstream_addr();
2249
2250 auto config = get_config();
2251
2252 // For HTTP/2 proxy, we require :authority.
2253 if (method_token != HTTP_CONNECT && config->http2_proxy &&
2254 faddr->alt_mode == UpstreamAltMode::NONE && !authority) {
2255 shutdown_stream(downstream, NGHTTP3_H3_GENERAL_PROTOCOL_ERROR);
2256 return 0;
2257 }
2258
2259 req.method = method_token;
2260 if (scheme) {
2261 req.scheme = scheme->value;
2262 }
2263
2264 // nghttp2 library guarantees either :authority or host exist
2265 if (!authority) {
2266 req.no_authority = true;
2267 authority = req.fs.header(http2::HD_HOST);
2268 }
2269
2270 if (authority) {
2271 req.authority = authority->value;
2272 }
2273
2274 if (path) {
2275 if (method_token == HTTP_OPTIONS &&
2276 path->value == StringRef::from_lit("*")) {
2277 // Server-wide OPTIONS request. Path is empty.
2278 } else if (config->http2_proxy &&
2279 faddr->alt_mode == UpstreamAltMode::NONE) {
2280 req.path = path->value;
2281 } else {
2282 req.path = http2::rewrite_clean_path(downstream->get_block_allocator(),
2283 path->value);
2284 }
2285 }
2286
2287 auto connect_proto = req.fs.header(http2::HD__PROTOCOL);
2288 if (connect_proto) {
2289 if (connect_proto->value != "websocket") {
2290 if (error_reply(downstream, 400) != 0) {
2291 return -1;
2292 }
2293 return 0;
2294 }
2295 req.connect_proto = ConnectProto::WEBSOCKET;
2296 }
2297
2298 if (!fin) {
2299 req.http2_expect_body = true;
2300 } else if (req.fs.content_length == -1) {
2301 req.fs.content_length = 0;
2302 }
2303
2304 downstream->inspect_http2_request();
2305
2306 downstream->set_request_state(DownstreamState::HEADER_COMPLETE);
2307
2308 if (config->http.require_http_scheme &&
2309 !http::check_http_scheme(req.scheme, /* encrypted = */ true)) {
2310 if (error_reply(downstream, 400) != 0) {
2311 return -1;
2312 }
2313 }
2314
2315 #ifdef HAVE_MRUBY
2316 auto worker = handler_->get_worker();
2317 auto mruby_ctx = worker->get_mruby_context();
2318
2319 if (mruby_ctx->run_on_request_proc(downstream) != 0) {
2320 if (error_reply(downstream, 500) != 0) {
2321 return -1;
2322 }
2323 return 0;
2324 }
2325 #endif // HAVE_MRUBY
2326
2327 if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
2328 return 0;
2329 }
2330
2331 start_downstream(downstream);
2332
2333 return 0;
2334 }
2335
start_downstream(Downstream * downstream)2336 void Http3Upstream::start_downstream(Downstream *downstream) {
2337 if (downstream_queue_.can_activate(downstream->request().authority)) {
2338 initiate_downstream(downstream);
2339 return;
2340 }
2341
2342 downstream_queue_.mark_blocked(downstream);
2343 }
2344
initiate_downstream(Downstream * downstream)2345 void Http3Upstream::initiate_downstream(Downstream *downstream) {
2346 int rv;
2347
2348 #ifdef HAVE_MRUBY
2349 DownstreamConnection *dconn_ptr;
2350 #endif // HAVE_MRUBY
2351
2352 for (;;) {
2353 auto dconn = handler_->get_downstream_connection(rv, downstream);
2354 if (!dconn) {
2355 if (rv == SHRPX_ERR_TLS_REQUIRED) {
2356 assert(0);
2357 abort();
2358 }
2359
2360 rv = error_reply(downstream, 502);
2361 if (rv != 0) {
2362 shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2363 }
2364
2365 downstream->set_request_state(DownstreamState::CONNECT_FAIL);
2366 downstream_queue_.mark_failure(downstream);
2367
2368 return;
2369 }
2370
2371 #ifdef HAVE_MRUBY
2372 dconn_ptr = dconn.get();
2373 #endif // HAVE_MRUBY
2374 rv = downstream->attach_downstream_connection(std::move(dconn));
2375 if (rv == 0) {
2376 break;
2377 }
2378 }
2379
2380 #ifdef HAVE_MRUBY
2381 const auto &group = dconn_ptr->get_downstream_addr_group();
2382 if (group) {
2383 const auto &mruby_ctx = group->shared_addr->mruby_ctx;
2384 if (mruby_ctx->run_on_request_proc(downstream) != 0) {
2385 if (error_reply(downstream, 500) != 0) {
2386 shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2387 }
2388
2389 downstream_queue_.mark_failure(downstream);
2390
2391 return;
2392 }
2393
2394 if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
2395 return;
2396 }
2397 }
2398 #endif // HAVE_MRUBY
2399
2400 rv = downstream->push_request_headers();
2401 if (rv != 0) {
2402
2403 if (error_reply(downstream, 502) != 0) {
2404 shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2405 }
2406
2407 downstream_queue_.mark_failure(downstream);
2408
2409 return;
2410 }
2411
2412 downstream_queue_.mark_active(downstream);
2413
2414 auto &req = downstream->request();
2415 if (!req.http2_expect_body) {
2416 rv = downstream->end_upload_data();
2417 if (rv != 0) {
2418 shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2419 }
2420 }
2421 }
2422
2423 namespace {
http_recv_data(nghttp3_conn * conn,int64_t stream_id,const uint8_t * data,size_t datalen,void * user_data,void * stream_user_data)2424 int http_recv_data(nghttp3_conn *conn, int64_t stream_id, const uint8_t *data,
2425 size_t datalen, void *user_data, void *stream_user_data) {
2426 auto upstream = static_cast<Http3Upstream *>(user_data);
2427 auto downstream = static_cast<Downstream *>(stream_user_data);
2428
2429 if (upstream->http_recv_data(downstream, data, datalen) != 0) {
2430 return NGHTTP3_ERR_CALLBACK_FAILURE;
2431 }
2432
2433 return 0;
2434 }
2435 } // namespace
2436
http_recv_data(Downstream * downstream,const uint8_t * data,size_t datalen)2437 int Http3Upstream::http_recv_data(Downstream *downstream, const uint8_t *data,
2438 size_t datalen) {
2439 downstream->reset_upstream_rtimer();
2440
2441 if (downstream->push_upload_data_chunk(data, datalen) != 0) {
2442 if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE) {
2443 shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2444 }
2445
2446 consume(downstream->get_stream_id(), datalen);
2447
2448 return 0;
2449 }
2450
2451 return 0;
2452 }
2453
2454 namespace {
http_end_stream(nghttp3_conn * conn,int64_t stream_id,void * user_data,void * stream_user_data)2455 int http_end_stream(nghttp3_conn *conn, int64_t stream_id, void *user_data,
2456 void *stream_user_data) {
2457 auto upstream = static_cast<Http3Upstream *>(user_data);
2458 auto downstream = static_cast<Downstream *>(stream_user_data);
2459
2460 if (!downstream || downstream->get_stop_reading()) {
2461 return 0;
2462 }
2463
2464 if (upstream->http_end_stream(downstream) != 0) {
2465 return NGHTTP3_ERR_CALLBACK_FAILURE;
2466 }
2467
2468 return 0;
2469 }
2470 } // namespace
2471
http_end_stream(Downstream * downstream)2472 int Http3Upstream::http_end_stream(Downstream *downstream) {
2473 downstream->disable_upstream_rtimer();
2474
2475 if (downstream->end_upload_data() != 0) {
2476 if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE) {
2477 shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
2478 }
2479 }
2480
2481 downstream->set_request_state(DownstreamState::MSG_COMPLETE);
2482
2483 return 0;
2484 }
2485
2486 namespace {
http_stream_close(nghttp3_conn * conn,int64_t stream_id,uint64_t app_error_code,void * conn_user_data,void * stream_user_data)2487 int http_stream_close(nghttp3_conn *conn, int64_t stream_id,
2488 uint64_t app_error_code, void *conn_user_data,
2489 void *stream_user_data) {
2490 auto upstream = static_cast<Http3Upstream *>(conn_user_data);
2491 auto downstream = static_cast<Downstream *>(stream_user_data);
2492
2493 if (!downstream) {
2494 return 0;
2495 }
2496
2497 if (upstream->http_stream_close(downstream, app_error_code) != 0) {
2498 return NGHTTP3_ERR_CALLBACK_FAILURE;
2499 }
2500
2501 return 0;
2502 }
2503 } // namespace
2504
http_stream_close(Downstream * downstream,uint64_t app_error_code)2505 int Http3Upstream::http_stream_close(Downstream *downstream,
2506 uint64_t app_error_code) {
2507 auto stream_id = downstream->get_stream_id();
2508
2509 if (LOG_ENABLED(INFO)) {
2510 ULOG(INFO, this) << "Stream stream_id=" << stream_id
2511 << " is being closed with app_error_code="
2512 << app_error_code;
2513
2514 auto body = downstream->get_response_buf();
2515
2516 ULOG(INFO, this) << "response unacked_left=" << body->rleft()
2517 << " not_sent=" << body->rleft_mark();
2518 }
2519
2520 auto &req = downstream->request();
2521
2522 consume(stream_id, req.unconsumed_body_length);
2523
2524 req.unconsumed_body_length = 0;
2525
2526 ngtcp2_conn_extend_max_streams_bidi(conn_, 1);
2527
2528 if (downstream->get_request_state() == DownstreamState::CONNECT_FAIL) {
2529 remove_downstream(downstream);
2530 // downstream was deleted
2531
2532 return 0;
2533 }
2534
2535 if (downstream->can_detach_downstream_connection()) {
2536 // Keep-alive
2537 downstream->detach_downstream_connection();
2538 }
2539
2540 downstream->set_request_state(DownstreamState::STREAM_CLOSED);
2541
2542 // At this point, downstream read may be paused.
2543
2544 // If shrpx_downstream::push_request_headers() failed, the
2545 // error is handled here.
2546 remove_downstream(downstream);
2547 // downstream was deleted
2548
2549 return 0;
2550 }
2551
2552 namespace {
http_stop_sending(nghttp3_conn * conn,int64_t stream_id,uint64_t app_error_code,void * user_data,void * stream_user_data)2553 int http_stop_sending(nghttp3_conn *conn, int64_t stream_id,
2554 uint64_t app_error_code, void *user_data,
2555 void *stream_user_data) {
2556 auto upstream = static_cast<Http3Upstream *>(user_data);
2557
2558 if (upstream->http_stop_sending(stream_id, app_error_code) != 0) {
2559 return NGHTTP3_ERR_CALLBACK_FAILURE;
2560 }
2561
2562 return 0;
2563 }
2564 } // namespace
2565
http_stop_sending(int64_t stream_id,uint64_t app_error_code)2566 int Http3Upstream::http_stop_sending(int64_t stream_id,
2567 uint64_t app_error_code) {
2568 auto rv =
2569 ngtcp2_conn_shutdown_stream_read(conn_, 0, stream_id, app_error_code);
2570 if (ngtcp2_err_is_fatal(rv)) {
2571 ULOG(ERROR, this) << "ngtcp2_conn_shutdown_stream_read: "
2572 << ngtcp2_strerror(rv);
2573 return -1;
2574 }
2575
2576 return 0;
2577 }
2578
2579 namespace {
http_reset_stream(nghttp3_conn * conn,int64_t stream_id,uint64_t app_error_code,void * user_data,void * stream_user_data)2580 int http_reset_stream(nghttp3_conn *conn, int64_t stream_id,
2581 uint64_t app_error_code, void *user_data,
2582 void *stream_user_data) {
2583 auto upstream = static_cast<Http3Upstream *>(user_data);
2584
2585 if (upstream->http_reset_stream(stream_id, app_error_code) != 0) {
2586 return NGHTTP3_ERR_CALLBACK_FAILURE;
2587 }
2588
2589 return 0;
2590 }
2591 } // namespace
2592
http_reset_stream(int64_t stream_id,uint64_t app_error_code)2593 int Http3Upstream::http_reset_stream(int64_t stream_id,
2594 uint64_t app_error_code) {
2595 auto rv =
2596 ngtcp2_conn_shutdown_stream_write(conn_, 0, stream_id, app_error_code);
2597 if (ngtcp2_err_is_fatal(rv)) {
2598 ULOG(ERROR, this) << "ngtcp2_conn_shutdown_stream_write: "
2599 << ngtcp2_strerror(rv);
2600 return -1;
2601 }
2602
2603 return 0;
2604 }
2605
setup_httpconn()2606 int Http3Upstream::setup_httpconn() {
2607 int rv;
2608
2609 if (ngtcp2_conn_get_streams_uni_left(conn_) < 3) {
2610 return -1;
2611 }
2612
2613 nghttp3_callbacks callbacks{
2614 shrpx::http_acked_stream_data,
2615 shrpx::http_stream_close,
2616 shrpx::http_recv_data,
2617 http_deferred_consume,
2618 shrpx::http_begin_request_headers,
2619 shrpx::http_recv_request_header,
2620 shrpx::http_end_request_headers,
2621 nullptr, // begin_trailers
2622 shrpx::http_recv_request_trailer,
2623 nullptr, // end_trailers
2624 shrpx::http_stop_sending,
2625 shrpx::http_end_stream,
2626 shrpx::http_reset_stream,
2627 };
2628
2629 auto config = get_config();
2630
2631 nghttp3_settings settings;
2632 nghttp3_settings_default(&settings);
2633 settings.qpack_max_dtable_capacity = 4_k;
2634
2635 if (!config->http2_proxy) {
2636 settings.enable_connect_protocol = 1;
2637 }
2638
2639 auto mem = nghttp3_mem_default();
2640
2641 rv = nghttp3_conn_server_new(&httpconn_, &callbacks, &settings, mem, this);
2642 if (rv != 0) {
2643 ULOG(ERROR, this) << "nghttp3_conn_server_new: " << nghttp3_strerror(rv);
2644 return -1;
2645 }
2646
2647 auto params = ngtcp2_conn_get_local_transport_params(conn_);
2648
2649 nghttp3_conn_set_max_client_streams_bidi(httpconn_,
2650 params->initial_max_streams_bidi);
2651
2652 int64_t ctrl_stream_id;
2653
2654 rv = ngtcp2_conn_open_uni_stream(conn_, &ctrl_stream_id, nullptr);
2655 if (rv != 0) {
2656 ULOG(ERROR, this) << "ngtcp2_conn_open_uni_stream: " << ngtcp2_strerror(rv);
2657 return -1;
2658 }
2659
2660 rv = nghttp3_conn_bind_control_stream(httpconn_, ctrl_stream_id);
2661 if (rv != 0) {
2662 ULOG(ERROR, this) << "nghttp3_conn_bind_control_stream: "
2663 << nghttp3_strerror(rv);
2664 return -1;
2665 }
2666
2667 int64_t qpack_enc_stream_id, qpack_dec_stream_id;
2668
2669 rv = ngtcp2_conn_open_uni_stream(conn_, &qpack_enc_stream_id, nullptr);
2670 if (rv != 0) {
2671 ULOG(ERROR, this) << "ngtcp2_conn_open_uni_stream: " << ngtcp2_strerror(rv);
2672 return -1;
2673 }
2674
2675 rv = ngtcp2_conn_open_uni_stream(conn_, &qpack_dec_stream_id, nullptr);
2676 if (rv != 0) {
2677 ULOG(ERROR, this) << "ngtcp2_conn_open_uni_stream: " << ngtcp2_strerror(rv);
2678 return -1;
2679 }
2680
2681 rv = nghttp3_conn_bind_qpack_streams(httpconn_, qpack_enc_stream_id,
2682 qpack_dec_stream_id);
2683 if (rv != 0) {
2684 ULOG(ERROR, this) << "nghttp3_conn_bind_qpack_streams: "
2685 << nghttp3_strerror(rv);
2686 return -1;
2687 }
2688
2689 return 0;
2690 }
2691
error_reply(Downstream * downstream,unsigned int status_code)2692 int Http3Upstream::error_reply(Downstream *downstream,
2693 unsigned int status_code) {
2694 int rv;
2695 auto &resp = downstream->response();
2696
2697 auto &balloc = downstream->get_block_allocator();
2698
2699 auto html = http::create_error_html(balloc, status_code);
2700 resp.http_status = status_code;
2701 auto body = downstream->get_response_buf();
2702 body->append(html);
2703 downstream->set_response_state(DownstreamState::MSG_COMPLETE);
2704
2705 nghttp3_data_reader data_read;
2706 data_read.read_data = downstream_read_data_callback;
2707
2708 auto lgconf = log_config();
2709 lgconf->update_tstamp(std::chrono::system_clock::now());
2710
2711 auto response_status = http2::stringify_status(balloc, status_code);
2712 auto content_length = util::make_string_ref_uint(balloc, html.size());
2713 auto date = make_string_ref(balloc, lgconf->tstamp->time_http);
2714
2715 auto nva = std::array<nghttp3_nv, 5>{
2716 {http3::make_nv_ls_nocopy(":status", response_status),
2717 http3::make_nv_ll("content-type", "text/html; charset=UTF-8"),
2718 http3::make_nv_ls_nocopy("server", get_config()->http.server_name),
2719 http3::make_nv_ls_nocopy("content-length", content_length),
2720 http3::make_nv_ls_nocopy("date", date)}};
2721
2722 rv = nghttp3_conn_submit_response(httpconn_, downstream->get_stream_id(),
2723 nva.data(), nva.size(), &data_read);
2724 if (nghttp3_err_is_fatal(rv)) {
2725 ULOG(FATAL, this) << "nghttp3_conn_submit_response() failed: "
2726 << nghttp3_strerror(rv);
2727 return -1;
2728 }
2729
2730 downstream->reset_upstream_wtimer();
2731
2732 if (shutdown_stream_read(downstream->get_stream_id(), NGHTTP3_H3_NO_ERROR) !=
2733 0) {
2734 return -1;
2735 }
2736
2737 return 0;
2738 }
2739
shutdown_stream(Downstream * downstream,uint64_t app_error_code)2740 int Http3Upstream::shutdown_stream(Downstream *downstream,
2741 uint64_t app_error_code) {
2742 auto stream_id = downstream->get_stream_id();
2743
2744 if (LOG_ENABLED(INFO)) {
2745 ULOG(INFO, this) << "Shutdown stream_id=" << stream_id
2746 << " with app_error_code=" << app_error_code;
2747 }
2748
2749 auto rv = ngtcp2_conn_shutdown_stream(conn_, 0, stream_id, app_error_code);
2750 if (rv != 0) {
2751 ULOG(FATAL, this) << "ngtcp2_conn_shutdown_stream() failed: "
2752 << ngtcp2_strerror(rv);
2753 return -1;
2754 }
2755
2756 return 0;
2757 }
2758
shutdown_stream_read(int64_t stream_id,uint64_t app_error_code)2759 int Http3Upstream::shutdown_stream_read(int64_t stream_id,
2760 uint64_t app_error_code) {
2761 auto rv = ngtcp2_conn_shutdown_stream_read(conn_, 0, stream_id,
2762 NGHTTP3_H3_NO_ERROR);
2763 if (ngtcp2_err_is_fatal(rv)) {
2764 ULOG(FATAL, this) << "ngtcp2_conn_shutdown_stream_read: "
2765 << ngtcp2_strerror(rv);
2766 return -1;
2767 }
2768
2769 return 0;
2770 }
2771
consume(int64_t stream_id,size_t nconsumed)2772 void Http3Upstream::consume(int64_t stream_id, size_t nconsumed) {
2773 ngtcp2_conn_extend_max_stream_offset(conn_, stream_id, nconsumed);
2774 ngtcp2_conn_extend_max_offset(conn_, nconsumed);
2775 }
2776
remove_downstream(Downstream * downstream)2777 void Http3Upstream::remove_downstream(Downstream *downstream) {
2778 if (downstream->accesslog_ready()) {
2779 handler_->write_accesslog(downstream);
2780 }
2781
2782 nghttp3_conn_set_stream_user_data(httpconn_, downstream->get_stream_id(),
2783 nullptr);
2784
2785 auto next_downstream = downstream_queue_.remove_and_get_blocked(downstream);
2786
2787 if (next_downstream) {
2788 initiate_downstream(next_downstream);
2789 }
2790
2791 if (downstream_queue_.get_downstreams() == nullptr) {
2792 // There is no downstream at the moment. Start idle timer now.
2793 handler_->repeat_read_timer();
2794 }
2795 }
2796
log_response_headers(Downstream * downstream,const std::vector<nghttp3_nv> & nva) const2797 void Http3Upstream::log_response_headers(
2798 Downstream *downstream, const std::vector<nghttp3_nv> &nva) const {
2799 std::stringstream ss;
2800 for (auto &nv : nva) {
2801 ss << TTY_HTTP_HD << StringRef{nv.name, nv.namelen} << TTY_RST << ": "
2802 << StringRef{nv.value, nv.valuelen} << "\n";
2803 }
2804 ULOG(INFO, this) << "HTTP response headers. stream_id="
2805 << downstream->get_stream_id() << "\n"
2806 << ss.str();
2807 }
2808
check_shutdown()2809 int Http3Upstream::check_shutdown() {
2810 auto worker = handler_->get_worker();
2811
2812 if (!worker->get_graceful_shutdown()) {
2813 return 0;
2814 }
2815
2816 ev_prepare_stop(handler_->get_loop(), &prep_);
2817
2818 return start_graceful_shutdown();
2819 }
2820
start_graceful_shutdown()2821 int Http3Upstream::start_graceful_shutdown() {
2822 int rv;
2823
2824 if (ev_is_active(&shutdown_timer_)) {
2825 return 0;
2826 }
2827
2828 if (!httpconn_) {
2829 return -1;
2830 }
2831
2832 rv = nghttp3_conn_submit_shutdown_notice(httpconn_);
2833 if (rv != 0) {
2834 ULOG(FATAL, this) << "nghttp3_conn_submit_shutdown_notice: "
2835 << nghttp3_strerror(rv);
2836 return -1;
2837 }
2838
2839 handler_->signal_write();
2840
2841 auto t = ngtcp2_conn_get_pto(conn_);
2842
2843 ev_timer_set(&shutdown_timer_, static_cast<ev_tstamp>(t * 3) / NGTCP2_SECONDS,
2844 0.);
2845 ev_timer_start(handler_->get_loop(), &shutdown_timer_);
2846
2847 return 0;
2848 }
2849
submit_goaway()2850 int Http3Upstream::submit_goaway() {
2851 int rv;
2852
2853 rv = nghttp3_conn_shutdown(httpconn_);
2854 if (rv != 0) {
2855 ULOG(FATAL, this) << "nghttp3_conn_shutdown: " << nghttp3_strerror(rv);
2856 return -1;
2857 }
2858
2859 handler_->signal_write();
2860
2861 return 0;
2862 }
2863
open_qlog_file(const StringRef & dir,const ngtcp2_cid & scid) const2864 int Http3Upstream::open_qlog_file(const StringRef &dir,
2865 const ngtcp2_cid &scid) const {
2866 std::array<char, sizeof("20141115T125824.741+0900")> buf;
2867
2868 auto path = dir.str();
2869 path += '/';
2870 path +=
2871 util::format_iso8601_basic(buf.data(), std::chrono::system_clock::now());
2872 path += '-';
2873 path += util::format_hex(scid.data, scid.datalen);
2874 path += ".sqlog";
2875
2876 int fd;
2877
2878 #ifdef O_CLOEXEC
2879 while ((fd = open(path.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_CLOEXEC,
2880 S_IRUSR | S_IWUSR | S_IRGRP)) == -1 &&
2881 errno == EINTR)
2882 ;
2883 #else // !O_CLOEXEC
2884 while ((fd = open(path.c_str(), O_WRONLY | O_CREAT | O_TRUNC,
2885 S_IRUSR | S_IWUSR | S_IRGRP)) == -1 &&
2886 errno == EINTR)
2887 ;
2888
2889 if (fd != -1) {
2890 util::make_socket_closeonexec(fd);
2891 }
2892 #endif // !O_CLOEXEC
2893
2894 if (fd == -1) {
2895 auto error = errno;
2896 ULOG(ERROR, this) << "Failed to open qlog file " << path
2897 << ": errno=" << error;
2898 return -1;
2899 }
2900
2901 return fd;
2902 }
2903
get_conn() const2904 ngtcp2_conn *Http3Upstream::get_conn() const { return conn_; }
2905
2906 } // namespace shrpx
2907