• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * nghttp2 - HTTP/2 C Library
3  *
4  * Copyright (c) 2019 nghttp2 contributors
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining
7  * a copy of this software and associated documentation files (the
8  * "Software"), to deal in the Software without restriction, including
9  * without limitation the rights to use, copy, modify, merge, publish,
10  * distribute, sublicense, and/or sell copies of the Software, and to
11  * permit persons to whom the Software is furnished to do so, subject to
12  * the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be
15  * included in all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21  * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22  * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23  * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24  */
25 #include "h2load_quic.h"
26 
27 #include <netinet/udp.h>
28 
29 #include <iostream>
30 
31 #ifdef HAVE_LIBNGTCP2_CRYPTO_QUICTLS
32 #  include <ngtcp2/ngtcp2_crypto_quictls.h>
33 #endif // HAVE_LIBNGTCP2_CRYPTO_QUICTLS
34 #ifdef HAVE_LIBNGTCP2_CRYPTO_BORINGSSL
35 #  include <ngtcp2/ngtcp2_crypto_boringssl.h>
36 #endif // HAVE_LIBNGTCP2_CRYPTO_BORINGSSL
37 
38 #include <openssl/err.h>
39 #include <openssl/rand.h>
40 
41 #include "h2load_http3_session.h"
42 
43 namespace h2load {
44 
45 namespace {
handshake_completed(ngtcp2_conn * conn,void * user_data)46 int handshake_completed(ngtcp2_conn *conn, void *user_data) {
47   auto c = static_cast<Client *>(user_data);
48 
49   if (c->quic_handshake_completed() != 0) {
50     return NGTCP2_ERR_CALLBACK_FAILURE;
51   }
52 
53   return 0;
54 }
55 } // namespace
56 
quic_handshake_completed()57 int Client::quic_handshake_completed() { return connection_made(); }
58 
59 namespace {
recv_stream_data(ngtcp2_conn * conn,uint32_t flags,int64_t stream_id,uint64_t offset,const uint8_t * data,size_t datalen,void * user_data,void * stream_user_data)60 int recv_stream_data(ngtcp2_conn *conn, uint32_t flags, int64_t stream_id,
61                      uint64_t offset, const uint8_t *data, size_t datalen,
62                      void *user_data, void *stream_user_data) {
63   auto c = static_cast<Client *>(user_data);
64   if (c->quic_recv_stream_data(flags, stream_id, data, datalen) != 0) {
65     // TODO Better to do this gracefully rather than
66     // NGTCP2_ERR_CALLBACK_FAILURE.  Perhaps, call
67     // ngtcp2_conn_write_application_close() ?
68     return NGTCP2_ERR_CALLBACK_FAILURE;
69   }
70   return 0;
71 }
72 } // namespace
73 
quic_recv_stream_data(uint32_t flags,int64_t stream_id,const uint8_t * data,size_t datalen)74 int Client::quic_recv_stream_data(uint32_t flags, int64_t stream_id,
75                                   const uint8_t *data, size_t datalen) {
76   if (worker->current_phase == Phase::MAIN_DURATION) {
77     worker->stats.bytes_total += datalen;
78   }
79 
80   auto s = static_cast<Http3Session *>(session.get());
81   auto nconsumed = s->read_stream(flags, stream_id, data, datalen);
82   if (nconsumed == -1) {
83     return -1;
84   }
85 
86   ngtcp2_conn_extend_max_stream_offset(quic.conn, stream_id, nconsumed);
87   ngtcp2_conn_extend_max_offset(quic.conn, nconsumed);
88 
89   return 0;
90 }
91 
92 namespace {
acked_stream_data_offset(ngtcp2_conn * conn,int64_t stream_id,uint64_t offset,uint64_t datalen,void * user_data,void * stream_user_data)93 int acked_stream_data_offset(ngtcp2_conn *conn, int64_t stream_id,
94                              uint64_t offset, uint64_t datalen, void *user_data,
95                              void *stream_user_data) {
96   auto c = static_cast<Client *>(user_data);
97   if (c->quic_acked_stream_data_offset(stream_id, datalen) != 0) {
98     return NGTCP2_ERR_CALLBACK_FAILURE;
99   }
100   return 0;
101 }
102 } // namespace
103 
quic_acked_stream_data_offset(int64_t stream_id,size_t datalen)104 int Client::quic_acked_stream_data_offset(int64_t stream_id, size_t datalen) {
105   auto s = static_cast<Http3Session *>(session.get());
106   if (s->add_ack_offset(stream_id, datalen) != 0) {
107     return -1;
108   }
109   return 0;
110 }
111 
112 namespace {
stream_close(ngtcp2_conn * conn,uint32_t flags,int64_t stream_id,uint64_t app_error_code,void * user_data,void * stream_user_data)113 int stream_close(ngtcp2_conn *conn, uint32_t flags, int64_t stream_id,
114                  uint64_t app_error_code, void *user_data,
115                  void *stream_user_data) {
116   auto c = static_cast<Client *>(user_data);
117 
118   if (!(flags & NGTCP2_STREAM_CLOSE_FLAG_APP_ERROR_CODE_SET)) {
119     app_error_code = NGHTTP3_H3_NO_ERROR;
120   }
121 
122   if (c->quic_stream_close(stream_id, app_error_code) != 0) {
123     return NGTCP2_ERR_CALLBACK_FAILURE;
124   }
125   return 0;
126 }
127 } // namespace
128 
quic_stream_close(int64_t stream_id,uint64_t app_error_code)129 int Client::quic_stream_close(int64_t stream_id, uint64_t app_error_code) {
130   auto s = static_cast<Http3Session *>(session.get());
131   if (s->close_stream(stream_id, app_error_code) != 0) {
132     return -1;
133   }
134   return 0;
135 }
136 
137 namespace {
stream_reset(ngtcp2_conn * conn,int64_t stream_id,uint64_t final_size,uint64_t app_error_code,void * user_data,void * stream_user_data)138 int stream_reset(ngtcp2_conn *conn, int64_t stream_id, uint64_t final_size,
139                  uint64_t app_error_code, void *user_data,
140                  void *stream_user_data) {
141   auto c = static_cast<Client *>(user_data);
142   if (c->quic_stream_reset(stream_id, app_error_code) != 0) {
143     return NGTCP2_ERR_CALLBACK_FAILURE;
144   }
145   return 0;
146 }
147 } // namespace
148 
quic_stream_reset(int64_t stream_id,uint64_t app_error_code)149 int Client::quic_stream_reset(int64_t stream_id, uint64_t app_error_code) {
150   auto s = static_cast<Http3Session *>(session.get());
151   if (s->shutdown_stream_read(stream_id) != 0) {
152     return -1;
153   }
154   return 0;
155 }
156 
157 namespace {
stream_stop_sending(ngtcp2_conn * conn,int64_t stream_id,uint64_t app_error_code,void * user_data,void * stream_user_data)158 int stream_stop_sending(ngtcp2_conn *conn, int64_t stream_id,
159                         uint64_t app_error_code, void *user_data,
160                         void *stream_user_data) {
161   auto c = static_cast<Client *>(user_data);
162   if (c->quic_stream_stop_sending(stream_id, app_error_code) != 0) {
163     return NGTCP2_ERR_CALLBACK_FAILURE;
164   }
165   return 0;
166 }
167 } // namespace
168 
quic_stream_stop_sending(int64_t stream_id,uint64_t app_error_code)169 int Client::quic_stream_stop_sending(int64_t stream_id,
170                                      uint64_t app_error_code) {
171   auto s = static_cast<Http3Session *>(session.get());
172   if (s->shutdown_stream_read(stream_id) != 0) {
173     return -1;
174   }
175   return 0;
176 }
177 
178 namespace {
extend_max_local_streams_bidi(ngtcp2_conn * conn,uint64_t max_streams,void * user_data)179 int extend_max_local_streams_bidi(ngtcp2_conn *conn, uint64_t max_streams,
180                                   void *user_data) {
181   auto c = static_cast<Client *>(user_data);
182 
183   if (c->quic_extend_max_local_streams() != 0) {
184     return NGTCP2_ERR_CALLBACK_FAILURE;
185   }
186 
187   return 0;
188 }
189 } // namespace
190 
quic_extend_max_local_streams()191 int Client::quic_extend_max_local_streams() {
192   auto s = static_cast<Http3Session *>(session.get());
193   if (s->extend_max_local_streams() != 0) {
194     return NGTCP2_ERR_CALLBACK_FAILURE;
195   }
196   return 0;
197 }
198 
199 namespace {
extend_max_stream_data(ngtcp2_conn * conn,int64_t stream_id,uint64_t max_data,void * user_data,void * stream_user_data)200 int extend_max_stream_data(ngtcp2_conn *conn, int64_t stream_id,
201                            uint64_t max_data, void *user_data,
202                            void *stream_user_data) {
203   auto c = static_cast<Client *>(user_data);
204 
205   if (c->quic_extend_max_stream_data(stream_id) != 0) {
206     return NGTCP2_ERR_CALLBACK_FAILURE;
207   }
208 
209   return 0;
210 }
211 } // namespace
212 
quic_extend_max_stream_data(int64_t stream_id)213 int Client::quic_extend_max_stream_data(int64_t stream_id) {
214   auto s = static_cast<Http3Session *>(session.get());
215   if (s->unblock_stream(stream_id) != 0) {
216     return -1;
217   }
218   return 0;
219 }
220 
221 namespace {
get_new_connection_id(ngtcp2_conn * conn,ngtcp2_cid * cid,uint8_t * token,size_t cidlen,void * user_data)222 int get_new_connection_id(ngtcp2_conn *conn, ngtcp2_cid *cid, uint8_t *token,
223                           size_t cidlen, void *user_data) {
224   if (RAND_bytes(cid->data, cidlen) != 1) {
225     return NGTCP2_ERR_CALLBACK_FAILURE;
226   }
227 
228   cid->datalen = cidlen;
229 
230   if (RAND_bytes(token, NGTCP2_STATELESS_RESET_TOKENLEN) != 1) {
231     return NGTCP2_ERR_CALLBACK_FAILURE;
232   }
233 
234   return 0;
235 }
236 } // namespace
237 
238 namespace {
debug_log_printf(void * user_data,const char * fmt,...)239 void debug_log_printf(void *user_data, const char *fmt, ...) {
240   va_list ap;
241 
242   va_start(ap, fmt);
243   vfprintf(stderr, fmt, ap);
244   va_end(ap);
245 
246   fprintf(stderr, "\n");
247 }
248 } // namespace
249 
250 namespace {
generate_cid(ngtcp2_cid & dest)251 int generate_cid(ngtcp2_cid &dest) {
252   dest.datalen = 8;
253 
254   if (RAND_bytes(dest.data, dest.datalen) != 1) {
255     return -1;
256   }
257 
258   return 0;
259 }
260 } // namespace
261 
262 namespace {
quic_timestamp()263 ngtcp2_tstamp quic_timestamp() {
264   return std::chrono::duration_cast<std::chrono::nanoseconds>(
265              std::chrono::steady_clock::now().time_since_epoch())
266       .count();
267 }
268 } // namespace
269 
270 // qlog write callback -- excerpted from ngtcp2/examples/client_base.cc
271 namespace {
qlog_write_cb(void * user_data,uint32_t flags,const void * data,size_t datalen)272 void qlog_write_cb(void *user_data, uint32_t flags, const void *data,
273                    size_t datalen) {
274   auto c = static_cast<Client *>(user_data);
275   c->quic_write_qlog(data, datalen);
276 }
277 } // namespace
278 
quic_write_qlog(const void * data,size_t datalen)279 void Client::quic_write_qlog(const void *data, size_t datalen) {
280   assert(quic.qlog_file != nullptr);
281   fwrite(data, 1, datalen, quic.qlog_file);
282 }
283 
284 namespace {
rand(uint8_t * dest,size_t destlen,const ngtcp2_rand_ctx * rand_ctx)285 void rand(uint8_t *dest, size_t destlen, const ngtcp2_rand_ctx *rand_ctx) {
286   util::random_bytes(dest, dest + destlen,
287                      *static_cast<std::mt19937 *>(rand_ctx->native_handle));
288 }
289 } // namespace
290 
291 namespace {
recv_rx_key(ngtcp2_conn * conn,ngtcp2_encryption_level level,void * user_data)292 int recv_rx_key(ngtcp2_conn *conn, ngtcp2_encryption_level level,
293                 void *user_data) {
294   if (level != NGTCP2_ENCRYPTION_LEVEL_1RTT) {
295     return 0;
296   }
297 
298   auto c = static_cast<Client *>(user_data);
299 
300   if (c->quic_make_http3_session() != 0) {
301     return NGTCP2_ERR_CALLBACK_FAILURE;
302   }
303 
304   return 0;
305 }
306 } // namespace
307 
quic_make_http3_session()308 int Client::quic_make_http3_session() {
309   auto s = std::make_unique<Http3Session>(this);
310   if (s->init_conn() == -1) {
311     return -1;
312   }
313   session = std::move(s);
314 
315   return 0;
316 }
317 
318 namespace {
get_conn(ngtcp2_crypto_conn_ref * conn_ref)319 ngtcp2_conn *get_conn(ngtcp2_crypto_conn_ref *conn_ref) {
320   auto c = static_cast<Client *>(conn_ref->user_data);
321   return c->quic.conn;
322 }
323 } // namespace
324 
quic_init(const sockaddr * local_addr,socklen_t local_addrlen,const sockaddr * remote_addr,socklen_t remote_addrlen)325 int Client::quic_init(const sockaddr *local_addr, socklen_t local_addrlen,
326                       const sockaddr *remote_addr, socklen_t remote_addrlen) {
327   int rv;
328 
329   if (!ssl) {
330     ssl = SSL_new(worker->ssl_ctx);
331 
332     quic.conn_ref.get_conn = get_conn;
333     quic.conn_ref.user_data = this;
334 
335     SSL_set_app_data(ssl, &quic.conn_ref);
336     SSL_set_connect_state(ssl);
337     SSL_set_quic_use_legacy_codepoint(ssl, 0);
338   }
339 
340   auto callbacks = ngtcp2_callbacks{
341       ngtcp2_crypto_client_initial_cb,
342       nullptr, // recv_client_initial
343       ngtcp2_crypto_recv_crypto_data_cb,
344       h2load::handshake_completed,
345       nullptr, // recv_version_negotiation
346       ngtcp2_crypto_encrypt_cb,
347       ngtcp2_crypto_decrypt_cb,
348       ngtcp2_crypto_hp_mask_cb,
349       h2load::recv_stream_data,
350       h2load::acked_stream_data_offset,
351       nullptr, // stream_open
352       h2load::stream_close,
353       nullptr, // recv_stateless_reset
354       ngtcp2_crypto_recv_retry_cb,
355       h2load::extend_max_local_streams_bidi,
356       nullptr, // extend_max_local_streams_uni
357       h2load::rand,
358       get_new_connection_id,
359       nullptr, // remove_connection_id
360       ngtcp2_crypto_update_key_cb,
361       nullptr, // path_validation
362       nullptr, // select_preferred_addr
363       h2load::stream_reset,
364       nullptr, // extend_max_remote_streams_bidi
365       nullptr, // extend_max_remote_streams_uni
366       h2load::extend_max_stream_data,
367       nullptr, // dcid_status
368       nullptr, // handshake_confirmed
369       nullptr, // recv_new_token
370       ngtcp2_crypto_delete_crypto_aead_ctx_cb,
371       ngtcp2_crypto_delete_crypto_cipher_ctx_cb,
372       nullptr, // recv_datagram
373       nullptr, // ack_datagram
374       nullptr, // lost_datagram
375       ngtcp2_crypto_get_path_challenge_data_cb,
376       h2load::stream_stop_sending,
377       nullptr, // version_negotiation
378       h2load::recv_rx_key,
379       nullptr, // recv_tx_key
380   };
381 
382   ngtcp2_cid scid, dcid;
383   if (generate_cid(scid) != 0) {
384     return -1;
385   }
386   if (generate_cid(dcid) != 0) {
387     return -1;
388   }
389 
390   auto config = worker->config;
391 
392   ngtcp2_settings settings;
393   ngtcp2_settings_default(&settings);
394   if (config->verbose) {
395     settings.log_printf = debug_log_printf;
396   }
397   settings.initial_ts = quic_timestamp();
398   settings.rand_ctx.native_handle = &worker->randgen;
399   if (!config->qlog_file_base.empty()) {
400     assert(quic.qlog_file == nullptr);
401     auto path = config->qlog_file_base;
402     path += '.';
403     path += util::utos(worker->id);
404     path += '.';
405     path += util::utos(id);
406     path += ".sqlog";
407     quic.qlog_file = fopen(path.c_str(), "w");
408     if (quic.qlog_file == nullptr) {
409       std::cerr << "Failed to open a qlog file: " << path << std::endl;
410       return -1;
411     }
412     settings.qlog_write = qlog_write_cb;
413   }
414   if (config->max_udp_payload_size) {
415     settings.max_tx_udp_payload_size = config->max_udp_payload_size;
416     settings.no_tx_udp_payload_size_shaping = 1;
417   }
418 
419   ngtcp2_transport_params params;
420   ngtcp2_transport_params_default(&params);
421   auto max_stream_data =
422       std::min((1 << 26) - 1, (1 << config->window_bits) - 1);
423   params.initial_max_stream_data_bidi_local = max_stream_data;
424   params.initial_max_stream_data_uni = max_stream_data;
425   params.initial_max_data = (1 << config->connection_window_bits) - 1;
426   params.initial_max_streams_bidi = 0;
427   params.initial_max_streams_uni = 100;
428   params.max_idle_timeout = 30 * NGTCP2_SECONDS;
429 
430   auto path = ngtcp2_path{
431       {
432           const_cast<sockaddr *>(local_addr),
433           local_addrlen,
434       },
435       {
436           const_cast<sockaddr *>(remote_addr),
437           remote_addrlen,
438       },
439   };
440 
441   assert(config->npn_list.size());
442 
443   uint32_t quic_version;
444 
445   if (config->npn_list[0] == NGHTTP3_ALPN_H3) {
446     quic_version = NGTCP2_PROTO_VER_V1;
447   } else {
448     quic_version = NGTCP2_PROTO_VER_MIN;
449   }
450 
451   rv = ngtcp2_conn_client_new(&quic.conn, &dcid, &scid, &path, quic_version,
452                               &callbacks, &settings, &params, nullptr, this);
453   if (rv != 0) {
454     return -1;
455   }
456 
457   ngtcp2_conn_set_tls_native_handle(quic.conn, ssl);
458 
459   return 0;
460 }
461 
quic_free()462 void Client::quic_free() {
463   ngtcp2_conn_del(quic.conn);
464   if (quic.qlog_file != nullptr) {
465     fclose(quic.qlog_file);
466     quic.qlog_file = nullptr;
467   }
468 }
469 
quic_close_connection()470 void Client::quic_close_connection() {
471   if (!quic.conn) {
472     return;
473   }
474 
475   std::array<uint8_t, NGTCP2_MAX_UDP_PAYLOAD_SIZE> buf;
476   ngtcp2_path_storage ps;
477   ngtcp2_path_storage_zero(&ps);
478 
479   auto nwrite = ngtcp2_conn_write_connection_close(
480       quic.conn, &ps.path, nullptr, buf.data(), buf.size(), &quic.last_error,
481       quic_timestamp());
482 
483   if (nwrite <= 0) {
484     return;
485   }
486 
487   write_udp(reinterpret_cast<sockaddr *>(ps.path.remote.addr),
488             ps.path.remote.addrlen, buf.data(), nwrite, 0);
489 }
490 
quic_write_client_handshake(ngtcp2_encryption_level level,const uint8_t * data,size_t datalen)491 int Client::quic_write_client_handshake(ngtcp2_encryption_level level,
492                                         const uint8_t *data, size_t datalen) {
493   int rv;
494 
495   assert(level < 2);
496 
497   rv = ngtcp2_conn_submit_crypto_data(quic.conn, level, data, datalen);
498   if (rv != 0) {
499     std::cerr << "ngtcp2_conn_submit_crypto_data: " << ngtcp2_strerror(rv)
500               << std::endl;
501     return -1;
502   }
503 
504   return 0;
505 }
506 
quic_pkt_timeout_cb(struct ev_loop * loop,ev_timer * w,int revents)507 void quic_pkt_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
508   auto c = static_cast<Client *>(w->data);
509 
510   if (c->quic_pkt_timeout() != 0) {
511     c->fail();
512     c->worker->free_client(c);
513     delete c;
514     return;
515   }
516 }
517 
quic_pkt_timeout()518 int Client::quic_pkt_timeout() {
519   int rv;
520   auto now = quic_timestamp();
521 
522   rv = ngtcp2_conn_handle_expiry(quic.conn, now);
523   if (rv != 0) {
524     ngtcp2_ccerr_set_liberr(&quic.last_error, rv, nullptr, 0);
525     return -1;
526   }
527 
528   return write_quic();
529 }
530 
quic_restart_pkt_timer()531 void Client::quic_restart_pkt_timer() {
532   auto expiry = ngtcp2_conn_get_expiry(quic.conn);
533   auto now = quic_timestamp();
534   auto t = expiry > now ? static_cast<ev_tstamp>(expiry - now) / NGTCP2_SECONDS
535                         : 1e-9;
536   quic.pkt_timer.repeat = t;
537   ev_timer_again(worker->loop, &quic.pkt_timer);
538 }
539 
read_quic()540 int Client::read_quic() {
541   std::array<uint8_t, 65535> buf;
542   sockaddr_union su;
543   int rv;
544   size_t pktcnt = 0;
545   ngtcp2_pkt_info pi{};
546 
547   iovec msg_iov;
548   msg_iov.iov_base = buf.data();
549   msg_iov.iov_len = buf.size();
550 
551   msghdr msg{};
552   msg.msg_name = &su;
553   msg.msg_iov = &msg_iov;
554   msg.msg_iovlen = 1;
555 
556   uint8_t msg_ctrl[CMSG_SPACE(sizeof(uint16_t))];
557   msg.msg_control = msg_ctrl;
558 
559   auto ts = quic_timestamp();
560 
561   for (;;) {
562     msg.msg_namelen = sizeof(su);
563     msg.msg_controllen = sizeof(msg_ctrl);
564 
565     auto nread = recvmsg(fd, &msg, 0);
566     if (nread == -1) {
567       return 0;
568     }
569 
570     auto gso_size = util::msghdr_get_udp_gro(&msg);
571     if (gso_size == 0) {
572       gso_size = static_cast<size_t>(nread);
573     }
574 
575     assert(quic.conn);
576 
577     ++worker->stats.udp_dgram_recv;
578 
579     auto path = ngtcp2_path{
580         {
581             &local_addr.su.sa,
582             static_cast<socklen_t>(local_addr.len),
583         },
584         {
585             &su.sa,
586             msg.msg_namelen,
587         },
588     };
589 
590     auto data = buf.data();
591 
592     for (;;) {
593       auto datalen = std::min(static_cast<size_t>(nread), gso_size);
594 
595       ++pktcnt;
596 
597       rv = ngtcp2_conn_read_pkt(quic.conn, &path, &pi, data, datalen, ts);
598       if (rv != 0) {
599         if (!quic.last_error.error_code) {
600           if (rv == NGTCP2_ERR_CRYPTO) {
601             ngtcp2_ccerr_set_tls_alert(&quic.last_error,
602                                        ngtcp2_conn_get_tls_alert(quic.conn),
603                                        nullptr, 0);
604           } else {
605             ngtcp2_ccerr_set_liberr(&quic.last_error, rv, nullptr, 0);
606           }
607         }
608 
609         return -1;
610       }
611 
612       nread -= datalen;
613       if (nread == 0) {
614         break;
615       }
616 
617       data += datalen;
618     }
619 
620     if (pktcnt >= 100) {
621       break;
622     }
623   }
624 
625   return 0;
626 }
627 
write_quic()628 int Client::write_quic() {
629   int rv;
630 
631   ev_io_stop(worker->loop, &wev);
632 
633   if (quic.close_requested) {
634     return -1;
635   }
636 
637   if (quic.tx.send_blocked) {
638     rv = send_blocked_packet();
639     if (rv != 0) {
640       return -1;
641     }
642 
643     if (quic.tx.send_blocked) {
644       return 0;
645     }
646   }
647 
648   std::array<nghttp3_vec, 16> vec;
649   size_t pktcnt = 0;
650   auto max_udp_payload_size =
651       ngtcp2_conn_get_max_tx_udp_payload_size(quic.conn);
652 #ifdef UDP_SEGMENT
653   auto path_max_udp_payload_size =
654       ngtcp2_conn_get_path_max_tx_udp_payload_size(quic.conn);
655 #endif // UDP_SEGMENT
656   auto max_pktcnt =
657       ngtcp2_conn_get_send_quantum(quic.conn) / max_udp_payload_size;
658   uint8_t *bufpos = quic.tx.data.get();
659   ngtcp2_path_storage ps;
660   size_t gso_size = 0;
661 
662   ngtcp2_path_storage_zero(&ps);
663 
664   auto s = static_cast<Http3Session *>(session.get());
665   auto ts = quic_timestamp();
666 
667   for (;;) {
668     int64_t stream_id = -1;
669     int fin = 0;
670     ssize_t sveccnt = 0;
671 
672     if (session && ngtcp2_conn_get_max_data_left(quic.conn)) {
673       sveccnt = s->write_stream(stream_id, fin, vec.data(), vec.size());
674       if (sveccnt == -1) {
675         return -1;
676       }
677     }
678 
679     ngtcp2_ssize ndatalen;
680     auto v = vec.data();
681     auto vcnt = static_cast<size_t>(sveccnt);
682 
683     uint32_t flags = NGTCP2_WRITE_STREAM_FLAG_MORE;
684     if (fin) {
685       flags |= NGTCP2_WRITE_STREAM_FLAG_FIN;
686     }
687 
688     auto nwrite = ngtcp2_conn_writev_stream(
689         quic.conn, &ps.path, nullptr, bufpos, max_udp_payload_size, &ndatalen,
690         flags, stream_id, reinterpret_cast<const ngtcp2_vec *>(v), vcnt, ts);
691     if (nwrite < 0) {
692       switch (nwrite) {
693       case NGTCP2_ERR_STREAM_DATA_BLOCKED:
694         assert(ndatalen == -1);
695         s->block_stream(stream_id);
696         continue;
697       case NGTCP2_ERR_STREAM_SHUT_WR:
698         assert(ndatalen == -1);
699         s->shutdown_stream_write(stream_id);
700         continue;
701       case NGTCP2_ERR_WRITE_MORE:
702         assert(ndatalen >= 0);
703         if (s->add_write_offset(stream_id, ndatalen) != 0) {
704           return -1;
705         }
706         continue;
707       }
708 
709       ngtcp2_ccerr_set_liberr(&quic.last_error, nwrite, nullptr, 0);
710       return -1;
711     } else if (ndatalen >= 0 && s->add_write_offset(stream_id, ndatalen) != 0) {
712       return -1;
713     }
714 
715     quic_restart_pkt_timer();
716 
717     if (nwrite == 0) {
718       if (bufpos - quic.tx.data.get()) {
719         auto data = quic.tx.data.get();
720         auto datalen = bufpos - quic.tx.data.get();
721         rv = write_udp(ps.path.remote.addr, ps.path.remote.addrlen, data,
722                        datalen, gso_size);
723         if (rv == 1) {
724           on_send_blocked(ps.path.remote, data, datalen, gso_size);
725           signal_write();
726           return 0;
727         }
728       }
729       return 0;
730     }
731 
732     bufpos += nwrite;
733 
734 #ifdef UDP_SEGMENT
735     if (worker->config->no_udp_gso) {
736 #endif // UDP_SEGMENT
737       auto data = quic.tx.data.get();
738       auto datalen = bufpos - quic.tx.data.get();
739       rv = write_udp(ps.path.remote.addr, ps.path.remote.addrlen, data, datalen,
740                      0);
741       if (rv == 1) {
742         on_send_blocked(ps.path.remote, data, datalen, 0);
743         signal_write();
744         return 0;
745       }
746 
747       if (++pktcnt == max_pktcnt) {
748         signal_write();
749         return 0;
750       }
751 
752       bufpos = quic.tx.data.get();
753 
754 #ifdef UDP_SEGMENT
755       continue;
756     }
757 #endif // UDP_SEGMENT
758 
759 #ifdef UDP_SEGMENT
760     if (pktcnt == 0) {
761       gso_size = nwrite;
762     } else if (static_cast<size_t>(nwrite) > gso_size ||
763                (gso_size > path_max_udp_payload_size &&
764                 static_cast<size_t>(nwrite) != gso_size)) {
765       auto data = quic.tx.data.get();
766       auto datalen = bufpos - quic.tx.data.get() - nwrite;
767       rv = write_udp(ps.path.remote.addr, ps.path.remote.addrlen, data, datalen,
768                      gso_size);
769       if (rv == 1) {
770         on_send_blocked(ps.path.remote, data, datalen, gso_size);
771         on_send_blocked(ps.path.remote, bufpos - nwrite, nwrite, 0);
772       } else {
773         auto data = bufpos - nwrite;
774         rv = write_udp(ps.path.remote.addr, ps.path.remote.addrlen, data,
775                        nwrite, 0);
776         if (rv == 1) {
777           on_send_blocked(ps.path.remote, data, nwrite, 0);
778         }
779       }
780 
781       signal_write();
782       return 0;
783     }
784 
785     // Assume that the path does not change.
786     if (++pktcnt == max_pktcnt || static_cast<size_t>(nwrite) < gso_size) {
787       auto data = quic.tx.data.get();
788       auto datalen = bufpos - quic.tx.data.get();
789       rv = write_udp(ps.path.remote.addr, ps.path.remote.addrlen, data, datalen,
790                      gso_size);
791       if (rv == 1) {
792         on_send_blocked(ps.path.remote, data, datalen, gso_size);
793       }
794       signal_write();
795       return 0;
796     }
797 #endif // UDP_SEGMENT
798   }
799 }
800 
on_send_blocked(const ngtcp2_addr & remote_addr,const uint8_t * data,size_t datalen,size_t gso_size)801 void Client::on_send_blocked(const ngtcp2_addr &remote_addr,
802                              const uint8_t *data, size_t datalen,
803                              size_t gso_size) {
804   assert(quic.tx.num_blocked || !quic.tx.send_blocked);
805   assert(quic.tx.num_blocked < 2);
806 
807   quic.tx.send_blocked = true;
808 
809   auto &p = quic.tx.blocked[quic.tx.num_blocked++];
810 
811   memcpy(&p.remote_addr.su, remote_addr.addr, remote_addr.addrlen);
812 
813   p.remote_addr.len = remote_addr.addrlen;
814   p.data = data;
815   p.datalen = datalen;
816   p.gso_size = gso_size;
817 }
818 
send_blocked_packet()819 int Client::send_blocked_packet() {
820   int rv;
821 
822   assert(quic.tx.send_blocked);
823 
824   for (; quic.tx.num_blocked_sent < quic.tx.num_blocked;
825        ++quic.tx.num_blocked_sent) {
826     auto &p = quic.tx.blocked[quic.tx.num_blocked_sent];
827 
828     rv = write_udp(&p.remote_addr.su.sa, p.remote_addr.len, p.data, p.datalen,
829                    p.gso_size);
830     if (rv == 1) {
831       signal_write();
832 
833       return 0;
834     }
835   }
836 
837   quic.tx.send_blocked = false;
838   quic.tx.num_blocked = 0;
839   quic.tx.num_blocked_sent = 0;
840 
841   return 0;
842 }
843 
844 } // namespace h2load
845