• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * nghttp2 - HTTP/2 C Library
3  *
4  * Copyright (c) 2019 nghttp2 contributors
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining
7  * a copy of this software and associated documentation files (the
8  * "Software"), to deal in the Software without restriction, including
9  * without limitation the rights to use, copy, modify, merge, publish,
10  * distribute, sublicense, and/or sell copies of the Software, and to
11  * permit persons to whom the Software is furnished to do so, subject to
12  * the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be
15  * included in all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21  * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22  * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23  * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24  */
25 #include "h2load_quic.h"
26 
27 #include <netinet/udp.h>
28 
29 #include <iostream>
30 
31 #ifdef HAVE_LIBNGTCP2_CRYPTO_OPENSSL
32 #  include <ngtcp2/ngtcp2_crypto_openssl.h>
33 #endif // HAVE_LIBNGTCP2_CRYPTO_OPENSSL
34 #ifdef HAVE_LIBNGTCP2_CRYPTO_BORINGSSL
35 #  include <ngtcp2/ngtcp2_crypto_boringssl.h>
36 #endif // HAVE_LIBNGTCP2_CRYPTO_BORINGSSL
37 
38 #include <openssl/err.h>
39 #include <openssl/rand.h>
40 
41 #include "h2load_http3_session.h"
42 
43 namespace h2load {
44 
45 namespace {
handshake_completed(ngtcp2_conn * conn,void * user_data)46 int handshake_completed(ngtcp2_conn *conn, void *user_data) {
47   auto c = static_cast<Client *>(user_data);
48 
49   if (c->quic_handshake_completed() != 0) {
50     return NGTCP2_ERR_CALLBACK_FAILURE;
51   }
52 
53   return 0;
54 }
55 } // namespace
56 
quic_handshake_completed()57 int Client::quic_handshake_completed() { return connection_made(); }
58 
59 namespace {
recv_stream_data(ngtcp2_conn * conn,uint32_t flags,int64_t stream_id,uint64_t offset,const uint8_t * data,size_t datalen,void * user_data,void * stream_user_data)60 int recv_stream_data(ngtcp2_conn *conn, uint32_t flags, int64_t stream_id,
61                      uint64_t offset, const uint8_t *data, size_t datalen,
62                      void *user_data, void *stream_user_data) {
63   auto c = static_cast<Client *>(user_data);
64   if (c->quic_recv_stream_data(flags, stream_id, data, datalen) != 0) {
65     // TODO Better to do this gracefully rather than
66     // NGTCP2_ERR_CALLBACK_FAILURE.  Perhaps, call
67     // ngtcp2_conn_write_application_close() ?
68     return NGTCP2_ERR_CALLBACK_FAILURE;
69   }
70   return 0;
71 }
72 } // namespace
73 
quic_recv_stream_data(uint32_t flags,int64_t stream_id,const uint8_t * data,size_t datalen)74 int Client::quic_recv_stream_data(uint32_t flags, int64_t stream_id,
75                                   const uint8_t *data, size_t datalen) {
76   if (worker->current_phase == Phase::MAIN_DURATION) {
77     worker->stats.bytes_total += datalen;
78   }
79 
80   auto s = static_cast<Http3Session *>(session.get());
81   auto nconsumed = s->read_stream(flags, stream_id, data, datalen);
82   if (nconsumed == -1) {
83     return -1;
84   }
85 
86   ngtcp2_conn_extend_max_stream_offset(quic.conn, stream_id, nconsumed);
87   ngtcp2_conn_extend_max_offset(quic.conn, nconsumed);
88 
89   return 0;
90 }
91 
92 namespace {
acked_stream_data_offset(ngtcp2_conn * conn,int64_t stream_id,uint64_t offset,uint64_t datalen,void * user_data,void * stream_user_data)93 int acked_stream_data_offset(ngtcp2_conn *conn, int64_t stream_id,
94                              uint64_t offset, uint64_t datalen, void *user_data,
95                              void *stream_user_data) {
96   auto c = static_cast<Client *>(user_data);
97   if (c->quic_acked_stream_data_offset(stream_id, datalen) != 0) {
98     return NGTCP2_ERR_CALLBACK_FAILURE;
99   }
100   return 0;
101 }
102 } // namespace
103 
quic_acked_stream_data_offset(int64_t stream_id,size_t datalen)104 int Client::quic_acked_stream_data_offset(int64_t stream_id, size_t datalen) {
105   auto s = static_cast<Http3Session *>(session.get());
106   if (s->add_ack_offset(stream_id, datalen) != 0) {
107     return -1;
108   }
109   return 0;
110 }
111 
112 namespace {
stream_close(ngtcp2_conn * conn,uint32_t flags,int64_t stream_id,uint64_t app_error_code,void * user_data,void * stream_user_data)113 int stream_close(ngtcp2_conn *conn, uint32_t flags, int64_t stream_id,
114                  uint64_t app_error_code, void *user_data,
115                  void *stream_user_data) {
116   auto c = static_cast<Client *>(user_data);
117 
118   if (!(flags & NGTCP2_STREAM_CLOSE_FLAG_APP_ERROR_CODE_SET)) {
119     app_error_code = NGHTTP3_H3_NO_ERROR;
120   }
121 
122   if (c->quic_stream_close(stream_id, app_error_code) != 0) {
123     return NGTCP2_ERR_CALLBACK_FAILURE;
124   }
125   return 0;
126 }
127 } // namespace
128 
quic_stream_close(int64_t stream_id,uint64_t app_error_code)129 int Client::quic_stream_close(int64_t stream_id, uint64_t app_error_code) {
130   auto s = static_cast<Http3Session *>(session.get());
131   if (s->close_stream(stream_id, app_error_code) != 0) {
132     return -1;
133   }
134   return 0;
135 }
136 
137 namespace {
stream_reset(ngtcp2_conn * conn,int64_t stream_id,uint64_t final_size,uint64_t app_error_code,void * user_data,void * stream_user_data)138 int stream_reset(ngtcp2_conn *conn, int64_t stream_id, uint64_t final_size,
139                  uint64_t app_error_code, void *user_data,
140                  void *stream_user_data) {
141   auto c = static_cast<Client *>(user_data);
142   if (c->quic_stream_reset(stream_id, app_error_code) != 0) {
143     return NGTCP2_ERR_CALLBACK_FAILURE;
144   }
145   return 0;
146 }
147 } // namespace
148 
quic_stream_reset(int64_t stream_id,uint64_t app_error_code)149 int Client::quic_stream_reset(int64_t stream_id, uint64_t app_error_code) {
150   auto s = static_cast<Http3Session *>(session.get());
151   if (s->shutdown_stream_read(stream_id) != 0) {
152     return -1;
153   }
154   return 0;
155 }
156 
157 namespace {
stream_stop_sending(ngtcp2_conn * conn,int64_t stream_id,uint64_t app_error_code,void * user_data,void * stream_user_data)158 int stream_stop_sending(ngtcp2_conn *conn, int64_t stream_id,
159                         uint64_t app_error_code, void *user_data,
160                         void *stream_user_data) {
161   auto c = static_cast<Client *>(user_data);
162   if (c->quic_stream_stop_sending(stream_id, app_error_code) != 0) {
163     return NGTCP2_ERR_CALLBACK_FAILURE;
164   }
165   return 0;
166 }
167 } // namespace
168 
quic_stream_stop_sending(int64_t stream_id,uint64_t app_error_code)169 int Client::quic_stream_stop_sending(int64_t stream_id,
170                                      uint64_t app_error_code) {
171   auto s = static_cast<Http3Session *>(session.get());
172   if (s->shutdown_stream_read(stream_id) != 0) {
173     return -1;
174   }
175   return 0;
176 }
177 
178 namespace {
extend_max_local_streams_bidi(ngtcp2_conn * conn,uint64_t max_streams,void * user_data)179 int extend_max_local_streams_bidi(ngtcp2_conn *conn, uint64_t max_streams,
180                                   void *user_data) {
181   auto c = static_cast<Client *>(user_data);
182 
183   if (c->quic_extend_max_local_streams() != 0) {
184     return NGTCP2_ERR_CALLBACK_FAILURE;
185   }
186 
187   return 0;
188 }
189 } // namespace
190 
quic_extend_max_local_streams()191 int Client::quic_extend_max_local_streams() {
192   auto s = static_cast<Http3Session *>(session.get());
193   if (s->extend_max_local_streams() != 0) {
194     return NGTCP2_ERR_CALLBACK_FAILURE;
195   }
196   return 0;
197 }
198 
199 namespace {
get_new_connection_id(ngtcp2_conn * conn,ngtcp2_cid * cid,uint8_t * token,size_t cidlen,void * user_data)200 int get_new_connection_id(ngtcp2_conn *conn, ngtcp2_cid *cid, uint8_t *token,
201                           size_t cidlen, void *user_data) {
202   if (RAND_bytes(cid->data, cidlen) != 1) {
203     return NGTCP2_ERR_CALLBACK_FAILURE;
204   }
205 
206   cid->datalen = cidlen;
207 
208   if (RAND_bytes(token, NGTCP2_STATELESS_RESET_TOKENLEN) != 1) {
209     return NGTCP2_ERR_CALLBACK_FAILURE;
210   }
211 
212   return 0;
213 }
214 } // namespace
215 
216 namespace {
debug_log_printf(void * user_data,const char * fmt,...)217 void debug_log_printf(void *user_data, const char *fmt, ...) {
218   va_list ap;
219 
220   va_start(ap, fmt);
221   vfprintf(stderr, fmt, ap);
222   va_end(ap);
223 
224   fprintf(stderr, "\n");
225 }
226 } // namespace
227 
228 namespace {
generate_cid(ngtcp2_cid & dest)229 int generate_cid(ngtcp2_cid &dest) {
230   dest.datalen = 8;
231 
232   if (RAND_bytes(dest.data, dest.datalen) != 1) {
233     return -1;
234   }
235 
236   return 0;
237 }
238 } // namespace
239 
240 namespace {
timestamp(struct ev_loop * loop)241 ngtcp2_tstamp timestamp(struct ev_loop *loop) {
242   return ev_now(loop) * NGTCP2_SECONDS;
243 }
244 } // namespace
245 
246 // qlog write callback -- excerpted from ngtcp2/examples/client_base.cc
247 namespace {
qlog_write_cb(void * user_data,uint32_t flags,const void * data,size_t datalen)248 void qlog_write_cb(void *user_data, uint32_t flags, const void *data,
249                    size_t datalen) {
250   auto c = static_cast<Client *>(user_data);
251   c->quic_write_qlog(data, datalen);
252 }
253 } // namespace
254 
quic_write_qlog(const void * data,size_t datalen)255 void Client::quic_write_qlog(const void *data, size_t datalen) {
256   assert(quic.qlog_file != nullptr);
257   fwrite(data, 1, datalen, quic.qlog_file);
258 }
259 
260 namespace {
rand(uint8_t * dest,size_t destlen,const ngtcp2_rand_ctx * rand_ctx)261 void rand(uint8_t *dest, size_t destlen, const ngtcp2_rand_ctx *rand_ctx) {
262   util::random_bytes(dest, dest + destlen,
263                      *static_cast<std::mt19937 *>(rand_ctx->native_handle));
264 }
265 } // namespace
266 
267 namespace {
recv_rx_key(ngtcp2_conn * conn,ngtcp2_crypto_level level,void * user_data)268 int recv_rx_key(ngtcp2_conn *conn, ngtcp2_crypto_level level, void *user_data) {
269   if (level != NGTCP2_CRYPTO_LEVEL_APPLICATION) {
270     return 0;
271   }
272 
273   auto c = static_cast<Client *>(user_data);
274 
275   if (c->quic_make_http3_session() != 0) {
276     return NGTCP2_ERR_CALLBACK_FAILURE;
277   }
278 
279   return 0;
280 }
281 } // namespace
282 
quic_make_http3_session()283 int Client::quic_make_http3_session() {
284   auto s = std::make_unique<Http3Session>(this);
285   if (s->init_conn() == -1) {
286     return -1;
287   }
288   session = std::move(s);
289 
290   return 0;
291 }
292 
293 namespace {
get_conn(ngtcp2_crypto_conn_ref * conn_ref)294 ngtcp2_conn *get_conn(ngtcp2_crypto_conn_ref *conn_ref) {
295   auto c = static_cast<Client *>(conn_ref->user_data);
296   return c->quic.conn;
297 }
298 } // namespace
299 
quic_init(const sockaddr * local_addr,socklen_t local_addrlen,const sockaddr * remote_addr,socklen_t remote_addrlen)300 int Client::quic_init(const sockaddr *local_addr, socklen_t local_addrlen,
301                       const sockaddr *remote_addr, socklen_t remote_addrlen) {
302   int rv;
303 
304   if (!ssl) {
305     ssl = SSL_new(worker->ssl_ctx);
306 
307     quic.conn_ref.get_conn = get_conn;
308     quic.conn_ref.user_data = this;
309 
310     SSL_set_app_data(ssl, &quic.conn_ref);
311     SSL_set_connect_state(ssl);
312     SSL_set_quic_use_legacy_codepoint(ssl, 0);
313   }
314 
315   auto callbacks = ngtcp2_callbacks{
316       ngtcp2_crypto_client_initial_cb,
317       nullptr, // recv_client_initial
318       ngtcp2_crypto_recv_crypto_data_cb,
319       h2load::handshake_completed,
320       nullptr, // recv_version_negotiation
321       ngtcp2_crypto_encrypt_cb,
322       ngtcp2_crypto_decrypt_cb,
323       ngtcp2_crypto_hp_mask_cb,
324       h2load::recv_stream_data,
325       h2load::acked_stream_data_offset,
326       nullptr, // stream_open
327       h2load::stream_close,
328       nullptr, // recv_stateless_reset
329       ngtcp2_crypto_recv_retry_cb,
330       h2load::extend_max_local_streams_bidi,
331       nullptr, // extend_max_local_streams_uni
332       h2load::rand,
333       get_new_connection_id,
334       nullptr, // remove_connection_id
335       ngtcp2_crypto_update_key_cb,
336       nullptr, // path_validation
337       nullptr, // select_preferred_addr
338       h2load::stream_reset,
339       nullptr, // extend_max_remote_streams_bidi
340       nullptr, // extend_max_remote_streams_uni
341       nullptr, // extend_max_stream_data
342       nullptr, // dcid_status
343       nullptr, // handshake_confirmed
344       nullptr, // recv_new_token
345       ngtcp2_crypto_delete_crypto_aead_ctx_cb,
346       ngtcp2_crypto_delete_crypto_cipher_ctx_cb,
347       nullptr, // recv_datagram
348       nullptr, // ack_datagram
349       nullptr, // lost_datagram
350       ngtcp2_crypto_get_path_challenge_data_cb,
351       h2load::stream_stop_sending,
352       nullptr, // version_negotiation
353       h2load::recv_rx_key,
354       nullptr, // recv_tx_key
355   };
356 
357   ngtcp2_cid scid, dcid;
358   if (generate_cid(scid) != 0) {
359     return -1;
360   }
361   if (generate_cid(dcid) != 0) {
362     return -1;
363   }
364 
365   auto config = worker->config;
366 
367   ngtcp2_settings settings;
368   ngtcp2_settings_default(&settings);
369   if (config->verbose) {
370     settings.log_printf = debug_log_printf;
371   }
372   settings.initial_ts = timestamp(worker->loop);
373   settings.rand_ctx.native_handle = &worker->randgen;
374   if (!config->qlog_file_base.empty()) {
375     assert(quic.qlog_file == nullptr);
376     auto path = config->qlog_file_base;
377     path += '.';
378     path += util::utos(worker->id);
379     path += '.';
380     path += util::utos(id);
381     path += ".sqlog";
382     quic.qlog_file = fopen(path.c_str(), "w");
383     if (quic.qlog_file == nullptr) {
384       std::cerr << "Failed to open a qlog file: " << path << std::endl;
385       return -1;
386     }
387     settings.qlog.write = qlog_write_cb;
388   }
389   if (config->max_udp_payload_size) {
390     settings.max_tx_udp_payload_size = config->max_udp_payload_size;
391     settings.no_tx_udp_payload_size_shaping = 1;
392   }
393 
394   ngtcp2_transport_params params;
395   ngtcp2_transport_params_default(&params);
396   auto max_stream_data =
397       std::min((1 << 26) - 1, (1 << config->window_bits) - 1);
398   params.initial_max_stream_data_bidi_local = max_stream_data;
399   params.initial_max_stream_data_uni = max_stream_data;
400   params.initial_max_data = (1 << config->connection_window_bits) - 1;
401   params.initial_max_streams_bidi = 0;
402   params.initial_max_streams_uni = 100;
403   params.max_idle_timeout = 30 * NGTCP2_SECONDS;
404 
405   auto path = ngtcp2_path{
406       {
407           const_cast<sockaddr *>(local_addr),
408           local_addrlen,
409       },
410       {
411           const_cast<sockaddr *>(remote_addr),
412           remote_addrlen,
413       },
414   };
415 
416   assert(config->npn_list.size());
417 
418   uint32_t quic_version;
419 
420   if (config->npn_list[0] == NGHTTP3_ALPN_H3) {
421     quic_version = NGTCP2_PROTO_VER_V1;
422   } else {
423     quic_version = NGTCP2_PROTO_VER_MIN;
424   }
425 
426   rv = ngtcp2_conn_client_new(&quic.conn, &dcid, &scid, &path, quic_version,
427                               &callbacks, &settings, &params, nullptr, this);
428   if (rv != 0) {
429     return -1;
430   }
431 
432   ngtcp2_conn_set_tls_native_handle(quic.conn, ssl);
433 
434   return 0;
435 }
436 
quic_free()437 void Client::quic_free() {
438   ngtcp2_conn_del(quic.conn);
439   if (quic.qlog_file != nullptr) {
440     fclose(quic.qlog_file);
441     quic.qlog_file = nullptr;
442   }
443 }
444 
quic_close_connection()445 void Client::quic_close_connection() {
446   if (!quic.conn) {
447     return;
448   }
449 
450   std::array<uint8_t, NGTCP2_MAX_UDP_PAYLOAD_SIZE> buf;
451   ngtcp2_path_storage ps;
452   ngtcp2_path_storage_zero(&ps);
453 
454   auto nwrite = ngtcp2_conn_write_connection_close(
455       quic.conn, &ps.path, nullptr, buf.data(), buf.size(), &quic.last_error,
456       timestamp(worker->loop));
457 
458   if (nwrite <= 0) {
459     return;
460   }
461 
462   write_udp(reinterpret_cast<sockaddr *>(ps.path.remote.addr),
463             ps.path.remote.addrlen, buf.data(), nwrite, 0);
464 }
465 
quic_write_client_handshake(ngtcp2_crypto_level level,const uint8_t * data,size_t datalen)466 int Client::quic_write_client_handshake(ngtcp2_crypto_level level,
467                                         const uint8_t *data, size_t datalen) {
468   int rv;
469 
470   assert(level < 2);
471 
472   rv = ngtcp2_conn_submit_crypto_data(quic.conn, level, data, datalen);
473   if (rv != 0) {
474     std::cerr << "ngtcp2_conn_submit_crypto_data: " << ngtcp2_strerror(rv)
475               << std::endl;
476     return -1;
477   }
478 
479   return 0;
480 }
481 
quic_pkt_timeout_cb(struct ev_loop * loop,ev_timer * w,int revents)482 void quic_pkt_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
483   auto c = static_cast<Client *>(w->data);
484 
485   if (c->quic_pkt_timeout() != 0) {
486     c->fail();
487     c->worker->free_client(c);
488     delete c;
489     return;
490   }
491 }
492 
quic_pkt_timeout()493 int Client::quic_pkt_timeout() {
494   int rv;
495   auto now = timestamp(worker->loop);
496 
497   rv = ngtcp2_conn_handle_expiry(quic.conn, now);
498   if (rv != 0) {
499     ngtcp2_connection_close_error_set_transport_error_liberr(&quic.last_error,
500                                                              rv, nullptr, 0);
501     return -1;
502   }
503 
504   return write_quic();
505 }
506 
quic_restart_pkt_timer()507 void Client::quic_restart_pkt_timer() {
508   auto expiry = ngtcp2_conn_get_expiry(quic.conn);
509   auto now = timestamp(worker->loop);
510   auto t = expiry > now ? static_cast<ev_tstamp>(expiry - now) / NGTCP2_SECONDS
511                         : 1e-9;
512   quic.pkt_timer.repeat = t;
513   ev_timer_again(worker->loop, &quic.pkt_timer);
514 }
515 
read_quic()516 int Client::read_quic() {
517   std::array<uint8_t, 65536> buf;
518   sockaddr_union su;
519   socklen_t addrlen = sizeof(su);
520   int rv;
521   size_t pktcnt = 0;
522   ngtcp2_pkt_info pi{};
523 
524   for (;;) {
525     auto nread =
526         recvfrom(fd, buf.data(), buf.size(), MSG_DONTWAIT, &su.sa, &addrlen);
527     if (nread == -1) {
528       return 0;
529     }
530 
531     assert(quic.conn);
532 
533     ++worker->stats.udp_dgram_recv;
534 
535     auto path = ngtcp2_path{
536         {
537             &local_addr.su.sa,
538             static_cast<socklen_t>(local_addr.len),
539         },
540         {
541             &su.sa,
542             addrlen,
543         },
544     };
545 
546     rv = ngtcp2_conn_read_pkt(quic.conn, &path, &pi, buf.data(), nread,
547                               timestamp(worker->loop));
548     if (rv != 0) {
549       std::cerr << "ngtcp2_conn_read_pkt: " << ngtcp2_strerror(rv) << std::endl;
550 
551       if (!quic.last_error.error_code) {
552         if (rv == NGTCP2_ERR_CRYPTO) {
553           ngtcp2_connection_close_error_set_transport_error_tls_alert(
554               &quic.last_error, ngtcp2_conn_get_tls_alert(quic.conn), nullptr,
555               0);
556         } else {
557           ngtcp2_connection_close_error_set_transport_error_liberr(
558               &quic.last_error, rv, nullptr, 0);
559         }
560       }
561 
562       return -1;
563     }
564 
565     if (++pktcnt == 100) {
566       break;
567     }
568   }
569 
570   return 0;
571 }
572 
write_quic()573 int Client::write_quic() {
574   int rv;
575 
576   ev_io_stop(worker->loop, &wev);
577 
578   if (quic.close_requested) {
579     return -1;
580   }
581 
582   if (quic.tx.send_blocked) {
583     rv = send_blocked_packet();
584     if (rv != 0) {
585       return -1;
586     }
587 
588     if (quic.tx.send_blocked) {
589       return 0;
590     }
591   }
592 
593   std::array<nghttp3_vec, 16> vec;
594   size_t pktcnt = 0;
595   auto max_udp_payload_size =
596       ngtcp2_conn_get_max_tx_udp_payload_size(quic.conn);
597 #ifdef UDP_SEGMENT
598   auto path_max_udp_payload_size =
599       ngtcp2_conn_get_path_max_tx_udp_payload_size(quic.conn);
600 #endif // UDP_SEGMENT
601   auto max_pktcnt =
602       ngtcp2_conn_get_send_quantum(quic.conn) / max_udp_payload_size;
603   uint8_t *bufpos = quic.tx.data.get();
604   ngtcp2_path_storage ps;
605   size_t gso_size = 0;
606 
607   ngtcp2_path_storage_zero(&ps);
608 
609   auto s = static_cast<Http3Session *>(session.get());
610 
611   for (;;) {
612     int64_t stream_id = -1;
613     int fin = 0;
614     ssize_t sveccnt = 0;
615 
616     if (session && ngtcp2_conn_get_max_data_left(quic.conn)) {
617       sveccnt = s->write_stream(stream_id, fin, vec.data(), vec.size());
618       if (sveccnt == -1) {
619         return -1;
620       }
621     }
622 
623     ngtcp2_ssize ndatalen;
624     auto v = vec.data();
625     auto vcnt = static_cast<size_t>(sveccnt);
626 
627     uint32_t flags = NGTCP2_WRITE_STREAM_FLAG_MORE;
628     if (fin) {
629       flags |= NGTCP2_WRITE_STREAM_FLAG_FIN;
630     }
631 
632     auto nwrite = ngtcp2_conn_writev_stream(
633         quic.conn, &ps.path, nullptr, bufpos, max_udp_payload_size, &ndatalen,
634         flags, stream_id, reinterpret_cast<const ngtcp2_vec *>(v), vcnt,
635         timestamp(worker->loop));
636     if (nwrite < 0) {
637       switch (nwrite) {
638       case NGTCP2_ERR_STREAM_DATA_BLOCKED:
639         assert(ndatalen == -1);
640         s->block_stream(stream_id);
641         continue;
642       case NGTCP2_ERR_STREAM_SHUT_WR:
643         assert(ndatalen == -1);
644         s->shutdown_stream_write(stream_id);
645         continue;
646       case NGTCP2_ERR_WRITE_MORE:
647         assert(ndatalen >= 0);
648         if (s->add_write_offset(stream_id, ndatalen) != 0) {
649           return -1;
650         }
651         continue;
652       }
653 
654       ngtcp2_connection_close_error_set_transport_error_liberr(
655           &quic.last_error, nwrite, nullptr, 0);
656       return -1;
657     } else if (ndatalen >= 0 && s->add_write_offset(stream_id, ndatalen) != 0) {
658       return -1;
659     }
660 
661     quic_restart_pkt_timer();
662 
663     if (nwrite == 0) {
664       if (bufpos - quic.tx.data.get()) {
665         auto data = quic.tx.data.get();
666         auto datalen = bufpos - quic.tx.data.get();
667         rv = write_udp(ps.path.remote.addr, ps.path.remote.addrlen, data,
668                        datalen, gso_size);
669         if (rv == 1) {
670           on_send_blocked(ps.path.remote, data, datalen, gso_size);
671           signal_write();
672           return 0;
673         }
674       }
675       return 0;
676     }
677 
678     bufpos += nwrite;
679 
680 #ifdef UDP_SEGMENT
681     if (worker->config->no_udp_gso) {
682 #endif // UDP_SEGMENT
683       auto data = quic.tx.data.get();
684       auto datalen = bufpos - quic.tx.data.get();
685       rv = write_udp(ps.path.remote.addr, ps.path.remote.addrlen, data, datalen,
686                      0);
687       if (rv == 1) {
688         on_send_blocked(ps.path.remote, data, datalen, 0);
689         signal_write();
690         return 0;
691       }
692 
693       if (++pktcnt == max_pktcnt) {
694         signal_write();
695         return 0;
696       }
697 
698       bufpos = quic.tx.data.get();
699 
700 #ifdef UDP_SEGMENT
701       continue;
702     }
703 #endif // UDP_SEGMENT
704 
705 #ifdef UDP_SEGMENT
706     if (pktcnt == 0) {
707       gso_size = nwrite;
708     } else if (static_cast<size_t>(nwrite) > gso_size ||
709                (gso_size > path_max_udp_payload_size &&
710                 static_cast<size_t>(nwrite) != gso_size)) {
711       auto data = quic.tx.data.get();
712       auto datalen = bufpos - quic.tx.data.get() - nwrite;
713       rv = write_udp(ps.path.remote.addr, ps.path.remote.addrlen, data, datalen,
714                      gso_size);
715       if (rv == 1) {
716         on_send_blocked(ps.path.remote, data, datalen, gso_size);
717         on_send_blocked(ps.path.remote, bufpos - nwrite, nwrite, 0);
718       } else {
719         auto data = bufpos - nwrite;
720         rv = write_udp(ps.path.remote.addr, ps.path.remote.addrlen, data,
721                        nwrite, 0);
722         if (rv == 1) {
723           on_send_blocked(ps.path.remote, data, nwrite, 0);
724         }
725       }
726 
727       signal_write();
728       return 0;
729     }
730 
731     // Assume that the path does not change.
732     if (++pktcnt == max_pktcnt || static_cast<size_t>(nwrite) < gso_size) {
733       auto data = quic.tx.data.get();
734       auto datalen = bufpos - quic.tx.data.get();
735       rv = write_udp(ps.path.remote.addr, ps.path.remote.addrlen, data, datalen,
736                      gso_size);
737       if (rv == 1) {
738         on_send_blocked(ps.path.remote, data, datalen, gso_size);
739       }
740       signal_write();
741       return 0;
742     }
743 #endif // UDP_SEGMENT
744   }
745 }
746 
on_send_blocked(const ngtcp2_addr & remote_addr,const uint8_t * data,size_t datalen,size_t gso_size)747 void Client::on_send_blocked(const ngtcp2_addr &remote_addr,
748                              const uint8_t *data, size_t datalen,
749                              size_t gso_size) {
750   assert(quic.tx.num_blocked || !quic.tx.send_blocked);
751   assert(quic.tx.num_blocked < 2);
752 
753   quic.tx.send_blocked = true;
754 
755   auto &p = quic.tx.blocked[quic.tx.num_blocked++];
756 
757   memcpy(&p.remote_addr.su, remote_addr.addr, remote_addr.addrlen);
758 
759   p.remote_addr.len = remote_addr.addrlen;
760   p.data = data;
761   p.datalen = datalen;
762   p.gso_size = gso_size;
763 }
764 
send_blocked_packet()765 int Client::send_blocked_packet() {
766   int rv;
767 
768   assert(quic.tx.send_blocked);
769 
770   for (; quic.tx.num_blocked_sent < quic.tx.num_blocked;
771        ++quic.tx.num_blocked_sent) {
772     auto &p = quic.tx.blocked[quic.tx.num_blocked_sent];
773 
774     rv = write_udp(&p.remote_addr.su.sa, p.remote_addr.len, p.data, p.datalen,
775                    p.gso_size);
776     if (rv == 1) {
777       signal_write();
778 
779       return 0;
780     }
781   }
782 
783   quic.tx.send_blocked = false;
784   quic.tx.num_blocked = 0;
785   quic.tx.num_blocked_sent = 0;
786 
787   return 0;
788 }
789 
790 } // namespace h2load
791