1 // Copyright (C) 2018-2019, Cloudflare, Inc.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 // * Redistributions of source code must retain the above copyright notice,
9 // this list of conditions and the following disclaimer.
10 //
11 // * Redistributions in binary form must reproduce the above copyright
12 // notice, this list of conditions and the following disclaimer in the
13 // documentation and/or other materials provided with the distribution.
14 //
15 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16 // IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17 // THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27 //! Savoury implementation of the QUIC transport protocol and HTTP/3.
28 //!
29 //! [quiche] is an implementation of the QUIC transport protocol and HTTP/3 as
30 //! specified by the [IETF]. It provides a low level API for processing QUIC
31 //! packets and handling connection state. The application is responsible for
32 //! providing I/O (e.g. sockets handling) as well as an event loop with support
33 //! for timers.
34 //!
35 //! [quiche]: https://github.com/cloudflare/quiche/
36 //! [ietf]: https://quicwg.org/
37 //!
38 //! ## Connection setup
39 //!
40 //! The first step in establishing a QUIC connection using quiche is creating a
41 //! configuration object:
42 //!
43 //! ```
44 //! let config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
45 //! # Ok::<(), quiche::Error>(())
46 //! ```
47 //!
48 //! This is shared among multiple connections and can be used to configure a
49 //! QUIC endpoint.
50 //!
51 //! On the client-side the [`connect()`] utility function can be used to create
52 //! a new connection, while [`accept()`] is for servers:
53 //!
54 //! ```
55 //! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
56 //! # let server_name = "quic.tech";
57 //! # let scid = [0xba; 16];
58 //! // Client connection.
59 //! let conn = quiche::connect(Some(&server_name), &scid, &mut config)?;
60 //!
61 //! // Server connection.
62 //! let conn = quiche::accept(&scid, None, &mut config)?;
63 //! # Ok::<(), quiche::Error>(())
64 //! ```
65 //!
66 //! ## Handling incoming packets
67 //!
68 //! Using the connection's [`recv()`] method the application can process
69 //! incoming packets that belong to that connection from the network:
70 //!
71 //! ```no_run
72 //! # let mut buf = [0; 512];
73 //! # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
74 //! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
75 //! # let scid = [0xba; 16];
76 //! # let mut conn = quiche::accept(&scid, None, &mut config)?;
77 //! loop {
78 //! let read = socket.recv(&mut buf).unwrap();
79 //!
80 //! let read = match conn.recv(&mut buf[..read]) {
81 //! Ok(v) => v,
82 //!
83 //! Err(quiche::Error::Done) => {
84 //! // Done reading.
85 //! break;
86 //! },
87 //!
88 //! Err(e) => {
89 //! // An error occurred, handle it.
90 //! break;
91 //! },
92 //! };
93 //! }
94 //! # Ok::<(), quiche::Error>(())
95 //! ```
96 //!
97 //! ## Generating outgoing packets
98 //!
99 //! Outgoing packet are generated using the connection's [`send()`] method
100 //! instead:
101 //!
102 //! ```no_run
103 //! # let mut out = [0; 512];
104 //! # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
105 //! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
106 //! # let scid = [0xba; 16];
107 //! # let mut conn = quiche::accept(&scid, None, &mut config)?;
108 //! loop {
109 //! let write = match conn.send(&mut out) {
110 //! Ok(v) => v,
111 //!
112 //! Err(quiche::Error::Done) => {
113 //! // Done writing.
114 //! break;
115 //! },
116 //!
117 //! Err(e) => {
118 //! // An error occurred, handle it.
119 //! break;
120 //! },
121 //! };
122 //!
123 //! socket.send(&out[..write]).unwrap();
124 //! }
125 //! # Ok::<(), quiche::Error>(())
126 //! ```
127 //!
128 //! When packets are sent, the application is responsible for maintaining a
129 //! timer to react to time-based connection events. The timer expiration can be
130 //! obtained using the connection's [`timeout()`] method.
131 //!
132 //! ```
133 //! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
134 //! # let scid = [0xba; 16];
135 //! # let mut conn = quiche::accept(&scid, None, &mut config)?;
136 //! let timeout = conn.timeout();
137 //! # Ok::<(), quiche::Error>(())
138 //! ```
139 //!
140 //! The application is responsible for providing a timer implementation, which
141 //! can be specific to the operating system or networking framework used. When
142 //! a timer expires, the connection's [`on_timeout()`] method should be called,
143 //! after which additional packets might need to be sent on the network:
144 //!
145 //! ```no_run
146 //! # let mut out = [0; 512];
147 //! # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
148 //! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
149 //! # let scid = [0xba; 16];
150 //! # let mut conn = quiche::accept(&scid, None, &mut config)?;
151 //! // Timeout expired, handle it.
152 //! conn.on_timeout();
153 //!
154 //! // Send more packets as needed after timeout.
155 //! loop {
156 //! let write = match conn.send(&mut out) {
157 //! Ok(v) => v,
158 //!
159 //! Err(quiche::Error::Done) => {
160 //! // Done writing.
161 //! break;
162 //! },
163 //!
164 //! Err(e) => {
165 //! // An error occurred, handle it.
166 //! break;
167 //! },
168 //! };
169 //!
170 //! socket.send(&out[..write]).unwrap();
171 //! }
172 //! # Ok::<(), quiche::Error>(())
173 //! ```
174 //!
175 //! ## Sending and receiving stream data
176 //!
177 //! After some back and forth, the connection will complete its handshake and
178 //! will be ready for sending or receiving application data.
179 //!
180 //! Data can be sent on a stream by using the [`stream_send()`] method:
181 //!
182 //! ```no_run
183 //! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
184 //! # let scid = [0xba; 16];
185 //! # let mut conn = quiche::accept(&scid, None, &mut config)?;
186 //! if conn.is_established() {
187 //! // Handshake completed, send some data on stream 0.
188 //! conn.stream_send(0, b"hello", true)?;
189 //! }
190 //! # Ok::<(), quiche::Error>(())
191 //! ```
192 //!
193 //! The application can check whether there are any readable streams by using
194 //! the connection's [`readable()`] method, which returns an iterator over all
195 //! the streams that have outstanding data to read.
196 //!
197 //! The [`stream_recv()`] method can then be used to retrieve the application
198 //! data from the readable stream:
199 //!
200 //! ```no_run
201 //! # let mut buf = [0; 512];
202 //! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
203 //! # let scid = [0xba; 16];
204 //! # let mut conn = quiche::accept(&scid, None, &mut config)?;
205 //! if conn.is_established() {
206 //! // Iterate over readable streams.
207 //! for stream_id in conn.readable() {
208 //! // Stream is readable, read until there's no more data.
209 //! while let Ok((read, fin)) = conn.stream_recv(stream_id, &mut buf) {
210 //! println!("Got {} bytes on stream {}", read, stream_id);
211 //! }
212 //! }
213 //! }
214 //! # Ok::<(), quiche::Error>(())
215 //! ```
216 //!
217 //! ## HTTP/3
218 //!
219 //! The quiche [HTTP/3 module] provides a high level API for sending and
220 //! receiving HTTP requests and responses on top of the QUIC transport protocol.
221 //!
222 //! [`connect()`]: fn.connect.html
223 //! [`accept()`]: fn.accept.html
224 //! [`recv()`]: struct.Connection.html#method.recv
225 //! [`send()`]: struct.Connection.html#method.send
226 //! [`timeout()`]: struct.Connection.html#method.timeout
227 //! [`on_timeout()`]: struct.Connection.html#method.on_timeout
228 //! [`stream_send()`]: struct.Connection.html#method.stream_send
229 //! [`readable()`]: struct.Connection.html#method.readable
230 //! [`stream_recv()`]: struct.Connection.html#method.stream_recv
231 //! [HTTP/3 module]: h3/index.html
232 //!
233 //! ## Congestion Control
234 //!
235 //! The quiche library provides a high-level API for configuring which
236 //! congestion control algorithm to use throughout the QUIC connection.
237 //!
238 //! When a QUIC connection is created, the application can optionally choose
239 //! which CC algorithm to use. See [`CongestionControlAlgorithm`] for currently
240 //! available congestion control algorithms.
241 //!
242 //! For example:
243 //!
244 //! ```
245 //! let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
246 //! config.set_cc_algorithm(quiche::CongestionControlAlgorithm::Reno);
247 //! ```
248 //!
249 //! Alternatively, you can configure the congestion control algorithm to use
250 //! by its name.
251 //!
252 //! ```
253 //! let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
254 //! config.set_cc_algorithm_name("reno").unwrap();
255 //! ```
256 //!
257 //! Note that the CC algorithm should be configured before calling [`connect()`]
258 //! or [`accept()`]. Otherwise the connection will use a default CC algorithm.
259 //!
260 //! [`CongestionControlAlgorithm`]: enum.CongestionControlAlgorithm.html
261
262 #![allow(improper_ctypes)]
263 #![warn(missing_docs)]
264
265 #[macro_use]
266 extern crate log;
267
268 use std::cmp;
269 use std::time;
270
271 use std::pin::Pin;
272 use std::str::FromStr;
273
274 /// The current QUIC wire version.
275 pub const PROTOCOL_VERSION: u32 = PROTOCOL_VERSION_DRAFT29;
276
277 /// Supported QUIC versions.
278 ///
279 /// Note that the older ones might not be fully supported.
280 const PROTOCOL_VERSION_DRAFT27: u32 = 0xff00_001b;
281 const PROTOCOL_VERSION_DRAFT28: u32 = 0xff00_001c;
282 const PROTOCOL_VERSION_DRAFT29: u32 = 0xff00_001d;
283
284 /// The maximum length of a connection ID.
285 pub const MAX_CONN_ID_LEN: usize = crate::packet::MAX_CID_LEN as usize;
286
287 /// The minimum length of Initial packets sent by a client.
288 pub const MIN_CLIENT_INITIAL_LEN: usize = 1200;
289
290 #[cfg(not(feature = "fuzzing"))]
291 const PAYLOAD_MIN_LEN: usize = 4;
292
293 #[cfg(feature = "fuzzing")]
294 // Due to the fact that in fuzzing mode we use a zero-length AEAD tag (which
295 // would normally be 16 bytes), we need to adjust the minimum payload size to
296 // account for that.
297 const PAYLOAD_MIN_LEN: usize = 20;
298
299 const MAX_AMPLIFICATION_FACTOR: usize = 3;
300
301 // The maximum number of tracked packet number ranges that need to be acked.
302 //
303 // This represents more or less how many ack blocks can fit in a typical packet.
304 const MAX_ACK_RANGES: usize = 68;
305
306 // The highest possible stream ID allowed.
307 const MAX_STREAM_ID: u64 = 1 << 60;
308
309 // The default length of DATAGRAM queues.
310 const DEFAULT_MAX_DGRAM_QUEUE_LEN: usize = 0;
311
312 // The DATAGRAM standard recommends either none or 65536 as maximum DATAGRAM
313 // frames size. We enforce the recommendation for forward compatibility.
314 const MAX_DGRAM_FRAME_SIZE: u64 = 65536;
315
316 /// A specialized [`Result`] type for quiche operations.
317 ///
318 /// This type is used throughout quiche's public API for any operation that
319 /// can produce an error.
320 ///
321 /// [`Result`]: https://doc.rust-lang.org/std/result/enum.Result.html
322 pub type Result<T> = std::result::Result<T, Error>;
323
324 /// A QUIC error.
325 #[derive(Clone, Copy, Debug, PartialEq)]
326 #[repr(C)]
327 pub enum Error {
328 /// There is no more work to do.
329 Done = -1,
330
331 /// The provided buffer is too short.
332 BufferTooShort = -2,
333
334 /// The provided packet cannot be parsed because its version is unknown.
335 UnknownVersion = -3,
336
337 /// The provided packet cannot be parsed because it contains an invalid
338 /// frame.
339 InvalidFrame = -4,
340
341 /// The provided packet cannot be parsed.
342 InvalidPacket = -5,
343
344 /// The operation cannot be completed because the connection is in an
345 /// invalid state.
346 InvalidState = -6,
347
348 /// The operation cannot be completed because the stream is in an
349 /// invalid state.
350 InvalidStreamState = -7,
351
352 /// The peer's transport params cannot be parsed.
353 InvalidTransportParam = -8,
354
355 /// A cryptographic operation failed.
356 CryptoFail = -9,
357
358 /// The TLS handshake failed.
359 TlsFail = -10,
360
361 /// The peer violated the local flow control limits.
362 FlowControl = -11,
363
364 /// The peer violated the local stream limits.
365 StreamLimit = -12,
366
367 /// The received data exceeds the stream's final size.
368 FinalSize = -13,
369
370 /// Error in congestion control.
371 CongestionControl = -14,
372 }
373
374 impl Error {
to_wire(self) -> u64375 fn to_wire(self) -> u64 {
376 match self {
377 Error::Done => 0x0,
378 Error::InvalidFrame => 0x7,
379 Error::InvalidStreamState => 0x5,
380 Error::InvalidTransportParam => 0x8,
381 Error::FlowControl => 0x3,
382 Error::StreamLimit => 0x4,
383 Error::FinalSize => 0x6,
384 _ => 0xa,
385 }
386 }
387
to_c(self) -> libc::ssize_t388 fn to_c(self) -> libc::ssize_t {
389 self as _
390 }
391 }
392
393 impl std::fmt::Display for Error {
fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result394 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
395 write!(f, "{:?}", self)
396 }
397 }
398
399 impl std::error::Error for Error {
source(&self) -> Option<&(dyn std::error::Error + 'static)>400 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
401 None
402 }
403 }
404
405 impl std::convert::From<octets::BufferTooShortError> for Error {
from(_err: octets::BufferTooShortError) -> Self406 fn from(_err: octets::BufferTooShortError) -> Self {
407 Error::BufferTooShort
408 }
409 }
410
411 /// The stream's side to shutdown.
412 ///
413 /// This should be used when calling [`stream_shutdown()`].
414 ///
415 /// [`stream_shutdown()`]: struct.Connection.html#method.stream_shutdown
416 #[repr(C)]
417 pub enum Shutdown {
418 /// Stop receiving stream data.
419 Read = 0,
420
421 /// Stop sending stream data.
422 Write = 1,
423 }
424
425 /// Stores configuration shared between multiple connections.
426 pub struct Config {
427 local_transport_params: TransportParams,
428
429 version: u32,
430
431 tls_ctx: tls::Context,
432
433 application_protos: Vec<Vec<u8>>,
434
435 grease: bool,
436
437 cc_algorithm: CongestionControlAlgorithm,
438
439 hystart: bool,
440
441 dgram_recv_max_queue_len: usize,
442 dgram_send_max_queue_len: usize,
443 }
444
445 impl Config {
446 /// Creates a config object with the given version.
447 ///
448 /// ## Examples:
449 ///
450 /// ```
451 /// let config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
452 /// # Ok::<(), quiche::Error>(())
453 /// ```
new(version: u32) -> Result<Config>454 pub fn new(version: u32) -> Result<Config> {
455 let tls_ctx = tls::Context::new()?;
456
457 Ok(Config {
458 local_transport_params: TransportParams::default(),
459 version,
460 tls_ctx,
461 application_protos: Vec::new(),
462 grease: true,
463 cc_algorithm: CongestionControlAlgorithm::CUBIC,
464 hystart: true,
465
466 dgram_recv_max_queue_len: DEFAULT_MAX_DGRAM_QUEUE_LEN,
467 dgram_send_max_queue_len: DEFAULT_MAX_DGRAM_QUEUE_LEN,
468 })
469 }
470
471 /// Configures the given certificate chain.
472 ///
473 /// The content of `file` is parsed as a PEM-encoded leaf certificate,
474 /// followed by optional intermediate certificates.
475 ///
476 /// ## Examples:
477 ///
478 /// ```no_run
479 /// # let mut config = quiche::Config::new(0xbabababa)?;
480 /// config.load_cert_chain_from_pem_file("/path/to/cert.pem")?;
481 /// # Ok::<(), quiche::Error>(())
482 /// ```
load_cert_chain_from_pem_file(&mut self, file: &str) -> Result<()>483 pub fn load_cert_chain_from_pem_file(&mut self, file: &str) -> Result<()> {
484 self.tls_ctx.use_certificate_chain_file(file)
485 }
486
487 /// Configures the given private key.
488 ///
489 /// The content of `file` is parsed as a PEM-encoded private key.
490 ///
491 /// ## Examples:
492 ///
493 /// ```no_run
494 /// # let mut config = quiche::Config::new(0xbabababa)?;
495 /// config.load_priv_key_from_pem_file("/path/to/key.pem")?;
496 /// # Ok::<(), quiche::Error>(())
497 /// ```
load_priv_key_from_pem_file(&mut self, file: &str) -> Result<()>498 pub fn load_priv_key_from_pem_file(&mut self, file: &str) -> Result<()> {
499 self.tls_ctx.use_privkey_file(file)
500 }
501
502 /// Specifies a file where trusted CA certificates are stored for the
503 /// purposes of certificate verification.
504 ///
505 /// The content of `file` is parsed as a PEM-encoded certificate chain.
506 ///
507 /// ## Examples:
508 ///
509 /// ```no_run
510 /// # let mut config = quiche::Config::new(0xbabababa)?;
511 /// config.load_verify_locations_from_file("/path/to/cert.pem")?;
512 /// # Ok::<(), quiche::Error>(())
513 /// ```
load_verify_locations_from_file(&mut self, file: &str) -> Result<()>514 pub fn load_verify_locations_from_file(&mut self, file: &str) -> Result<()> {
515 self.tls_ctx.load_verify_locations_from_file(file)
516 }
517
518 /// Specifies a directory where trusted CA certificates are stored for the
519 /// purposes of certificate verification.
520 ///
521 /// The content of `dir` a set of PEM-encoded certificate chains.
522 ///
523 /// ## Examples:
524 ///
525 /// ```no_run
526 /// # let mut config = quiche::Config::new(0xbabababa)?;
527 /// config.load_verify_locations_from_directory("/path/to/certs")?;
528 /// # Ok::<(), quiche::Error>(())
529 /// ```
load_verify_locations_from_directory( &mut self, dir: &str, ) -> Result<()>530 pub fn load_verify_locations_from_directory(
531 &mut self, dir: &str,
532 ) -> Result<()> {
533 self.tls_ctx.load_verify_locations_from_directory(dir)
534 }
535
536 /// Configures whether to verify the peer's certificate.
537 ///
538 /// The default value is `true` for client connections, and `false` for
539 /// server ones.
verify_peer(&mut self, verify: bool)540 pub fn verify_peer(&mut self, verify: bool) {
541 self.tls_ctx.set_verify(verify);
542 }
543
544 /// Configures whether to send GREASE values.
545 ///
546 /// The default value is `true`.
grease(&mut self, grease: bool)547 pub fn grease(&mut self, grease: bool) {
548 self.grease = grease;
549 }
550
551 /// Enables logging of secrets.
552 ///
553 /// When logging is enabled, the [`set_keylog()`] method must be called on
554 /// the connection for its cryptographic secrets to be logged in the
555 /// [keylog] format to the specified writer.
556 ///
557 /// [`set_keylog()`]: struct.Connection.html#method.set_keylog
558 /// [keylog]: https://developer.mozilla.org/en-US/docs/Mozilla/Projects/NSS/Key_Log_Format
log_keys(&mut self)559 pub fn log_keys(&mut self) {
560 self.tls_ctx.enable_keylog();
561 }
562
563 /// Enables sending or receiving early data.
enable_early_data(&mut self)564 pub fn enable_early_data(&mut self) {
565 self.tls_ctx.set_early_data_enabled(true);
566 }
567
568 /// Configures the list of supported application protocols.
569 ///
570 /// The list of protocols `protos` must be in wire-format (i.e. a series
571 /// of non-empty, 8-bit length-prefixed strings).
572 ///
573 /// On the client this configures the list of protocols to send to the
574 /// server as part of the ALPN extension.
575 ///
576 /// On the server this configures the list of supported protocols to match
577 /// against the client-supplied list.
578 ///
579 /// Applications must set a value, but no default is provided.
580 ///
581 /// ## Examples:
582 ///
583 /// ```
584 /// # let mut config = quiche::Config::new(0xbabababa)?;
585 /// config.set_application_protos(b"\x08http/1.1\x08http/0.9")?;
586 /// # Ok::<(), quiche::Error>(())
587 /// ```
set_application_protos(&mut self, protos: &[u8]) -> Result<()>588 pub fn set_application_protos(&mut self, protos: &[u8]) -> Result<()> {
589 let mut b = octets::Octets::with_slice(&protos);
590
591 let mut protos_list = Vec::new();
592
593 while let Ok(proto) = b.get_bytes_with_u8_length() {
594 protos_list.push(proto.to_vec());
595 }
596
597 self.application_protos = protos_list;
598
599 self.tls_ctx.set_alpn(&self.application_protos)
600 }
601
602 /// Sets the `max_idle_timeout` transport parameter.
603 ///
604 /// The default value is infinite, that is, no timeout is used.
set_max_idle_timeout(&mut self, v: u64)605 pub fn set_max_idle_timeout(&mut self, v: u64) {
606 self.local_transport_params.max_idle_timeout = v;
607 }
608
609 /// Sets the `max_udp_payload_size transport` parameter.
610 ///
611 /// The default value is `65527`.
set_max_udp_payload_size(&mut self, v: u64)612 pub fn set_max_udp_payload_size(&mut self, v: u64) {
613 self.local_transport_params.max_udp_payload_size = v;
614 }
615
616 /// Sets the `initial_max_data` transport parameter.
617 ///
618 /// When set to a non-zero value quiche will only allow at most `v` bytes
619 /// of incoming stream data to be buffered for the whole connection (that
620 /// is, data that is not yet read by the application) and will allow more
621 /// data to be received as the buffer is consumed by the application.
622 ///
623 /// The default value is `0`.
set_initial_max_data(&mut self, v: u64)624 pub fn set_initial_max_data(&mut self, v: u64) {
625 self.local_transport_params.initial_max_data = v;
626 }
627
628 /// Sets the `initial_max_stream_data_bidi_local` transport parameter.
629 ///
630 /// When set to a non-zero value quiche will only allow at most `v` bytes
631 /// of incoming stream data to be buffered for each locally-initiated
632 /// bidirectional stream (that is, data that is not yet read by the
633 /// application) and will allow more data to be received as the buffer is
634 /// consumed by the application.
635 ///
636 /// The default value is `0`.
set_initial_max_stream_data_bidi_local(&mut self, v: u64)637 pub fn set_initial_max_stream_data_bidi_local(&mut self, v: u64) {
638 self.local_transport_params
639 .initial_max_stream_data_bidi_local = v;
640 }
641
642 /// Sets the `initial_max_stream_data_bidi_remote` transport parameter.
643 ///
644 /// When set to a non-zero value quiche will only allow at most `v` bytes
645 /// of incoming stream data to be buffered for each remotely-initiated
646 /// bidirectional stream (that is, data that is not yet read by the
647 /// application) and will allow more data to be received as the buffer is
648 /// consumed by the application.
649 ///
650 /// The default value is `0`.
set_initial_max_stream_data_bidi_remote(&mut self, v: u64)651 pub fn set_initial_max_stream_data_bidi_remote(&mut self, v: u64) {
652 self.local_transport_params
653 .initial_max_stream_data_bidi_remote = v;
654 }
655
656 /// Sets the `initial_max_stream_data_uni` transport parameter.
657 ///
658 /// When set to a non-zero value quiche will only allow at most `v` bytes
659 /// of incoming stream data to be buffered for each unidirectional stream
660 /// (that is, data that is not yet read by the application) and will allow
661 /// more data to be received as the buffer is consumed by the application.
662 ///
663 /// The default value is `0`.
set_initial_max_stream_data_uni(&mut self, v: u64)664 pub fn set_initial_max_stream_data_uni(&mut self, v: u64) {
665 self.local_transport_params.initial_max_stream_data_uni = v;
666 }
667
668 /// Sets the `initial_max_streams_bidi` transport parameter.
669 ///
670 /// When set to a non-zero value quiche will only allow `v` number of
671 /// concurrent remotely-initiated bidirectional streams to be open at any
672 /// given time and will increase the limit automatically as streams are
673 /// completed.
674 ///
675 /// A bidirectional stream is considered completed when all incoming data
676 /// has been read by the application (up to the `fin` offset) or the
677 /// stream's read direction has been shutdown, and all outgoing data has
678 /// been acked by the peer (up to the `fin` offset) or the stream's write
679 /// direction has been shutdown.
680 ///
681 /// The default value is `0`.
set_initial_max_streams_bidi(&mut self, v: u64)682 pub fn set_initial_max_streams_bidi(&mut self, v: u64) {
683 self.local_transport_params.initial_max_streams_bidi = v;
684 }
685
686 /// Sets the `initial_max_streams_uni` transport parameter.
687 ///
688 /// When set to a non-zero value quiche will only allow `v` number of
689 /// concurrent remotely-initiated unidirectional streams to be open at any
690 /// given time and will increase the limit automatically as streams are
691 /// completed.
692 ///
693 /// A unidirectional stream is considered completed when all incoming data
694 /// has been read by the application (up to the `fin` offset) or the
695 /// stream's read direction has been shutdown.
696 ///
697 /// The default value is `0`.
set_initial_max_streams_uni(&mut self, v: u64)698 pub fn set_initial_max_streams_uni(&mut self, v: u64) {
699 self.local_transport_params.initial_max_streams_uni = v;
700 }
701
702 /// Sets the `ack_delay_exponent` transport parameter.
703 ///
704 /// The default value is `3`.
set_ack_delay_exponent(&mut self, v: u64)705 pub fn set_ack_delay_exponent(&mut self, v: u64) {
706 self.local_transport_params.ack_delay_exponent = v;
707 }
708
709 /// Sets the `max_ack_delay` transport parameter.
710 ///
711 /// The default value is `25`.
set_max_ack_delay(&mut self, v: u64)712 pub fn set_max_ack_delay(&mut self, v: u64) {
713 self.local_transport_params.max_ack_delay = v;
714 }
715
716 /// Sets the `disable_active_migration` transport parameter.
717 ///
718 /// The default value is `false`.
set_disable_active_migration(&mut self, v: bool)719 pub fn set_disable_active_migration(&mut self, v: bool) {
720 self.local_transport_params.disable_active_migration = v;
721 }
722
723 /// Sets the congestion control algorithm used by string.
724 ///
725 /// The default value is `reno`. On error `Error::CongestionControl`
726 /// will be returned.
727 ///
728 /// ## Examples:
729 ///
730 /// ```
731 /// # let mut config = quiche::Config::new(0xbabababa)?;
732 /// config.set_cc_algorithm_name("reno");
733 /// # Ok::<(), quiche::Error>(())
734 /// ```
set_cc_algorithm_name(&mut self, name: &str) -> Result<()>735 pub fn set_cc_algorithm_name(&mut self, name: &str) -> Result<()> {
736 self.cc_algorithm = CongestionControlAlgorithm::from_str(name)?;
737
738 Ok(())
739 }
740
741 /// Sets the congestion control algorithm used.
742 ///
743 /// The default value is `CongestionControlAlgorithm::CUBIC`.
set_cc_algorithm(&mut self, algo: CongestionControlAlgorithm)744 pub fn set_cc_algorithm(&mut self, algo: CongestionControlAlgorithm) {
745 self.cc_algorithm = algo;
746 }
747
748 /// Configures whether to enable HyStart++.
749 ///
750 /// The default value is `true`.
enable_hystart(&mut self, v: bool)751 pub fn enable_hystart(&mut self, v: bool) {
752 self.hystart = v;
753 }
754
755 /// Configures whether to enable receiving DATAGRAM frames.
756 ///
757 /// When enabled, the `max_datagram_frame_size` transport parameter is set
758 /// to 65536 as recommended by draft-ietf-quic-datagram-01.
759 ///
760 /// The default is `false`.
enable_dgram( &mut self, enabled: bool, recv_queue_len: usize, send_queue_len: usize, )761 pub fn enable_dgram(
762 &mut self, enabled: bool, recv_queue_len: usize, send_queue_len: usize,
763 ) {
764 self.local_transport_params.max_datagram_frame_size = if enabled {
765 Some(MAX_DGRAM_FRAME_SIZE)
766 } else {
767 None
768 };
769 self.dgram_recv_max_queue_len = recv_queue_len;
770 self.dgram_send_max_queue_len = send_queue_len;
771 }
772 }
773
774 /// A QUIC connection.
775 pub struct Connection {
776 /// QUIC wire version used for the connection.
777 version: u32,
778
779 /// Peer's connection ID.
780 dcid: Vec<u8>,
781
782 /// Local connection ID.
783 scid: Vec<u8>,
784
785 /// Unique opaque ID for the connection that can be used for logging.
786 trace_id: String,
787
788 /// Packet number spaces.
789 pkt_num_spaces: [packet::PktNumSpace; packet::EPOCH_COUNT],
790
791 /// Peer's transport parameters.
792 peer_transport_params: TransportParams,
793
794 /// Local transport parameters.
795 local_transport_params: TransportParams,
796
797 /// TLS handshake state.
798 handshake: tls::Handshake,
799
800 /// Loss recovery and congestion control state.
801 recovery: recovery::Recovery,
802
803 /// List of supported application protocols.
804 application_protos: Vec<Vec<u8>>,
805
806 /// Total number of received packets.
807 recv_count: usize,
808
809 /// Total number of sent packets.
810 sent_count: usize,
811
812 /// Total number of bytes received from the peer.
813 rx_data: u64,
814
815 /// Local flow control limit for the connection.
816 max_rx_data: u64,
817
818 /// Updated local flow control limit for the connection. This is used to
819 /// trigger sending MAX_DATA frames after a certain threshold.
820 max_rx_data_next: u64,
821
822 /// Whether we send MAX_DATA frame.
823 almost_full: bool,
824
825 /// Total number of bytes sent to the peer.
826 tx_data: u64,
827
828 /// Peer's flow control limit for the connection.
829 max_tx_data: u64,
830
831 /// Total number of bytes the server can send before the peer's address
832 /// is verified.
833 max_send_bytes: usize,
834
835 /// Streams map, indexed by stream ID.
836 streams: stream::StreamMap,
837
838 /// Peer's original destination connection ID. Used by the client to
839 /// validate the server's transport parameter.
840 odcid: Option<Vec<u8>>,
841
842 /// Peer's retry source connection ID. Used by the client during stateless
843 /// retry to validate the server's transport parameter.
844 rscid: Option<Vec<u8>>,
845
846 /// Received address verification token.
847 token: Option<Vec<u8>>,
848
849 /// Error code to be sent to the peer in CONNECTION_CLOSE.
850 error: Option<u64>,
851
852 /// Error code to be sent to the peer in APPLICATION_CLOSE.
853 app_error: Option<u64>,
854
855 /// Error reason to be sent to the peer in APPLICATION_CLOSE.
856 app_reason: Vec<u8>,
857
858 /// Received path challenge.
859 challenge: Option<Vec<u8>>,
860
861 /// The connection-level limit at which send blocking occurred.
862 blocked_limit: Option<u64>,
863
864 /// Idle timeout expiration time.
865 idle_timer: Option<time::Instant>,
866
867 /// Draining timeout expiration time.
868 draining_timer: Option<time::Instant>,
869
870 /// Whether this is a server-side connection.
871 is_server: bool,
872
873 /// Whether the initial secrets have been derived.
874 derived_initial_secrets: bool,
875
876 /// Whether a version negotiation packet has already been received. Only
877 /// relevant for client connections.
878 did_version_negotiation: bool,
879
880 /// Whether stateless retry has been performed.
881 did_retry: bool,
882
883 /// Whether the peer already updated its connection ID.
884 got_peer_conn_id: bool,
885
886 /// Whether the peer's address has been verified.
887 verified_peer_address: bool,
888
889 /// Whether the peer has verified our address.
890 peer_verified_address: bool,
891
892 /// Whether the peer's transport parameters were parsed.
893 parsed_peer_transport_params: bool,
894
895 /// Whether the HANDSHAKE_DONE has been sent.
896 handshake_done_sent: bool,
897
898 /// Whether the connection handshake has been confirmed.
899 handshake_confirmed: bool,
900
901 /// Whether an ack-eliciting packet has been sent since last receiving a
902 /// packet.
903 ack_eliciting_sent: bool,
904
905 /// Whether the connection is closed.
906 closed: bool,
907
908 /// Whether to send GREASE.
909 grease: bool,
910
911 /// TLS keylog writer.
912 keylog: Option<Box<dyn std::io::Write + Send>>,
913
914 /// Qlog streaming output.
915 #[cfg(feature = "qlog")]
916 qlog_streamer: Option<qlog::QlogStreamer>,
917
918 /// Whether peer transport parameters were qlogged.
919 #[cfg(feature = "qlog")]
920 qlogged_peer_params: bool,
921
922 /// DATAGRAM queues.
923 dgram_recv_queue: dgram::DatagramQueue,
924 dgram_send_queue: dgram::DatagramQueue,
925 }
926
927 /// Creates a new server-side connection.
928 ///
929 /// The `scid` parameter represents the server's source connection ID, while
930 /// the optional `odcid` parameter represents the original destination ID the
931 /// client sent before a stateless retry (this is only required when using
932 /// the [`retry()`] function).
933 ///
934 /// [`retry()`]: fn.retry.html
935 ///
936 /// ## Examples:
937 ///
938 /// ```no_run
939 /// # let mut config = quiche::Config::new(0xbabababa)?;
940 /// # let scid = [0xba; 16];
941 /// let conn = quiche::accept(&scid, None, &mut config)?;
942 /// # Ok::<(), quiche::Error>(())
943 /// ```
accept( scid: &[u8], odcid: Option<&[u8]>, config: &mut Config, ) -> Result<Pin<Box<Connection>>>944 pub fn accept(
945 scid: &[u8], odcid: Option<&[u8]>, config: &mut Config,
946 ) -> Result<Pin<Box<Connection>>> {
947 let conn = Connection::new(scid, odcid, config, true)?;
948
949 Ok(conn)
950 }
951
952 /// Creates a new client-side connection.
953 ///
954 /// The `scid` parameter is used as the connection's source connection ID,
955 /// while the optional `server_name` parameter is used to verify the peer's
956 /// certificate.
957 ///
958 /// ## Examples:
959 ///
960 /// ```no_run
961 /// # let mut config = quiche::Config::new(0xbabababa)?;
962 /// # let server_name = "quic.tech";
963 /// # let scid = [0xba; 16];
964 /// let conn = quiche::connect(Some(&server_name), &scid, &mut config)?;
965 /// # Ok::<(), quiche::Error>(())
966 /// ```
connect( server_name: Option<&str>, scid: &[u8], config: &mut Config, ) -> Result<Pin<Box<Connection>>>967 pub fn connect(
968 server_name: Option<&str>, scid: &[u8], config: &mut Config,
969 ) -> Result<Pin<Box<Connection>>> {
970 let conn = Connection::new(scid, None, config, false)?;
971
972 if let Some(server_name) = server_name {
973 conn.handshake.set_host_name(server_name)?;
974 }
975
976 Ok(conn)
977 }
978
979 /// Writes a version negotiation packet.
980 ///
981 /// The `scid` and `dcid` parameters are the source connection ID and the
982 /// destination connection ID extracted from the received client's Initial
983 /// packet that advertises an unsupported version.
984 ///
985 /// ## Examples:
986 ///
987 /// ```no_run
988 /// # let mut buf = [0; 512];
989 /// # let mut out = [0; 512];
990 /// # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
991 /// let (len, src) = socket.recv_from(&mut buf).unwrap();
992 ///
993 /// let hdr =
994 /// quiche::Header::from_slice(&mut buf[..len], quiche::MAX_CONN_ID_LEN)?;
995 ///
996 /// if hdr.version != quiche::PROTOCOL_VERSION {
997 /// let len = quiche::negotiate_version(&hdr.scid, &hdr.dcid, &mut out)?;
998 /// socket.send_to(&out[..len], &src).unwrap();
999 /// }
1000 /// # Ok::<(), quiche::Error>(())
1001 /// ```
negotiate_version( scid: &[u8], dcid: &[u8], out: &mut [u8], ) -> Result<usize>1002 pub fn negotiate_version(
1003 scid: &[u8], dcid: &[u8], out: &mut [u8],
1004 ) -> Result<usize> {
1005 packet::negotiate_version(scid, dcid, out)
1006 }
1007
1008 /// Writes a stateless retry packet.
1009 ///
1010 /// The `scid` and `dcid` parameters are the source connection ID and the
1011 /// destination connection ID extracted from the received client's Initial
1012 /// packet, while `new_scid` is the server's new source connection ID and
1013 /// `token` is the address validation token the client needs to echo back.
1014 ///
1015 /// The application is responsible for generating the address validation
1016 /// token to be sent to the client, and verifying tokens sent back by the
1017 /// client. The generated token should include the `dcid` parameter, such
1018 /// that it can be later extracted from the token and passed to the
1019 /// [`accept()`] function as its `odcid` parameter.
1020 ///
1021 /// [`accept()`]: fn.accept.html
1022 ///
1023 /// ## Examples:
1024 ///
1025 /// ```no_run
1026 /// # let mut config = quiche::Config::new(0xbabababa)?;
1027 /// # let mut buf = [0; 512];
1028 /// # let mut out = [0; 512];
1029 /// # let scid = [0xba; 16];
1030 /// # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
1031 /// # fn mint_token(hdr: &quiche::Header, src: &std::net::SocketAddr) -> Vec<u8> {
1032 /// # vec![]
1033 /// # }
1034 /// # fn validate_token<'a>(src: &std::net::SocketAddr, token: &'a [u8]) -> Option<&'a [u8]> {
1035 /// # None
1036 /// # }
1037 /// let (len, src) = socket.recv_from(&mut buf).unwrap();
1038 ///
1039 /// let hdr = quiche::Header::from_slice(&mut buf[..len], quiche::MAX_CONN_ID_LEN)?;
1040 ///
1041 /// let token = hdr.token.as_ref().unwrap();
1042 ///
1043 /// // No token sent by client, create a new one.
1044 /// if token.is_empty() {
1045 /// let new_token = mint_token(&hdr, &src);
1046 ///
1047 /// let len = quiche::retry(
1048 /// &hdr.scid, &hdr.dcid, &scid, &new_token, hdr.version, &mut out,
1049 /// )?;
1050 ///
1051 /// socket.send_to(&out[..len], &src).unwrap();
1052 /// return Ok(());
1053 /// }
1054 ///
1055 /// // Client sent token, validate it.
1056 /// let odcid = validate_token(&src, token);
1057 ///
1058 /// if odcid == None {
1059 /// // Invalid address validation token.
1060 /// return Ok(());
1061 /// }
1062 ///
1063 /// let conn = quiche::accept(&scid, odcid, &mut config)?;
1064 /// # Ok::<(), quiche::Error>(())
1065 /// ```
retry( scid: &[u8], dcid: &[u8], new_scid: &[u8], token: &[u8], version: u32, out: &mut [u8], ) -> Result<usize>1066 pub fn retry(
1067 scid: &[u8], dcid: &[u8], new_scid: &[u8], token: &[u8], version: u32,
1068 out: &mut [u8],
1069 ) -> Result<usize> {
1070 packet::retry(scid, dcid, new_scid, token, version, out)
1071 }
1072
1073 /// Returns true if the given protocol version is supported.
version_is_supported(version: u32) -> bool1074 pub fn version_is_supported(version: u32) -> bool {
1075 matches!(
1076 version,
1077 PROTOCOL_VERSION_DRAFT27 |
1078 PROTOCOL_VERSION_DRAFT28 |
1079 PROTOCOL_VERSION_DRAFT29
1080 )
1081 }
1082
1083 /// Pushes a frame to the output packet if there is enough space.
1084 ///
1085 /// Returns `true` on success, `false` otherwise. In case of failure it means
1086 /// there is no room to add the frame in the packet. You may retry to add the
1087 /// frame later.
1088 macro_rules! push_frame_to_pkt {
1089 ($frames:expr, $frame:expr, $payload_len: expr, $left:expr) => {{
1090 if $frame.wire_len() <= $left {
1091 $payload_len += $frame.wire_len();
1092 $left -= $frame.wire_len();
1093
1094 $frames.push($frame);
1095
1096 true
1097 } else {
1098 false
1099 }
1100 }};
1101 }
1102
1103 /// Conditional qlog action.
1104 ///
1105 /// Executes the provided body if the qlog feature is enabled and quiche
1106 /// has been condifigured with a log writer.
1107 macro_rules! qlog_with {
1108 ($qlog_streamer:expr, $qlog_streamer_ref:ident, $body:block) => {{
1109 #[cfg(feature = "qlog")]
1110 {
1111 if let Some($qlog_streamer_ref) = &mut $qlog_streamer {
1112 $body
1113 }
1114 }
1115 }};
1116 }
1117
1118 impl Connection {
new( scid: &[u8], odcid: Option<&[u8]>, config: &mut Config, is_server: bool, ) -> Result<Pin<Box<Connection>>>1119 fn new(
1120 scid: &[u8], odcid: Option<&[u8]>, config: &mut Config, is_server: bool,
1121 ) -> Result<Pin<Box<Connection>>> {
1122 let tls = config.tls_ctx.new_handshake()?;
1123 Connection::with_tls(scid, odcid, config, tls, is_server)
1124 }
1125
with_tls( scid: &[u8], odcid: Option<&[u8]>, config: &mut Config, tls: tls::Handshake, is_server: bool, ) -> Result<Pin<Box<Connection>>>1126 fn with_tls(
1127 scid: &[u8], odcid: Option<&[u8]>, config: &mut Config,
1128 tls: tls::Handshake, is_server: bool,
1129 ) -> Result<Pin<Box<Connection>>> {
1130 let max_rx_data = config.local_transport_params.initial_max_data;
1131
1132 let scid_as_hex: Vec<String> =
1133 scid.iter().map(|b| format!("{:02x}", b)).collect();
1134
1135 let mut conn = Box::pin(Connection {
1136 version: config.version,
1137
1138 dcid: Vec::new(),
1139 scid: scid.to_vec(),
1140
1141 trace_id: scid_as_hex.join(""),
1142
1143 pkt_num_spaces: [
1144 packet::PktNumSpace::new(),
1145 packet::PktNumSpace::new(),
1146 packet::PktNumSpace::new(),
1147 ],
1148
1149 peer_transport_params: TransportParams::default(),
1150
1151 local_transport_params: config.local_transport_params.clone(),
1152
1153 handshake: tls,
1154
1155 recovery: recovery::Recovery::new(&config),
1156
1157 application_protos: config.application_protos.clone(),
1158
1159 recv_count: 0,
1160 sent_count: 0,
1161
1162 rx_data: 0,
1163 max_rx_data,
1164 max_rx_data_next: max_rx_data,
1165 almost_full: false,
1166
1167 tx_data: 0,
1168 max_tx_data: 0,
1169
1170 max_send_bytes: 0,
1171
1172 streams: stream::StreamMap::new(
1173 config.local_transport_params.initial_max_streams_bidi,
1174 config.local_transport_params.initial_max_streams_uni,
1175 ),
1176
1177 odcid: None,
1178
1179 rscid: None,
1180
1181 token: None,
1182
1183 error: None,
1184
1185 app_error: None,
1186 app_reason: Vec::new(),
1187
1188 challenge: None,
1189
1190 blocked_limit: None,
1191
1192 idle_timer: None,
1193
1194 draining_timer: None,
1195
1196 is_server,
1197
1198 derived_initial_secrets: false,
1199
1200 did_version_negotiation: false,
1201
1202 did_retry: false,
1203
1204 got_peer_conn_id: false,
1205
1206 // If we did stateless retry assume the peer's address is verified.
1207 verified_peer_address: odcid.is_some(),
1208
1209 // Assume clients validate the server's address implicitly.
1210 peer_verified_address: is_server,
1211
1212 parsed_peer_transport_params: false,
1213
1214 handshake_done_sent: false,
1215
1216 handshake_confirmed: false,
1217
1218 ack_eliciting_sent: false,
1219
1220 closed: false,
1221
1222 grease: config.grease,
1223
1224 keylog: None,
1225
1226 #[cfg(feature = "qlog")]
1227 qlog_streamer: None,
1228
1229 #[cfg(feature = "qlog")]
1230 qlogged_peer_params: false,
1231
1232 dgram_recv_queue: dgram::DatagramQueue::new(
1233 config.dgram_recv_max_queue_len,
1234 ),
1235
1236 dgram_send_queue: dgram::DatagramQueue::new(
1237 config.dgram_send_max_queue_len,
1238 ),
1239 });
1240
1241 if let Some(odcid) = odcid {
1242 conn.local_transport_params
1243 .original_destination_connection_id = Some(odcid.to_vec());
1244
1245 conn.local_transport_params.retry_source_connection_id =
1246 Some(scid.to_vec());
1247
1248 conn.did_retry = true;
1249 }
1250
1251 conn.local_transport_params.initial_source_connection_id =
1252 Some(scid.to_vec());
1253
1254 conn.handshake.init(&conn)?;
1255
1256 conn.encode_transport_params()?;
1257
1258 // Derive initial secrets for the client. We can do this here because
1259 // we already generated the random destination connection ID.
1260 if !is_server {
1261 let mut dcid = [0; 16];
1262 rand::rand_bytes(&mut dcid[..]);
1263
1264 let (aead_open, aead_seal) = crypto::derive_initial_key_material(
1265 &dcid,
1266 conn.version,
1267 conn.is_server,
1268 )?;
1269
1270 conn.dcid.extend_from_slice(&dcid);
1271
1272 conn.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_open =
1273 Some(aead_open);
1274 conn.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_seal =
1275 Some(aead_seal);
1276
1277 conn.derived_initial_secrets = true;
1278 }
1279
1280 Ok(conn)
1281 }
1282
1283 /// Sets keylog output to the designated [`Writer`].
1284 ///
1285 /// This needs to be called as soon as the connection is created, to avoid
1286 /// missing some early logs.
1287 ///
1288 /// [`Writer`]: https://doc.rust-lang.org/std/io/trait.Write.html
set_keylog(&mut self, writer: Box<dyn std::io::Write + Send>)1289 pub fn set_keylog(&mut self, writer: Box<dyn std::io::Write + Send>) {
1290 self.keylog = Some(writer);
1291 }
1292
1293 /// Sets qlog output to the designated [`Writer`].
1294 ///
1295 /// This needs to be called as soon as the connection is created, to avoid
1296 /// missing some early logs.
1297 ///
1298 /// [`Writer`]: https://doc.rust-lang.org/std/io/trait.Write.html
1299 #[cfg(feature = "qlog")]
set_qlog( &mut self, writer: Box<dyn std::io::Write + Send>, title: String, description: String, )1300 pub fn set_qlog(
1301 &mut self, writer: Box<dyn std::io::Write + Send>, title: String,
1302 description: String,
1303 ) {
1304 let vp = if self.is_server {
1305 qlog::VantagePointType::Server
1306 } else {
1307 qlog::VantagePointType::Client
1308 };
1309
1310 let trace = qlog::Trace::new(
1311 qlog::VantagePoint {
1312 name: None,
1313 ty: vp,
1314 flow: None,
1315 },
1316 Some(title.to_string()),
1317 Some(description.to_string()),
1318 Some(qlog::Configuration {
1319 time_offset: Some("0".to_string()),
1320 time_units: Some(qlog::TimeUnits::Ms),
1321 original_uris: None,
1322 }),
1323 None,
1324 );
1325
1326 let mut streamer = qlog::QlogStreamer::new(
1327 qlog::QLOG_VERSION.to_string(),
1328 Some(title),
1329 Some(description),
1330 None,
1331 std::time::Instant::now(),
1332 trace,
1333 writer,
1334 );
1335
1336 streamer.start_log().ok();
1337
1338 let ev = self.local_transport_params.to_qlog(
1339 qlog::TransportOwner::Local,
1340 self.version,
1341 self.handshake.alpn_protocol(),
1342 self.handshake.cipher(),
1343 );
1344
1345 streamer.add_event(ev).ok();
1346
1347 self.qlog_streamer = Some(streamer);
1348 }
1349
1350 /// Processes QUIC packets received from the peer.
1351 ///
1352 /// On success the number of bytes processed from the input buffer is
1353 /// returned. On error the connection will be closed by calling [`close()`]
1354 /// with the appropriate error code.
1355 ///
1356 /// Coalesced packets will be processed as necessary.
1357 ///
1358 /// Note that the contents of the input buffer `buf` might be modified by
1359 /// this function due to, for example, in-place decryption.
1360 ///
1361 /// [`close()`]: struct.Connection.html#method.close
1362 ///
1363 /// ## Examples:
1364 ///
1365 /// ```no_run
1366 /// # let mut buf = [0; 512];
1367 /// # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
1368 /// # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
1369 /// # let scid = [0xba; 16];
1370 /// # let mut conn = quiche::accept(&scid, None, &mut config)?;
1371 /// loop {
1372 /// let read = socket.recv(&mut buf).unwrap();
1373 ///
1374 /// let read = match conn.recv(&mut buf[..read]) {
1375 /// Ok(v) => v,
1376 ///
1377 /// Err(e) => {
1378 /// // An error occurred, handle it.
1379 /// break;
1380 /// },
1381 /// };
1382 /// }
1383 /// # Ok::<(), quiche::Error>(())
1384 /// ```
recv(&mut self, buf: &mut [u8]) -> Result<usize>1385 pub fn recv(&mut self, buf: &mut [u8]) -> Result<usize> {
1386 let len = buf.len();
1387
1388 // Keep track of how many bytes we received from the client, so we
1389 // can limit bytes sent back before address validation, to a multiple
1390 // of this. The limit needs to be increased early on, so that if there
1391 // is an error there is enough credit to send a CONNECTION_CLOSE.
1392 //
1393 // It doesn't matter if the packets received were valid or not, we only
1394 // need to track the total amount of bytes received.
1395 if !self.verified_peer_address {
1396 self.max_send_bytes += len * MAX_AMPLIFICATION_FACTOR;
1397 }
1398
1399 let mut done = 0;
1400 let mut left = len;
1401
1402 // Process coalesced packets.
1403 while left > 0 {
1404 let read = match self.recv_single(&mut buf[len - left..len]) {
1405 Ok(v) => v,
1406
1407 Err(Error::Done) => left,
1408
1409 Err(e) => {
1410 // In case of error processing the incoming packet, close
1411 // the connection.
1412 self.close(false, e.to_wire(), b"").ok();
1413 return Err(e);
1414 },
1415 };
1416
1417 done += read;
1418 left -= read;
1419 }
1420
1421 Ok(done)
1422 }
1423
1424 /// Processes a single QUIC packet received from the peer.
1425 ///
1426 /// On success the number of bytes processed from the input buffer is
1427 /// returned. When the [`Done`] error is returned, processing of the
1428 /// remainder of the incoming UDP datagram should be interrupted.
1429 ///
1430 /// On error, an error other than [`Done`] is returned.
1431 ///
1432 /// [`Done`]: enum.Error.html#variant.Done
recv_single(&mut self, buf: &mut [u8]) -> Result<usize>1433 fn recv_single(&mut self, buf: &mut [u8]) -> Result<usize> {
1434 let now = time::Instant::now();
1435
1436 if buf.is_empty() {
1437 return Err(Error::Done);
1438 }
1439
1440 if self.is_closed() || self.draining_timer.is_some() {
1441 return Err(Error::Done);
1442 }
1443
1444 let is_closing = self.error.is_some() || self.app_error.is_some();
1445
1446 if is_closing {
1447 return Err(Error::Done);
1448 }
1449
1450 let mut b = octets::OctetsMut::with_slice(buf);
1451
1452 let mut hdr =
1453 Header::from_bytes(&mut b, self.scid.len()).map_err(|e| {
1454 drop_pkt_on_err(
1455 e,
1456 self.recv_count,
1457 self.is_server,
1458 &self.trace_id,
1459 )
1460 })?;
1461
1462 if hdr.ty == packet::Type::VersionNegotiation {
1463 // Version negotiation packets can only be sent by the server.
1464 if self.is_server {
1465 return Err(Error::Done);
1466 }
1467
1468 // Ignore duplicate version negotiation.
1469 if self.did_version_negotiation {
1470 return Err(Error::Done);
1471 }
1472
1473 // Ignore version negotiation if any other packet has already been
1474 // successfully processed.
1475 if self.recv_count > 0 {
1476 return Err(Error::Done);
1477 }
1478
1479 if hdr.dcid != self.scid {
1480 return Err(Error::Done);
1481 }
1482
1483 if hdr.scid != self.dcid {
1484 return Err(Error::Done);
1485 }
1486
1487 trace!("{} rx pkt {:?}", self.trace_id, hdr);
1488
1489 let versions = hdr.versions.ok_or(Error::Done)?;
1490
1491 // Ignore version negotiation if the version already selected is
1492 // listed.
1493 if versions.iter().any(|&v| v == self.version) {
1494 return Err(Error::Done);
1495 }
1496
1497 match versions.iter().filter(|&&v| version_is_supported(v)).max() {
1498 Some(v) => self.version = *v,
1499
1500 None => {
1501 // We don't support any of the versions offered.
1502 //
1503 // While a man-in-the-middle attacker might be able to
1504 // inject a version negotiation packet that triggers this
1505 // failure, the window of opportunity is very small and
1506 // this error is quite useful for debugging, so don't just
1507 // ignore the packet.
1508 return Err(Error::UnknownVersion);
1509 },
1510 };
1511
1512 self.did_version_negotiation = true;
1513
1514 // Derive Initial secrets based on the new version.
1515 let (aead_open, aead_seal) = crypto::derive_initial_key_material(
1516 &self.dcid,
1517 self.version,
1518 self.is_server,
1519 )?;
1520
1521 // Reset connection state to force sending another Initial packet.
1522 self.drop_epoch_state(packet::EPOCH_INITIAL, now);
1523 self.got_peer_conn_id = false;
1524 self.handshake.clear()?;
1525
1526 self.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_open =
1527 Some(aead_open);
1528 self.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_seal =
1529 Some(aead_seal);
1530
1531 // Encode transport parameters again, as the new version might be
1532 // using a different format.
1533 self.encode_transport_params()?;
1534
1535 return Err(Error::Done);
1536 }
1537
1538 if hdr.ty == packet::Type::Retry {
1539 // Retry packets can only be sent by the server.
1540 if self.is_server {
1541 return Err(Error::Done);
1542 }
1543
1544 // Ignore duplicate retry.
1545 if self.did_retry {
1546 return Err(Error::Done);
1547 }
1548
1549 // Check if Retry packet is valid.
1550 if packet::verify_retry_integrity(&b, &self.dcid, self.version)
1551 .is_err()
1552 {
1553 return Err(Error::Done);
1554 }
1555
1556 trace!("{} rx pkt {:?}", self.trace_id, hdr);
1557
1558 self.token = hdr.token;
1559 self.did_retry = true;
1560
1561 // Remember peer's new connection ID.
1562 self.odcid = Some(self.dcid.clone());
1563
1564 self.dcid.resize(hdr.scid.len(), 0);
1565 self.dcid.copy_from_slice(&hdr.scid);
1566
1567 self.rscid = Some(self.dcid.clone());
1568
1569 // Derive Initial secrets using the new connection ID.
1570 let (aead_open, aead_seal) = crypto::derive_initial_key_material(
1571 &hdr.scid,
1572 self.version,
1573 self.is_server,
1574 )?;
1575
1576 // Reset connection state to force sending another Initial packet.
1577 self.drop_epoch_state(packet::EPOCH_INITIAL, now);
1578 self.got_peer_conn_id = false;
1579 self.handshake.clear()?;
1580
1581 self.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_open =
1582 Some(aead_open);
1583 self.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_seal =
1584 Some(aead_seal);
1585
1586 return Err(Error::Done);
1587 }
1588
1589 if self.is_server && !self.did_version_negotiation {
1590 if !version_is_supported(hdr.version) {
1591 return Err(Error::UnknownVersion);
1592 }
1593
1594 self.version = hdr.version;
1595 self.did_version_negotiation = true;
1596
1597 // Encode transport parameters again, as the new version might be
1598 // using a different format.
1599 self.encode_transport_params()?;
1600 }
1601
1602 if hdr.ty != packet::Type::Short && hdr.version != self.version {
1603 // At this point version negotiation was already performed, so
1604 // ignore packets that don't match the connection's version.
1605 return Err(Error::Done);
1606 }
1607
1608 // Long header packets have an explicit payload length, but short
1609 // packets don't so just use the remaining capacity in the buffer.
1610 let payload_len = if hdr.ty == packet::Type::Short {
1611 b.cap()
1612 } else {
1613 b.get_varint().map_err(|e| {
1614 drop_pkt_on_err(
1615 e.into(),
1616 self.recv_count,
1617 self.is_server,
1618 &self.trace_id,
1619 )
1620 })? as usize
1621 };
1622
1623 // Derive initial secrets on the server.
1624 if !self.derived_initial_secrets {
1625 let (aead_open, aead_seal) = crypto::derive_initial_key_material(
1626 &hdr.dcid,
1627 self.version,
1628 self.is_server,
1629 )?;
1630
1631 self.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_open =
1632 Some(aead_open);
1633 self.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_seal =
1634 Some(aead_seal);
1635
1636 self.derived_initial_secrets = true;
1637 }
1638
1639 // Select packet number space epoch based on the received packet's type.
1640 let epoch = hdr.ty.to_epoch()?;
1641
1642 // Select AEAD context used to open incoming packet.
1643 #[allow(clippy::or_fun_call)]
1644 let aead = (self.pkt_num_spaces[epoch].crypto_0rtt_open.as_ref())
1645 // Only use 0-RTT key if incoming packet is 0-RTT.
1646 .filter(|_| hdr.ty == packet::Type::ZeroRTT)
1647 // Otherwise use the packet number space's main key.
1648 .or(self.pkt_num_spaces[epoch].crypto_open.as_ref())
1649 // Finally, discard packet if no usable key is available.
1650 //
1651 // TODO: buffer 0-RTT/1-RTT packets instead of discarding when the
1652 // required key is not available yet, as an optimization.
1653 .ok_or_else(|| {
1654 drop_pkt_on_err(
1655 Error::CryptoFail,
1656 self.recv_count,
1657 self.is_server,
1658 &self.trace_id,
1659 )
1660 })?;
1661
1662 let aead_tag_len = aead.alg().tag_len();
1663
1664 packet::decrypt_hdr(&mut b, &mut hdr, &aead).map_err(|e| {
1665 drop_pkt_on_err(e, self.recv_count, self.is_server, &self.trace_id)
1666 })?;
1667
1668 let pn = packet::decode_pkt_num(
1669 self.pkt_num_spaces[epoch].largest_rx_pkt_num,
1670 hdr.pkt_num,
1671 hdr.pkt_num_len,
1672 );
1673
1674 let pn_len = hdr.pkt_num_len;
1675
1676 trace!(
1677 "{} rx pkt {:?} len={} pn={}",
1678 self.trace_id,
1679 hdr,
1680 payload_len,
1681 pn
1682 );
1683
1684 qlog_with!(self.qlog_streamer, q, {
1685 let packet_size = b.len();
1686
1687 let qlog_pkt_hdr = qlog::PacketHeader::with_type(
1688 hdr.ty.to_qlog(),
1689 pn,
1690 Some(packet_size as u64),
1691 Some(payload_len as u64),
1692 Some(hdr.version),
1693 Some(&hdr.scid),
1694 Some(&hdr.dcid),
1695 );
1696
1697 q.add_event(qlog::event::Event::packet_received(
1698 hdr.ty.to_qlog(),
1699 qlog_pkt_hdr,
1700 Some(Vec::new()),
1701 None,
1702 None,
1703 None,
1704 ))
1705 .ok();
1706 });
1707
1708 let mut payload = packet::decrypt_pkt(
1709 &mut b,
1710 pn,
1711 pn_len,
1712 payload_len,
1713 &aead,
1714 )
1715 .map_err(|e| {
1716 drop_pkt_on_err(e, self.recv_count, self.is_server, &self.trace_id)
1717 })?;
1718
1719 if self.pkt_num_spaces[epoch].recv_pkt_num.contains(pn) {
1720 trace!("{} ignored duplicate packet {}", self.trace_id, pn);
1721 return Err(Error::Done);
1722 }
1723
1724 if !self.is_server && !self.got_peer_conn_id {
1725 if self.odcid.is_none() {
1726 self.odcid = Some(self.dcid.clone());
1727 }
1728
1729 // Replace the randomly generated destination connection ID with
1730 // the one supplied by the server.
1731 self.dcid.resize(hdr.scid.len(), 0);
1732 self.dcid.copy_from_slice(&hdr.scid);
1733
1734 self.got_peer_conn_id = true;
1735 }
1736
1737 if self.is_server && !self.got_peer_conn_id {
1738 self.dcid.extend_from_slice(&hdr.scid);
1739
1740 if !self.did_retry && self.version >= PROTOCOL_VERSION_DRAFT28 {
1741 self.local_transport_params
1742 .original_destination_connection_id = Some(hdr.dcid.to_vec());
1743
1744 self.encode_transport_params()?;
1745 }
1746
1747 self.got_peer_conn_id = true;
1748 }
1749
1750 // To avoid sending an ACK in response to an ACK-only packet, we need
1751 // to keep track of whether this packet contains any frame other than
1752 // ACK and PADDING.
1753 let mut ack_elicited = false;
1754
1755 // Process packet payload.
1756 while payload.cap() > 0 {
1757 let frame = frame::Frame::from_bytes(&mut payload, hdr.ty)?;
1758
1759 qlog_with!(self.qlog_streamer, q, {
1760 q.add_frame(frame.to_qlog(), false).ok();
1761 });
1762
1763 if frame.ack_eliciting() {
1764 ack_elicited = true;
1765 }
1766
1767 if let Err(e) = self.process_frame(frame, epoch, now) {
1768 qlog_with!(self.qlog_streamer, q, {
1769 // Always conclude frame writing on error.
1770 q.finish_frames().ok();
1771 });
1772
1773 return Err(e);
1774 }
1775 }
1776
1777 qlog_with!(self.qlog_streamer, q, {
1778 // Always conclude frame writing.
1779 q.finish_frames().ok();
1780 });
1781
1782 qlog_with!(self.qlog_streamer, q, {
1783 let ev = self.recovery.to_qlog();
1784 q.add_event(ev).ok();
1785 });
1786
1787 // Only log the remote transport parameters once the connection is
1788 // established (i.e. after frames have been fully parsed) and only
1789 // once per connection.
1790 if self.is_established() {
1791 qlog_with!(self.qlog_streamer, q, {
1792 if !self.qlogged_peer_params {
1793 let ev = self.peer_transport_params.to_qlog(
1794 qlog::TransportOwner::Remote,
1795 self.version,
1796 self.handshake.alpn_protocol(),
1797 self.handshake.cipher(),
1798 );
1799
1800 q.add_event(ev).ok();
1801
1802 self.qlogged_peer_params = true;
1803 }
1804 });
1805 }
1806
1807 // Process acked frames.
1808 for acked in self.recovery.acked[epoch].drain(..) {
1809 match acked {
1810 frame::Frame::ACK { ranges, .. } => {
1811 // Stop acknowledging packets less than or equal to the
1812 // largest acknowledged in the sent ACK frame that, in
1813 // turn, got acked.
1814 if let Some(largest_acked) = ranges.last() {
1815 self.pkt_num_spaces[epoch]
1816 .recv_pkt_need_ack
1817 .remove_until(largest_acked);
1818 }
1819 },
1820
1821 frame::Frame::Crypto { data } => {
1822 self.pkt_num_spaces[epoch]
1823 .crypto_stream
1824 .send
1825 .ack(data.off(), data.len());
1826 },
1827
1828 frame::Frame::Stream { stream_id, data } => {
1829 let stream = match self.streams.get_mut(stream_id) {
1830 Some(v) => v,
1831
1832 None => continue,
1833 };
1834
1835 stream.send.ack(data.off(), data.len());
1836
1837 if stream.is_complete() {
1838 let local = stream.local;
1839 self.streams.collect(stream_id, local);
1840 }
1841 },
1842
1843 _ => (),
1844 }
1845 }
1846
1847 // We only record the time of arrival of the largest packet number
1848 // that still needs to be acked, to be used for ACK delay calculation.
1849 if self.pkt_num_spaces[epoch].recv_pkt_need_ack.last() < Some(pn) {
1850 self.pkt_num_spaces[epoch].largest_rx_pkt_time = now;
1851 }
1852
1853 self.pkt_num_spaces[epoch].recv_pkt_num.insert(pn);
1854
1855 self.pkt_num_spaces[epoch].recv_pkt_need_ack.push_item(pn);
1856
1857 self.pkt_num_spaces[epoch].ack_elicited =
1858 cmp::max(self.pkt_num_spaces[epoch].ack_elicited, ack_elicited);
1859
1860 self.pkt_num_spaces[epoch].largest_rx_pkt_num =
1861 cmp::max(self.pkt_num_spaces[epoch].largest_rx_pkt_num, pn);
1862
1863 if let Some(idle_timeout) = self.idle_timeout() {
1864 self.idle_timer = Some(now + idle_timeout);
1865 }
1866
1867 self.recv_count += 1;
1868
1869 let read = b.off() + aead_tag_len;
1870
1871 // An Handshake packet has been received from the client and has been
1872 // successfully processed, so we can drop the initial state and consider
1873 // the client's address to be verified.
1874 if self.is_server && hdr.ty == packet::Type::Handshake {
1875 self.drop_epoch_state(packet::EPOCH_INITIAL, now);
1876
1877 self.verified_peer_address = true;
1878 }
1879
1880 self.ack_eliciting_sent = false;
1881
1882 Ok(read)
1883 }
1884
1885 /// Writes a single QUIC packet to be sent to the peer.
1886 ///
1887 /// On success the number of bytes written to the output buffer is
1888 /// returned, or [`Done`] if there was nothing to write.
1889 ///
1890 /// The application should call `send()` multiple times until [`Done`] is
1891 /// returned, indicating that there are no more packets to send. It is
1892 /// recommended that `send()` be called in the following cases:
1893 ///
1894 /// * When the application receives QUIC packets from the peer (that is,
1895 /// any time [`recv()`] is also called).
1896 ///
1897 /// * When the connection timer expires (that is, any time [`on_timeout()`]
1898 /// is also called).
1899 ///
1900 /// * When the application sends data to the peer (for examples, any time
1901 /// [`stream_send()`] or [`stream_shutdown()`] are called).
1902 ///
1903 /// [`Done`]: enum.Error.html#variant.Done
1904 /// [`recv()`]: struct.Connection.html#method.recv
1905 /// [`on_timeout()`]: struct.Connection.html#method.on_timeout
1906 /// [`stream_send()`]: struct.Connection.html#method.stream_send
1907 /// [`stream_shutdown()`]: struct.Connection.html#method.stream_shutdown
1908 ///
1909 /// ## Examples:
1910 ///
1911 /// ```no_run
1912 /// # let mut out = [0; 512];
1913 /// # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
1914 /// # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
1915 /// # let scid = [0xba; 16];
1916 /// # let mut conn = quiche::accept(&scid, None, &mut config)?;
1917 /// loop {
1918 /// let write = match conn.send(&mut out) {
1919 /// Ok(v) => v,
1920 ///
1921 /// Err(quiche::Error::Done) => {
1922 /// // Done writing.
1923 /// break;
1924 /// },
1925 ///
1926 /// Err(e) => {
1927 /// // An error occurred, handle it.
1928 /// break;
1929 /// },
1930 /// };
1931 ///
1932 /// socket.send(&out[..write]).unwrap();
1933 /// }
1934 /// # Ok::<(), quiche::Error>(())
1935 /// ```
send(&mut self, out: &mut [u8]) -> Result<usize>1936 pub fn send(&mut self, out: &mut [u8]) -> Result<usize> {
1937 let now = time::Instant::now();
1938
1939 if out.is_empty() {
1940 return Err(Error::BufferTooShort);
1941 }
1942
1943 if self.is_closed() || self.draining_timer.is_some() {
1944 return Err(Error::Done);
1945 }
1946
1947 // If the Initial secrets have not been derived yet, there's no point
1948 // in trying to send a packet, so return early.
1949 if !self.derived_initial_secrets {
1950 return Err(Error::Done);
1951 }
1952
1953 let is_closing = self.error.is_some() || self.app_error.is_some();
1954
1955 if !is_closing {
1956 self.do_handshake()?;
1957 }
1958
1959 let mut b = octets::OctetsMut::with_slice(out);
1960
1961 let epoch = self.write_epoch()?;
1962
1963 let pkt_type = packet::Type::from_epoch(epoch);
1964
1965 // Process lost frames.
1966 for lost in self.recovery.lost[epoch].drain(..) {
1967 match lost {
1968 frame::Frame::Crypto { data } => {
1969 self.pkt_num_spaces[epoch].crypto_stream.send.push(data)?;
1970 },
1971
1972 frame::Frame::Stream { stream_id, data } => {
1973 let stream = match self.streams.get_mut(stream_id) {
1974 Some(v) => v,
1975
1976 None => continue,
1977 };
1978
1979 let was_flushable = stream.is_flushable();
1980
1981 let empty_fin = data.is_empty() && data.fin();
1982
1983 stream.send.push(data)?;
1984
1985 // If the stream is now flushable push it to the flushable
1986 // queue, but only if it wasn't already queued.
1987 //
1988 // Consider the stream flushable also when we are sending a
1989 // zero-length frame that has the fin flag set.
1990 if (stream.is_flushable() || empty_fin) && !was_flushable {
1991 let urgency = stream.urgency;
1992 let incremental = stream.incremental;
1993 self.streams.push_flushable(
1994 stream_id,
1995 urgency,
1996 incremental,
1997 );
1998 }
1999 },
2000
2001 frame::Frame::ACK { .. } => {
2002 self.pkt_num_spaces[epoch].ack_elicited = true;
2003 },
2004
2005 frame::Frame::HandshakeDone => {
2006 self.handshake_done_sent = false;
2007 },
2008
2009 frame::Frame::MaxStreamData { stream_id, .. } => {
2010 if self.streams.get(stream_id).is_some() {
2011 self.streams.mark_almost_full(stream_id, true);
2012 }
2013 },
2014
2015 frame::Frame::MaxData { .. } => {
2016 self.almost_full = true;
2017 },
2018
2019 _ => (),
2020 }
2021 }
2022
2023 let mut left = b.cap();
2024
2025 // Limit output packet size to respect peer's max_packet_size limit.
2026 left = cmp::min(left, self.max_send_udp_payload_len());
2027
2028 // Limit output packet size by congestion window size.
2029 left = cmp::min(left, self.recovery.cwnd_available());
2030
2031 // Limit data sent by the server based on the amount of data received
2032 // from the client before its address is validated.
2033 if !self.verified_peer_address && self.is_server {
2034 left = cmp::min(left, self.max_send_bytes);
2035 }
2036
2037 let pn = self.pkt_num_spaces[epoch].next_pkt_num;
2038 let pn_len = packet::pkt_num_len(pn)?;
2039
2040 // The AEAD overhead at the current encryption level.
2041 let crypto_overhead = self.pkt_num_spaces[epoch]
2042 .crypto_overhead()
2043 .ok_or(Error::Done)?;
2044
2045 let hdr = Header {
2046 ty: pkt_type,
2047 version: self.version,
2048 dcid: self.dcid.clone(),
2049
2050 // Don't needlessly clone the source connection ID for 1-RTT packets
2051 // as it is not used.
2052 scid: if pkt_type != packet::Type::Short {
2053 self.scid.clone()
2054 } else {
2055 Vec::new()
2056 },
2057
2058 pkt_num: 0,
2059 pkt_num_len: pn_len,
2060
2061 // Only clone token for Initial packets, as other packets don't have
2062 // this field (Retry doesn't count, as it's not encoded as part of
2063 // this code path).
2064 token: if pkt_type == packet::Type::Initial {
2065 self.token.clone()
2066 } else {
2067 None
2068 },
2069
2070 versions: None,
2071 key_phase: false,
2072 };
2073
2074 hdr.to_bytes(&mut b)?;
2075
2076 // Calculate the space required for the packet, including the header
2077 // the payload length, the packet number and the AEAD overhead.
2078 let mut overhead = b.off() + pn_len + crypto_overhead;
2079
2080 // We assume that the payload length, which is only present in long
2081 // header packets, can always be encoded with a 2-byte varint.
2082 if pkt_type != packet::Type::Short {
2083 overhead += 2;
2084 }
2085
2086 // Make sure we have enough space left for the packet.
2087 match left.checked_sub(overhead) {
2088 Some(v) => left = v,
2089
2090 None => {
2091 // We can't send more because there isn't enough space available
2092 // in the output buffer.
2093 //
2094 // This usually happens when we try to send a new packet but
2095 // failed because cwnd is almost full. In such case app_limited
2096 // is set to false here to make cwnd grow when ACK is received.
2097 self.recovery.update_app_limited(false);
2098 return Err(Error::Done);
2099 },
2100 }
2101
2102 let mut frames: Vec<frame::Frame> = Vec::new();
2103
2104 let mut ack_eliciting = false;
2105 let mut in_flight = false;
2106 let mut has_data = false;
2107
2108 let mut payload_len = 0;
2109
2110 // Create ACK frame.
2111 if self.pkt_num_spaces[epoch].recv_pkt_need_ack.len() > 0 &&
2112 (self.pkt_num_spaces[epoch].ack_elicited ||
2113 self.recovery.loss_probes[epoch] > 0) &&
2114 !is_closing
2115 {
2116 let ack_delay =
2117 self.pkt_num_spaces[epoch].largest_rx_pkt_time.elapsed();
2118
2119 let ack_delay = ack_delay.as_micros() as u64 /
2120 2_u64
2121 .pow(self.local_transport_params.ack_delay_exponent as u32);
2122
2123 let frame = frame::Frame::ACK {
2124 ack_delay,
2125 ranges: self.pkt_num_spaces[epoch].recv_pkt_need_ack.clone(),
2126 };
2127
2128 if push_frame_to_pkt!(frames, frame, payload_len, left) {
2129 self.pkt_num_spaces[epoch].ack_elicited = false;
2130 }
2131 }
2132
2133 if pkt_type == packet::Type::Short && !is_closing {
2134 // Create HANDSHAKE_DONE frame.
2135 if self.is_established() &&
2136 !self.handshake_done_sent &&
2137 self.is_server
2138 {
2139 let frame = frame::Frame::HandshakeDone;
2140
2141 if push_frame_to_pkt!(frames, frame, payload_len, left) {
2142 self.handshake_done_sent = true;
2143
2144 ack_eliciting = true;
2145 in_flight = true;
2146 }
2147 }
2148
2149 // Create MAX_STREAMS_BIDI frame.
2150 if self.streams.should_update_max_streams_bidi() {
2151 let frame = frame::Frame::MaxStreamsBidi {
2152 max: self.streams.max_streams_bidi_next(),
2153 };
2154
2155 if push_frame_to_pkt!(frames, frame, payload_len, left) {
2156 self.streams.update_max_streams_bidi();
2157
2158 ack_eliciting = true;
2159 in_flight = true;
2160 }
2161 }
2162
2163 // Create MAX_STREAMS_UNI frame.
2164 if self.streams.should_update_max_streams_uni() {
2165 let frame = frame::Frame::MaxStreamsUni {
2166 max: self.streams.max_streams_uni_next(),
2167 };
2168
2169 if push_frame_to_pkt!(frames, frame, payload_len, left) {
2170 self.streams.update_max_streams_uni();
2171
2172 ack_eliciting = true;
2173 in_flight = true;
2174 }
2175 }
2176
2177 // Create MAX_DATA frame as needed.
2178 if self.almost_full {
2179 let frame = frame::Frame::MaxData {
2180 max: self.max_rx_data_next,
2181 };
2182
2183 if push_frame_to_pkt!(frames, frame, payload_len, left) {
2184 self.almost_full = false;
2185
2186 // Commits the new max_rx_data limit.
2187 self.max_rx_data = self.max_rx_data_next;
2188
2189 ack_eliciting = true;
2190 in_flight = true;
2191 }
2192 }
2193
2194 // Create DATA_BLOCKED frame.
2195 if let Some(limit) = self.blocked_limit {
2196 let frame = frame::Frame::DataBlocked { limit };
2197
2198 if push_frame_to_pkt!(frames, frame, payload_len, left) {
2199 self.blocked_limit = None;
2200
2201 ack_eliciting = true;
2202 in_flight = true;
2203 }
2204 }
2205
2206 // Create MAX_STREAM_DATA frames as needed.
2207 for stream_id in self.streams.almost_full() {
2208 let stream = match self.streams.get_mut(stream_id) {
2209 Some(v) => v,
2210
2211 None => {
2212 // The stream doesn't exist anymore, so remove it from
2213 // the almost full set.
2214 self.streams.mark_almost_full(stream_id, false);
2215 continue;
2216 },
2217 };
2218
2219 let frame = frame::Frame::MaxStreamData {
2220 stream_id,
2221 max: stream.recv.max_data_next(),
2222 };
2223
2224 if push_frame_to_pkt!(frames, frame, payload_len, left) {
2225 stream.recv.update_max_data();
2226
2227 self.streams.mark_almost_full(stream_id, false);
2228
2229 ack_eliciting = true;
2230 in_flight = true;
2231 }
2232 }
2233
2234 // Create STREAM_DATA_BLOCKED frames as needed.
2235 for (stream_id, limit) in self
2236 .streams
2237 .blocked()
2238 .map(|(&k, &v)| (k, v))
2239 .collect::<Vec<(u64, u64)>>()
2240 {
2241 let frame = frame::Frame::StreamDataBlocked { stream_id, limit };
2242
2243 if push_frame_to_pkt!(frames, frame, payload_len, left) {
2244 self.streams.mark_blocked(stream_id, false, 0);
2245
2246 ack_eliciting = true;
2247 in_flight = true;
2248 }
2249 }
2250 }
2251
2252 // Create CONNECTION_CLOSE frame.
2253 if let Some(err) = self.error {
2254 let frame = frame::Frame::ConnectionClose {
2255 error_code: err,
2256 frame_type: 0,
2257 reason: Vec::new(),
2258 };
2259
2260 if push_frame_to_pkt!(frames, frame, payload_len, left) {
2261 self.draining_timer = Some(now + (self.recovery.pto() * 3));
2262
2263 ack_eliciting = true;
2264 in_flight = true;
2265 }
2266 }
2267
2268 // Create APPLICATION_CLOSE frame.
2269 if let Some(err) = self.app_error {
2270 if pkt_type == packet::Type::Short {
2271 let frame = frame::Frame::ApplicationClose {
2272 error_code: err,
2273 reason: self.app_reason.clone(),
2274 };
2275
2276 if push_frame_to_pkt!(frames, frame, payload_len, left) {
2277 self.draining_timer = Some(now + (self.recovery.pto() * 3));
2278
2279 ack_eliciting = true;
2280 in_flight = true;
2281 }
2282 }
2283 }
2284
2285 // Create PATH_RESPONSE frame.
2286 if let Some(ref challenge) = self.challenge {
2287 let frame = frame::Frame::PathResponse {
2288 data: challenge.clone(),
2289 };
2290
2291 if push_frame_to_pkt!(frames, frame, payload_len, left) {
2292 self.challenge = None;
2293
2294 ack_eliciting = true;
2295 in_flight = true;
2296 }
2297 }
2298
2299 // Create CRYPTO frame.
2300 if self.pkt_num_spaces[epoch].crypto_stream.is_flushable() &&
2301 left > frame::MAX_CRYPTO_OVERHEAD &&
2302 !is_closing
2303 {
2304 let crypto_len = left - frame::MAX_CRYPTO_OVERHEAD;
2305 let crypto_buf = self.pkt_num_spaces[epoch]
2306 .crypto_stream
2307 .send
2308 .pop(crypto_len)?;
2309
2310 let frame = frame::Frame::Crypto { data: crypto_buf };
2311
2312 if push_frame_to_pkt!(frames, frame, payload_len, left) {
2313 ack_eliciting = true;
2314 in_flight = true;
2315 has_data = true;
2316 }
2317 }
2318
2319 // Create DATAGRAM frame.
2320 if pkt_type == packet::Type::Short &&
2321 left > frame::MAX_DGRAM_OVERHEAD &&
2322 !is_closing
2323 {
2324 if let Some(max_dgram_payload) = self.dgram_max_writable_len() {
2325 while let Some(len) = self.dgram_send_queue.peek_front_len() {
2326 if (len + frame::MAX_DGRAM_OVERHEAD) <= left {
2327 // Front of the queue fits this packet, send it
2328 match self.dgram_send_queue.pop() {
2329 Some(data) => {
2330 let frame = frame::Frame::Datagram { data };
2331
2332 if push_frame_to_pkt!(
2333 frames,
2334 frame,
2335 payload_len,
2336 left
2337 ) {
2338 ack_eliciting = true;
2339 in_flight = true;
2340 }
2341 },
2342
2343 None => continue,
2344 };
2345 } else if len > max_dgram_payload {
2346 // This dgram frame will never fit. Let's purge it.
2347 self.dgram_send_queue.pop();
2348 } else {
2349 break;
2350 }
2351 }
2352 }
2353 }
2354
2355 // Create a single STREAM frame for the first stream that is flushable.
2356 if pkt_type == packet::Type::Short &&
2357 left > frame::MAX_STREAM_OVERHEAD &&
2358 !is_closing
2359 {
2360 while let Some(stream_id) = self.streams.pop_flushable() {
2361 let stream = match self.streams.get_mut(stream_id) {
2362 Some(v) => v,
2363
2364 None => continue,
2365 };
2366
2367 let off = stream.send.off_front();
2368
2369 // Try to accurately account for the STREAM frame's overhead,
2370 // such that we can fill as much of the packet buffer as
2371 // possible.
2372 let overhead = 1 +
2373 octets::varint_len(stream_id) +
2374 octets::varint_len(off) +
2375 octets::varint_len(left as u64);
2376
2377 let max_len = match left.checked_sub(overhead) {
2378 Some(v) => v,
2379
2380 None => continue,
2381 };
2382
2383 let stream_buf = stream.send.pop(max_len)?;
2384
2385 if stream_buf.is_empty() && !stream_buf.fin() {
2386 continue;
2387 }
2388
2389 let frame = frame::Frame::Stream {
2390 stream_id,
2391 data: stream_buf,
2392 };
2393
2394 if push_frame_to_pkt!(frames, frame, payload_len, left) {
2395 ack_eliciting = true;
2396 in_flight = true;
2397 has_data = true;
2398 }
2399
2400 // If the stream is still flushable, push it to the back of the
2401 // queue again.
2402 if stream.is_flushable() {
2403 let urgency = stream.urgency;
2404 let incremental = stream.incremental;
2405 self.streams.push_flushable(stream_id, urgency, incremental);
2406 }
2407
2408 // When fuzzing, try to coalesce multiple STREAM frames in the
2409 // same packet, so it's easier to generate fuzz corpora.
2410 if cfg!(feature = "fuzzing") && left > frame::MAX_STREAM_OVERHEAD
2411 {
2412 continue;
2413 }
2414
2415 break;
2416 }
2417 }
2418
2419 // Create PING for PTO probe if no other ack-elicitng frame is sent.
2420 if self.recovery.loss_probes[epoch] > 0 &&
2421 !ack_eliciting &&
2422 left >= 1 &&
2423 !is_closing
2424 {
2425 let frame = frame::Frame::Ping;
2426
2427 if push_frame_to_pkt!(frames, frame, payload_len, left) {
2428 ack_eliciting = true;
2429 in_flight = true;
2430 }
2431 }
2432
2433 if ack_eliciting {
2434 self.recovery.loss_probes[epoch] =
2435 self.recovery.loss_probes[epoch].saturating_sub(1);
2436 }
2437
2438 if frames.is_empty() {
2439 // When we reach this point we are not able to write more, so set
2440 // app_limited to false.
2441 self.recovery.update_app_limited(false);
2442 return Err(Error::Done);
2443 }
2444
2445 // Pad the client's initial packet.
2446 if !self.is_server && pkt_type == packet::Type::Initial {
2447 let pkt_len = pn_len + payload_len + crypto_overhead;
2448
2449 let frame = frame::Frame::Padding {
2450 len: cmp::min(MIN_CLIENT_INITIAL_LEN - pkt_len, left),
2451 };
2452
2453 payload_len += frame.wire_len();
2454
2455 frames.push(frame);
2456
2457 in_flight = true;
2458 }
2459
2460 // Pad payload so that it's always at least 4 bytes.
2461 if payload_len < PAYLOAD_MIN_LEN {
2462 let frame = frame::Frame::Padding {
2463 len: PAYLOAD_MIN_LEN - payload_len,
2464 };
2465
2466 payload_len += frame.wire_len();
2467
2468 frames.push(frame);
2469
2470 in_flight = true;
2471 }
2472
2473 payload_len += crypto_overhead;
2474
2475 // Only long header packets have an explicit length field.
2476 if pkt_type != packet::Type::Short {
2477 let len = pn_len + payload_len;
2478 b.put_varint(len as u64)?;
2479 }
2480
2481 packet::encode_pkt_num(pn, &mut b)?;
2482
2483 let payload_offset = b.off();
2484
2485 trace!(
2486 "{} tx pkt {:?} len={} pn={}",
2487 self.trace_id,
2488 hdr,
2489 payload_len,
2490 pn
2491 );
2492
2493 qlog_with!(self.qlog_streamer, q, {
2494 let qlog_pkt_hdr = qlog::PacketHeader::with_type(
2495 hdr.ty.to_qlog(),
2496 pn,
2497 Some(payload_len as u64 + payload_offset as u64),
2498 Some(payload_len as u64),
2499 Some(hdr.version),
2500 Some(&hdr.scid),
2501 Some(&hdr.dcid),
2502 );
2503
2504 let packet_sent_ev = qlog::event::Event::packet_sent_min(
2505 hdr.ty.to_qlog(),
2506 qlog_pkt_hdr,
2507 Some(Vec::new()),
2508 );
2509
2510 q.add_event(packet_sent_ev).ok();
2511 });
2512
2513 // Encode frames into the output packet.
2514 for frame in &mut frames {
2515 trace!("{} tx frm {:?}", self.trace_id, frame);
2516
2517 frame.to_bytes(&mut b)?;
2518
2519 qlog_with!(self.qlog_streamer, q, {
2520 q.add_frame(frame.to_qlog(), false).ok();
2521 });
2522
2523 // Once frames have been serialized they are passed to the Recovery
2524 // module which manages retransmission. However, some frames do not
2525 // contain retransmittable data, so drop it here.
2526 frame.shrink_for_retransmission();
2527 }
2528
2529 qlog_with!(self.qlog_streamer, q, {
2530 q.finish_frames().ok();
2531 });
2532
2533 let aead = match self.pkt_num_spaces[epoch].crypto_seal {
2534 Some(ref v) => v,
2535 None => return Err(Error::InvalidState),
2536 };
2537
2538 let written = packet::encrypt_pkt(
2539 &mut b,
2540 pn,
2541 pn_len,
2542 payload_len,
2543 payload_offset,
2544 aead,
2545 )?;
2546
2547 let sent_pkt = recovery::Sent {
2548 pkt_num: pn,
2549 frames,
2550 time_sent: now,
2551 time_acked: None,
2552 time_lost: None,
2553 size: if ack_eliciting { written } else { 0 },
2554 ack_eliciting,
2555 in_flight,
2556 delivered: 0,
2557 delivered_time: now,
2558 recent_delivered_packet_sent_time: now,
2559 is_app_limited: false,
2560 has_data,
2561 };
2562
2563 self.recovery.on_packet_sent(
2564 sent_pkt,
2565 epoch,
2566 self.handshake_status(),
2567 now,
2568 &self.trace_id,
2569 );
2570
2571 qlog_with!(self.qlog_streamer, q, {
2572 let ev = self.recovery.to_qlog();
2573 q.add_event(ev).ok();
2574 });
2575
2576 self.pkt_num_spaces[epoch].next_pkt_num += 1;
2577
2578 self.sent_count += 1;
2579
2580 if self.dgram_send_queue.byte_size() > self.recovery.cwnd_available() {
2581 self.recovery.update_app_limited(false);
2582 }
2583
2584 // On the client, drop initial state after sending an Handshake packet.
2585 if !self.is_server && hdr.ty == packet::Type::Handshake {
2586 self.drop_epoch_state(packet::EPOCH_INITIAL, now);
2587 }
2588
2589 self.max_send_bytes = self.max_send_bytes.saturating_sub(written);
2590
2591 // (Re)start the idle timer if we are sending the first ack-eliciting
2592 // packet since last receiving a packet.
2593 if ack_eliciting && !self.ack_eliciting_sent {
2594 if let Some(idle_timeout) = self.idle_timeout() {
2595 self.idle_timer = Some(now + idle_timeout);
2596 }
2597 }
2598
2599 if ack_eliciting {
2600 self.ack_eliciting_sent = true;
2601 }
2602
2603 Ok(written)
2604 }
2605
2606 // Returns the maximum len of a packet to be sent. This is max_packet_size
2607 // as sent by the peer, except during the handshake when we haven't parsed
2608 // transport parameters yet, so use a default value then.
max_send_udp_payload_len(&self) -> usize2609 fn max_send_udp_payload_len(&self) -> usize {
2610 if self.is_established() {
2611 // We cap the maximum packet size to 16KB or so, so that it can be
2612 // always encoded with a 2-byte varint.
2613 cmp::min(16383, self.peer_transport_params.max_udp_payload_size)
2614 as usize
2615 } else {
2616 // Allow for 1200 bytes (minimum QUIC packet size) during the
2617 // handshake.
2618 MIN_CLIENT_INITIAL_LEN
2619 }
2620 }
2621
2622 /// Reads contiguous data from a stream into the provided slice.
2623 ///
2624 /// The slice must be sized by the caller and will be populated up to its
2625 /// capacity.
2626 ///
2627 /// On success the amount of bytes read and a flag indicating the fin state
2628 /// is returned as a tuple, or [`Done`] if there is no data to read.
2629 ///
2630 /// [`Done`]: enum.Error.html#variant.Done
2631 ///
2632 /// ## Examples:
2633 ///
2634 /// ```no_run
2635 /// # let mut buf = [0; 512];
2636 /// # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
2637 /// # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
2638 /// # let scid = [0xba; 16];
2639 /// # let mut conn = quiche::accept(&scid, None, &mut config)?;
2640 /// # let stream_id = 0;
2641 /// while let Ok((read, fin)) = conn.stream_recv(stream_id, &mut buf) {
2642 /// println!("Got {} bytes on stream {}", read, stream_id);
2643 /// }
2644 /// # Ok::<(), quiche::Error>(())
2645 /// ```
stream_recv( &mut self, stream_id: u64, out: &mut [u8], ) -> Result<(usize, bool)>2646 pub fn stream_recv(
2647 &mut self, stream_id: u64, out: &mut [u8],
2648 ) -> Result<(usize, bool)> {
2649 // We can't read on our own unidirectional streams.
2650 if !stream::is_bidi(stream_id) &&
2651 stream::is_local(stream_id, self.is_server)
2652 {
2653 return Err(Error::InvalidStreamState);
2654 }
2655
2656 let stream = self
2657 .streams
2658 .get_mut(stream_id)
2659 .ok_or(Error::InvalidStreamState)?;
2660
2661 if !stream.is_readable() {
2662 return Err(Error::Done);
2663 }
2664
2665 #[cfg(feature = "qlog")]
2666 let offset = stream.recv.off_front();
2667
2668 let (read, fin) = stream.recv.pop(out)?;
2669
2670 self.max_rx_data_next = self.max_rx_data_next.saturating_add(read as u64);
2671
2672 let readable = stream.is_readable();
2673
2674 let complete = stream.is_complete();
2675
2676 let local = stream.local;
2677
2678 if stream.recv.almost_full() {
2679 self.streams.mark_almost_full(stream_id, true);
2680 }
2681
2682 if !readable {
2683 self.streams.mark_readable(stream_id, false);
2684 }
2685
2686 if complete {
2687 self.streams.collect(stream_id, local);
2688 }
2689
2690 qlog_with!(self.qlog_streamer, q, {
2691 let ev = qlog::event::Event::h3_data_moved(
2692 stream_id.to_string(),
2693 Some(offset.to_string()),
2694 Some(read as u64),
2695 Some(qlog::H3DataRecipient::Transport),
2696 None,
2697 None,
2698 );
2699 q.add_event(ev).ok();
2700 });
2701
2702 if self.should_update_max_data() {
2703 self.almost_full = true;
2704 }
2705
2706 Ok((read, fin))
2707 }
2708
2709 /// Writes data to a stream.
2710 ///
2711 /// On success the number of bytes written is returned, or [`Done`] if no
2712 /// data was written (e.g. because the stream has no capacity).
2713 ///
2714 /// Note that in order to avoid buffering an infinite amount of data in the
2715 /// stream's send buffer, streams are only allowed to buffer outgoing data
2716 /// up to the amount that the peer allows it to send (that is, up to the
2717 /// stream's outgoing flow control capacity).
2718 ///
2719 /// This means that the number of written bytes returned can be lower than
2720 /// the length of the input buffer when the stream doesn't have enough
2721 /// capacity for the operation to complete. The application should retry the
2722 /// operation once the stream is reported as writable again.
2723 ///
2724 /// Applications should call this method only after the handshake is
2725 /// completed (whenever [`is_established()`] returns `true`) or during
2726 /// early data if enabled (whenever [`is_in_early_data()`] returns `true`).
2727 ///
2728 /// [`Done`]: enum.Error.html#variant.Done
2729 /// [`is_established()`]: struct.Connection.html#method.is_established
2730 /// [`is_in_early_data()`]: struct.Connection.html#method.is_in_early_data
2731 ///
2732 /// ## Examples:
2733 ///
2734 /// ```no_run
2735 /// # let mut buf = [0; 512];
2736 /// # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
2737 /// # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
2738 /// # let scid = [0xba; 16];
2739 /// # let mut conn = quiche::accept(&scid, None, &mut config)?;
2740 /// # let stream_id = 0;
2741 /// conn.stream_send(stream_id, b"hello", true)?;
2742 /// # Ok::<(), quiche::Error>(())
2743 /// ```
stream_send( &mut self, stream_id: u64, buf: &[u8], fin: bool, ) -> Result<usize>2744 pub fn stream_send(
2745 &mut self, stream_id: u64, buf: &[u8], fin: bool,
2746 ) -> Result<usize> {
2747 // We can't write on the peer's unidirectional streams.
2748 if !stream::is_bidi(stream_id) &&
2749 !stream::is_local(stream_id, self.is_server)
2750 {
2751 return Err(Error::InvalidStreamState);
2752 }
2753
2754 // Mark the connection as blocked if the connection-level flow control
2755 // limit doesn't let us buffer all the data.
2756 //
2757 // Note that this is separate from "send capacity" as that also takes
2758 // congestion control into consideration.
2759 if self.max_tx_data - self.tx_data < buf.len() as u64 {
2760 self.blocked_limit = Some(self.max_tx_data);
2761 }
2762
2763 // Truncate the input buffer based on the connection's send capacity if
2764 // necessary.
2765 let cap = self.send_capacity();
2766
2767 let (buf, fin) = if cap < buf.len() {
2768 (&buf[..cap], false)
2769 } else {
2770 (buf, fin)
2771 };
2772
2773 // Get existing stream or create a new one.
2774 let stream = self.get_or_create_stream(stream_id, true)?;
2775
2776 #[cfg(feature = "qlog")]
2777 let offset = stream.send.off_back();
2778
2779 let was_flushable = stream.is_flushable();
2780
2781 let sent = stream.send.push_slice(buf, fin)?;
2782
2783 let urgency = stream.urgency;
2784 let incremental = stream.incremental;
2785
2786 let flushable = stream.is_flushable();
2787
2788 let writable = stream.is_writable();
2789
2790 let empty_fin = buf.is_empty() && fin;
2791
2792 if sent < buf.len() {
2793 let max_off = stream.send.max_off();
2794
2795 self.streams.mark_blocked(stream_id, true, max_off);
2796 } else {
2797 self.streams.mark_blocked(stream_id, false, 0);
2798 }
2799
2800 // If the stream is now flushable push it to the flushable queue, but
2801 // only if it wasn't already queued.
2802 //
2803 // Consider the stream flushable also when we are sending a zero-length
2804 // frame that has the fin flag set.
2805 if (flushable || empty_fin) && !was_flushable {
2806 self.streams.push_flushable(stream_id, urgency, incremental);
2807 }
2808
2809 if !writable {
2810 self.streams.mark_writable(stream_id, false);
2811 }
2812
2813 self.tx_data += sent as u64;
2814
2815 self.recovery.rate_check_app_limited();
2816
2817 qlog_with!(self.qlog_streamer, q, {
2818 let ev = qlog::event::Event::h3_data_moved(
2819 stream_id.to_string(),
2820 Some(offset.to_string()),
2821 Some(sent as u64),
2822 None,
2823 Some(qlog::H3DataRecipient::Transport),
2824 None,
2825 );
2826 q.add_event(ev).ok();
2827 });
2828
2829 Ok(sent)
2830 }
2831
2832 /// Sets the priority for a stream.
2833 ///
2834 /// A stream's priority determines the order in which stream data is sent
2835 /// on the wire (streams with lower priority are sent first). Streams are
2836 /// created with a default priority of `127`.
2837 ///
2838 /// The target stream is created if it did not exist before calling this
2839 /// method.
stream_priority( &mut self, stream_id: u64, urgency: u8, incremental: bool, ) -> Result<()>2840 pub fn stream_priority(
2841 &mut self, stream_id: u64, urgency: u8, incremental: bool,
2842 ) -> Result<()> {
2843 // Get existing stream or create a new one, but if the stream
2844 // has already been closed and collected, ignore the prioritization.
2845 let stream = match self.get_or_create_stream(stream_id, true) {
2846 Ok(v) => v,
2847
2848 Err(Error::Done) => return Ok(()),
2849
2850 Err(e) => return Err(e),
2851 };
2852
2853 if stream.urgency == urgency && stream.incremental == incremental {
2854 return Ok(());
2855 }
2856
2857 stream.urgency = urgency;
2858 stream.incremental = incremental;
2859
2860 // TODO: reprioritization
2861
2862 Ok(())
2863 }
2864
2865 /// Shuts down reading or writing from/to the specified stream.
2866 ///
2867 /// When the `direction` argument is set to [`Shutdown::Read`], outstanding
2868 /// data in the stream's receive buffer is dropped, and no additional data
2869 /// is added to it. Data received after calling this method is still
2870 /// validated and acked but not stored, and [`stream_recv()`] will not
2871 /// return it to the application.
2872 ///
2873 /// When the `direction` argument is set to [`Shutdown::Write`], outstanding
2874 /// data in the stream's send buffer is dropped, and no additional data
2875 /// is added to it. Data passed to [`stream_send()`] after calling this
2876 /// method will be ignored.
2877 ///
2878 /// [`Shutdown::Read`]: enum.Shutdown.html#variant.Read
2879 /// [`Shutdown::Write`]: enum.Shutdown.html#variant.Write
2880 /// [`stream_recv()`]: struct.Connection.html#method.stream_recv
2881 /// [`stream_send()`]: struct.Connection.html#method.stream_send
stream_shutdown( &mut self, stream_id: u64, direction: Shutdown, _err: u64, ) -> Result<()>2882 pub fn stream_shutdown(
2883 &mut self, stream_id: u64, direction: Shutdown, _err: u64,
2884 ) -> Result<()> {
2885 // Get existing stream.
2886 let stream = self.streams.get_mut(stream_id).ok_or(Error::Done)?;
2887
2888 match direction {
2889 // TODO: send STOP_SENDING
2890 Shutdown::Read => {
2891 stream.recv.shutdown()?;
2892
2893 // Once shutdown, the stream is guaranteed to be non-readable.
2894 self.streams.mark_readable(stream_id, false);
2895 },
2896
2897 // TODO: send RESET_STREAM
2898 Shutdown::Write => {
2899 stream.send.shutdown()?;
2900
2901 // Once shutdown, the stream is guaranteed to be non-writable.
2902 self.streams.mark_writable(stream_id, false);
2903 },
2904 }
2905
2906 Ok(())
2907 }
2908
2909 /// Returns the stream's send capacity in bytes.
stream_capacity(&self, stream_id: u64) -> Result<usize>2910 pub fn stream_capacity(&self, stream_id: u64) -> Result<usize> {
2911 if let Some(stream) = self.streams.get(stream_id) {
2912 let cap = cmp::min(self.send_capacity(), stream.send.cap());
2913 return Ok(cap);
2914 };
2915
2916 Err(Error::InvalidStreamState)
2917 }
2918
2919 /// Returns true if all the data has been read from the specified stream.
2920 ///
2921 /// This instructs the application that all the data received from the
2922 /// peer on the stream has been read, and there won't be anymore in the
2923 /// future.
2924 ///
2925 /// Basically this returns true when the peer either set the `fin` flag
2926 /// for the stream, or sent `RESET_STREAM`.
stream_finished(&self, stream_id: u64) -> bool2927 pub fn stream_finished(&self, stream_id: u64) -> bool {
2928 let stream = match self.streams.get(stream_id) {
2929 Some(v) => v,
2930
2931 None => return true,
2932 };
2933
2934 stream.recv.is_fin()
2935 }
2936
2937 /// Initializes the stream's application data.
2938 ///
2939 /// This can be used by applications to store per-stream information without
2940 /// having to maintain their own stream map.
2941 ///
2942 /// Stream data can only be initialized once. Additional calls to this
2943 /// method will return [`Done`].
2944 ///
2945 /// [`Done`]: enum.Error.html#variant.Done
stream_init_application_data<T>( &mut self, stream_id: u64, data: T, ) -> Result<()> where T: std::any::Any + Send,2946 pub fn stream_init_application_data<T>(
2947 &mut self, stream_id: u64, data: T,
2948 ) -> Result<()>
2949 where
2950 T: std::any::Any + Send,
2951 {
2952 // Get existing stream.
2953 let stream = self.streams.get_mut(stream_id).ok_or(Error::Done)?;
2954
2955 if stream.data.is_some() {
2956 return Err(Error::Done);
2957 }
2958
2959 stream.data = Some(Box::new(data));
2960
2961 Ok(())
2962 }
2963
2964 /// Returns the stream's application data, if any was initialized.
2965 ///
2966 /// This returns a reference to the application data that was initialized
2967 /// by calling [`stream_init_application_data()`].
2968 ///
2969 /// [`stream_init_application_data()`]:
2970 /// struct.Connection.html#method.stream_init_application_data
stream_application_data( &mut self, stream_id: u64, ) -> Option<&mut dyn std::any::Any>2971 pub fn stream_application_data(
2972 &mut self, stream_id: u64,
2973 ) -> Option<&mut dyn std::any::Any> {
2974 // Get existing stream.
2975 let stream = self.streams.get_mut(stream_id)?;
2976
2977 if let Some(ref mut stream_data) = stream.data {
2978 return Some(stream_data.as_mut());
2979 }
2980
2981 None
2982 }
2983
2984 /// Returns an iterator over streams that have outstanding data to read.
2985 ///
2986 /// Note that the iterator will only include streams that were readable at
2987 /// the time the iterator itself was created (i.e. when `readable()` was
2988 /// called). To account for newly readable streams, the iterator needs to
2989 /// be created again.
2990 ///
2991 /// ## Examples:
2992 ///
2993 /// ```no_run
2994 /// # let mut buf = [0; 512];
2995 /// # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
2996 /// # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
2997 /// # let scid = [0xba; 16];
2998 /// # let mut conn = quiche::accept(&scid, None, &mut config)?;
2999 /// // Iterate over readable streams.
3000 /// for stream_id in conn.readable() {
3001 /// // Stream is readable, read until there's no more data.
3002 /// while let Ok((read, fin)) = conn.stream_recv(stream_id, &mut buf) {
3003 /// println!("Got {} bytes on stream {}", read, stream_id);
3004 /// }
3005 /// }
3006 /// # Ok::<(), quiche::Error>(())
3007 /// ```
readable(&self) -> StreamIter3008 pub fn readable(&self) -> StreamIter {
3009 self.streams.readable()
3010 }
3011
3012 /// Returns an iterator over streams that can be written to.
3013 ///
3014 /// A "writable" stream is a stream that has enough flow control capacity to
3015 /// send data to the peer. To avoid buffering an infinite amount of data,
3016 /// streams are only allowed to buffer outgoing data up to the amount that
3017 /// the peer allows to send.
3018 ///
3019 /// Note that the iterator will only include streams that were writable at
3020 /// the time the iterator itself was created (i.e. when `writable()` was
3021 /// called). To account for newly writable streams, the iterator needs to
3022 /// be created again.
3023 ///
3024 /// ## Examples:
3025 ///
3026 /// ```no_run
3027 /// # let mut buf = [0; 512];
3028 /// # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
3029 /// # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
3030 /// # let scid = [0xba; 16];
3031 /// # let mut conn = quiche::accept(&scid, None, &mut config)?;
3032 /// // Iterate over writable streams.
3033 /// for stream_id in conn.writable() {
3034 /// // Stream is writable, write some data.
3035 /// if let Ok(written) = conn.stream_send(stream_id, &buf, false) {
3036 /// println!("Written {} bytes on stream {}", written, stream_id);
3037 /// }
3038 /// }
3039 /// # Ok::<(), quiche::Error>(())
3040 /// ```
writable(&self) -> StreamIter3041 pub fn writable(&self) -> StreamIter {
3042 // If there is not enough connection-level send capacity, none of the
3043 // streams are writable, so return an empty iterator.
3044 if self.send_capacity() == 0 {
3045 return StreamIter::default();
3046 }
3047
3048 self.streams.writable()
3049 }
3050
3051 /// Reads the first received DATAGRAM.
3052 ///
3053 /// On success the DATAGRAM's data is returned along with its size.
3054 ///
3055 /// [`Done`] is returned if there is no data to read.
3056 ///
3057 /// [`BufferTooShort`] is returned if the provided buffer is too small for
3058 /// the DATAGRAM.
3059 ///
3060 /// [`Done`]: enum.Error.html#variant.Done
3061 /// [`BufferTooShort`]: enum.Error.html#variant.BufferTooShort
3062 ///
3063 /// ## Examples:
3064 ///
3065 /// ```no_run
3066 /// # let mut buf = [0; 512];
3067 /// # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
3068 /// # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
3069 /// # let scid = [0xba; 16];
3070 /// # let mut conn = quiche::accept(&scid, None, &mut config)?;
3071 /// let mut dgram_buf = [0; 512];
3072 /// while let Ok((len)) = conn.dgram_recv(&mut dgram_buf) {
3073 /// println!("Got {} bytes of DATAGRAM", len);
3074 /// }
3075 /// # Ok::<(), quiche::Error>(())
3076 /// ```
dgram_recv(&mut self, buf: &mut [u8]) -> Result<usize>3077 pub fn dgram_recv(&mut self, buf: &mut [u8]) -> Result<usize> {
3078 match self.dgram_recv_queue.pop() {
3079 Some(d) => {
3080 if d.len() > buf.len() {
3081 return Err(Error::BufferTooShort);
3082 }
3083
3084 buf[..d.len()].copy_from_slice(&d);
3085 Ok(d.len())
3086 },
3087
3088 None => Err(Error::Done),
3089 }
3090 }
3091
3092 /// Reads the first received DATAGRAM without removing it from the queue.
3093 ///
3094 /// On success the DATAGRAM's data is returned along with the actual number
3095 /// of bytes peeked. The requested length cannot exceed the DATAGRAM's
3096 /// actual length.
3097 ///
3098 /// [`Done`] is returned if there is no data to read.
3099 ///
3100 /// [`BufferTooShort`] is returned if the provided buffer is smaller the
3101 /// number of bytes to peek.
3102 ///
3103 /// [`Done`]: enum.Error.html#variant.Done
3104 /// [`BufferTooShort`]: enum.Error.html#variant.BufferTooShort
dgram_recv_peek(&self, buf: &mut [u8], len: usize) -> Result<usize>3105 pub fn dgram_recv_peek(&self, buf: &mut [u8], len: usize) -> Result<usize> {
3106 self.dgram_recv_queue.peek_front_bytes(buf, len)
3107 }
3108
3109 /// Returns the length of the first stored DATAGRAM.
dgram_recv_front_len(&self) -> Option<usize>3110 pub fn dgram_recv_front_len(&self) -> Option<usize> {
3111 self.dgram_recv_queue.peek_front_len()
3112 }
3113
3114 /// Sends data in a DATAGRAM frame.
3115 ///
3116 /// [`Done`] is returned if no data was written.
3117 /// [`InvalidState`] is returned if the peer does not support DATAGRAM.
3118 /// [`BufferTooShort`] is returned if the DATAGRAM frame length is larger
3119 /// than peer's supported DATAGRAM frame length. Use
3120 /// [`dgram_max_writable_len()`] to get the largest supported DATAGRAM
3121 /// frame length.
3122 ///
3123 /// Note that there is no flow control of DATAGRAM frames, so in order to
3124 /// avoid buffering an infinite amount of frames we apply an internal
3125 /// limit.
3126 ///
3127 /// [`Done`]: enum.Error.html#variant.Done
3128 /// [`InvalidState`]: enum.Error.html#variant.InvalidState
3129 /// [`BufferTooShort`]: enum.Error.html#variant.BufferTooShort
3130 /// [`dgram_max_writable_len()`]:
3131 /// struct.Connection.html#method.dgram_max_writable_len
3132 ///
3133 /// ## Examples:
3134 ///
3135 /// ```no_run
3136 /// # let mut buf = [0; 512];
3137 /// # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
3138 /// # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
3139 /// # let scid = [0xba; 16];
3140 /// # let mut conn = quiche::accept(&scid, None, &mut config)?;
3141 /// conn.dgram_send(b"hello")?;
3142 /// # Ok::<(), quiche::Error>(())
3143 /// ```
dgram_send(&mut self, buf: &[u8]) -> Result<()>3144 pub fn dgram_send(&mut self, buf: &[u8]) -> Result<()> {
3145 let max_payload_len = match self.dgram_max_writable_len() {
3146 Some(v) => v as usize,
3147 None => {
3148 return Err(Error::InvalidState);
3149 },
3150 };
3151
3152 if buf.len() > max_payload_len {
3153 return Err(Error::BufferTooShort);
3154 }
3155
3156 self.dgram_send_queue.push(buf)?;
3157
3158 if self.dgram_send_queue.byte_size() > self.recovery.cwnd_available() {
3159 self.recovery.update_app_limited(false);
3160 }
3161
3162 Ok(())
3163 }
3164
3165 /// Purges queued outgoing DATAGRAMs matching the predicate.
3166 ///
3167 /// In other words, remove all elements `e` such that `f(&e)` returns true.
3168 ///
3169 /// ## Examples:
3170 /// ```no_run
3171 /// # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
3172 /// # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
3173 /// # let scid = [0xba; 16];
3174 /// # let mut conn = quiche::accept(&scid, None, &mut config)?;
3175 /// conn.dgram_send(b"hello")?;
3176 /// conn.dgram_purge_outgoing(&|d: &[u8]| -> bool { d[0] == 0 });
3177 /// # Ok::<(), quiche::Error>(())
3178 /// ```
dgram_purge_outgoing<F: Fn(&[u8]) -> bool>(&mut self, f: F)3179 pub fn dgram_purge_outgoing<F: Fn(&[u8]) -> bool>(&mut self, f: F) {
3180 self.dgram_send_queue.purge(f);
3181 }
3182
3183 /// Returns the maximum DATAGRAM payload that can be sent.
3184 ///
3185 /// [`None`] is returned if the peer hasn't advertised a maximum DATAGRAM
3186 /// frame size.
3187 ///
3188 /// ## Examples:
3189 ///
3190 /// ```no_run
3191 /// # let mut buf = [0; 512];
3192 /// # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
3193 /// # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
3194 /// # let scid = [0xba; 16];
3195 /// # let mut conn = quiche::accept(&scid, None, &mut config)?;
3196 /// if let Some(payload_size) = conn.dgram_max_writable_len() {
3197 /// if payload_size > 5 {
3198 /// conn.dgram_send(b"hello")?;
3199 /// }
3200 /// }
3201 /// # Ok::<(), quiche::Error>(())
3202 /// ```
dgram_max_writable_len(&self) -> Option<usize>3203 pub fn dgram_max_writable_len(&self) -> Option<usize> {
3204 match self.peer_transport_params.max_datagram_frame_size {
3205 None => None,
3206 Some(peer_frame_len) => {
3207 // Start from the maximum packet size...
3208 let mut max_len = self.max_send_udp_payload_len();
3209 // ...subtract the Short packet header overhead...
3210 // (1 byte of pkt_len + len of dcid)
3211 max_len = max_len.saturating_sub(1 + self.dcid.len());
3212 // ...subtract the packet number (max len)...
3213 max_len = max_len.saturating_sub(packet::MAX_PKT_NUM_LEN);
3214 // ...subtract the crypto overhead...
3215 max_len = max_len.saturating_sub(
3216 self.pkt_num_spaces[packet::EPOCH_APPLICATION]
3217 .crypto_overhead()?,
3218 );
3219 // ...clamp to what peer can support...
3220 max_len = cmp::min(peer_frame_len as usize, max_len);
3221 // ...subtract frame overhead, checked for underflow.
3222 max_len.checked_sub(frame::MAX_DGRAM_OVERHEAD)
3223 },
3224 }
3225 }
3226
dgram_enabled(&self) -> bool3227 fn dgram_enabled(&self) -> bool {
3228 self.local_transport_params
3229 .max_datagram_frame_size
3230 .is_none()
3231 }
3232
3233 /// Returns the amount of time until the next timeout event.
3234 ///
3235 /// Once the given duration has elapsed, the [`on_timeout()`] method should
3236 /// be called. A timeout of `None` means that the timer should be disarmed.
3237 ///
3238 /// [`on_timeout()`]: struct.Connection.html#method.on_timeout
timeout(&self) -> Option<time::Duration>3239 pub fn timeout(&self) -> Option<time::Duration> {
3240 if self.is_closed() {
3241 return None;
3242 }
3243
3244 let timeout = if self.draining_timer.is_some() {
3245 // Draining timer takes precedence over all other timers. If it is
3246 // set it means the connection is closing so there's no point in
3247 // processing the other timers.
3248 self.draining_timer
3249 } else {
3250 // Use the lowest timer value (i.e. "sooner") among idle and loss
3251 // detection timers. If they are both unset (i.e. `None`) then the
3252 // result is `None`, but if at least one of them is set then a
3253 // `Some(...)` value is returned.
3254 let timers = [self.idle_timer, self.recovery.loss_detection_timer()];
3255
3256 timers.iter().filter_map(|&x| x).min()
3257 };
3258
3259 if let Some(timeout) = timeout {
3260 let now = time::Instant::now();
3261
3262 if timeout <= now {
3263 return Some(time::Duration::new(0, 0));
3264 }
3265
3266 return Some(timeout.duration_since(now));
3267 }
3268
3269 None
3270 }
3271
3272 /// Processes a timeout event.
3273 ///
3274 /// If no timeout has occurred it does nothing.
on_timeout(&mut self)3275 pub fn on_timeout(&mut self) {
3276 let now = time::Instant::now();
3277
3278 if let Some(draining_timer) = self.draining_timer {
3279 if draining_timer <= now {
3280 trace!("{} draining timeout expired", self.trace_id);
3281
3282 qlog_with!(self.qlog_streamer, q, {
3283 q.finish_log().ok();
3284 });
3285
3286 self.closed = true;
3287 }
3288
3289 // Draining timer takes precedence over all other timers. If it is
3290 // set it means the connection is closing so there's no point in
3291 // processing the other timers.
3292 return;
3293 }
3294
3295 if let Some(timer) = self.idle_timer {
3296 if timer <= now {
3297 trace!("{} idle timeout expired", self.trace_id);
3298
3299 qlog_with!(self.qlog_streamer, q, {
3300 q.finish_log().ok();
3301 });
3302
3303 self.closed = true;
3304 return;
3305 }
3306 }
3307
3308 if let Some(timer) = self.recovery.loss_detection_timer() {
3309 if timer <= now {
3310 trace!("{} loss detection timeout expired", self.trace_id);
3311
3312 self.recovery.on_loss_detection_timeout(
3313 self.handshake_status(),
3314 now,
3315 &self.trace_id,
3316 );
3317
3318 qlog_with!(self.qlog_streamer, q, {
3319 let ev = self.recovery.to_qlog();
3320 q.add_event(ev).ok();
3321 });
3322
3323 return;
3324 }
3325 }
3326 }
3327
3328 /// Closes the connection with the given error and reason.
3329 ///
3330 /// The `app` parameter specifies whether an application close should be
3331 /// sent to the peer. Otherwise a normal connection close is sent.
3332 ///
3333 /// Returns [`Done`] if the connection had already been closed.
3334 ///
3335 /// Note that the connection will not be closed immediately. An application
3336 /// should continue calling [`recv()`], [`send()`] and [`timeout()`] as
3337 /// normal, until the [`is_closed()`] method returns `true`.
3338 ///
3339 /// [`Done`]: enum.Error.html#variant.Done
3340 /// [`recv()`]: struct.Connection.html#method.recv
3341 /// [`send()`]: struct.Connection.html#method.send
3342 /// [`timeout()`]: struct.Connection.html#method.timeout
3343 /// [`is_closed()`]: struct.Connection.html#method.is_closed
close(&mut self, app: bool, err: u64, reason: &[u8]) -> Result<()>3344 pub fn close(&mut self, app: bool, err: u64, reason: &[u8]) -> Result<()> {
3345 if self.is_closed() || self.draining_timer.is_some() {
3346 return Err(Error::Done);
3347 }
3348
3349 if self.error.is_some() || self.app_error.is_some() {
3350 return Err(Error::Done);
3351 }
3352
3353 if app {
3354 self.app_error = Some(err);
3355 self.app_reason.extend_from_slice(reason);
3356 } else {
3357 self.error = Some(err);
3358 }
3359
3360 // When no packet was successfully processed close connection immediately.
3361 if self.recv_count == 0 {
3362 self.closed = true;
3363 }
3364
3365 Ok(())
3366 }
3367
3368 /// Returns a string uniquely representing the connection.
3369 ///
3370 /// This can be used for logging purposes to differentiate between multiple
3371 /// connections.
trace_id(&self) -> &str3372 pub fn trace_id(&self) -> &str {
3373 &self.trace_id
3374 }
3375
3376 /// Returns the negotiated ALPN protocol.
3377 ///
3378 /// If no protocol has been negotiated, the returned value is empty.
application_proto(&self) -> &[u8]3379 pub fn application_proto(&self) -> &[u8] {
3380 self.handshake.alpn_protocol()
3381 }
3382
3383 /// Returns the peer's leaf certificate (if any) as a DER-encoded buffer.
peer_cert(&self) -> Option<Vec<u8>>3384 pub fn peer_cert(&self) -> Option<Vec<u8>> {
3385 self.handshake.peer_cert()
3386 }
3387
3388 /// Returns true if the connection handshake is complete.
is_established(&self) -> bool3389 pub fn is_established(&self) -> bool {
3390 self.handshake.is_completed()
3391 }
3392
3393 /// Returns true if the connection is resumed.
is_resumed(&self) -> bool3394 pub fn is_resumed(&self) -> bool {
3395 self.handshake.is_resumed()
3396 }
3397
3398 /// Returns true if the connection has a pending handshake that has
3399 /// progressed enough to send or receive early data.
is_in_early_data(&self) -> bool3400 pub fn is_in_early_data(&self) -> bool {
3401 self.handshake.is_in_early_data()
3402 }
3403
3404 /// Returns true if the connection is closed.
3405 ///
3406 /// If this returns true, the connection object can be dropped.
is_closed(&self) -> bool3407 pub fn is_closed(&self) -> bool {
3408 self.closed
3409 }
3410
3411 /// Collects and returns statistics about the connection.
stats(&self) -> Stats3412 pub fn stats(&self) -> Stats {
3413 Stats {
3414 recv: self.recv_count,
3415 sent: self.sent_count,
3416 lost: self.recovery.lost_count,
3417 cwnd: self.recovery.cwnd(),
3418 rtt: self.recovery.rtt(),
3419 delivery_rate: self.recovery.delivery_rate(),
3420 }
3421 }
3422
encode_transport_params(&mut self) -> Result<()>3423 fn encode_transport_params(&mut self) -> Result<()> {
3424 let mut raw_params = [0; 128];
3425
3426 let raw_params = TransportParams::encode(
3427 &self.local_transport_params,
3428 self.is_server,
3429 &mut raw_params,
3430 )?;
3431
3432 self.handshake.set_quic_transport_params(raw_params)?;
3433
3434 Ok(())
3435 }
3436
3437 /// Continues the handshake.
3438 ///
3439 /// If the connection is already established, it does nothing.
do_handshake(&mut self) -> Result<()>3440 fn do_handshake(&mut self) -> Result<()> {
3441 // Handshake is already complete, there's nothing to do.
3442 if self.is_established() {
3443 return Ok(());
3444 }
3445
3446 match self.handshake.do_handshake() {
3447 Ok(_) => (),
3448
3449 Err(Error::Done) => return Ok(()),
3450
3451 Err(e) => return Err(e),
3452 };
3453
3454 if self.application_proto().is_empty() {
3455 // Send no_application_proto TLS alert when no protocol
3456 // can be negotiated.
3457 self.error = Some(0x178);
3458 return Err(Error::TlsFail);
3459 }
3460
3461 trace!("{} connection established: proto={:?} cipher={:?} curve={:?} sigalg={:?} resumed={} {:?}",
3462 &self.trace_id,
3463 std::str::from_utf8(self.application_proto()),
3464 self.handshake.cipher(),
3465 self.handshake.curve(),
3466 self.handshake.sigalg(),
3467 self.is_resumed(),
3468 self.peer_transport_params);
3469
3470 Ok(())
3471 }
3472
3473 /// Selects the packet number space for outgoing packets.
write_epoch(&self) -> Result<packet::Epoch>3474 fn write_epoch(&self) -> Result<packet::Epoch> {
3475 // On error send packet in the latest epoch available, but only send
3476 // 1-RTT ones when the handshake is completed.
3477 if self.error.is_some() {
3478 let epoch = match self.handshake.write_level() {
3479 crypto::Level::Initial => packet::EPOCH_INITIAL,
3480 crypto::Level::ZeroRTT => unreachable!(),
3481 crypto::Level::Handshake => packet::EPOCH_HANDSHAKE,
3482 crypto::Level::OneRTT => packet::EPOCH_APPLICATION,
3483 };
3484
3485 if epoch == packet::EPOCH_APPLICATION && !self.is_established() {
3486 // Downgrade the epoch to handshake as the handshake is not
3487 // completed yet.
3488 return Ok(packet::EPOCH_HANDSHAKE);
3489 }
3490
3491 return Ok(epoch);
3492 }
3493
3494 for epoch in packet::EPOCH_INITIAL..packet::EPOCH_COUNT {
3495 // Only send packets in a space when we have the send keys for it.
3496 if self.pkt_num_spaces[epoch].crypto_seal.is_none() {
3497 continue;
3498 }
3499
3500 // We are ready to send data for this packet number space.
3501 if self.pkt_num_spaces[epoch].ready() {
3502 return Ok(epoch);
3503 }
3504
3505 // There are lost frames in this packet number space.
3506 if !self.recovery.lost[epoch].is_empty() {
3507 return Ok(epoch);
3508 }
3509
3510 // We need to send PTO probe packets.
3511 if self.recovery.loss_probes[epoch] > 0 {
3512 return Ok(epoch);
3513 }
3514 }
3515
3516 // If there are flushable, almost full or blocked streams, use the
3517 // Application epoch.
3518 if (self.is_established() || self.is_in_early_data()) &&
3519 (self.almost_full ||
3520 self.blocked_limit.is_some() ||
3521 self.dgram_send_queue.has_pending() ||
3522 self.streams.should_update_max_streams_bidi() ||
3523 self.streams.should_update_max_streams_uni() ||
3524 self.streams.has_flushable() ||
3525 self.streams.has_almost_full() ||
3526 self.streams.has_blocked())
3527 {
3528 return Ok(packet::EPOCH_APPLICATION);
3529 }
3530
3531 Err(Error::Done)
3532 }
3533
3534 /// Returns the mutable stream with the given ID if it exists, or creates
3535 /// a new one otherwise.
get_or_create_stream( &mut self, id: u64, local: bool, ) -> Result<&mut stream::Stream>3536 fn get_or_create_stream(
3537 &mut self, id: u64, local: bool,
3538 ) -> Result<&mut stream::Stream> {
3539 self.streams.get_or_create(
3540 id,
3541 &self.local_transport_params,
3542 &self.peer_transport_params,
3543 local,
3544 self.is_server,
3545 )
3546 }
3547
3548 /// Processes an incoming frame.
process_frame( &mut self, frame: frame::Frame, epoch: packet::Epoch, now: time::Instant, ) -> Result<()>3549 fn process_frame(
3550 &mut self, frame: frame::Frame, epoch: packet::Epoch, now: time::Instant,
3551 ) -> Result<()> {
3552 trace!("{} rx frm {:?}", self.trace_id, frame);
3553
3554 match frame {
3555 frame::Frame::Padding { .. } => (),
3556
3557 frame::Frame::Ping => (),
3558
3559 frame::Frame::ACK { ranges, ack_delay } => {
3560 let ack_delay = ack_delay
3561 .checked_mul(2_u64.pow(
3562 self.peer_transport_params.ack_delay_exponent as u32,
3563 ))
3564 .ok_or(Error::InvalidFrame)?;
3565
3566 if epoch == packet::EPOCH_HANDSHAKE {
3567 self.peer_verified_address = true;
3568 }
3569
3570 // When we receive an ACK for a 1-RTT packet after handshake
3571 // completion, it means the handshake has been confirmed.
3572 if epoch == packet::EPOCH_APPLICATION && self.is_established() {
3573 self.peer_verified_address = true;
3574
3575 self.handshake_confirmed = true;
3576 }
3577
3578 self.recovery.on_ack_received(
3579 &ranges,
3580 ack_delay,
3581 epoch,
3582 self.handshake_status(),
3583 now,
3584 &self.trace_id,
3585 )?;
3586
3587 // Once the handshake is confirmed, we can drop Handshake keys.
3588 if self.handshake_confirmed {
3589 self.drop_epoch_state(packet::EPOCH_HANDSHAKE, now);
3590 }
3591 },
3592
3593 frame::Frame::ResetStream {
3594 stream_id,
3595 final_size,
3596 ..
3597 } => {
3598 // Peer can't send on our unidirectional streams.
3599 if !stream::is_bidi(stream_id) &&
3600 stream::is_local(stream_id, self.is_server)
3601 {
3602 return Err(Error::InvalidStreamState);
3603 }
3604
3605 // Get existing stream or create a new one, but if the stream
3606 // has already been closed and collected, ignore the frame.
3607 //
3608 // This can happen if e.g. an ACK frame is lost, and the peer
3609 // retransmits another frame before it realizes that the stream
3610 // is gone.
3611 //
3612 // Note that it makes it impossible to check if the frame is
3613 // illegal, since we have no state, but since we ignore the
3614 // frame, it should be fine.
3615 let stream = match self.get_or_create_stream(stream_id, false) {
3616 Ok(v) => v,
3617
3618 Err(Error::Done) => return Ok(()),
3619
3620 Err(e) => return Err(e),
3621 };
3622
3623 self.rx_data += stream.recv.reset(final_size)? as u64;
3624
3625 if self.rx_data > self.max_rx_data {
3626 return Err(Error::FlowControl);
3627 }
3628 },
3629
3630 frame::Frame::StopSending { stream_id, .. } => {
3631 // STOP_SENDING on a receive-only stream is a fatal error.
3632 if !stream::is_local(stream_id, self.is_server) &&
3633 !stream::is_bidi(stream_id)
3634 {
3635 return Err(Error::InvalidStreamState);
3636 }
3637 },
3638
3639 frame::Frame::Crypto { data } => {
3640 // Push the data to the stream so it can be re-ordered.
3641 self.pkt_num_spaces[epoch].crypto_stream.recv.push(data)?;
3642
3643 // Feed crypto data to the TLS state, if there's data
3644 // available at the expected offset.
3645 let mut crypto_buf = [0; 512];
3646
3647 let level = crypto::Level::from_epoch(epoch);
3648
3649 let stream = &mut self.pkt_num_spaces[epoch].crypto_stream;
3650
3651 while let Ok((read, _)) = stream.recv.pop(&mut crypto_buf) {
3652 let recv_buf = &crypto_buf[..read];
3653 self.handshake.provide_data(level, &recv_buf)?;
3654 }
3655
3656 self.do_handshake()?;
3657
3658 // Try to parse transport parameters as soon as the first flight
3659 // of handshake data is processed.
3660 //
3661 // This is potentially dangerous as the handshake hasn't been
3662 // completed yet, though it's required to be able to send data
3663 // in 0.5 RTT.
3664 let raw_params = self.handshake.quic_transport_params();
3665
3666 if !self.parsed_peer_transport_params && !raw_params.is_empty() {
3667 let peer_params =
3668 TransportParams::decode(&raw_params, self.is_server)?;
3669
3670 if self.version >= PROTOCOL_VERSION_DRAFT28 {
3671 // Validate initial_source_connection_id.
3672 match &peer_params.initial_source_connection_id {
3673 Some(v) if v != &self.dcid =>
3674 return Err(Error::InvalidTransportParam),
3675
3676 Some(_) => (),
3677
3678 // initial_source_connection_id must be sent by
3679 // both endpoints.
3680 None => return Err(Error::InvalidTransportParam),
3681 }
3682
3683 // Validate original_destination_connection_id.
3684 if let Some(odcid) = &self.odcid {
3685 match &peer_params.original_destination_connection_id
3686 {
3687 Some(v) if v != odcid =>
3688 return Err(Error::InvalidTransportParam),
3689
3690 Some(_) => (),
3691
3692 // original_destination_connection_id must be
3693 // sent by the server.
3694 None if !self.is_server =>
3695 return Err(Error::InvalidTransportParam),
3696
3697 None => (),
3698 }
3699 }
3700
3701 // Validate retry_source_connection_id.
3702 if let Some(rscid) = &self.rscid {
3703 match &peer_params.retry_source_connection_id {
3704 Some(v) if v != rscid =>
3705 return Err(Error::InvalidTransportParam),
3706
3707 Some(_) => (),
3708
3709 // retry_source_connection_id must be sent by
3710 // the server.
3711 None => return Err(Error::InvalidTransportParam),
3712 }
3713 }
3714 } else {
3715 // Legacy validation of the original connection ID when
3716 // stateless retry is performed, for drafts < 28.
3717 if self.did_retry &&
3718 peer_params.original_destination_connection_id !=
3719 self.odcid
3720 {
3721 return Err(Error::InvalidTransportParam);
3722 }
3723 }
3724
3725 // Update flow control limits.
3726 self.max_tx_data = peer_params.initial_max_data;
3727
3728 self.streams.update_peer_max_streams_bidi(
3729 peer_params.initial_max_streams_bidi,
3730 );
3731 self.streams.update_peer_max_streams_uni(
3732 peer_params.initial_max_streams_uni,
3733 );
3734
3735 self.recovery.max_ack_delay =
3736 time::Duration::from_millis(peer_params.max_ack_delay);
3737
3738 self.peer_transport_params = peer_params;
3739
3740 self.parsed_peer_transport_params = true;
3741 }
3742 },
3743
3744 // TODO: implement stateless retry
3745 frame::Frame::NewToken { .. } => (),
3746
3747 frame::Frame::Stream { stream_id, data } => {
3748 // Peer can't send on our unidirectional streams.
3749 if !stream::is_bidi(stream_id) &&
3750 stream::is_local(stream_id, self.is_server)
3751 {
3752 return Err(Error::InvalidStreamState);
3753 }
3754
3755 let max_rx_data_left = self.max_rx_data - self.rx_data;
3756
3757 // Get existing stream or create a new one, but if the stream
3758 // has already been closed and collected, ignore the frame.
3759 //
3760 // This can happen if e.g. an ACK frame is lost, and the peer
3761 // retransmits another frame before it realizes that the stream
3762 // is gone.
3763 //
3764 // Note that it makes it impossible to check if the frame is
3765 // illegal, since we have no state, but since we ignore the
3766 // frame, it should be fine.
3767 let stream = match self.get_or_create_stream(stream_id, false) {
3768 Ok(v) => v,
3769
3770 Err(Error::Done) => return Ok(()),
3771
3772 Err(e) => return Err(e),
3773 };
3774
3775 // Check for the connection-level flow control limit.
3776 let max_off_delta =
3777 data.max_off().saturating_sub(stream.recv.max_off());
3778
3779 if max_off_delta > max_rx_data_left {
3780 return Err(Error::FlowControl);
3781 }
3782
3783 stream.recv.push(data)?;
3784
3785 if stream.is_readable() {
3786 self.streams.mark_readable(stream_id, true);
3787 }
3788
3789 self.rx_data += max_off_delta;
3790 },
3791
3792 frame::Frame::MaxData { max } => {
3793 self.max_tx_data = cmp::max(self.max_tx_data, max);
3794 },
3795
3796 frame::Frame::MaxStreamData { stream_id, max } => {
3797 // Get existing stream or create a new one, but if the stream
3798 // has already been closed and collected, ignore the frame.
3799 //
3800 // This can happen if e.g. an ACK frame is lost, and the peer
3801 // retransmits another frame before it realizes that the stream
3802 // is gone.
3803 //
3804 // Note that it makes it impossible to check if the frame is
3805 // illegal, since we have no state, but since we ignore the
3806 // frame, it should be fine.
3807 let stream = match self.get_or_create_stream(stream_id, false) {
3808 Ok(v) => v,
3809
3810 Err(Error::Done) => return Ok(()),
3811
3812 Err(e) => return Err(e),
3813 };
3814
3815 let was_flushable = stream.is_flushable();
3816
3817 stream.send.update_max_data(max);
3818
3819 let writable = stream.is_writable();
3820
3821 // If the stream is now flushable push it to the flushable queue,
3822 // but only if it wasn't already queued.
3823 if stream.is_flushable() && !was_flushable {
3824 let urgency = stream.urgency;
3825 let incremental = stream.incremental;
3826 self.streams.push_flushable(stream_id, urgency, incremental);
3827 }
3828
3829 if writable {
3830 self.streams.mark_writable(stream_id, true);
3831 }
3832 },
3833
3834 frame::Frame::MaxStreamsBidi { max } => {
3835 if max > MAX_STREAM_ID {
3836 return Err(Error::InvalidFrame);
3837 }
3838
3839 self.streams.update_peer_max_streams_bidi(max);
3840 },
3841
3842 frame::Frame::MaxStreamsUni { max } => {
3843 if max > MAX_STREAM_ID {
3844 return Err(Error::InvalidFrame);
3845 }
3846
3847 self.streams.update_peer_max_streams_uni(max);
3848 },
3849
3850 frame::Frame::DataBlocked { .. } => (),
3851
3852 frame::Frame::StreamDataBlocked { .. } => (),
3853
3854 frame::Frame::StreamsBlockedBidi { limit } =>
3855 if limit > MAX_STREAM_ID {
3856 return Err(Error::InvalidFrame);
3857 },
3858
3859 frame::Frame::StreamsBlockedUni { limit } =>
3860 if limit > MAX_STREAM_ID {
3861 return Err(Error::InvalidFrame);
3862 },
3863
3864 // TODO: implement connection migration
3865 frame::Frame::NewConnectionId { .. } => (),
3866
3867 // TODO: implement connection migration
3868 frame::Frame::RetireConnectionId { .. } => (),
3869
3870 frame::Frame::PathChallenge { data } => {
3871 self.challenge = Some(data);
3872 },
3873
3874 frame::Frame::PathResponse { .. } => (),
3875
3876 frame::Frame::ConnectionClose { .. } => {
3877 self.draining_timer = Some(now + (self.recovery.pto() * 3));
3878 },
3879
3880 frame::Frame::ApplicationClose { .. } => {
3881 self.draining_timer = Some(now + (self.recovery.pto() * 3));
3882 },
3883
3884 frame::Frame::HandshakeDone => {
3885 if self.is_server {
3886 return Err(Error::InvalidPacket);
3887 }
3888
3889 self.peer_verified_address = true;
3890
3891 self.handshake_confirmed = true;
3892
3893 // Once the handshake is confirmed, we can drop Handshake keys.
3894 self.drop_epoch_state(packet::EPOCH_HANDSHAKE, now);
3895 },
3896
3897 frame::Frame::Datagram { data } => {
3898 // Close the connection if DATAGRAMs are not enabled.
3899 // quiche always advertises support for 64K sized DATAGRAM
3900 // frames, as recommended by the standard, so we don't need a
3901 // size check.
3902 if self.dgram_enabled() {
3903 return Err(Error::InvalidState);
3904 }
3905
3906 // If recv queue is full, discard oldest
3907 if self.dgram_recv_queue.is_full() {
3908 self.dgram_recv_queue.pop();
3909 }
3910
3911 self.dgram_recv_queue.push(&data)?;
3912 },
3913 }
3914
3915 Ok(())
3916 }
3917
3918 /// Drops the keys and recovery state for the given epoch.
drop_epoch_state(&mut self, epoch: packet::Epoch, now: time::Instant)3919 fn drop_epoch_state(&mut self, epoch: packet::Epoch, now: time::Instant) {
3920 if self.pkt_num_spaces[epoch].crypto_open.is_none() {
3921 return;
3922 }
3923
3924 self.pkt_num_spaces[epoch].crypto_open = None;
3925 self.pkt_num_spaces[epoch].crypto_seal = None;
3926 self.pkt_num_spaces[epoch].clear();
3927
3928 self.recovery.on_pkt_num_space_discarded(
3929 epoch,
3930 self.handshake_status(),
3931 now,
3932 );
3933
3934 trace!("{} dropped epoch {} state", self.trace_id, epoch);
3935 }
3936
3937 /// Returns true if the connection-level flow control needs to be updated.
3938 ///
3939 /// This happens when the new max data limit is at least double the amount
3940 /// of data that can be received before blocking.
should_update_max_data(&self) -> bool3941 fn should_update_max_data(&self) -> bool {
3942 self.max_rx_data_next != self.max_rx_data &&
3943 self.max_rx_data_next / 2 > self.max_rx_data - self.rx_data
3944 }
3945
3946 /// Returns the idle timeout value.
3947 ///
3948 /// `None` is returned if both end-points disabled the idle timeout.
idle_timeout(&mut self) -> Option<time::Duration>3949 fn idle_timeout(&mut self) -> Option<time::Duration> {
3950 // If the transport parameter is set to 0, then the respective endpoint
3951 // decided to disable the idle timeout. If both are disabled we should
3952 // not set any timeout.
3953 if self.local_transport_params.max_idle_timeout == 0 &&
3954 self.peer_transport_params.max_idle_timeout == 0
3955 {
3956 return None;
3957 }
3958
3959 // If the local endpoint or the peer disabled the idle timeout, use the
3960 // other peer's value, otherwise use the minimum of the two values.
3961 let idle_timeout = if self.local_transport_params.max_idle_timeout == 0 {
3962 self.peer_transport_params.max_idle_timeout
3963 } else if self.peer_transport_params.max_idle_timeout == 0 {
3964 self.local_transport_params.max_idle_timeout
3965 } else {
3966 cmp::min(
3967 self.local_transport_params.max_idle_timeout,
3968 self.peer_transport_params.max_idle_timeout,
3969 )
3970 };
3971
3972 let idle_timeout = time::Duration::from_millis(idle_timeout);
3973 let idle_timeout = cmp::max(idle_timeout, 3 * self.recovery.pto());
3974
3975 Some(idle_timeout)
3976 }
3977
3978 /// Returns the connection's overall send capacity.
send_capacity(&self) -> usize3979 fn send_capacity(&self) -> usize {
3980 let cap = self.max_tx_data - self.tx_data;
3981 cmp::min(cap, self.recovery.cwnd_available() as u64) as usize
3982 }
3983
3984 /// Returns the connection's handshake status for use in loss recovery.
handshake_status(&self) -> recovery::HandshakeStatus3985 fn handshake_status(&self) -> recovery::HandshakeStatus {
3986 recovery::HandshakeStatus {
3987 has_handshake_keys: self.pkt_num_spaces[packet::EPOCH_HANDSHAKE]
3988 .has_keys(),
3989
3990 peer_verified_address: self.peer_verified_address,
3991
3992 completed: self.is_established(),
3993 }
3994 }
3995 }
3996
3997 /// Maps an `Error` to `Error::Done`, or itself.
3998 ///
3999 /// When a received packet that hasn't yet been authenticated triggers a failure
4000 /// it should, in most cases, be ignored, instead of raising a connection error,
4001 /// to avoid potential man-in-the-middle and man-on-the-side attacks.
4002 ///
4003 /// However, if no other packet was previously received, the connection should
4004 /// indeed be closed as the received packet might just be network background
4005 /// noise, and it shouldn't keep resources occupied indefinitely.
4006 ///
4007 /// This function maps an error to `Error::Done` to ignore a packet failure
4008 /// without aborting the connection, except when no other packet was previously
4009 /// received, in which case the error itself is returned, but only on the
4010 /// server-side as the client will already have armed the idle timer.
4011 ///
4012 /// This must only be used for errors preceding packet authentication. Failures
4013 /// happening after a packet has been authenticated should still cause the
4014 /// connection to be aborted.
drop_pkt_on_err( e: Error, recv_count: usize, is_server: bool, trace_id: &str, ) -> Error4015 fn drop_pkt_on_err(
4016 e: Error, recv_count: usize, is_server: bool, trace_id: &str,
4017 ) -> Error {
4018 // On the server, if no other packet has been successflully processed, abort
4019 // the connection to avoid keeping the connection open when only junk is
4020 // received.
4021 if is_server && recv_count == 0 {
4022 return e;
4023 }
4024
4025 trace!("{} dropped invalid packet", trace_id);
4026
4027 // Ignore other invalid packets that haven't been authenticated to prevent
4028 // man-in-the-middle and man-on-the-side attacks.
4029 Error::Done
4030 }
4031
4032 /// Statistics about the connection.
4033 ///
4034 /// A connections's statistics can be collected using the [`stats()`] method.
4035 ///
4036 /// [`stats()`]: struct.Connection.html#method.stats
4037 #[derive(Clone)]
4038 pub struct Stats {
4039 /// The number of QUIC packets received on this connection.
4040 pub recv: usize,
4041
4042 /// The number of QUIC packets sent on this connection.
4043 pub sent: usize,
4044
4045 /// The number of QUIC packets that were lost.
4046 pub lost: usize,
4047
4048 /// The estimated round-trip time of the connection.
4049 pub rtt: time::Duration,
4050
4051 /// The size of the connection's congestion window in bytes.
4052 pub cwnd: usize,
4053
4054 /// The estimated data delivery rate in bytes/s.
4055 pub delivery_rate: u64,
4056 }
4057
4058 impl std::fmt::Debug for Stats {
fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result4059 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
4060 write!(
4061 f,
4062 "recv={} sent={} lost={} rtt={:?} cwnd={} delivery_rate={}",
4063 self.recv,
4064 self.sent,
4065 self.lost,
4066 self.rtt,
4067 self.cwnd,
4068 self.delivery_rate
4069 )
4070 }
4071 }
4072
4073 #[derive(Clone, Debug, PartialEq)]
4074 struct TransportParams {
4075 pub original_destination_connection_id: Option<Vec<u8>>,
4076 pub max_idle_timeout: u64,
4077 pub stateless_reset_token: Option<Vec<u8>>,
4078 pub max_udp_payload_size: u64,
4079 pub initial_max_data: u64,
4080 pub initial_max_stream_data_bidi_local: u64,
4081 pub initial_max_stream_data_bidi_remote: u64,
4082 pub initial_max_stream_data_uni: u64,
4083 pub initial_max_streams_bidi: u64,
4084 pub initial_max_streams_uni: u64,
4085 pub ack_delay_exponent: u64,
4086 pub max_ack_delay: u64,
4087 pub disable_active_migration: bool,
4088 // pub preferred_address: ...,
4089 pub active_conn_id_limit: u64,
4090 pub initial_source_connection_id: Option<Vec<u8>>,
4091 pub retry_source_connection_id: Option<Vec<u8>>,
4092 pub max_datagram_frame_size: Option<u64>,
4093 }
4094
4095 impl Default for TransportParams {
default() -> TransportParams4096 fn default() -> TransportParams {
4097 TransportParams {
4098 original_destination_connection_id: None,
4099 max_idle_timeout: 0,
4100 stateless_reset_token: None,
4101 max_udp_payload_size: 65527,
4102 initial_max_data: 0,
4103 initial_max_stream_data_bidi_local: 0,
4104 initial_max_stream_data_bidi_remote: 0,
4105 initial_max_stream_data_uni: 0,
4106 initial_max_streams_bidi: 0,
4107 initial_max_streams_uni: 0,
4108 ack_delay_exponent: 3,
4109 max_ack_delay: 25,
4110 disable_active_migration: false,
4111 active_conn_id_limit: 2,
4112 initial_source_connection_id: None,
4113 retry_source_connection_id: None,
4114 max_datagram_frame_size: None,
4115 }
4116 }
4117 }
4118
4119 impl TransportParams {
decode(buf: &[u8], is_server: bool) -> Result<TransportParams>4120 fn decode(buf: &[u8], is_server: bool) -> Result<TransportParams> {
4121 let mut params = octets::Octets::with_slice(buf);
4122
4123 let mut tp = TransportParams::default();
4124
4125 while params.cap() > 0 {
4126 let id = params.get_varint()?;
4127
4128 let mut val = params.get_bytes_with_varint_length()?;
4129
4130 // TODO: forbid duplicated param
4131
4132 match id {
4133 0x0000 => {
4134 if is_server {
4135 return Err(Error::InvalidTransportParam);
4136 }
4137
4138 tp.original_destination_connection_id = Some(val.to_vec());
4139 },
4140
4141 0x0001 => {
4142 tp.max_idle_timeout = val.get_varint()?;
4143 },
4144
4145 0x0002 => {
4146 if is_server {
4147 return Err(Error::InvalidTransportParam);
4148 }
4149
4150 tp.stateless_reset_token = Some(val.get_bytes(16)?.to_vec());
4151 },
4152
4153 0x0003 => {
4154 tp.max_udp_payload_size = val.get_varint()?;
4155
4156 if tp.max_udp_payload_size < 1200 {
4157 return Err(Error::InvalidTransportParam);
4158 }
4159 },
4160
4161 0x0004 => {
4162 tp.initial_max_data = val.get_varint()?;
4163 },
4164
4165 0x0005 => {
4166 tp.initial_max_stream_data_bidi_local = val.get_varint()?;
4167 },
4168
4169 0x0006 => {
4170 tp.initial_max_stream_data_bidi_remote = val.get_varint()?;
4171 },
4172
4173 0x0007 => {
4174 tp.initial_max_stream_data_uni = val.get_varint()?;
4175 },
4176
4177 0x0008 => {
4178 let max = val.get_varint()?;
4179
4180 if max > MAX_STREAM_ID {
4181 return Err(Error::InvalidTransportParam);
4182 }
4183
4184 tp.initial_max_streams_bidi = max;
4185 },
4186
4187 0x0009 => {
4188 let max = val.get_varint()?;
4189
4190 if max > MAX_STREAM_ID {
4191 return Err(Error::InvalidTransportParam);
4192 }
4193
4194 tp.initial_max_streams_uni = max;
4195 },
4196
4197 0x000a => {
4198 let ack_delay_exponent = val.get_varint()?;
4199
4200 if ack_delay_exponent > 20 {
4201 return Err(Error::InvalidTransportParam);
4202 }
4203
4204 tp.ack_delay_exponent = ack_delay_exponent;
4205 },
4206
4207 0x000b => {
4208 let max_ack_delay = val.get_varint()?;
4209
4210 if max_ack_delay >= 2_u64.pow(14) {
4211 return Err(Error::InvalidTransportParam);
4212 }
4213
4214 tp.max_ack_delay = max_ack_delay;
4215 },
4216
4217 0x000c => {
4218 tp.disable_active_migration = true;
4219 },
4220
4221 0x000d => {
4222 if is_server {
4223 return Err(Error::InvalidTransportParam);
4224 }
4225
4226 // TODO: decode preferred_address
4227 },
4228
4229 0x000e => {
4230 let limit = val.get_varint()?;
4231
4232 if limit < 2 {
4233 return Err(Error::InvalidTransportParam);
4234 }
4235
4236 tp.active_conn_id_limit = limit;
4237 },
4238
4239 0x000f => {
4240 tp.initial_source_connection_id = Some(val.to_vec());
4241 },
4242
4243 0x00010 => {
4244 if is_server {
4245 return Err(Error::InvalidTransportParam);
4246 }
4247
4248 tp.retry_source_connection_id = Some(val.to_vec());
4249 },
4250
4251 0x0020 => {
4252 tp.max_datagram_frame_size = Some(val.get_varint()?);
4253 },
4254
4255 // Ignore unknown parameters.
4256 _ => (),
4257 }
4258 }
4259
4260 Ok(tp)
4261 }
4262
encode_param( b: &mut octets::OctetsMut, ty: u64, len: usize, ) -> Result<()>4263 fn encode_param(
4264 b: &mut octets::OctetsMut, ty: u64, len: usize,
4265 ) -> Result<()> {
4266 b.put_varint(ty)?;
4267 b.put_varint(len as u64)?;
4268
4269 Ok(())
4270 }
4271
encode<'a>( tp: &TransportParams, is_server: bool, out: &'a mut [u8], ) -> Result<&'a mut [u8]>4272 fn encode<'a>(
4273 tp: &TransportParams, is_server: bool, out: &'a mut [u8],
4274 ) -> Result<&'a mut [u8]> {
4275 let mut b = octets::OctetsMut::with_slice(out);
4276
4277 if is_server {
4278 if let Some(ref odcid) = tp.original_destination_connection_id {
4279 TransportParams::encode_param(&mut b, 0x0000, odcid.len())?;
4280 b.put_bytes(&odcid)?;
4281 }
4282 };
4283
4284 if tp.max_idle_timeout != 0 {
4285 TransportParams::encode_param(
4286 &mut b,
4287 0x0001,
4288 octets::varint_len(tp.max_idle_timeout),
4289 )?;
4290 b.put_varint(tp.max_idle_timeout)?;
4291 }
4292
4293 if is_server {
4294 if let Some(ref token) = tp.stateless_reset_token {
4295 TransportParams::encode_param(&mut b, 0x0002, token.len())?;
4296 b.put_bytes(&token)?;
4297 }
4298 }
4299
4300 if tp.max_udp_payload_size != 0 {
4301 TransportParams::encode_param(
4302 &mut b,
4303 0x0003,
4304 octets::varint_len(tp.max_udp_payload_size),
4305 )?;
4306 b.put_varint(tp.max_udp_payload_size)?;
4307 }
4308
4309 if tp.initial_max_data != 0 {
4310 TransportParams::encode_param(
4311 &mut b,
4312 0x0004,
4313 octets::varint_len(tp.initial_max_data),
4314 )?;
4315 b.put_varint(tp.initial_max_data)?;
4316 }
4317
4318 if tp.initial_max_stream_data_bidi_local != 0 {
4319 TransportParams::encode_param(
4320 &mut b,
4321 0x0005,
4322 octets::varint_len(tp.initial_max_stream_data_bidi_local),
4323 )?;
4324 b.put_varint(tp.initial_max_stream_data_bidi_local)?;
4325 }
4326
4327 if tp.initial_max_stream_data_bidi_remote != 0 {
4328 TransportParams::encode_param(
4329 &mut b,
4330 0x0006,
4331 octets::varint_len(tp.initial_max_stream_data_bidi_remote),
4332 )?;
4333 b.put_varint(tp.initial_max_stream_data_bidi_remote)?;
4334 }
4335
4336 if tp.initial_max_stream_data_uni != 0 {
4337 TransportParams::encode_param(
4338 &mut b,
4339 0x0007,
4340 octets::varint_len(tp.initial_max_stream_data_uni),
4341 )?;
4342 b.put_varint(tp.initial_max_stream_data_uni)?;
4343 }
4344
4345 if tp.initial_max_streams_bidi != 0 {
4346 TransportParams::encode_param(
4347 &mut b,
4348 0x0008,
4349 octets::varint_len(tp.initial_max_streams_bidi),
4350 )?;
4351 b.put_varint(tp.initial_max_streams_bidi)?;
4352 }
4353
4354 if tp.initial_max_streams_uni != 0 {
4355 TransportParams::encode_param(
4356 &mut b,
4357 0x0009,
4358 octets::varint_len(tp.initial_max_streams_uni),
4359 )?;
4360 b.put_varint(tp.initial_max_streams_uni)?;
4361 }
4362
4363 if tp.ack_delay_exponent != 0 {
4364 TransportParams::encode_param(
4365 &mut b,
4366 0x000a,
4367 octets::varint_len(tp.ack_delay_exponent),
4368 )?;
4369 b.put_varint(tp.ack_delay_exponent)?;
4370 }
4371
4372 if tp.max_ack_delay != 0 {
4373 TransportParams::encode_param(
4374 &mut b,
4375 0x000b,
4376 octets::varint_len(tp.max_ack_delay),
4377 )?;
4378 b.put_varint(tp.max_ack_delay)?;
4379 }
4380
4381 if tp.disable_active_migration {
4382 TransportParams::encode_param(&mut b, 0x000c, 0)?;
4383 }
4384
4385 // TODO: encode preferred_address
4386
4387 if tp.active_conn_id_limit != 2 {
4388 TransportParams::encode_param(
4389 &mut b,
4390 0x000e,
4391 octets::varint_len(tp.active_conn_id_limit),
4392 )?;
4393 b.put_varint(tp.active_conn_id_limit)?;
4394 }
4395
4396 if let Some(scid) = &tp.initial_source_connection_id {
4397 TransportParams::encode_param(&mut b, 0x000f, scid.len())?;
4398 b.put_bytes(&scid)?;
4399 }
4400
4401 if is_server {
4402 if let Some(scid) = &tp.retry_source_connection_id {
4403 TransportParams::encode_param(&mut b, 0x0010, scid.len())?;
4404 b.put_bytes(&scid)?;
4405 }
4406 }
4407
4408 if let Some(max_datagram_frame_size) = tp.max_datagram_frame_size {
4409 TransportParams::encode_param(
4410 &mut b,
4411 0x0020,
4412 octets::varint_len(max_datagram_frame_size),
4413 )?;
4414 b.put_varint(max_datagram_frame_size)?;
4415 }
4416
4417 let out_len = b.off();
4418
4419 Ok(&mut out[..out_len])
4420 }
4421
4422 /// Creates a qlog event for connection transport parameters and TLS fields
4423 #[cfg(feature = "qlog")]
to_qlog( &self, owner: qlog::TransportOwner, version: u32, alpn: &[u8], cipher: Option<crypto::Algorithm>, ) -> qlog::event::Event4424 pub fn to_qlog(
4425 &self, owner: qlog::TransportOwner, version: u32, alpn: &[u8],
4426 cipher: Option<crypto::Algorithm>,
4427 ) -> qlog::event::Event {
4428 let ocid = qlog::HexSlice::maybe_string(
4429 self.original_destination_connection_id.as_ref(),
4430 );
4431 let stateless_reset_token =
4432 qlog::HexSlice::maybe_string(self.stateless_reset_token.as_ref());
4433
4434 qlog::event::Event::transport_parameters_set(
4435 Some(owner),
4436 None, // resumption
4437 None, // early data
4438 String::from_utf8(alpn.to_vec()).ok(),
4439 Some(format!("{:x?}", version)),
4440 Some(format!("{:?}", cipher)),
4441 ocid,
4442 stateless_reset_token,
4443 Some(self.disable_active_migration),
4444 Some(self.max_idle_timeout),
4445 Some(self.max_udp_payload_size),
4446 Some(self.ack_delay_exponent),
4447 Some(self.max_ack_delay),
4448 Some(self.active_conn_id_limit),
4449 Some(self.initial_max_data.to_string()),
4450 Some(self.initial_max_stream_data_bidi_local.to_string()),
4451 Some(self.initial_max_stream_data_bidi_remote.to_string()),
4452 Some(self.initial_max_stream_data_uni.to_string()),
4453 Some(self.initial_max_streams_bidi.to_string()),
4454 Some(self.initial_max_streams_uni.to_string()),
4455 None, // preferred address
4456 )
4457 }
4458 }
4459
4460 #[doc(hidden)]
4461 pub mod testing {
4462 use super::*;
4463
4464 pub struct Pipe {
4465 pub client: Pin<Box<Connection>>,
4466 pub server: Pin<Box<Connection>>,
4467 }
4468
4469 impl Pipe {
default() -> Result<Pipe>4470 pub fn default() -> Result<Pipe> {
4471 let mut config = Config::new(crate::PROTOCOL_VERSION)?;
4472 config.load_cert_chain_from_pem_file("examples/cert.crt")?;
4473 config.load_priv_key_from_pem_file("examples/cert.key")?;
4474 config.set_application_protos(b"\x06proto1\x06proto2")?;
4475 config.set_initial_max_data(30);
4476 config.set_initial_max_stream_data_bidi_local(15);
4477 config.set_initial_max_stream_data_bidi_remote(15);
4478 config.set_initial_max_stream_data_uni(10);
4479 config.set_initial_max_streams_bidi(3);
4480 config.set_initial_max_streams_uni(3);
4481 config.set_max_idle_timeout(180_000);
4482 config.verify_peer(false);
4483
4484 Pipe::with_config(&mut config)
4485 }
4486
with_config(config: &mut Config) -> Result<Pipe>4487 pub fn with_config(config: &mut Config) -> Result<Pipe> {
4488 let mut client_scid = [0; 16];
4489 rand::rand_bytes(&mut client_scid[..]);
4490
4491 let mut server_scid = [0; 16];
4492 rand::rand_bytes(&mut server_scid[..]);
4493
4494 Ok(Pipe {
4495 client: connect(Some("quic.tech"), &client_scid, config)?,
4496 server: accept(&server_scid, None, config)?,
4497 })
4498 }
4499
with_client_config(client_config: &mut Config) -> Result<Pipe>4500 pub fn with_client_config(client_config: &mut Config) -> Result<Pipe> {
4501 let mut client_scid = [0; 16];
4502 rand::rand_bytes(&mut client_scid[..]);
4503
4504 let mut server_scid = [0; 16];
4505 rand::rand_bytes(&mut server_scid[..]);
4506
4507 let mut config = Config::new(crate::PROTOCOL_VERSION)?;
4508 config.load_cert_chain_from_pem_file("examples/cert.crt")?;
4509 config.load_priv_key_from_pem_file("examples/cert.key")?;
4510 config.set_application_protos(b"\x06proto1\x06proto2")?;
4511 config.set_initial_max_data(30);
4512 config.set_initial_max_stream_data_bidi_local(15);
4513 config.set_initial_max_stream_data_bidi_remote(15);
4514 config.set_initial_max_streams_bidi(3);
4515 config.set_initial_max_streams_uni(3);
4516
4517 Ok(Pipe {
4518 client: connect(Some("quic.tech"), &client_scid, client_config)?,
4519 server: accept(&server_scid, None, &mut config)?,
4520 })
4521 }
4522
with_server_config(server_config: &mut Config) -> Result<Pipe>4523 pub fn with_server_config(server_config: &mut Config) -> Result<Pipe> {
4524 let mut client_scid = [0; 16];
4525 rand::rand_bytes(&mut client_scid[..]);
4526
4527 let mut server_scid = [0; 16];
4528 rand::rand_bytes(&mut server_scid[..]);
4529
4530 let mut config = Config::new(crate::PROTOCOL_VERSION)?;
4531 config.set_application_protos(b"\x06proto1\x06proto2")?;
4532 config.set_initial_max_data(30);
4533 config.set_initial_max_stream_data_bidi_local(15);
4534 config.set_initial_max_stream_data_bidi_remote(15);
4535 config.set_initial_max_streams_bidi(3);
4536 config.set_initial_max_streams_uni(3);
4537
4538 Ok(Pipe {
4539 client: connect(Some("quic.tech"), &client_scid, &mut config)?,
4540 server: accept(&server_scid, None, server_config)?,
4541 })
4542 }
4543
handshake(&mut self, buf: &mut [u8]) -> Result<()>4544 pub fn handshake(&mut self, buf: &mut [u8]) -> Result<()> {
4545 let mut len = self.client.send(buf)?;
4546
4547 while !self.client.is_established() && !self.server.is_established() {
4548 len = recv_send(&mut self.server, buf, len)?;
4549 len = recv_send(&mut self.client, buf, len)?;
4550 }
4551
4552 recv_send(&mut self.server, buf, len)?;
4553
4554 Ok(())
4555 }
4556
flush_client(&mut self, buf: &mut [u8]) -> Result<()>4557 pub fn flush_client(&mut self, buf: &mut [u8]) -> Result<()> {
4558 loop {
4559 let len = match self.client.send(buf) {
4560 Ok(v) => v,
4561
4562 Err(Error::Done) => break,
4563
4564 Err(e) => return Err(e),
4565 };
4566
4567 match self.server.recv(&mut buf[..len]) {
4568 Ok(_) => (),
4569
4570 Err(Error::Done) => (),
4571
4572 Err(e) => return Err(e),
4573 }
4574 }
4575
4576 Ok(())
4577 }
4578
flush_server(&mut self, buf: &mut [u8]) -> Result<()>4579 pub fn flush_server(&mut self, buf: &mut [u8]) -> Result<()> {
4580 loop {
4581 let len = match self.server.send(buf) {
4582 Ok(v) => v,
4583
4584 Err(Error::Done) => break,
4585
4586 Err(e) => return Err(e),
4587 };
4588
4589 match self.client.recv(&mut buf[..len]) {
4590 Ok(_) => (),
4591
4592 Err(Error::Done) => (),
4593
4594 Err(e) => return Err(e),
4595 }
4596 }
4597
4598 Ok(())
4599 }
4600
advance(&mut self, buf: &mut [u8]) -> Result<()>4601 pub fn advance(&mut self, buf: &mut [u8]) -> Result<()> {
4602 let mut client_done = false;
4603 let mut server_done = false;
4604
4605 let mut len = 0;
4606
4607 while !client_done || !server_done {
4608 len = recv_send(&mut self.client, buf, len)?;
4609 client_done = len == 0;
4610
4611 len = recv_send(&mut self.server, buf, len)?;
4612 server_done = len == 0;
4613 }
4614
4615 Ok(())
4616 }
4617
send_pkt_to_server( &mut self, pkt_type: packet::Type, frames: &[frame::Frame], buf: &mut [u8], ) -> Result<usize>4618 pub fn send_pkt_to_server(
4619 &mut self, pkt_type: packet::Type, frames: &[frame::Frame],
4620 buf: &mut [u8],
4621 ) -> Result<usize> {
4622 let written = encode_pkt(&mut self.client, pkt_type, frames, buf)?;
4623 recv_send(&mut self.server, buf, written)
4624 }
4625 }
4626
recv_send( conn: &mut Connection, buf: &mut [u8], len: usize, ) -> Result<usize>4627 pub fn recv_send(
4628 conn: &mut Connection, buf: &mut [u8], len: usize,
4629 ) -> Result<usize> {
4630 let mut left = len;
4631
4632 while left > 0 {
4633 match conn.recv(&mut buf[len - left..len]) {
4634 Ok(read) => left -= read,
4635
4636 Err(Error::Done) => break,
4637
4638 Err(e) => return Err(e),
4639 }
4640 }
4641
4642 assert_eq!(left, 0);
4643
4644 let mut off = 0;
4645
4646 while off < buf.len() {
4647 match conn.send(&mut buf[off..]) {
4648 Ok(write) => off += write,
4649
4650 Err(Error::Done) => break,
4651
4652 Err(e) => return Err(e),
4653 }
4654 }
4655
4656 Ok(off)
4657 }
4658
encode_pkt( conn: &mut Connection, pkt_type: packet::Type, frames: &[frame::Frame], buf: &mut [u8], ) -> Result<usize>4659 pub fn encode_pkt(
4660 conn: &mut Connection, pkt_type: packet::Type, frames: &[frame::Frame],
4661 buf: &mut [u8],
4662 ) -> Result<usize> {
4663 let mut b = octets::OctetsMut::with_slice(buf);
4664
4665 let epoch = pkt_type.to_epoch()?;
4666
4667 let space = &mut conn.pkt_num_spaces[epoch];
4668
4669 let pn = space.next_pkt_num;
4670 let pn_len = packet::pkt_num_len(pn)?;
4671
4672 let hdr = Header {
4673 ty: pkt_type,
4674 version: conn.version,
4675 dcid: conn.dcid.clone(),
4676 scid: conn.scid.clone(),
4677 pkt_num: 0,
4678 pkt_num_len: pn_len,
4679 token: conn.token.clone(),
4680 versions: None,
4681 key_phase: false,
4682 };
4683
4684 hdr.to_bytes(&mut b)?;
4685
4686 let payload_len = frames.iter().fold(0, |acc, x| acc + x.wire_len()) +
4687 space.crypto_overhead().unwrap();
4688
4689 if pkt_type != packet::Type::Short {
4690 let len = pn_len + payload_len;
4691 b.put_varint(len as u64)?;
4692 }
4693
4694 packet::encode_pkt_num(pn, &mut b)?;
4695
4696 let payload_offset = b.off();
4697
4698 for frame in frames {
4699 frame.to_bytes(&mut b)?;
4700 }
4701
4702 let aead = match space.crypto_seal {
4703 Some(ref v) => v,
4704 None => return Err(Error::InvalidState),
4705 };
4706
4707 let written = packet::encrypt_pkt(
4708 &mut b,
4709 pn,
4710 pn_len,
4711 payload_len,
4712 payload_offset,
4713 aead,
4714 )?;
4715
4716 space.next_pkt_num += 1;
4717
4718 Ok(written)
4719 }
4720
decode_pkt( conn: &mut Connection, buf: &mut [u8], len: usize, ) -> Result<Vec<frame::Frame>>4721 pub fn decode_pkt(
4722 conn: &mut Connection, buf: &mut [u8], len: usize,
4723 ) -> Result<Vec<frame::Frame>> {
4724 let mut b = octets::OctetsMut::with_slice(&mut buf[..len]);
4725
4726 let mut hdr = Header::from_bytes(&mut b, conn.scid.len()).unwrap();
4727
4728 let epoch = hdr.ty.to_epoch()?;
4729
4730 let aead = conn.pkt_num_spaces[epoch].crypto_open.as_ref().unwrap();
4731
4732 let payload_len = b.cap();
4733
4734 packet::decrypt_hdr(&mut b, &mut hdr, &aead).unwrap();
4735
4736 let pn = packet::decode_pkt_num(
4737 conn.pkt_num_spaces[epoch].largest_rx_pkt_num,
4738 hdr.pkt_num,
4739 hdr.pkt_num_len,
4740 );
4741
4742 let mut payload =
4743 packet::decrypt_pkt(&mut b, pn, hdr.pkt_num_len, payload_len, aead)
4744 .unwrap();
4745
4746 let mut frames = Vec::new();
4747
4748 while payload.cap() > 0 {
4749 let frame = frame::Frame::from_bytes(&mut payload, hdr.ty)?;
4750 frames.push(frame);
4751 }
4752
4753 Ok(frames)
4754 }
4755 }
4756
4757 #[cfg(test)]
4758 mod tests {
4759 use super::*;
4760
4761 #[test]
transport_params()4762 fn transport_params() {
4763 // Server encodes, client decodes.
4764 let tp = TransportParams {
4765 original_destination_connection_id: None,
4766 max_idle_timeout: 30,
4767 stateless_reset_token: Some(vec![0xba; 16]),
4768 max_udp_payload_size: 23_421,
4769 initial_max_data: 424_645_563,
4770 initial_max_stream_data_bidi_local: 154_323_123,
4771 initial_max_stream_data_bidi_remote: 6_587_456,
4772 initial_max_stream_data_uni: 2_461_234,
4773 initial_max_streams_bidi: 12_231,
4774 initial_max_streams_uni: 18_473,
4775 ack_delay_exponent: 20,
4776 max_ack_delay: 2_u64.pow(14) - 1,
4777 disable_active_migration: true,
4778 active_conn_id_limit: 8,
4779 initial_source_connection_id: Some(b"woot woot".to_vec()),
4780 retry_source_connection_id: Some(b"retry".to_vec()),
4781 max_datagram_frame_size: Some(32),
4782 };
4783
4784 let mut raw_params = [42; 256];
4785 let raw_params =
4786 TransportParams::encode(&tp, true, &mut raw_params).unwrap();
4787 assert_eq!(raw_params.len(), 94);
4788
4789 let new_tp = TransportParams::decode(&raw_params, false).unwrap();
4790
4791 assert_eq!(new_tp, tp);
4792
4793 // Client encodes, server decodes.
4794 let tp = TransportParams {
4795 original_destination_connection_id: None,
4796 max_idle_timeout: 30,
4797 stateless_reset_token: None,
4798 max_udp_payload_size: 23_421,
4799 initial_max_data: 424_645_563,
4800 initial_max_stream_data_bidi_local: 154_323_123,
4801 initial_max_stream_data_bidi_remote: 6_587_456,
4802 initial_max_stream_data_uni: 2_461_234,
4803 initial_max_streams_bidi: 12_231,
4804 initial_max_streams_uni: 18_473,
4805 ack_delay_exponent: 20,
4806 max_ack_delay: 2_u64.pow(14) - 1,
4807 disable_active_migration: true,
4808 active_conn_id_limit: 8,
4809 initial_source_connection_id: Some(b"woot woot".to_vec()),
4810 retry_source_connection_id: None,
4811 max_datagram_frame_size: Some(32),
4812 };
4813
4814 let mut raw_params = [42; 256];
4815 let raw_params =
4816 TransportParams::encode(&tp, false, &mut raw_params).unwrap();
4817 assert_eq!(raw_params.len(), 69);
4818
4819 let new_tp = TransportParams::decode(&raw_params, true).unwrap();
4820
4821 assert_eq!(new_tp, tp);
4822 }
4823
4824 #[test]
4825 #[ignore = "Android: failure reason unkown."]
unknown_version()4826 fn unknown_version() {
4827 let mut buf = [0; 65535];
4828
4829 let mut config = Config::new(0xbabababa).unwrap();
4830 config.verify_peer(false);
4831
4832 let mut pipe = testing::Pipe::with_client_config(&mut config).unwrap();
4833
4834 assert_eq!(pipe.handshake(&mut buf), Err(Error::UnknownVersion));
4835 }
4836
4837 #[test]
version_negotiation()4838 fn version_negotiation() {
4839 let mut buf = [0; 65535];
4840
4841 let mut config = Config::new(0xbabababa).unwrap();
4842 config
4843 .set_application_protos(b"\x06proto1\x06proto2")
4844 .unwrap();
4845 config.verify_peer(false);
4846
4847 let mut pipe = testing::Pipe::with_client_config(&mut config).unwrap();
4848
4849 let mut len = pipe.client.send(&mut buf).unwrap();
4850
4851 let hdr = packet::Header::from_slice(&mut buf[..len], 0).unwrap();
4852 len = crate::negotiate_version(&hdr.scid, &hdr.dcid, &mut buf).unwrap();
4853
4854 assert_eq!(pipe.client.recv(&mut buf[..len]), Ok(len));
4855
4856 assert_eq!(pipe.handshake(&mut buf), Ok(()));
4857
4858 assert_eq!(pipe.client.version, PROTOCOL_VERSION);
4859 assert_eq!(pipe.server.version, PROTOCOL_VERSION);
4860 }
4861
4862 #[test]
verify_custom_root()4863 fn verify_custom_root() {
4864 let mut buf = [0; 65535];
4865
4866 let mut config = Config::new(PROTOCOL_VERSION).unwrap();
4867 config.verify_peer(true);
4868 config
4869 .load_verify_locations_from_file("examples/rootca.crt")
4870 .unwrap();
4871 config
4872 .set_application_protos(b"\x06proto1\x06proto2")
4873 .unwrap();
4874
4875 let mut pipe = testing::Pipe::with_client_config(&mut config).unwrap();
4876 assert_eq!(pipe.handshake(&mut buf), Ok(()));
4877 }
4878
4879 #[test]
missing_initial_source_connection_id()4880 fn missing_initial_source_connection_id() {
4881 let mut buf = [0; 65535];
4882
4883 let mut pipe = testing::Pipe::default().unwrap();
4884
4885 // Reset initial_source_connection_id.
4886 pipe.client
4887 .local_transport_params
4888 .initial_source_connection_id = None;
4889 assert_eq!(pipe.client.encode_transport_params(), Ok(()));
4890
4891 // Client sends initial flight.
4892 let len = pipe.client.send(&mut buf).unwrap();
4893
4894 // Server rejects transport parameters.
4895 assert_eq!(
4896 testing::recv_send(&mut pipe.server, &mut buf, len),
4897 Err(Error::InvalidTransportParam)
4898 );
4899 }
4900
4901 #[test]
invalid_initial_source_connection_id()4902 fn invalid_initial_source_connection_id() {
4903 let mut buf = [0; 65535];
4904
4905 let mut pipe = testing::Pipe::default().unwrap();
4906
4907 // Scramble initial_source_connection_id.
4908 pipe.client
4909 .local_transport_params
4910 .initial_source_connection_id = Some(b"bogus value".to_vec());
4911 assert_eq!(pipe.client.encode_transport_params(), Ok(()));
4912
4913 // Client sends initial flight.
4914 let len = pipe.client.send(&mut buf).unwrap();
4915
4916 // Server rejects transport parameters.
4917 assert_eq!(
4918 testing::recv_send(&mut pipe.server, &mut buf, len),
4919 Err(Error::InvalidTransportParam)
4920 );
4921 }
4922
4923 #[test]
handshake()4924 fn handshake() {
4925 let mut buf = [0; 65535];
4926
4927 let mut pipe = testing::Pipe::default().unwrap();
4928
4929 assert_eq!(pipe.handshake(&mut buf), Ok(()));
4930
4931 assert_eq!(
4932 pipe.client.application_proto(),
4933 pipe.server.application_proto()
4934 );
4935 }
4936
4937 #[test]
handshake_confirmation()4938 fn handshake_confirmation() {
4939 let mut buf = [0; 65535];
4940
4941 let mut pipe = testing::Pipe::default().unwrap();
4942
4943 // Client sends initial flight.
4944 let mut len = pipe.client.send(&mut buf).unwrap();
4945
4946 // Server sends initial flight.
4947 len = testing::recv_send(&mut pipe.server, &mut buf, len).unwrap();
4948
4949 assert!(!pipe.client.is_established());
4950 assert!(!pipe.client.handshake_confirmed);
4951
4952 assert!(!pipe.server.is_established());
4953 assert!(!pipe.server.handshake_confirmed);
4954
4955 // Client sends Handshake packet and completes handshake.
4956 len = testing::recv_send(&mut pipe.client, &mut buf, len).unwrap();
4957
4958 assert!(pipe.client.is_established());
4959 assert!(!pipe.client.handshake_confirmed);
4960
4961 assert!(!pipe.server.is_established());
4962 assert!(!pipe.server.handshake_confirmed);
4963
4964 // Server completes handshake and sends HANDSHAKE_DONE.
4965 len = testing::recv_send(&mut pipe.server, &mut buf, len).unwrap();
4966
4967 assert!(pipe.client.is_established());
4968 assert!(!pipe.client.handshake_confirmed);
4969
4970 assert!(pipe.server.is_established());
4971 assert!(!pipe.server.handshake_confirmed);
4972
4973 // Client acks 1-RTT packet, and confirms handshake.
4974 len = testing::recv_send(&mut pipe.client, &mut buf, len).unwrap();
4975
4976 assert!(pipe.client.is_established());
4977 assert!(pipe.client.handshake_confirmed);
4978
4979 assert!(pipe.server.is_established());
4980 assert!(!pipe.server.handshake_confirmed);
4981
4982 // Server handshake is confirmed.
4983 testing::recv_send(&mut pipe.server, &mut buf, len).unwrap();
4984
4985 assert!(pipe.client.is_established());
4986 assert!(pipe.client.handshake_confirmed);
4987
4988 assert!(pipe.server.is_established());
4989 assert!(pipe.server.handshake_confirmed);
4990 }
4991
4992 #[test]
handshake_alpn_mismatch()4993 fn handshake_alpn_mismatch() {
4994 let mut buf = [0; 65535];
4995
4996 let mut config = Config::new(PROTOCOL_VERSION).unwrap();
4997 config
4998 .set_application_protos(b"\x06proto3\x06proto4")
4999 .unwrap();
5000 config.verify_peer(false);
5001
5002 let mut pipe = testing::Pipe::with_client_config(&mut config).unwrap();
5003
5004 assert_eq!(pipe.handshake(&mut buf), Err(Error::TlsFail));
5005
5006 assert_eq!(pipe.client.application_proto(), b"");
5007 assert_eq!(pipe.server.application_proto(), b"");
5008 }
5009
5010 #[test]
limit_handshake_data()5011 fn limit_handshake_data() {
5012 let mut buf = [0; 65535];
5013
5014 let mut config = Config::new(PROTOCOL_VERSION).unwrap();
5015 config
5016 .load_cert_chain_from_pem_file("examples/cert-big.crt")
5017 .unwrap();
5018 config
5019 .load_priv_key_from_pem_file("examples/cert.key")
5020 .unwrap();
5021 config
5022 .set_application_protos(b"\x06proto1\06proto2")
5023 .unwrap();
5024
5025 let mut pipe = testing::Pipe::with_server_config(&mut config).unwrap();
5026
5027 let client_sent = pipe.client.send(&mut buf).unwrap();
5028 let server_sent =
5029 testing::recv_send(&mut pipe.server, &mut buf, client_sent).unwrap();
5030
5031 assert_eq!(server_sent, (client_sent - 1) * MAX_AMPLIFICATION_FACTOR);
5032 }
5033
5034 #[test]
stream()5035 fn stream() {
5036 let mut buf = [0; 65535];
5037
5038 let mut pipe = testing::Pipe::default().unwrap();
5039
5040 assert_eq!(pipe.handshake(&mut buf), Ok(()));
5041
5042 assert_eq!(pipe.client.stream_send(4, b"hello, world", true), Ok(12));
5043
5044 assert_eq!(pipe.advance(&mut buf), Ok(()));
5045
5046 assert!(!pipe.server.stream_finished(4));
5047
5048 let mut r = pipe.server.readable();
5049 assert_eq!(r.next(), Some(4));
5050 assert_eq!(r.next(), None);
5051
5052 let mut b = [0; 15];
5053 assert_eq!(pipe.server.stream_recv(4, &mut b), Ok((12, true)));
5054 assert_eq!(&b[..12], b"hello, world");
5055
5056 assert!(pipe.server.stream_finished(4));
5057 }
5058
5059 #[test]
stream_send_on_32bit_arch()5060 fn stream_send_on_32bit_arch() {
5061 let mut buf = [0; 65535];
5062
5063 let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
5064 config
5065 .load_cert_chain_from_pem_file("examples/cert.crt")
5066 .unwrap();
5067 config
5068 .load_priv_key_from_pem_file("examples/cert.key")
5069 .unwrap();
5070 config
5071 .set_application_protos(b"\x06proto1\x06proto2")
5072 .unwrap();
5073 config.set_initial_max_data(2_u64.pow(32) + 5);
5074 config.set_initial_max_stream_data_bidi_local(15);
5075 config.set_initial_max_stream_data_bidi_remote(15);
5076 config.set_initial_max_stream_data_uni(10);
5077 config.set_initial_max_streams_bidi(3);
5078 config.set_initial_max_streams_uni(0);
5079 config.verify_peer(false);
5080
5081 let mut pipe = testing::Pipe::with_config(&mut config).unwrap();
5082
5083 assert_eq!(pipe.handshake(&mut buf), Ok(()));
5084
5085 // In 32bit arch, send_capacity() should be min(2^32+5, cwnd),
5086 // not min(5, cwnd)
5087 assert_eq!(pipe.client.stream_send(4, b"hello, world", true), Ok(12));
5088
5089 assert_eq!(pipe.advance(&mut buf), Ok(()));
5090
5091 assert!(!pipe.server.stream_finished(4));
5092 }
5093
5094 #[test]
empty_stream_frame()5095 fn empty_stream_frame() {
5096 let mut buf = [0; 65535];
5097
5098 let mut pipe = testing::Pipe::default().unwrap();
5099
5100 assert_eq!(pipe.handshake(&mut buf), Ok(()));
5101
5102 let frames = [frame::Frame::Stream {
5103 stream_id: 4,
5104 data: stream::RangeBuf::from(b"aaaaa", 0, false),
5105 }];
5106
5107 let pkt_type = packet::Type::Short;
5108 assert_eq!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf), Ok(39));
5109
5110 let mut readable = pipe.server.readable();
5111 assert_eq!(readable.next(), Some(4));
5112
5113 assert_eq!(pipe.server.stream_recv(4, &mut buf), Ok((5, false)));
5114
5115 let frames = [frame::Frame::Stream {
5116 stream_id: 4,
5117 data: stream::RangeBuf::from(b"", 5, true),
5118 }];
5119
5120 let pkt_type = packet::Type::Short;
5121 assert_eq!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf), Ok(39));
5122
5123 let mut readable = pipe.server.readable();
5124 assert_eq!(readable.next(), Some(4));
5125
5126 assert_eq!(pipe.server.stream_recv(4, &mut buf), Ok((0, true)));
5127
5128 let frames = [frame::Frame::Stream {
5129 stream_id: 4,
5130 data: stream::RangeBuf::from(b"", 15, true),
5131 }];
5132
5133 let pkt_type = packet::Type::Short;
5134 assert_eq!(
5135 pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
5136 Err(Error::FinalSize)
5137 );
5138 }
5139
5140 #[test]
flow_control_limit()5141 fn flow_control_limit() {
5142 let mut buf = [0; 65535];
5143
5144 let mut pipe = testing::Pipe::default().unwrap();
5145
5146 assert_eq!(pipe.handshake(&mut buf), Ok(()));
5147
5148 let frames = [
5149 frame::Frame::Stream {
5150 stream_id: 4,
5151 data: stream::RangeBuf::from(b"aaaaaaaaaaaaaaa", 0, false),
5152 },
5153 frame::Frame::Stream {
5154 stream_id: 8,
5155 data: stream::RangeBuf::from(b"aaaaaaaaaaaaaaa", 0, false),
5156 },
5157 frame::Frame::Stream {
5158 stream_id: 12,
5159 data: stream::RangeBuf::from(b"a", 0, false),
5160 },
5161 ];
5162
5163 let pkt_type = packet::Type::Short;
5164 assert_eq!(
5165 pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
5166 Err(Error::FlowControl),
5167 );
5168 }
5169
5170 #[test]
flow_control_limit_dup()5171 fn flow_control_limit_dup() {
5172 let mut buf = [0; 65535];
5173
5174 let mut pipe = testing::Pipe::default().unwrap();
5175
5176 assert_eq!(pipe.handshake(&mut buf), Ok(()));
5177
5178 let frames = [
5179 // One byte less than stream limit.
5180 frame::Frame::Stream {
5181 stream_id: 4,
5182 data: stream::RangeBuf::from(b"aaaaaaaaaaaaaa", 0, false),
5183 },
5184 // Same stream, but one byte more.
5185 frame::Frame::Stream {
5186 stream_id: 4,
5187 data: stream::RangeBuf::from(b"aaaaaaaaaaaaaaa", 0, false),
5188 },
5189 frame::Frame::Stream {
5190 stream_id: 12,
5191 data: stream::RangeBuf::from(b"aaaaaaaaaaaaaaa", 0, false),
5192 },
5193 ];
5194
5195 let pkt_type = packet::Type::Short;
5196 assert!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf).is_ok());
5197 }
5198
5199 #[test]
flow_control_update()5200 fn flow_control_update() {
5201 let mut buf = [0; 65535];
5202
5203 let mut pipe = testing::Pipe::default().unwrap();
5204
5205 assert_eq!(pipe.handshake(&mut buf), Ok(()));
5206
5207 let frames = [
5208 frame::Frame::Stream {
5209 stream_id: 4,
5210 data: stream::RangeBuf::from(b"aaaaaaaaaaaaaaa", 0, false),
5211 },
5212 frame::Frame::Stream {
5213 stream_id: 8,
5214 data: stream::RangeBuf::from(b"a", 0, false),
5215 },
5216 ];
5217
5218 let pkt_type = packet::Type::Short;
5219
5220 assert!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf).is_ok());
5221
5222 pipe.server.stream_recv(4, &mut buf).unwrap();
5223 pipe.server.stream_recv(8, &mut buf).unwrap();
5224
5225 let frames = [frame::Frame::Stream {
5226 stream_id: 8,
5227 data: stream::RangeBuf::from(b"a", 1, false),
5228 }];
5229
5230 let len = pipe
5231 .send_pkt_to_server(pkt_type, &frames, &mut buf)
5232 .unwrap();
5233
5234 assert!(len > 0);
5235
5236 let frames =
5237 testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
5238 let mut iter = frames.iter();
5239
5240 // Ignore ACK.
5241 iter.next().unwrap();
5242
5243 assert_eq!(iter.next(), Some(&frame::Frame::MaxData { max: 46 }));
5244 }
5245
5246 #[test]
stream_flow_control_limit_bidi()5247 fn stream_flow_control_limit_bidi() {
5248 let mut buf = [0; 65535];
5249
5250 let mut pipe = testing::Pipe::default().unwrap();
5251
5252 assert_eq!(pipe.handshake(&mut buf), Ok(()));
5253
5254 let frames = [frame::Frame::Stream {
5255 stream_id: 4,
5256 data: stream::RangeBuf::from(b"aaaaaaaaaaaaaaaa", 0, true),
5257 }];
5258
5259 let pkt_type = packet::Type::Short;
5260 assert_eq!(
5261 pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
5262 Err(Error::FlowControl),
5263 );
5264 }
5265
5266 #[test]
stream_flow_control_limit_uni()5267 fn stream_flow_control_limit_uni() {
5268 let mut buf = [0; 65535];
5269
5270 let mut pipe = testing::Pipe::default().unwrap();
5271
5272 assert_eq!(pipe.handshake(&mut buf), Ok(()));
5273
5274 let frames = [frame::Frame::Stream {
5275 stream_id: 2,
5276 data: stream::RangeBuf::from(b"aaaaaaaaaaa", 0, true),
5277 }];
5278
5279 let pkt_type = packet::Type::Short;
5280 assert_eq!(
5281 pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
5282 Err(Error::FlowControl),
5283 );
5284 }
5285
5286 #[test]
stream_flow_control_update()5287 fn stream_flow_control_update() {
5288 let mut buf = [0; 65535];
5289
5290 let mut pipe = testing::Pipe::default().unwrap();
5291
5292 assert_eq!(pipe.handshake(&mut buf), Ok(()));
5293
5294 let frames = [frame::Frame::Stream {
5295 stream_id: 4,
5296 data: stream::RangeBuf::from(b"aaaaaaa", 0, false),
5297 }];
5298
5299 let pkt_type = packet::Type::Short;
5300
5301 assert!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf).is_ok());
5302
5303 pipe.server.stream_recv(4, &mut buf).unwrap();
5304
5305 let frames = [frame::Frame::Stream {
5306 stream_id: 4,
5307 data: stream::RangeBuf::from(b"a", 7, false),
5308 }];
5309
5310 let len = pipe
5311 .send_pkt_to_server(pkt_type, &frames, &mut buf)
5312 .unwrap();
5313
5314 assert!(len > 0);
5315
5316 let frames =
5317 testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
5318 let mut iter = frames.iter();
5319
5320 // Ignore ACK.
5321 iter.next().unwrap();
5322
5323 assert_eq!(
5324 iter.next(),
5325 Some(&frame::Frame::MaxStreamData {
5326 stream_id: 4,
5327 max: 22,
5328 })
5329 );
5330 }
5331
5332 #[test]
stream_limit_bidi()5333 fn stream_limit_bidi() {
5334 let mut buf = [0; 65535];
5335
5336 let mut pipe = testing::Pipe::default().unwrap();
5337
5338 assert_eq!(pipe.handshake(&mut buf), Ok(()));
5339
5340 let frames = [
5341 frame::Frame::Stream {
5342 stream_id: 4,
5343 data: stream::RangeBuf::from(b"a", 0, false),
5344 },
5345 frame::Frame::Stream {
5346 stream_id: 8,
5347 data: stream::RangeBuf::from(b"a", 0, false),
5348 },
5349 frame::Frame::Stream {
5350 stream_id: 12,
5351 data: stream::RangeBuf::from(b"a", 0, false),
5352 },
5353 frame::Frame::Stream {
5354 stream_id: 16,
5355 data: stream::RangeBuf::from(b"a", 0, false),
5356 },
5357 frame::Frame::Stream {
5358 stream_id: 20,
5359 data: stream::RangeBuf::from(b"a", 0, false),
5360 },
5361 frame::Frame::Stream {
5362 stream_id: 24,
5363 data: stream::RangeBuf::from(b"a", 0, false),
5364 },
5365 frame::Frame::Stream {
5366 stream_id: 28,
5367 data: stream::RangeBuf::from(b"a", 0, false),
5368 },
5369 ];
5370
5371 let pkt_type = packet::Type::Short;
5372 assert_eq!(
5373 pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
5374 Err(Error::StreamLimit),
5375 );
5376 }
5377
5378 #[test]
stream_limit_max_bidi()5379 fn stream_limit_max_bidi() {
5380 let mut buf = [0; 65535];
5381
5382 let mut pipe = testing::Pipe::default().unwrap();
5383
5384 assert_eq!(pipe.handshake(&mut buf), Ok(()));
5385
5386 let frames = [frame::Frame::MaxStreamsBidi { max: MAX_STREAM_ID }];
5387
5388 let pkt_type = packet::Type::Short;
5389 assert!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf).is_ok());
5390
5391 let frames = [frame::Frame::MaxStreamsBidi {
5392 max: MAX_STREAM_ID + 1,
5393 }];
5394
5395 let pkt_type = packet::Type::Short;
5396 assert_eq!(
5397 pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
5398 Err(Error::InvalidFrame),
5399 );
5400 }
5401
5402 #[test]
stream_limit_uni()5403 fn stream_limit_uni() {
5404 let mut buf = [0; 65535];
5405
5406 let mut pipe = testing::Pipe::default().unwrap();
5407
5408 assert_eq!(pipe.handshake(&mut buf), Ok(()));
5409
5410 let frames = [
5411 frame::Frame::Stream {
5412 stream_id: 2,
5413 data: stream::RangeBuf::from(b"a", 0, false),
5414 },
5415 frame::Frame::Stream {
5416 stream_id: 6,
5417 data: stream::RangeBuf::from(b"a", 0, false),
5418 },
5419 frame::Frame::Stream {
5420 stream_id: 10,
5421 data: stream::RangeBuf::from(b"a", 0, false),
5422 },
5423 frame::Frame::Stream {
5424 stream_id: 14,
5425 data: stream::RangeBuf::from(b"a", 0, false),
5426 },
5427 frame::Frame::Stream {
5428 stream_id: 18,
5429 data: stream::RangeBuf::from(b"a", 0, false),
5430 },
5431 frame::Frame::Stream {
5432 stream_id: 22,
5433 data: stream::RangeBuf::from(b"a", 0, false),
5434 },
5435 frame::Frame::Stream {
5436 stream_id: 26,
5437 data: stream::RangeBuf::from(b"a", 0, false),
5438 },
5439 ];
5440
5441 let pkt_type = packet::Type::Short;
5442 assert_eq!(
5443 pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
5444 Err(Error::StreamLimit),
5445 );
5446 }
5447
5448 #[test]
stream_limit_max_uni()5449 fn stream_limit_max_uni() {
5450 let mut buf = [0; 65535];
5451
5452 let mut pipe = testing::Pipe::default().unwrap();
5453
5454 assert_eq!(pipe.handshake(&mut buf), Ok(()));
5455
5456 let frames = [frame::Frame::MaxStreamsUni { max: MAX_STREAM_ID }];
5457
5458 let pkt_type = packet::Type::Short;
5459 assert!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf).is_ok());
5460
5461 let frames = [frame::Frame::MaxStreamsUni {
5462 max: MAX_STREAM_ID + 1,
5463 }];
5464
5465 let pkt_type = packet::Type::Short;
5466 assert_eq!(
5467 pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
5468 Err(Error::InvalidFrame),
5469 );
5470 }
5471
5472 #[test]
streams_blocked_max_bidi()5473 fn streams_blocked_max_bidi() {
5474 let mut buf = [0; 65535];
5475
5476 let mut pipe = testing::Pipe::default().unwrap();
5477
5478 assert_eq!(pipe.handshake(&mut buf), Ok(()));
5479
5480 let frames = [frame::Frame::StreamsBlockedBidi {
5481 limit: MAX_STREAM_ID,
5482 }];
5483
5484 let pkt_type = packet::Type::Short;
5485 assert!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf).is_ok());
5486
5487 let frames = [frame::Frame::StreamsBlockedBidi {
5488 limit: MAX_STREAM_ID + 1,
5489 }];
5490
5491 let pkt_type = packet::Type::Short;
5492 assert_eq!(
5493 pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
5494 Err(Error::InvalidFrame),
5495 );
5496 }
5497
5498 #[test]
streams_blocked_max_uni()5499 fn streams_blocked_max_uni() {
5500 let mut buf = [0; 65535];
5501
5502 let mut pipe = testing::Pipe::default().unwrap();
5503
5504 assert_eq!(pipe.handshake(&mut buf), Ok(()));
5505
5506 let frames = [frame::Frame::StreamsBlockedUni {
5507 limit: MAX_STREAM_ID,
5508 }];
5509
5510 let pkt_type = packet::Type::Short;
5511 assert!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf).is_ok());
5512
5513 let frames = [frame::Frame::StreamsBlockedUni {
5514 limit: MAX_STREAM_ID + 1,
5515 }];
5516
5517 let pkt_type = packet::Type::Short;
5518 assert_eq!(
5519 pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
5520 Err(Error::InvalidFrame),
5521 );
5522 }
5523
5524 #[test]
stream_data_overlap()5525 fn stream_data_overlap() {
5526 let mut buf = [0; 65535];
5527
5528 let mut pipe = testing::Pipe::default().unwrap();
5529
5530 assert_eq!(pipe.handshake(&mut buf), Ok(()));
5531
5532 let frames = [
5533 frame::Frame::Stream {
5534 stream_id: 0,
5535 data: stream::RangeBuf::from(b"aaaaa", 0, false),
5536 },
5537 frame::Frame::Stream {
5538 stream_id: 0,
5539 data: stream::RangeBuf::from(b"bbbbb", 3, false),
5540 },
5541 frame::Frame::Stream {
5542 stream_id: 0,
5543 data: stream::RangeBuf::from(b"ccccc", 6, false),
5544 },
5545 ];
5546
5547 let pkt_type = packet::Type::Short;
5548 assert!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf).is_ok());
5549
5550 let mut b = [0; 15];
5551 assert_eq!(pipe.server.stream_recv(0, &mut b), Ok((11, false)));
5552 assert_eq!(&b[..11], b"aaaaabbbccc");
5553 }
5554
5555 #[test]
stream_data_overlap_with_reordering()5556 fn stream_data_overlap_with_reordering() {
5557 let mut buf = [0; 65535];
5558
5559 let mut pipe = testing::Pipe::default().unwrap();
5560
5561 assert_eq!(pipe.handshake(&mut buf), Ok(()));
5562
5563 let frames = [
5564 frame::Frame::Stream {
5565 stream_id: 0,
5566 data: stream::RangeBuf::from(b"aaaaa", 0, false),
5567 },
5568 frame::Frame::Stream {
5569 stream_id: 0,
5570 data: stream::RangeBuf::from(b"ccccc", 6, false),
5571 },
5572 frame::Frame::Stream {
5573 stream_id: 0,
5574 data: stream::RangeBuf::from(b"bbbbb", 3, false),
5575 },
5576 ];
5577
5578 let pkt_type = packet::Type::Short;
5579 assert!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf).is_ok());
5580
5581 let mut b = [0; 15];
5582 assert_eq!(pipe.server.stream_recv(0, &mut b), Ok((11, false)));
5583 assert_eq!(&b[..11], b"aaaaabccccc");
5584 }
5585
5586 #[test]
reset_stream_flow_control()5587 fn reset_stream_flow_control() {
5588 let mut buf = [0; 65535];
5589
5590 let mut pipe = testing::Pipe::default().unwrap();
5591
5592 assert_eq!(pipe.handshake(&mut buf), Ok(()));
5593
5594 let frames = [
5595 frame::Frame::Stream {
5596 stream_id: 4,
5597 data: stream::RangeBuf::from(b"aaaaaaaaaaaaaaa", 0, false),
5598 },
5599 frame::Frame::Stream {
5600 stream_id: 8,
5601 data: stream::RangeBuf::from(b"a", 0, false),
5602 },
5603 frame::Frame::ResetStream {
5604 stream_id: 8,
5605 error_code: 0,
5606 final_size: 15,
5607 },
5608 frame::Frame::Stream {
5609 stream_id: 12,
5610 data: stream::RangeBuf::from(b"a", 0, false),
5611 },
5612 ];
5613
5614 let pkt_type = packet::Type::Short;
5615 assert_eq!(
5616 pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
5617 Err(Error::FlowControl),
5618 );
5619 }
5620
5621 #[test]
path_challenge()5622 fn path_challenge() {
5623 let mut buf = [0; 65535];
5624
5625 let mut pipe = testing::Pipe::default().unwrap();
5626
5627 assert_eq!(pipe.handshake(&mut buf), Ok(()));
5628
5629 let frames = [frame::Frame::PathChallenge {
5630 data: vec![0xba; 8],
5631 }];
5632
5633 let pkt_type = packet::Type::Short;
5634
5635 let len = pipe
5636 .send_pkt_to_server(pkt_type, &frames, &mut buf)
5637 .unwrap();
5638
5639 assert!(len > 0);
5640
5641 let frames =
5642 testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
5643 let mut iter = frames.iter();
5644
5645 // Ignore ACK.
5646 iter.next().unwrap();
5647
5648 assert_eq!(
5649 iter.next(),
5650 Some(&frame::Frame::PathResponse {
5651 data: vec![0xba; 8],
5652 })
5653 );
5654 }
5655
5656 #[test]
5657 /// Simulates reception of an early 1-RTT packet on the server, by
5658 /// delaying the client's Handshake packet that completes the handshake.
early_1rtt_packet()5659 fn early_1rtt_packet() {
5660 let mut buf = [0; 65535];
5661
5662 let mut pipe = testing::Pipe::default().unwrap();
5663
5664 // Client sends initial flight
5665 let mut len = pipe.client.send(&mut buf).unwrap();
5666
5667 // Server sends initial flight..
5668 len = testing::recv_send(&mut pipe.server, &mut buf, len).unwrap();
5669
5670 // Client sends Handshake packet.
5671 len = testing::recv_send(&mut pipe.client, &mut buf, len).unwrap();
5672
5673 // Emulate handshake packet delay by not making server process client
5674 // packet.
5675 let mut delayed = (&buf[..len]).to_vec();
5676 testing::recv_send(&mut pipe.server, &mut buf, 0).unwrap();
5677
5678 assert!(pipe.client.is_established());
5679
5680 // Send 1-RTT packet #0.
5681 let frames = [frame::Frame::Stream {
5682 stream_id: 0,
5683 data: stream::RangeBuf::from(b"hello, world", 0, true),
5684 }];
5685
5686 let pkt_type = packet::Type::Short;
5687 let written =
5688 testing::encode_pkt(&mut pipe.client, pkt_type, &frames, &mut buf)
5689 .unwrap();
5690 assert_eq!(pipe.server.recv(&mut buf[..written]), Ok(written));
5691
5692 // Send 1-RTT packet #1.
5693 let frames = [frame::Frame::Stream {
5694 stream_id: 4,
5695 data: stream::RangeBuf::from(b"hello, world", 0, true),
5696 }];
5697
5698 let written =
5699 testing::encode_pkt(&mut pipe.client, pkt_type, &frames, &mut buf)
5700 .unwrap();
5701 assert_eq!(pipe.server.recv(&mut buf[..written]), Ok(written));
5702
5703 assert!(!pipe.server.is_established());
5704
5705 // Client sent 1-RTT packets 0 and 1, but server hasn't received them.
5706 //
5707 // Note that `largest_rx_pkt_num` is initialized to 0, so we need to
5708 // send another 1-RTT packet to make this check meaningful.
5709 assert_eq!(
5710 pipe.server.pkt_num_spaces[packet::EPOCH_APPLICATION]
5711 .largest_rx_pkt_num,
5712 0
5713 );
5714
5715 // Process delayed packet.
5716 pipe.server.recv(&mut delayed).unwrap();
5717
5718 assert!(pipe.server.is_established());
5719
5720 assert_eq!(
5721 pipe.server.pkt_num_spaces[packet::EPOCH_APPLICATION]
5722 .largest_rx_pkt_num,
5723 0
5724 );
5725 }
5726
5727 #[test]
stream_shutdown_read()5728 fn stream_shutdown_read() {
5729 let mut buf = [0; 65535];
5730
5731 let mut pipe = testing::Pipe::default().unwrap();
5732
5733 assert_eq!(pipe.handshake(&mut buf), Ok(()));
5734
5735 assert_eq!(pipe.client.stream_send(4, b"hello, world", false), Ok(12));
5736 assert_eq!(pipe.advance(&mut buf), Ok(()));
5737
5738 let mut r = pipe.server.readable();
5739 assert_eq!(r.next(), Some(4));
5740 assert_eq!(r.next(), None);
5741
5742 assert_eq!(pipe.server.stream_shutdown(4, Shutdown::Read, 0), Ok(()));
5743
5744 let mut r = pipe.server.readable();
5745 assert_eq!(r.next(), None);
5746
5747 assert_eq!(pipe.client.stream_send(4, b"bye", false), Ok(3));
5748 assert_eq!(pipe.advance(&mut buf), Ok(()));
5749
5750 let mut r = pipe.server.readable();
5751 assert_eq!(r.next(), None);
5752
5753 assert_eq!(
5754 pipe.server.stream_shutdown(4, Shutdown::Read, 0),
5755 Err(Error::Done)
5756 );
5757 }
5758
5759 #[test]
stream_shutdown_write()5760 fn stream_shutdown_write() {
5761 let mut buf = [0; 65535];
5762
5763 let mut pipe = testing::Pipe::default().unwrap();
5764
5765 assert_eq!(pipe.handshake(&mut buf), Ok(()));
5766
5767 assert_eq!(pipe.client.stream_send(4, b"hello, world", false), Ok(12));
5768 assert_eq!(pipe.advance(&mut buf), Ok(()));
5769
5770 let mut r = pipe.server.readable();
5771 assert_eq!(r.next(), Some(4));
5772 assert_eq!(r.next(), None);
5773
5774 let mut b = [0; 15];
5775 pipe.server.stream_recv(4, &mut b).unwrap();
5776
5777 assert_eq!(pipe.client.stream_send(4, b"a", false), Ok(1));
5778 assert_eq!(pipe.client.stream_shutdown(4, Shutdown::Write, 0), Ok(()));
5779 assert_eq!(pipe.advance(&mut buf), Ok(()));
5780
5781 let mut r = pipe.server.readable();
5782 assert_eq!(r.next(), None);
5783
5784 assert_eq!(pipe.client.stream_send(4, b"bye", false), Ok(3));
5785 assert_eq!(pipe.advance(&mut buf), Ok(()));
5786
5787 let mut r = pipe.server.readable();
5788 assert_eq!(r.next(), None);
5789
5790 assert_eq!(
5791 pipe.client.stream_shutdown(4, Shutdown::Write, 0),
5792 Err(Error::Done)
5793 );
5794 }
5795
5796 #[test]
5797 /// Tests that the order of flushable streams scheduled on the wire is the
5798 /// same as the order of `stream_send()` calls done by the application.
stream_round_robin()5799 fn stream_round_robin() {
5800 let mut buf = [0; 65535];
5801
5802 let mut pipe = testing::Pipe::default().unwrap();
5803
5804 assert_eq!(pipe.handshake(&mut buf), Ok(()));
5805
5806 assert_eq!(pipe.client.stream_send(8, b"aaaaa", false), Ok(5));
5807 assert_eq!(pipe.client.stream_send(0, b"aaaaa", false), Ok(5));
5808 assert_eq!(pipe.client.stream_send(4, b"aaaaa", false), Ok(5));
5809
5810 let len = pipe.client.send(&mut buf).unwrap();
5811
5812 let frames =
5813 testing::decode_pkt(&mut pipe.server, &mut buf, len).unwrap();
5814
5815 assert_eq!(
5816 frames.iter().next(),
5817 Some(&frame::Frame::Stream {
5818 stream_id: 8,
5819 data: stream::RangeBuf::from(b"aaaaa", 0, false),
5820 })
5821 );
5822
5823 let len = pipe.client.send(&mut buf).unwrap();
5824
5825 let frames =
5826 testing::decode_pkt(&mut pipe.server, &mut buf, len).unwrap();
5827
5828 assert_eq!(
5829 frames.iter().next(),
5830 Some(&frame::Frame::Stream {
5831 stream_id: 0,
5832 data: stream::RangeBuf::from(b"aaaaa", 0, false),
5833 })
5834 );
5835
5836 let len = pipe.client.send(&mut buf).unwrap();
5837
5838 let frames =
5839 testing::decode_pkt(&mut pipe.server, &mut buf, len).unwrap();
5840
5841 assert_eq!(
5842 frames.iter().next(),
5843 Some(&frame::Frame::Stream {
5844 stream_id: 4,
5845 data: stream::RangeBuf::from(b"aaaaa", 0, false),
5846 })
5847 );
5848 }
5849
5850 #[test]
5851 /// Tests the readable iterator.
stream_readable()5852 fn stream_readable() {
5853 let mut buf = [0; 65535];
5854
5855 let mut pipe = testing::Pipe::default().unwrap();
5856
5857 assert_eq!(pipe.handshake(&mut buf), Ok(()));
5858
5859 // No readable streams.
5860 let mut r = pipe.client.readable();
5861 assert_eq!(r.next(), None);
5862
5863 assert_eq!(pipe.client.stream_send(4, b"aaaaa", false), Ok(5));
5864
5865 let mut r = pipe.client.readable();
5866 assert_eq!(r.next(), None);
5867
5868 let mut r = pipe.server.readable();
5869 assert_eq!(r.next(), None);
5870
5871 assert_eq!(pipe.advance(&mut buf), Ok(()));
5872
5873 // Server received stream.
5874 let mut r = pipe.server.readable();
5875 assert_eq!(r.next(), Some(4));
5876 assert_eq!(r.next(), None);
5877
5878 assert_eq!(
5879 pipe.server.stream_send(4, b"aaaaaaaaaaaaaaa", false),
5880 Ok(15)
5881 );
5882 assert_eq!(pipe.advance(&mut buf), Ok(()));
5883
5884 let mut r = pipe.client.readable();
5885 assert_eq!(r.next(), Some(4));
5886 assert_eq!(r.next(), None);
5887
5888 // Client drains stream.
5889 let mut b = [0; 15];
5890 pipe.client.stream_recv(4, &mut b).unwrap();
5891 assert_eq!(pipe.advance(&mut buf), Ok(()));
5892
5893 let mut r = pipe.client.readable();
5894 assert_eq!(r.next(), None);
5895
5896 // Server shuts down stream.
5897 let mut r = pipe.server.readable();
5898 assert_eq!(r.next(), Some(4));
5899 assert_eq!(r.next(), None);
5900
5901 assert_eq!(pipe.server.stream_shutdown(4, Shutdown::Read, 0), Ok(()));
5902
5903 let mut r = pipe.server.readable();
5904 assert_eq!(r.next(), None);
5905
5906 // Client creates multiple streams.
5907 assert_eq!(pipe.client.stream_send(8, b"aaaaa", false), Ok(5));
5908 assert_eq!(pipe.advance(&mut buf), Ok(()));
5909
5910 assert_eq!(pipe.client.stream_send(12, b"aaaaa", false), Ok(5));
5911 assert_eq!(pipe.advance(&mut buf), Ok(()));
5912
5913 let mut r = pipe.server.readable();
5914 assert_eq!(r.len(), 2);
5915
5916 assert!(r.next().is_some());
5917 assert!(r.next().is_some());
5918 assert!(r.next().is_none());
5919
5920 assert_eq!(r.len(), 0);
5921 }
5922
5923 #[test]
5924 /// Tests the writable iterator.
stream_writable()5925 fn stream_writable() {
5926 let mut buf = [0; 65535];
5927
5928 let mut pipe = testing::Pipe::default().unwrap();
5929
5930 assert_eq!(pipe.handshake(&mut buf), Ok(()));
5931
5932 // No writable streams.
5933 let mut w = pipe.client.writable();
5934 assert_eq!(w.next(), None);
5935
5936 assert_eq!(pipe.client.stream_send(4, b"aaaaa", false), Ok(5));
5937
5938 // Client created stream.
5939 let mut w = pipe.client.writable();
5940 assert_eq!(w.next(), Some(4));
5941 assert_eq!(w.next(), None);
5942
5943 assert_eq!(pipe.advance(&mut buf), Ok(()));
5944
5945 // Server created stream.
5946 let mut w = pipe.server.writable();
5947 assert_eq!(w.next(), Some(4));
5948 assert_eq!(w.next(), None);
5949
5950 assert_eq!(
5951 pipe.server.stream_send(4, b"aaaaaaaaaaaaaaa", false),
5952 Ok(15)
5953 );
5954
5955 // Server stream is full.
5956 let mut w = pipe.server.writable();
5957 assert_eq!(w.next(), None);
5958
5959 assert_eq!(pipe.advance(&mut buf), Ok(()));
5960
5961 // Client drains stream.
5962 let mut b = [0; 15];
5963 pipe.client.stream_recv(4, &mut b).unwrap();
5964 assert_eq!(pipe.advance(&mut buf), Ok(()));
5965
5966 // Server stream is writable again.
5967 let mut w = pipe.server.writable();
5968 assert_eq!(w.next(), Some(4));
5969 assert_eq!(w.next(), None);
5970
5971 // Server suts down stream.
5972 assert_eq!(pipe.server.stream_shutdown(4, Shutdown::Write, 0), Ok(()));
5973
5974 let mut w = pipe.server.writable();
5975 assert_eq!(w.next(), None);
5976
5977 // Client creates multiple streams.
5978 assert_eq!(pipe.client.stream_send(8, b"aaaaa", false), Ok(5));
5979 assert_eq!(pipe.advance(&mut buf), Ok(()));
5980
5981 assert_eq!(pipe.client.stream_send(12, b"aaaaa", false), Ok(5));
5982 assert_eq!(pipe.advance(&mut buf), Ok(()));
5983
5984 let mut w = pipe.server.writable();
5985 assert_eq!(w.len(), 2);
5986
5987 assert!(w.next().is_some());
5988 assert!(w.next().is_some());
5989 assert!(w.next().is_none());
5990
5991 assert_eq!(w.len(), 0);
5992
5993 // Server finishes stream.
5994 assert_eq!(pipe.server.stream_send(12, b"aaaaa", true), Ok(5));
5995
5996 let mut w = pipe.server.writable();
5997 assert_eq!(w.next(), Some(8));
5998 assert_eq!(w.next(), None);
5999 }
6000
6001 #[test]
6002 /// Tests that we don't exceed the per-connection flow control limit set by
6003 /// the peer.
flow_control_limit_send()6004 fn flow_control_limit_send() {
6005 let mut buf = [0; 65535];
6006
6007 let mut pipe = testing::Pipe::default().unwrap();
6008
6009 assert_eq!(pipe.handshake(&mut buf), Ok(()));
6010
6011 assert_eq!(
6012 pipe.client.stream_send(0, b"aaaaaaaaaaaaaaa", false),
6013 Ok(15)
6014 );
6015 assert_eq!(pipe.advance(&mut buf), Ok(()));
6016 assert_eq!(
6017 pipe.client.stream_send(4, b"aaaaaaaaaaaaaaa", false),
6018 Ok(15)
6019 );
6020 assert_eq!(pipe.advance(&mut buf), Ok(()));
6021 assert_eq!(pipe.client.stream_send(8, b"a", false), Ok(0));
6022 assert_eq!(pipe.advance(&mut buf), Ok(()));
6023
6024 let mut r = pipe.server.readable();
6025 assert!(r.next().is_some());
6026 assert!(r.next().is_some());
6027 assert!(r.next().is_none());
6028 }
6029
6030 #[test]
6031 /// Tests that invalid packets received before any other valid ones cause
6032 /// the server to close the connection immediately.
invalid_initial_server()6033 fn invalid_initial_server() {
6034 let mut buf = [0; 65535];
6035 let mut pipe = testing::Pipe::default().unwrap();
6036
6037 let frames = [frame::Frame::Padding { len: 10 }];
6038
6039 let written = testing::encode_pkt(
6040 &mut pipe.client,
6041 packet::Type::Initial,
6042 &frames,
6043 &mut buf,
6044 )
6045 .unwrap();
6046
6047 // Corrupt the packets's last byte to make decryption fail (the last
6048 // byte is part of the AEAD tag, so changing it means that the packet
6049 // cannot be authenticated during decryption).
6050 buf[written - 1] = !buf[written - 1];
6051
6052 assert_eq!(pipe.server.timeout(), None);
6053
6054 assert_eq!(
6055 pipe.server.recv(&mut buf[..written]),
6056 Err(Error::CryptoFail)
6057 );
6058
6059 assert!(pipe.server.is_closed());
6060 }
6061
6062 #[test]
6063 /// Tests that invalid Initial packets received to cause
6064 /// the client to close the connection immediately.
invalid_initial_client()6065 fn invalid_initial_client() {
6066 let mut buf = [0; 65535];
6067 let mut pipe = testing::Pipe::default().unwrap();
6068
6069 // Client sends initial flight.
6070 let len = pipe.client.send(&mut buf).unwrap();
6071
6072 // Server sends initial flight.
6073 assert_eq!(pipe.server.recv(&mut buf[..len]), Ok(1200));
6074
6075 let frames = [frame::Frame::Padding { len: 10 }];
6076
6077 let written = testing::encode_pkt(
6078 &mut pipe.server,
6079 packet::Type::Initial,
6080 &frames,
6081 &mut buf,
6082 )
6083 .unwrap();
6084
6085 // Corrupt the packets's last byte to make decryption fail (the last
6086 // byte is part of the AEAD tag, so changing it means that the packet
6087 // cannot be authenticated during decryption).
6088 buf[written - 1] = !buf[written - 1];
6089
6090 // Client will ignore invalid packet.
6091 assert_eq!(pipe.client.recv(&mut buf[..written]), Ok(68));
6092
6093 // The connection should be alive...
6094 assert_eq!(pipe.client.is_closed(), false);
6095
6096 // ...and the idle timeout should be armed.
6097 assert!(pipe.client.idle_timer.is_some());
6098 }
6099
6100 #[test]
6101 /// Tests that packets with invalid payload length received before any other
6102 /// valid packet cause the server to close the connection immediately.
invalid_initial_payload()6103 fn invalid_initial_payload() {
6104 let mut buf = [0; 65535];
6105 let mut pipe = testing::Pipe::default().unwrap();
6106
6107 let mut b = octets::OctetsMut::with_slice(&mut buf);
6108
6109 let epoch = packet::Type::Initial.to_epoch().unwrap();
6110
6111 let pn = 0;
6112 let pn_len = packet::pkt_num_len(pn).unwrap();
6113
6114 let hdr = Header {
6115 ty: packet::Type::Initial,
6116 version: pipe.client.version,
6117 dcid: pipe.client.dcid.clone(),
6118 scid: pipe.client.scid.clone(),
6119 pkt_num: 0,
6120 pkt_num_len: pn_len,
6121 token: pipe.client.token.clone(),
6122 versions: None,
6123 key_phase: false,
6124 };
6125
6126 hdr.to_bytes(&mut b).unwrap();
6127
6128 // Payload length is invalid!!!
6129 let payload_len = 4096;
6130
6131 let len = pn_len + payload_len;
6132 b.put_varint(len as u64).unwrap();
6133
6134 packet::encode_pkt_num(pn, &mut b).unwrap();
6135
6136 let payload_offset = b.off();
6137
6138 let frames = [frame::Frame::Padding { len: 10 }];
6139
6140 for frame in &frames {
6141 frame.to_bytes(&mut b).unwrap();
6142 }
6143
6144 let space = &mut pipe.client.pkt_num_spaces[epoch];
6145
6146 // Use correct payload length when encrypting the packet.
6147 let payload_len = frames.iter().fold(0, |acc, x| acc + x.wire_len()) +
6148 space.crypto_overhead().unwrap();
6149
6150 let aead = space.crypto_seal.as_ref().unwrap();
6151
6152 let written = packet::encrypt_pkt(
6153 &mut b,
6154 pn,
6155 pn_len,
6156 payload_len,
6157 payload_offset,
6158 aead,
6159 )
6160 .unwrap();
6161
6162 assert_eq!(pipe.server.timeout(), None);
6163
6164 assert_eq!(
6165 pipe.server.recv(&mut buf[..written]),
6166 Err(Error::BufferTooShort)
6167 );
6168
6169 assert!(pipe.server.is_closed());
6170 }
6171
6172 #[test]
6173 /// Tests that invalid packets don't cause the connection to be closed.
invalid_packet()6174 fn invalid_packet() {
6175 let mut buf = [0; 65535];
6176 let mut pipe = testing::Pipe::default().unwrap();
6177
6178 assert_eq!(pipe.handshake(&mut buf), Ok(()));
6179
6180 let frames = [frame::Frame::Padding { len: 10 }];
6181
6182 let written = testing::encode_pkt(
6183 &mut pipe.client,
6184 packet::Type::Short,
6185 &frames,
6186 &mut buf,
6187 )
6188 .unwrap();
6189
6190 // Corrupt the packets's last byte to make decryption fail (the last
6191 // byte is part of the AEAD tag, so changing it means that the packet
6192 // cannot be authenticated during decryption).
6193 buf[written - 1] = !buf[written - 1];
6194
6195 assert_eq!(pipe.server.recv(&mut buf[..written]), Ok(written));
6196
6197 // Corrupt the packets's first byte to make the header fail decoding.
6198 buf[0] = 255;
6199
6200 assert_eq!(pipe.server.recv(&mut buf[..written]), Ok(written));
6201 }
6202
6203 #[test]
6204 /// Tests that the MAX_STREAMS frame is sent for bidirectional streams.
stream_limit_update_bidi()6205 fn stream_limit_update_bidi() {
6206 let mut buf = [0; 65535];
6207
6208 let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
6209 config
6210 .load_cert_chain_from_pem_file("examples/cert.crt")
6211 .unwrap();
6212 config
6213 .load_priv_key_from_pem_file("examples/cert.key")
6214 .unwrap();
6215 config
6216 .set_application_protos(b"\x06proto1\x06proto2")
6217 .unwrap();
6218 config.set_initial_max_data(30);
6219 config.set_initial_max_stream_data_bidi_local(15);
6220 config.set_initial_max_stream_data_bidi_remote(15);
6221 config.set_initial_max_stream_data_uni(10);
6222 config.set_initial_max_streams_bidi(3);
6223 config.set_initial_max_streams_uni(0);
6224 config.verify_peer(false);
6225
6226 let mut pipe = testing::Pipe::with_config(&mut config).unwrap();
6227 assert_eq!(pipe.handshake(&mut buf), Ok(()));
6228
6229 // Client sends stream data.
6230 assert_eq!(pipe.client.stream_send(0, b"a", false), Ok(1));
6231 assert_eq!(pipe.advance(&mut buf), Ok(()));
6232
6233 assert_eq!(pipe.client.stream_send(4, b"a", false), Ok(1));
6234 assert_eq!(pipe.advance(&mut buf), Ok(()));
6235
6236 assert_eq!(pipe.client.stream_send(4, b"b", true), Ok(1));
6237 assert_eq!(pipe.advance(&mut buf), Ok(()));
6238
6239 assert_eq!(pipe.client.stream_send(0, b"b", true), Ok(1));
6240 assert_eq!(pipe.advance(&mut buf), Ok(()));
6241
6242 // Server reads stream data.
6243 let mut b = [0; 15];
6244 pipe.server.stream_recv(0, &mut b).unwrap();
6245 pipe.server.stream_recv(4, &mut b).unwrap();
6246 assert_eq!(pipe.advance(&mut buf), Ok(()));
6247
6248 // Server sends stream data, with fin.
6249 assert_eq!(pipe.server.stream_send(0, b"a", false), Ok(1));
6250 assert_eq!(pipe.advance(&mut buf), Ok(()));
6251
6252 assert_eq!(pipe.server.stream_send(4, b"a", false), Ok(1));
6253 assert_eq!(pipe.advance(&mut buf), Ok(()));
6254
6255 assert_eq!(pipe.server.stream_send(4, b"b", true), Ok(1));
6256 assert_eq!(pipe.advance(&mut buf), Ok(()));
6257
6258 assert_eq!(pipe.server.stream_send(0, b"b", true), Ok(1));
6259
6260 // Server sends MAX_STREAMS.
6261 assert_eq!(pipe.advance(&mut buf), Ok(()));
6262
6263 // Client tries to create new streams.
6264 assert_eq!(pipe.client.stream_send(8, b"a", false), Ok(1));
6265 assert_eq!(pipe.advance(&mut buf), Ok(()));
6266
6267 assert_eq!(pipe.client.stream_send(12, b"a", false), Ok(1));
6268 assert_eq!(pipe.advance(&mut buf), Ok(()));
6269
6270 assert_eq!(pipe.client.stream_send(16, b"a", false), Ok(1));
6271 assert_eq!(pipe.advance(&mut buf), Ok(()));
6272
6273 assert_eq!(
6274 pipe.client.stream_send(20, b"a", false),
6275 Err(Error::StreamLimit)
6276 );
6277
6278 assert_eq!(pipe.server.readable().len(), 3);
6279 }
6280
6281 #[test]
6282 /// Tests that the MAX_STREAMS frame is sent for unirectional streams.
stream_limit_update_uni()6283 fn stream_limit_update_uni() {
6284 let mut buf = [0; 65535];
6285
6286 let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
6287 config
6288 .load_cert_chain_from_pem_file("examples/cert.crt")
6289 .unwrap();
6290 config
6291 .load_priv_key_from_pem_file("examples/cert.key")
6292 .unwrap();
6293 config
6294 .set_application_protos(b"\x06proto1\x06proto2")
6295 .unwrap();
6296 config.set_initial_max_data(30);
6297 config.set_initial_max_stream_data_bidi_local(15);
6298 config.set_initial_max_stream_data_bidi_remote(15);
6299 config.set_initial_max_stream_data_uni(10);
6300 config.set_initial_max_streams_bidi(0);
6301 config.set_initial_max_streams_uni(3);
6302 config.verify_peer(false);
6303
6304 let mut pipe = testing::Pipe::with_config(&mut config).unwrap();
6305 assert_eq!(pipe.handshake(&mut buf), Ok(()));
6306
6307 // Client sends stream data.
6308 assert_eq!(pipe.client.stream_send(2, b"a", false), Ok(1));
6309 assert_eq!(pipe.advance(&mut buf), Ok(()));
6310
6311 assert_eq!(pipe.client.stream_send(6, b"a", false), Ok(1));
6312 assert_eq!(pipe.advance(&mut buf), Ok(()));
6313
6314 assert_eq!(pipe.client.stream_send(6, b"b", true), Ok(1));
6315 assert_eq!(pipe.advance(&mut buf), Ok(()));
6316
6317 assert_eq!(pipe.client.stream_send(2, b"b", true), Ok(1));
6318 assert_eq!(pipe.advance(&mut buf), Ok(()));
6319
6320 // Server reads stream data.
6321 let mut b = [0; 15];
6322 pipe.server.stream_recv(2, &mut b).unwrap();
6323 pipe.server.stream_recv(6, &mut b).unwrap();
6324
6325 // Server sends MAX_STREAMS.
6326 assert_eq!(pipe.advance(&mut buf), Ok(()));
6327
6328 // Client tries to create new streams.
6329 assert_eq!(pipe.client.stream_send(10, b"a", false), Ok(1));
6330 assert_eq!(pipe.advance(&mut buf), Ok(()));
6331
6332 assert_eq!(pipe.client.stream_send(14, b"a", false), Ok(1));
6333 assert_eq!(pipe.advance(&mut buf), Ok(()));
6334
6335 assert_eq!(pipe.client.stream_send(18, b"a", false), Ok(1));
6336 assert_eq!(pipe.advance(&mut buf), Ok(()));
6337
6338 assert_eq!(
6339 pipe.client.stream_send(22, b"a", false),
6340 Err(Error::StreamLimit)
6341 );
6342
6343 assert_eq!(pipe.server.readable().len(), 3);
6344 }
6345
6346 #[test]
6347 /// Tests that the stream's fin flag is properly flushed even if there's no
6348 /// data in the buffer, and that the buffer becomes readable on the other
6349 /// side.
stream_zero_length_fin()6350 fn stream_zero_length_fin() {
6351 let mut buf = [0; 65535];
6352
6353 let mut pipe = testing::Pipe::default().unwrap();
6354
6355 assert_eq!(pipe.handshake(&mut buf), Ok(()));
6356
6357 assert_eq!(
6358 pipe.client.stream_send(0, b"aaaaaaaaaaaaaaa", false),
6359 Ok(15)
6360 );
6361 assert_eq!(pipe.advance(&mut buf), Ok(()));
6362
6363 let mut r = pipe.server.readable();
6364 assert_eq!(r.next(), Some(0));
6365 assert!(r.next().is_none());
6366
6367 let mut b = [0; 15];
6368 pipe.server.stream_recv(0, &mut b).unwrap();
6369 assert_eq!(pipe.advance(&mut buf), Ok(()));
6370
6371 // Client sends zero-length frame.
6372 assert_eq!(pipe.client.stream_send(0, b"", true), Ok(0));
6373 assert_eq!(pipe.advance(&mut buf), Ok(()));
6374
6375 // Stream should be readable on the server after receiving empty fin.
6376 let mut r = pipe.server.readable();
6377 assert_eq!(r.next(), Some(0));
6378 assert!(r.next().is_none());
6379
6380 let mut b = [0; 15];
6381 pipe.server.stream_recv(0, &mut b).unwrap();
6382 assert_eq!(pipe.advance(&mut buf), Ok(()));
6383
6384 // Client sends zero-length frame (again).
6385 assert_eq!(pipe.client.stream_send(0, b"", true), Ok(0));
6386 assert_eq!(pipe.advance(&mut buf), Ok(()));
6387
6388 // Stream should _not_ be readable on the server after receiving empty
6389 // fin, because it was already finished.
6390 let mut r = pipe.server.readable();
6391 assert_eq!(r.next(), None);
6392 }
6393
6394 #[test]
6395 /// Tests that completed streams are garbage collected.
collect_streams()6396 fn collect_streams() {
6397 let mut buf = [0; 65535];
6398
6399 let mut pipe = testing::Pipe::default().unwrap();
6400
6401 assert_eq!(pipe.handshake(&mut buf), Ok(()));
6402
6403 assert_eq!(pipe.client.streams.len(), 0);
6404 assert_eq!(pipe.server.streams.len(), 0);
6405
6406 assert_eq!(pipe.client.stream_send(0, b"aaaaa", true), Ok(5));
6407 assert_eq!(pipe.advance(&mut buf), Ok(()));
6408
6409 assert!(!pipe.client.stream_finished(0));
6410 assert!(!pipe.server.stream_finished(0));
6411
6412 assert_eq!(pipe.client.streams.len(), 1);
6413 assert_eq!(pipe.server.streams.len(), 1);
6414
6415 let mut b = [0; 5];
6416 pipe.server.stream_recv(0, &mut b).unwrap();
6417 assert_eq!(pipe.advance(&mut buf), Ok(()));
6418
6419 assert_eq!(pipe.server.stream_send(0, b"aaaaa", true), Ok(5));
6420 assert_eq!(pipe.advance(&mut buf), Ok(()));
6421
6422 assert!(!pipe.client.stream_finished(0));
6423 assert!(pipe.server.stream_finished(0));
6424
6425 assert_eq!(pipe.client.streams.len(), 1);
6426 assert_eq!(pipe.server.streams.len(), 0);
6427
6428 let mut b = [0; 5];
6429 pipe.client.stream_recv(0, &mut b).unwrap();
6430 assert_eq!(pipe.advance(&mut buf), Ok(()));
6431
6432 assert_eq!(pipe.client.streams.len(), 0);
6433 assert_eq!(pipe.server.streams.len(), 0);
6434
6435 assert!(pipe.client.stream_finished(0));
6436 assert!(pipe.server.stream_finished(0));
6437
6438 assert_eq!(pipe.client.stream_send(0, b"", true), Err(Error::Done));
6439
6440 let frames = [frame::Frame::Stream {
6441 stream_id: 0,
6442 data: stream::RangeBuf::from(b"aa", 0, false),
6443 }];
6444
6445 let pkt_type = packet::Type::Short;
6446 assert_eq!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf), Ok(39));
6447 }
6448
6449 #[test]
config_set_cc_algorithm_name()6450 fn config_set_cc_algorithm_name() {
6451 let mut config = Config::new(PROTOCOL_VERSION).unwrap();
6452
6453 assert_eq!(config.set_cc_algorithm_name("reno"), Ok(()));
6454
6455 // Unknown name.
6456 assert_eq!(
6457 config.set_cc_algorithm_name("???"),
6458 Err(Error::CongestionControl)
6459 );
6460 }
6461
6462 #[test]
peer_cert()6463 fn peer_cert() {
6464 let mut buf = [0; 65535];
6465
6466 let mut pipe = testing::Pipe::default().unwrap();
6467
6468 assert_eq!(pipe.handshake(&mut buf), Ok(()));
6469
6470 match pipe.client.peer_cert() {
6471 Some(c) => assert_eq!(c.len(), 753),
6472
6473 None => panic!("missing server certificate"),
6474 }
6475 }
6476
6477 #[test]
retry()6478 fn retry() {
6479 let mut buf = [0; 65535];
6480
6481 let mut config = Config::new(PROTOCOL_VERSION).unwrap();
6482 config
6483 .load_cert_chain_from_pem_file("examples/cert.crt")
6484 .unwrap();
6485 config
6486 .load_priv_key_from_pem_file("examples/cert.key")
6487 .unwrap();
6488 config
6489 .set_application_protos(b"\x06proto1\06proto2")
6490 .unwrap();
6491
6492 let mut pipe = testing::Pipe::with_server_config(&mut config).unwrap();
6493
6494 // Client sends initial flight.
6495 let mut len = pipe.client.send(&mut buf).unwrap();
6496
6497 // Server sends Retry packet.
6498 let hdr = Header::from_slice(&mut buf[..len], MAX_CONN_ID_LEN).unwrap();
6499
6500 let odcid = hdr.dcid.to_vec();
6501
6502 let mut scid = [0; MAX_CONN_ID_LEN];
6503 rand::rand_bytes(&mut scid[..]);
6504
6505 let token = b"quiche test retry token";
6506
6507 len = packet::retry(
6508 &hdr.scid,
6509 &hdr.dcid,
6510 &scid,
6511 token,
6512 hdr.version,
6513 &mut buf,
6514 )
6515 .unwrap();
6516
6517 // Client receives Retry and sends new Initial.
6518 assert_eq!(pipe.client.recv(&mut buf[..len]), Ok(len));
6519
6520 len = pipe.client.send(&mut buf).unwrap();
6521
6522 let hdr = Header::from_slice(&mut buf[..len], MAX_CONN_ID_LEN).unwrap();
6523 assert_eq!(&hdr.token.unwrap(), token);
6524
6525 // Server accepts connection and send first flight.
6526 pipe.server = accept(&scid, Some(&odcid), &mut config).unwrap();
6527
6528 len = testing::recv_send(&mut pipe.server, &mut buf, len).unwrap();
6529 len = testing::recv_send(&mut pipe.client, &mut buf, len).unwrap();
6530 testing::recv_send(&mut pipe.server, &mut buf, len).unwrap();
6531
6532 assert!(pipe.client.is_established());
6533 assert!(pipe.server.is_established());
6534 }
6535
6536 #[test]
missing_retry_source_connection_id()6537 fn missing_retry_source_connection_id() {
6538 let mut buf = [0; 65535];
6539
6540 let mut config = Config::new(PROTOCOL_VERSION).unwrap();
6541 config
6542 .load_cert_chain_from_pem_file("examples/cert.crt")
6543 .unwrap();
6544 config
6545 .load_priv_key_from_pem_file("examples/cert.key")
6546 .unwrap();
6547 config
6548 .set_application_protos(b"\x06proto1\06proto2")
6549 .unwrap();
6550
6551 let mut pipe = testing::Pipe::with_server_config(&mut config).unwrap();
6552
6553 // Client sends initial flight.
6554 let mut len = pipe.client.send(&mut buf).unwrap();
6555
6556 // Server sends Retry packet.
6557 let hdr = Header::from_slice(&mut buf[..len], MAX_CONN_ID_LEN).unwrap();
6558
6559 let mut scid = [0; MAX_CONN_ID_LEN];
6560 rand::rand_bytes(&mut scid[..]);
6561
6562 let token = b"quiche test retry token";
6563
6564 len = packet::retry(
6565 &hdr.scid,
6566 &hdr.dcid,
6567 &scid,
6568 token,
6569 hdr.version,
6570 &mut buf,
6571 )
6572 .unwrap();
6573
6574 // Client receives Retry and sends new Initial.
6575 assert_eq!(pipe.client.recv(&mut buf[..len]), Ok(len));
6576
6577 len = pipe.client.send(&mut buf).unwrap();
6578
6579 // Server accepts connection and send first flight. But original
6580 // destination connection ID is ignored.
6581 pipe.server = accept(&scid, None, &mut config).unwrap();
6582
6583 len = testing::recv_send(&mut pipe.server, &mut buf, len).unwrap();
6584
6585 assert_eq!(
6586 pipe.client.recv(&mut buf[..len]),
6587 Err(Error::InvalidTransportParam)
6588 );
6589 }
6590
6591 #[test]
invalid_retry_source_connection_id()6592 fn invalid_retry_source_connection_id() {
6593 let mut buf = [0; 65535];
6594
6595 let mut config = Config::new(PROTOCOL_VERSION).unwrap();
6596 config
6597 .load_cert_chain_from_pem_file("examples/cert.crt")
6598 .unwrap();
6599 config
6600 .load_priv_key_from_pem_file("examples/cert.key")
6601 .unwrap();
6602 config
6603 .set_application_protos(b"\x06proto1\06proto2")
6604 .unwrap();
6605
6606 let mut pipe = testing::Pipe::with_server_config(&mut config).unwrap();
6607
6608 // Client sends initial flight.
6609 let mut len = pipe.client.send(&mut buf).unwrap();
6610
6611 // Server sends Retry packet.
6612 let hdr = Header::from_slice(&mut buf[..len], MAX_CONN_ID_LEN).unwrap();
6613
6614 let mut scid = [0; MAX_CONN_ID_LEN];
6615 rand::rand_bytes(&mut scid[..]);
6616
6617 let token = b"quiche test retry token";
6618
6619 len = packet::retry(
6620 &hdr.scid,
6621 &hdr.dcid,
6622 &scid,
6623 token,
6624 hdr.version,
6625 &mut buf,
6626 )
6627 .unwrap();
6628
6629 // Client receives Retry and sends new Initial.
6630 assert_eq!(pipe.client.recv(&mut buf[..len]), Ok(len));
6631
6632 len = pipe.client.send(&mut buf).unwrap();
6633
6634 // Server accepts connection and send first flight. But original
6635 // destination connection ID is invalid.
6636 pipe.server = accept(&scid, Some(b"bogus value"), &mut config).unwrap();
6637
6638 len = testing::recv_send(&mut pipe.server, &mut buf, len).unwrap();
6639
6640 assert_eq!(
6641 pipe.client.recv(&mut buf[..len]),
6642 Err(Error::InvalidTransportParam)
6643 );
6644 }
6645
check_send(_: &mut impl Send)6646 fn check_send(_: &mut impl Send) {}
6647
6648 #[test]
connection_must_be_send()6649 fn connection_must_be_send() {
6650 let mut pipe = testing::Pipe::default().unwrap();
6651 check_send(&mut pipe.client);
6652 }
6653
6654 #[test]
data_blocked()6655 fn data_blocked() {
6656 let mut buf = [0; 65535];
6657
6658 let mut pipe = testing::Pipe::default().unwrap();
6659
6660 assert_eq!(pipe.handshake(&mut buf), Ok(()));
6661
6662 assert_eq!(pipe.client.stream_send(0, b"aaaaaaaaaa", false), Ok(10));
6663 assert_eq!(pipe.client.blocked_limit, None);
6664 assert_eq!(pipe.advance(&mut buf), Ok(()));
6665
6666 assert_eq!(pipe.client.stream_send(4, b"aaaaaaaaaa", false), Ok(10));
6667 assert_eq!(pipe.client.blocked_limit, None);
6668 assert_eq!(pipe.advance(&mut buf), Ok(()));
6669
6670 assert_eq!(pipe.client.stream_send(8, b"aaaaaaaaaaa", false), Ok(10));
6671 assert_eq!(pipe.client.blocked_limit, Some(30));
6672
6673 let len = pipe.client.send(&mut buf).unwrap();
6674 assert_eq!(pipe.client.blocked_limit, None);
6675
6676 let frames =
6677 testing::decode_pkt(&mut pipe.server, &mut buf, len).unwrap();
6678
6679 let mut iter = frames.iter();
6680
6681 assert_eq!(iter.next(), Some(&frame::Frame::DataBlocked { limit: 30 }));
6682
6683 assert_eq!(
6684 iter.next(),
6685 Some(&frame::Frame::Stream {
6686 stream_id: 8,
6687 data: stream::RangeBuf::from(b"aaaaaaaaaa", 0, false),
6688 })
6689 );
6690
6691 assert_eq!(iter.next(), None);
6692 }
6693
6694 #[test]
stream_data_blocked()6695 fn stream_data_blocked() {
6696 let mut buf = [0; 65535];
6697
6698 let mut pipe = testing::Pipe::default().unwrap();
6699
6700 assert_eq!(pipe.handshake(&mut buf), Ok(()));
6701
6702 assert_eq!(pipe.client.stream_send(0, b"aaaaa", false), Ok(5));
6703 assert_eq!(pipe.client.streams.blocked().len(), 0);
6704
6705 assert_eq!(pipe.client.stream_send(0, b"aaaaa", false), Ok(5));
6706 assert_eq!(pipe.client.streams.blocked().len(), 0);
6707
6708 assert_eq!(pipe.client.stream_send(0, b"aaaaaa", false), Ok(5));
6709 assert_eq!(pipe.client.streams.blocked().len(), 1);
6710
6711 let len = pipe.client.send(&mut buf).unwrap();
6712 assert_eq!(pipe.client.streams.blocked().len(), 0);
6713
6714 let frames =
6715 testing::decode_pkt(&mut pipe.server, &mut buf, len).unwrap();
6716
6717 let mut iter = frames.iter();
6718
6719 assert_eq!(
6720 iter.next(),
6721 Some(&frame::Frame::StreamDataBlocked {
6722 stream_id: 0,
6723 limit: 15,
6724 })
6725 );
6726
6727 assert_eq!(
6728 iter.next(),
6729 Some(&frame::Frame::Stream {
6730 stream_id: 0,
6731 data: stream::RangeBuf::from(b"aaaaaaaaaaaaaaa", 0, false),
6732 })
6733 );
6734
6735 assert_eq!(iter.next(), None);
6736
6737 // Send from another stream, make sure we don't send STREAM_DATA_BLOCKED
6738 // again.
6739 assert_eq!(pipe.client.stream_send(4, b"a", false), Ok(1));
6740
6741 let len = pipe.client.send(&mut buf).unwrap();
6742 assert_eq!(pipe.client.streams.blocked().len(), 0);
6743
6744 let frames =
6745 testing::decode_pkt(&mut pipe.server, &mut buf, len).unwrap();
6746
6747 let mut iter = frames.iter();
6748
6749 assert_eq!(
6750 iter.next(),
6751 Some(&frame::Frame::Stream {
6752 stream_id: 4,
6753 data: stream::RangeBuf::from(b"a", 0, false),
6754 })
6755 );
6756
6757 assert_eq!(iter.next(), None);
6758
6759 // Send again from blocked stream and make sure it is marked as blocked
6760 // again.
6761 assert_eq!(pipe.client.stream_send(0, b"aaaaaa", false), Ok(0));
6762 assert_eq!(pipe.client.streams.blocked().len(), 1);
6763
6764 let len = pipe.client.send(&mut buf).unwrap();
6765 assert_eq!(pipe.client.streams.blocked().len(), 0);
6766
6767 let frames =
6768 testing::decode_pkt(&mut pipe.server, &mut buf, len).unwrap();
6769
6770 let mut iter = frames.iter();
6771
6772 assert_eq!(
6773 iter.next(),
6774 Some(&frame::Frame::StreamDataBlocked {
6775 stream_id: 0,
6776 limit: 15,
6777 })
6778 );
6779
6780 assert_eq!(iter.next(), Some(&frame::Frame::Padding { len: 1 }));
6781
6782 assert_eq!(iter.next(), None);
6783 }
6784
6785 #[test]
app_limited_true()6786 fn app_limited_true() {
6787 let mut buf = [0; 65535];
6788
6789 let mut config = Config::new(PROTOCOL_VERSION).unwrap();
6790 config
6791 .set_application_protos(b"\x06proto1\x06proto2")
6792 .unwrap();
6793 config.set_initial_max_data(50000);
6794 config.set_initial_max_stream_data_bidi_local(50000);
6795 config.set_initial_max_stream_data_bidi_remote(50000);
6796 config.set_max_udp_payload_size(1200);
6797 config.verify_peer(false);
6798
6799 let mut pipe = testing::Pipe::with_client_config(&mut config).unwrap();
6800
6801 assert_eq!(pipe.handshake(&mut buf), Ok(()));
6802
6803 // Client sends stream data.
6804 assert_eq!(pipe.client.stream_send(0, b"a", true), Ok(1));
6805 assert_eq!(pipe.advance(&mut buf), Ok(()));
6806
6807 // Server reads stream data.
6808 let mut b = [0; 15];
6809 pipe.server.stream_recv(0, &mut b).unwrap();
6810 assert_eq!(pipe.advance(&mut buf), Ok(()));
6811
6812 // Server sends stream data smaller than cwnd.
6813 let send_buf = [0; 10000];
6814 assert_eq!(pipe.server.stream_send(0, &send_buf, false), Ok(10000));
6815 assert_eq!(pipe.advance(&mut buf), Ok(()));
6816
6817 // app_limited should be true because we send less than cwnd.
6818 assert_eq!(pipe.server.recovery.app_limited(), true);
6819 }
6820
6821 #[test]
app_limited_false()6822 fn app_limited_false() {
6823 let mut buf = [0; 65535];
6824
6825 let mut config = Config::new(PROTOCOL_VERSION).unwrap();
6826 config
6827 .set_application_protos(b"\x06proto1\x06proto2")
6828 .unwrap();
6829 config.set_initial_max_data(50000);
6830 config.set_initial_max_stream_data_bidi_local(50000);
6831 config.set_initial_max_stream_data_bidi_remote(50000);
6832 config.set_max_udp_payload_size(1200);
6833 config.verify_peer(false);
6834
6835 let mut pipe = testing::Pipe::with_client_config(&mut config).unwrap();
6836
6837 assert_eq!(pipe.handshake(&mut buf), Ok(()));
6838
6839 // Client sends stream data.
6840 assert_eq!(pipe.client.stream_send(0, b"a", true), Ok(1));
6841 assert_eq!(pipe.advance(&mut buf), Ok(()));
6842
6843 // Server reads stream data.
6844 let mut b = [0; 15];
6845 pipe.server.stream_recv(0, &mut b).unwrap();
6846 assert_eq!(pipe.advance(&mut buf), Ok(()));
6847
6848 // Server sends stream data bigger than cwnd.
6849 let send_buf1 = [0; 20000];
6850 assert_eq!(pipe.server.stream_send(0, &send_buf1, false), Ok(14085));
6851 assert_eq!(pipe.advance(&mut buf), Ok(()));
6852
6853 // We can't create a new packet header because there is no room by cwnd.
6854 // app_limited should be false because we can't send more by cwnd.
6855 assert_eq!(pipe.server.recovery.app_limited(), false);
6856 }
6857
6858 #[test]
app_limited_false_no_frame()6859 fn app_limited_false_no_frame() {
6860 let mut buf = [0; 65535];
6861
6862 let mut config = Config::new(PROTOCOL_VERSION).unwrap();
6863 config
6864 .set_application_protos(b"\x06proto1\x06proto2")
6865 .unwrap();
6866 config.set_initial_max_data(50000);
6867 config.set_initial_max_stream_data_bidi_local(50000);
6868 config.set_initial_max_stream_data_bidi_remote(50000);
6869 config.set_max_udp_payload_size(1405);
6870 config.verify_peer(false);
6871
6872 let mut pipe = testing::Pipe::with_client_config(&mut config).unwrap();
6873
6874 assert_eq!(pipe.handshake(&mut buf), Ok(()));
6875
6876 // Client sends stream data.
6877 assert_eq!(pipe.client.stream_send(0, b"a", true), Ok(1));
6878 assert_eq!(pipe.advance(&mut buf), Ok(()));
6879
6880 // Server reads stream data.
6881 let mut b = [0; 15];
6882 pipe.server.stream_recv(0, &mut b).unwrap();
6883 assert_eq!(pipe.advance(&mut buf), Ok(()));
6884
6885 // Server sends stream data bigger than cwnd.
6886 let send_buf1 = [0; 20000];
6887 assert_eq!(pipe.server.stream_send(0, &send_buf1, false), Ok(14085));
6888 assert_eq!(pipe.advance(&mut buf), Ok(()));
6889
6890 // We can't create a new packet header because there is no room by cwnd.
6891 // app_limited should be false because we can't send more by cwnd.
6892 assert_eq!(pipe.server.recovery.app_limited(), false);
6893 }
6894
6895 #[test]
app_limited_false_no_header()6896 fn app_limited_false_no_header() {
6897 let mut buf = [0; 65535];
6898
6899 let mut config = Config::new(PROTOCOL_VERSION).unwrap();
6900 config
6901 .set_application_protos(b"\x06proto1\x06proto2")
6902 .unwrap();
6903 config.set_initial_max_data(50000);
6904 config.set_initial_max_stream_data_bidi_local(50000);
6905 config.set_initial_max_stream_data_bidi_remote(50000);
6906 config.set_max_udp_payload_size(1406);
6907 config.verify_peer(false);
6908
6909 let mut pipe = testing::Pipe::with_client_config(&mut config).unwrap();
6910
6911 assert_eq!(pipe.handshake(&mut buf), Ok(()));
6912
6913 // Client sends stream data.
6914 assert_eq!(pipe.client.stream_send(0, b"a", true), Ok(1));
6915 assert_eq!(pipe.advance(&mut buf), Ok(()));
6916
6917 // Server reads stream data.
6918 let mut b = [0; 15];
6919 pipe.server.stream_recv(0, &mut b).unwrap();
6920 assert_eq!(pipe.advance(&mut buf), Ok(()));
6921
6922 // Server sends stream data bigger than cwnd.
6923 let send_buf1 = [0; 20000];
6924 assert_eq!(pipe.server.stream_send(0, &send_buf1, false), Ok(14085));
6925 assert_eq!(pipe.advance(&mut buf), Ok(()));
6926
6927 // We can't create a new frame because there is no room by cwnd.
6928 // app_limited should be false because we can't send more by cwnd.
6929 assert_eq!(pipe.server.recovery.app_limited(), false);
6930 }
6931
6932 #[test]
limit_ack_ranges()6933 fn limit_ack_ranges() {
6934 let mut buf = [0; 65535];
6935
6936 let mut pipe = testing::Pipe::default().unwrap();
6937
6938 assert_eq!(pipe.handshake(&mut buf), Ok(()));
6939
6940 let epoch = packet::EPOCH_APPLICATION;
6941
6942 assert_eq!(pipe.server.pkt_num_spaces[epoch].recv_pkt_need_ack.len(), 0);
6943
6944 let frames = [frame::Frame::Ping, frame::Frame::Padding { len: 3 }];
6945
6946 let pkt_type = packet::Type::Short;
6947
6948 let mut last_packet_sent = 0;
6949
6950 for _ in 0..512 {
6951 let recv_count = pipe.server.recv_count;
6952
6953 last_packet_sent = pipe.client.pkt_num_spaces[epoch].next_pkt_num;
6954
6955 pipe.send_pkt_to_server(pkt_type, &frames, &mut buf)
6956 .unwrap();
6957
6958 assert_eq!(pipe.server.recv_count, recv_count + 1);
6959
6960 // Skip packet number.
6961 pipe.client.pkt_num_spaces[epoch].next_pkt_num += 1;
6962 }
6963
6964 assert_eq!(
6965 pipe.server.pkt_num_spaces[epoch].recv_pkt_need_ack.len(),
6966 MAX_ACK_RANGES
6967 );
6968
6969 assert_eq!(
6970 pipe.server.pkt_num_spaces[epoch].recv_pkt_need_ack.first(),
6971 Some(last_packet_sent - ((MAX_ACK_RANGES as u64) - 1) * 2)
6972 );
6973
6974 assert_eq!(
6975 pipe.server.pkt_num_spaces[epoch].recv_pkt_need_ack.last(),
6976 Some(last_packet_sent)
6977 );
6978 }
6979
6980 #[test]
6981 /// Tests that streams are correctly scheduled based on their priority.
stream_priority()6982 fn stream_priority() {
6983 // Limit 1-RTT packet size to avoid congestion control interference.
6984 const MAX_TEST_PACKET_SIZE: usize = 540;
6985
6986 let mut buf = [0; 65535];
6987
6988 let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
6989 config
6990 .load_cert_chain_from_pem_file("examples/cert.crt")
6991 .unwrap();
6992 config
6993 .load_priv_key_from_pem_file("examples/cert.key")
6994 .unwrap();
6995 config
6996 .set_application_protos(b"\x06proto1\x06proto2")
6997 .unwrap();
6998 config.set_initial_max_data(1_000_000);
6999 config.set_initial_max_stream_data_bidi_local(1_000_000);
7000 config.set_initial_max_stream_data_bidi_remote(1_000_000);
7001 config.set_initial_max_stream_data_uni(0);
7002 config.set_initial_max_streams_bidi(100);
7003 config.set_initial_max_streams_uni(0);
7004 config.verify_peer(false);
7005
7006 let mut pipe = testing::Pipe::with_config(&mut config).unwrap();
7007 assert_eq!(pipe.handshake(&mut buf), Ok(()));
7008
7009 assert_eq!(pipe.client.stream_send(0, b"a", false), Ok(1));
7010 assert_eq!(pipe.advance(&mut buf), Ok(()));
7011
7012 assert_eq!(pipe.client.stream_send(4, b"a", false), Ok(1));
7013 assert_eq!(pipe.advance(&mut buf), Ok(()));
7014
7015 assert_eq!(pipe.client.stream_send(8, b"a", false), Ok(1));
7016 assert_eq!(pipe.advance(&mut buf), Ok(()));
7017
7018 assert_eq!(pipe.client.stream_send(12, b"a", false), Ok(1));
7019 assert_eq!(pipe.advance(&mut buf), Ok(()));
7020
7021 assert_eq!(pipe.client.stream_send(16, b"a", false), Ok(1));
7022 assert_eq!(pipe.advance(&mut buf), Ok(()));
7023
7024 assert_eq!(pipe.client.stream_send(20, b"a", false), Ok(1));
7025 assert_eq!(pipe.advance(&mut buf), Ok(()));
7026
7027 let mut b = [0; 1];
7028
7029 let out = [b'b'; 500];
7030
7031 // Server prioritizes streams as follows:
7032 // * Stream 8 and 16 have the same priority but are non-incremental.
7033 // * Stream 4, 12 and 20 have the same priority but 20 is non-incremental
7034 // and 4 and 12 are incremental.
7035 // * Stream 0 is on its own.
7036
7037 pipe.server.stream_recv(0, &mut b).unwrap();
7038 assert_eq!(pipe.server.stream_priority(0, 255, true), Ok(()));
7039 pipe.server.stream_send(0, &out, false).unwrap();
7040 pipe.server.stream_send(0, &out, false).unwrap();
7041 pipe.server.stream_send(0, &out, false).unwrap();
7042
7043 pipe.server.stream_recv(12, &mut b).unwrap();
7044 assert_eq!(pipe.server.stream_priority(12, 42, true), Ok(()));
7045 pipe.server.stream_send(12, &out, false).unwrap();
7046 pipe.server.stream_send(12, &out, false).unwrap();
7047 pipe.server.stream_send(12, &out, false).unwrap();
7048
7049 pipe.server.stream_recv(16, &mut b).unwrap();
7050 assert_eq!(pipe.server.stream_priority(16, 10, false), Ok(()));
7051 pipe.server.stream_send(16, &out, false).unwrap();
7052 pipe.server.stream_send(16, &out, false).unwrap();
7053 pipe.server.stream_send(16, &out, false).unwrap();
7054
7055 pipe.server.stream_recv(4, &mut b).unwrap();
7056 assert_eq!(pipe.server.stream_priority(4, 42, true), Ok(()));
7057 pipe.server.stream_send(4, &out, false).unwrap();
7058 pipe.server.stream_send(4, &out, false).unwrap();
7059 pipe.server.stream_send(4, &out, false).unwrap();
7060
7061 pipe.server.stream_recv(8, &mut b).unwrap();
7062 assert_eq!(pipe.server.stream_priority(8, 10, false), Ok(()));
7063 pipe.server.stream_send(8, &out, false).unwrap();
7064 pipe.server.stream_send(8, &out, false).unwrap();
7065 pipe.server.stream_send(8, &out, false).unwrap();
7066
7067 pipe.server.stream_recv(20, &mut b).unwrap();
7068 assert_eq!(pipe.server.stream_priority(20, 42, false), Ok(()));
7069 pipe.server.stream_send(20, &out, false).unwrap();
7070 pipe.server.stream_send(20, &out, false).unwrap();
7071 pipe.server.stream_send(20, &out, false).unwrap();
7072
7073 // First is stream 8.
7074 let mut off = 0;
7075
7076 for _ in 1..=3 {
7077 let len = pipe.server.send(&mut buf[..MAX_TEST_PACKET_SIZE]).unwrap();
7078
7079 let frames =
7080 testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
7081 let stream = frames.iter().next().unwrap();
7082
7083 assert_eq!(stream, &frame::Frame::Stream {
7084 stream_id: 8,
7085 data: stream::RangeBuf::from(&out, off, false),
7086 });
7087
7088 off = match stream {
7089 frame::Frame::Stream { data, .. } => data.max_off(),
7090
7091 _ => unreachable!(),
7092 };
7093 }
7094
7095 // Then is stream 16.
7096 let mut off = 0;
7097
7098 for _ in 1..=3 {
7099 let len = pipe.server.send(&mut buf[..MAX_TEST_PACKET_SIZE]).unwrap();
7100
7101 let frames =
7102 testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
7103 let stream = frames.iter().next().unwrap();
7104
7105 assert_eq!(stream, &frame::Frame::Stream {
7106 stream_id: 16,
7107 data: stream::RangeBuf::from(&out, off, false),
7108 });
7109
7110 off = match stream {
7111 frame::Frame::Stream { data, .. } => data.max_off(),
7112
7113 _ => unreachable!(),
7114 };
7115 }
7116
7117 // Then is stream 20.
7118 let mut off = 0;
7119
7120 for _ in 1..=3 {
7121 let len = pipe.server.send(&mut buf[..MAX_TEST_PACKET_SIZE]).unwrap();
7122
7123 let frames =
7124 testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
7125 let stream = frames.iter().next().unwrap();
7126
7127 assert_eq!(stream, &frame::Frame::Stream {
7128 stream_id: 20,
7129 data: stream::RangeBuf::from(&out, off, false),
7130 });
7131
7132 off = match stream {
7133 frame::Frame::Stream { data, .. } => data.max_off(),
7134
7135 _ => unreachable!(),
7136 };
7137 }
7138
7139 // Then are stream 12 and 4, with the same priority, incrementally.
7140 let mut off = 0;
7141
7142 for _ in 1..=3 {
7143 let len = pipe.server.send(&mut buf[..MAX_TEST_PACKET_SIZE]).unwrap();
7144
7145 let frames =
7146 testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
7147
7148 assert_eq!(
7149 frames.iter().next(),
7150 Some(&frame::Frame::Stream {
7151 stream_id: 12,
7152 data: stream::RangeBuf::from(&out, off, false),
7153 })
7154 );
7155
7156 let len = pipe.server.send(&mut buf[..MAX_TEST_PACKET_SIZE]).unwrap();
7157
7158 let frames =
7159 testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
7160
7161 let stream = frames.iter().next().unwrap();
7162
7163 assert_eq!(stream, &frame::Frame::Stream {
7164 stream_id: 4,
7165 data: stream::RangeBuf::from(&out, off, false),
7166 });
7167
7168 off = match stream {
7169 frame::Frame::Stream { data, .. } => data.max_off(),
7170
7171 _ => unreachable!(),
7172 };
7173 }
7174
7175 // Final is stream 0.
7176 let mut off = 0;
7177
7178 for _ in 1..=3 {
7179 let len = pipe.server.send(&mut buf[..MAX_TEST_PACKET_SIZE]).unwrap();
7180
7181 let frames =
7182 testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
7183 let stream = frames.iter().next().unwrap();
7184
7185 assert_eq!(stream, &frame::Frame::Stream {
7186 stream_id: 0,
7187 data: stream::RangeBuf::from(&out, off, false),
7188 });
7189
7190 off = match stream {
7191 frame::Frame::Stream { data, .. } => data.max_off(),
7192
7193 _ => unreachable!(),
7194 };
7195 }
7196
7197 assert_eq!(pipe.server.send(&mut buf), Err(Error::Done));
7198 }
7199
7200 #[test]
7201 /// Tests that changing a stream's priority is correctly propagated.
7202 ///
7203 /// Re-prioritization is not supported, so this should fail.
7204 #[should_panic]
stream_reprioritize()7205 fn stream_reprioritize() {
7206 let mut buf = [0; 65535];
7207
7208 let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
7209 config
7210 .load_cert_chain_from_pem_file("examples/cert.crt")
7211 .unwrap();
7212 config
7213 .load_priv_key_from_pem_file("examples/cert.key")
7214 .unwrap();
7215 config
7216 .set_application_protos(b"\x06proto1\x06proto2")
7217 .unwrap();
7218 config.set_initial_max_data(30);
7219 config.set_initial_max_stream_data_bidi_local(15);
7220 config.set_initial_max_stream_data_bidi_remote(15);
7221 config.set_initial_max_stream_data_uni(0);
7222 config.set_initial_max_streams_bidi(5);
7223 config.set_initial_max_streams_uni(0);
7224 config.verify_peer(false);
7225
7226 let mut pipe = testing::Pipe::with_config(&mut config).unwrap();
7227 assert_eq!(pipe.handshake(&mut buf), Ok(()));
7228
7229 assert_eq!(pipe.client.stream_send(0, b"a", false), Ok(1));
7230 assert_eq!(pipe.advance(&mut buf), Ok(()));
7231
7232 assert_eq!(pipe.client.stream_send(4, b"a", false), Ok(1));
7233 assert_eq!(pipe.advance(&mut buf), Ok(()));
7234
7235 assert_eq!(pipe.client.stream_send(8, b"a", false), Ok(1));
7236 assert_eq!(pipe.advance(&mut buf), Ok(()));
7237
7238 assert_eq!(pipe.client.stream_send(12, b"a", false), Ok(1));
7239 assert_eq!(pipe.advance(&mut buf), Ok(()));
7240
7241 let mut b = [0; 1];
7242
7243 pipe.server.stream_recv(0, &mut b).unwrap();
7244 assert_eq!(pipe.server.stream_priority(0, 255, true), Ok(()));
7245 pipe.server.stream_send(0, b"b", false).unwrap();
7246
7247 pipe.server.stream_recv(12, &mut b).unwrap();
7248 assert_eq!(pipe.server.stream_priority(12, 42, true), Ok(()));
7249 pipe.server.stream_send(12, b"b", false).unwrap();
7250
7251 pipe.server.stream_recv(8, &mut b).unwrap();
7252 assert_eq!(pipe.server.stream_priority(8, 10, true), Ok(()));
7253 pipe.server.stream_send(8, b"b", false).unwrap();
7254
7255 pipe.server.stream_recv(4, &mut b).unwrap();
7256 assert_eq!(pipe.server.stream_priority(4, 42, true), Ok(()));
7257 pipe.server.stream_send(4, b"b", false).unwrap();
7258
7259 // Stream 0 is re-prioritized!!!
7260 assert_eq!(pipe.server.stream_priority(0, 20, true), Ok(()));
7261
7262 // First is stream 8.
7263 let len = pipe.server.send(&mut buf).unwrap();
7264
7265 let frames =
7266 testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
7267
7268 assert_eq!(
7269 frames.iter().next(),
7270 Some(&frame::Frame::Stream {
7271 stream_id: 8,
7272 data: stream::RangeBuf::from(b"b", 0, false),
7273 })
7274 );
7275
7276 // Then is stream 0.
7277 let len = pipe.server.send(&mut buf).unwrap();
7278
7279 let frames =
7280 testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
7281
7282 assert_eq!(
7283 frames.iter().next(),
7284 Some(&frame::Frame::Stream {
7285 stream_id: 0,
7286 data: stream::RangeBuf::from(b"b", 0, false),
7287 })
7288 );
7289
7290 // Then are stream 12 and 4, with the same priority.
7291 let len = pipe.server.send(&mut buf).unwrap();
7292
7293 let frames =
7294 testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
7295
7296 assert_eq!(
7297 frames.iter().next(),
7298 Some(&frame::Frame::Stream {
7299 stream_id: 12,
7300 data: stream::RangeBuf::from(b"b", 0, false),
7301 })
7302 );
7303
7304 let len = pipe.server.send(&mut buf).unwrap();
7305
7306 let frames =
7307 testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
7308
7309 assert_eq!(
7310 frames.iter().next(),
7311 Some(&frame::Frame::Stream {
7312 stream_id: 4,
7313 data: stream::RangeBuf::from(b"b", 0, false),
7314 })
7315 );
7316
7317 assert_eq!(pipe.server.send(&mut buf), Err(Error::Done));
7318 }
7319
7320 #[test]
7321 /// Tests that old data is retransmitted on PTO.
early_retransmit()7322 fn early_retransmit() {
7323 let mut buf = [0; 65535];
7324
7325 let mut pipe = testing::Pipe::default().unwrap();
7326 assert_eq!(pipe.handshake(&mut buf), Ok(()));
7327
7328 // Client sends stream data.
7329 assert_eq!(pipe.client.stream_send(0, b"a", false), Ok(1));
7330 assert_eq!(pipe.advance(&mut buf), Ok(()));
7331
7332 // Client sends more stream data, but packet is lost
7333 assert_eq!(pipe.client.stream_send(4, b"b", false), Ok(1));
7334 assert!(pipe.client.send(&mut buf).is_ok());
7335
7336 // Wait until PTO expires. Since the RTT is very low, wait a bit more.
7337 let timer = pipe.client.timeout().unwrap();
7338 std::thread::sleep(timer + time::Duration::from_millis(1));
7339
7340 pipe.client.on_timeout();
7341
7342 let epoch = packet::EPOCH_APPLICATION;
7343 assert_eq!(pipe.client.recovery.loss_probes[epoch], 1);
7344
7345 // Client retransmits stream data in PTO probe.
7346 let len = pipe.client.send(&mut buf).unwrap();
7347 assert_eq!(pipe.client.recovery.loss_probes[epoch], 0);
7348
7349 let frames =
7350 testing::decode_pkt(&mut pipe.server, &mut buf, len).unwrap();
7351
7352 let mut iter = frames.iter();
7353
7354 // Skip ACK frame.
7355 iter.next();
7356
7357 assert_eq!(
7358 iter.next(),
7359 Some(&frame::Frame::Stream {
7360 stream_id: 4,
7361 data: stream::RangeBuf::from(b"b", 0, false),
7362 })
7363 );
7364 }
7365
7366 #[test]
7367 /// Tests that client avoids handshake deadlock by arming PTO.
handshake_anti_deadlock()7368 fn handshake_anti_deadlock() {
7369 let mut buf = [0; 65535];
7370
7371 let mut config = Config::new(PROTOCOL_VERSION).unwrap();
7372 config
7373 .load_cert_chain_from_pem_file("examples/cert-big.crt")
7374 .unwrap();
7375 config
7376 .load_priv_key_from_pem_file("examples/cert.key")
7377 .unwrap();
7378 config
7379 .set_application_protos(b"\x06proto1\06proto2")
7380 .unwrap();
7381
7382 let mut pipe = testing::Pipe::with_server_config(&mut config).unwrap();
7383
7384 assert_eq!(pipe.client.handshake_status().has_handshake_keys, false);
7385 assert_eq!(pipe.client.handshake_status().peer_verified_address, false);
7386 assert_eq!(pipe.server.handshake_status().has_handshake_keys, false);
7387 assert_eq!(pipe.server.handshake_status().peer_verified_address, true);
7388
7389 // Client sends padded Initial.
7390 let len = pipe.client.send(&mut buf).unwrap();
7391 assert_eq!(len, 1200);
7392
7393 // Server receives client's Initial and sends own Initial and Handshake
7394 // until it's blocked by the anti-amplification limit.
7395 let len = testing::recv_send(&mut pipe.server, &mut buf, len).unwrap();
7396 assert_eq!(pipe.server.send(&mut buf[len..]), Err(Error::Done));
7397
7398 assert_eq!(pipe.client.handshake_status().has_handshake_keys, false);
7399 assert_eq!(pipe.client.handshake_status().peer_verified_address, false);
7400 assert_eq!(pipe.server.handshake_status().has_handshake_keys, true);
7401 assert_eq!(pipe.server.handshake_status().peer_verified_address, true);
7402
7403 // Client receives the server flight and sends Handshake ACK, but it is
7404 // lost.
7405 assert!(testing::recv_send(&mut pipe.client, &mut buf, len).is_ok());
7406
7407 assert_eq!(pipe.client.handshake_status().has_handshake_keys, true);
7408 assert_eq!(pipe.client.handshake_status().peer_verified_address, false);
7409 assert_eq!(pipe.server.handshake_status().has_handshake_keys, true);
7410 assert_eq!(pipe.server.handshake_status().peer_verified_address, true);
7411
7412 // Make sure client's PTO timer is armed.
7413 assert!(pipe.client.timeout().is_some());
7414 }
7415
7416 #[test]
7417 /// Tests that packets with corrupted type (from Handshake to Initial) are
7418 /// properly ignored.
handshake_packet_type_corruption()7419 fn handshake_packet_type_corruption() {
7420 let mut buf = [0; 65535];
7421
7422 let mut pipe = testing::Pipe::default().unwrap();
7423
7424 // Client sends padded Initial.
7425 let len = pipe.client.send(&mut buf).unwrap();
7426 assert_eq!(len, 1200);
7427
7428 // Server receives client's Initial and sends own Initial and Handshake.
7429 let len = testing::recv_send(&mut pipe.server, &mut buf, len).unwrap();
7430 assert_eq!(pipe.client.recv(&mut buf[..len]), Ok(len));
7431
7432 // Client sends Initial packet with ACK.
7433 let len = pipe.client.send(&mut buf).unwrap();
7434
7435 let hdr = Header::from_slice(&mut buf[..len], 0).unwrap();
7436 assert_eq!(hdr.ty, Type::Initial);
7437
7438 assert_eq!(pipe.server.recv(&mut buf[..len]), Ok(len));
7439
7440 // Client sends Handshake packet.
7441 let len = pipe.client.send(&mut buf).unwrap();
7442
7443 let hdr = Header::from_slice(&mut buf[..len], 0).unwrap();
7444 assert_eq!(hdr.ty, Type::Handshake);
7445
7446 // Packet type is corrupted to Initial..
7447 buf[0] &= !(0x20);
7448
7449 let hdr = Header::from_slice(&mut buf[..len], 0).unwrap();
7450 assert_eq!(hdr.ty, Type::Initial);
7451
7452 // Server receives corrupted packet without returning an error.
7453 assert_eq!(pipe.server.recv(&mut buf[..len]), Ok(len));
7454 }
7455
7456 #[test]
dgram_send_fails_invalidstate()7457 fn dgram_send_fails_invalidstate() {
7458 let mut buf = [0; 65535];
7459
7460 let mut pipe = testing::Pipe::default().unwrap();
7461
7462 assert_eq!(pipe.handshake(&mut buf), Ok(()));
7463
7464 assert_eq!(
7465 pipe.client.dgram_send(b"hello, world"),
7466 Err(Error::InvalidState)
7467 );
7468 }
7469
7470 #[test]
dgram_send_app_limited()7471 fn dgram_send_app_limited() {
7472 let mut buf = [0; 65535];
7473 let send_buf = [0xcf; 1000];
7474
7475 let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
7476 config
7477 .load_cert_chain_from_pem_file("examples/cert.crt")
7478 .unwrap();
7479 config
7480 .load_priv_key_from_pem_file("examples/cert.key")
7481 .unwrap();
7482 config
7483 .set_application_protos(b"\x06proto1\x06proto2")
7484 .unwrap();
7485 config.set_initial_max_data(30);
7486 config.set_initial_max_stream_data_bidi_local(15);
7487 config.set_initial_max_stream_data_bidi_remote(15);
7488 config.set_initial_max_stream_data_uni(10);
7489 config.set_initial_max_streams_bidi(3);
7490 config.set_initial_max_streams_uni(3);
7491 config.enable_dgram(true, 1000, 1000);
7492 config.set_max_udp_payload_size(1200);
7493 config.verify_peer(false);
7494
7495 let mut pipe = testing::Pipe::with_config(&mut config).unwrap();
7496
7497 assert_eq!(pipe.handshake(&mut buf), Ok(()));
7498 assert_eq!(pipe.advance(&mut buf), Ok(()));
7499
7500 for _ in 0..1000 {
7501 assert_eq!(pipe.client.dgram_send(&send_buf), Ok(()));
7502 }
7503
7504 assert!(!pipe.client.recovery.app_limited());
7505 assert_eq!(pipe.client.dgram_send_queue.byte_size(), 1_000_000);
7506
7507 let len = pipe.client.send(&mut buf).unwrap();
7508
7509 assert_ne!(pipe.client.dgram_send_queue.byte_size(), 0);
7510 assert_ne!(pipe.client.dgram_send_queue.byte_size(), 1_000_000);
7511 assert!(!pipe.client.recovery.app_limited());
7512
7513 testing::recv_send(&mut pipe.client, &mut buf, len).unwrap();
7514 testing::recv_send(&mut pipe.server, &mut buf, len).unwrap();
7515
7516 assert_ne!(pipe.client.dgram_send_queue.byte_size(), 0);
7517 assert_ne!(pipe.client.dgram_send_queue.byte_size(), 1_000_000);
7518
7519 assert!(!pipe.client.recovery.app_limited());
7520 }
7521
7522 #[test]
dgram_single_datagram()7523 fn dgram_single_datagram() {
7524 let mut buf = [0; 65535];
7525
7526 let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
7527 config
7528 .load_cert_chain_from_pem_file("examples/cert.crt")
7529 .unwrap();
7530 config
7531 .load_priv_key_from_pem_file("examples/cert.key")
7532 .unwrap();
7533 config
7534 .set_application_protos(b"\x06proto1\x06proto2")
7535 .unwrap();
7536 config.set_initial_max_data(30);
7537 config.set_initial_max_stream_data_bidi_local(15);
7538 config.set_initial_max_stream_data_bidi_remote(15);
7539 config.set_initial_max_stream_data_uni(10);
7540 config.set_initial_max_streams_bidi(3);
7541 config.set_initial_max_streams_uni(3);
7542 config.enable_dgram(true, 10, 10);
7543 config.verify_peer(false);
7544
7545 let mut pipe = testing::Pipe::with_config(&mut config).unwrap();
7546
7547 assert_eq!(pipe.handshake(&mut buf), Ok(()));
7548
7549 assert_eq!(pipe.client.dgram_send(b"hello, world"), Ok(()));
7550
7551 assert_eq!(pipe.advance(&mut buf), Ok(()));
7552
7553 let result1 = pipe.server.dgram_recv(&mut buf);
7554 assert_eq!(result1, Ok(12));
7555
7556 let result2 = pipe.server.dgram_recv(&mut buf);
7557 assert_eq!(result2, Err(Error::Done));
7558 }
7559
7560 #[test]
dgram_multiple_datagrams()7561 fn dgram_multiple_datagrams() {
7562 let mut buf = [0; 65535];
7563
7564 let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
7565 config
7566 .load_cert_chain_from_pem_file("examples/cert.crt")
7567 .unwrap();
7568 config
7569 .load_priv_key_from_pem_file("examples/cert.key")
7570 .unwrap();
7571 config
7572 .set_application_protos(b"\x06proto1\x06proto2")
7573 .unwrap();
7574 config.set_initial_max_data(30);
7575 config.set_initial_max_stream_data_bidi_local(15);
7576 config.set_initial_max_stream_data_bidi_remote(15);
7577 config.set_initial_max_stream_data_uni(10);
7578 config.set_initial_max_streams_bidi(3);
7579 config.set_initial_max_streams_uni(3);
7580 config.enable_dgram(true, 10, 10);
7581 config.verify_peer(false);
7582
7583 let mut pipe = testing::Pipe::with_config(&mut config).unwrap();
7584
7585 assert_eq!(pipe.handshake(&mut buf), Ok(()));
7586
7587 assert_eq!(pipe.client.dgram_send(b"hello, world"), Ok(()));
7588 assert_eq!(pipe.client.dgram_send(b"ciao, mondo"), Ok(()));
7589 assert_eq!(pipe.client.dgram_send(b"hola, mundo"), Ok(()));
7590
7591 pipe.client
7592 .dgram_purge_outgoing(|d: &[u8]| -> bool { d[0] == b'c' });
7593
7594 assert_eq!(pipe.advance(&mut buf), Ok(()));
7595
7596 let result1 = pipe.server.dgram_recv(&mut buf);
7597 assert_eq!(result1, Ok(12));
7598 assert_eq!(buf[0], b'h');
7599 assert_eq!(buf[1], b'e');
7600
7601 let result2 = pipe.server.dgram_recv(&mut buf);
7602 assert_eq!(result2, Ok(11));
7603 assert_eq!(buf[0], b'h');
7604 assert_eq!(buf[1], b'o');
7605
7606 let result3 = pipe.server.dgram_recv(&mut buf);
7607 assert_eq!(result3, Err(Error::Done));
7608 }
7609
7610 #[test]
dgram_send_queue_overflow()7611 fn dgram_send_queue_overflow() {
7612 let mut buf = [0; 65535];
7613
7614 let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
7615 config
7616 .load_cert_chain_from_pem_file("examples/cert.crt")
7617 .unwrap();
7618 config
7619 .load_priv_key_from_pem_file("examples/cert.key")
7620 .unwrap();
7621 config
7622 .set_application_protos(b"\x06proto1\x06proto2")
7623 .unwrap();
7624 config.set_initial_max_data(30);
7625 config.set_initial_max_stream_data_bidi_local(15);
7626 config.set_initial_max_stream_data_bidi_remote(15);
7627 config.set_initial_max_stream_data_uni(10);
7628 config.set_initial_max_streams_bidi(3);
7629 config.set_initial_max_streams_uni(3);
7630 config.enable_dgram(true, 10, 2);
7631 config.verify_peer(false);
7632
7633 let mut pipe = testing::Pipe::with_config(&mut config).unwrap();
7634
7635 assert_eq!(pipe.advance(&mut buf), Ok(()));
7636
7637 assert_eq!(pipe.client.dgram_send(b"hello, world"), Ok(()));
7638 assert_eq!(pipe.client.dgram_send(b"ciao, mondo"), Ok(()));
7639 assert_eq!(pipe.client.dgram_send(b"hola, mundo"), Err(Error::Done));
7640
7641 assert_eq!(pipe.advance(&mut buf), Ok(()));
7642
7643 let result1 = pipe.server.dgram_recv(&mut buf);
7644 assert_eq!(result1, Ok(12));
7645 assert_eq!(buf[0], b'h');
7646 assert_eq!(buf[1], b'e');
7647
7648 let result2 = pipe.server.dgram_recv(&mut buf);
7649 assert_eq!(result2, Ok(11));
7650 assert_eq!(buf[0], b'c');
7651 assert_eq!(buf[1], b'i');
7652
7653 let result3 = pipe.server.dgram_recv(&mut buf);
7654 assert_eq!(result3, Err(Error::Done));
7655 }
7656
7657 #[test]
dgram_recv_queue_overflow()7658 fn dgram_recv_queue_overflow() {
7659 let mut buf = [0; 65535];
7660
7661 let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
7662 config
7663 .load_cert_chain_from_pem_file("examples/cert.crt")
7664 .unwrap();
7665 config
7666 .load_priv_key_from_pem_file("examples/cert.key")
7667 .unwrap();
7668 config
7669 .set_application_protos(b"\x06proto1\x06proto2")
7670 .unwrap();
7671 config.set_initial_max_data(30);
7672 config.set_initial_max_stream_data_bidi_local(15);
7673 config.set_initial_max_stream_data_bidi_remote(15);
7674 config.set_initial_max_stream_data_uni(10);
7675 config.set_initial_max_streams_bidi(3);
7676 config.set_initial_max_streams_uni(3);
7677 config.enable_dgram(true, 2, 10);
7678 config.set_max_udp_payload_size(1200);
7679 config.verify_peer(false);
7680
7681 let mut pipe = testing::Pipe::with_config(&mut config).unwrap();
7682
7683 assert_eq!(pipe.handshake(&mut buf), Ok(()));
7684
7685 assert_eq!(pipe.client.dgram_send(b"hello, world"), Ok(()));
7686 assert_eq!(pipe.client.dgram_send(b"ciao, mondo"), Ok(()));
7687 assert_eq!(pipe.client.dgram_send(b"hola, mundo"), Ok(()));
7688
7689 assert_eq!(pipe.advance(&mut buf), Ok(()));
7690
7691 let result1 = pipe.server.dgram_recv(&mut buf);
7692 assert_eq!(result1, Ok(11));
7693 assert_eq!(buf[0], b'c');
7694 assert_eq!(buf[1], b'i');
7695
7696 let result2 = pipe.server.dgram_recv(&mut buf);
7697 assert_eq!(result2, Ok(11));
7698 assert_eq!(buf[0], b'h');
7699 assert_eq!(buf[1], b'o');
7700
7701 let result3 = pipe.server.dgram_recv(&mut buf);
7702 assert_eq!(result3, Err(Error::Done));
7703 }
7704
7705 #[test]
dgram_send_max_size()7706 fn dgram_send_max_size() {
7707 let mut buf = [0; MAX_DGRAM_FRAME_SIZE as usize];
7708
7709 let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
7710 config
7711 .load_cert_chain_from_pem_file("examples/cert.crt")
7712 .unwrap();
7713 config
7714 .load_priv_key_from_pem_file("examples/cert.key")
7715 .unwrap();
7716 config
7717 .set_application_protos(b"\x06proto1\x06proto2")
7718 .unwrap();
7719 config.set_initial_max_data(30);
7720 config.set_initial_max_stream_data_bidi_local(15);
7721 config.set_initial_max_stream_data_bidi_remote(15);
7722 config.set_initial_max_stream_data_uni(10);
7723 config.set_initial_max_streams_bidi(3);
7724 config.set_initial_max_streams_uni(3);
7725 config.enable_dgram(true, 10, 10);
7726 config.set_max_udp_payload_size(1452);
7727 config.verify_peer(false);
7728
7729 let mut pipe = testing::Pipe::with_config(&mut config).unwrap();
7730
7731 // Before handshake (before peer settings) we don't know max dgram size
7732 assert_eq!(pipe.client.dgram_max_writable_len(), None);
7733
7734 assert_eq!(pipe.handshake(&mut buf), Ok(()));
7735
7736 let max_dgram_size = pipe.client.dgram_max_writable_len().unwrap();
7737
7738 let dgram_packet: Vec<u8> = vec![42; max_dgram_size];
7739
7740 assert_eq!(pipe.client.dgram_send(&dgram_packet), Ok(()));
7741
7742 assert_eq!(pipe.advance(&mut buf), Ok(()));
7743
7744 let result1 = pipe.server.dgram_recv(&mut buf);
7745 assert_eq!(result1, Ok(max_dgram_size));
7746
7747 let result2 = pipe.server.dgram_recv(&mut buf);
7748 assert_eq!(result2, Err(Error::Done));
7749 }
7750 }
7751
7752 pub use crate::packet::Header;
7753 pub use crate::packet::Type;
7754 pub use crate::recovery::CongestionControlAlgorithm;
7755 pub use crate::stream::StreamIter;
7756
7757 mod crypto;
7758 mod dgram;
7759 mod ffi;
7760 mod frame;
7761 pub mod h3;
7762 mod minmax;
7763 mod octets;
7764 mod packet;
7765 mod rand;
7766 mod ranges;
7767 mod recovery;
7768 mod stream;
7769 mod tls;
7770