• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2019, Cloudflare, Inc.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 //     * Redistributions of source code must retain the above copyright notice,
9 //       this list of conditions and the following disclaimer.
10 //
11 //     * Redistributions in binary form must reproduce the above copyright
12 //       notice, this list of conditions and the following disclaimer in the
13 //       documentation and/or other materials provided with the distribution.
14 //
15 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16 // IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17 // THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 
27 //! HTTP/3 wire protocol and QPACK implementation.
28 //!
29 //! This module provides a high level API for sending and receiving HTTP/3
30 //! requests and responses on top of the QUIC transport protocol.
31 //!
32 //! ## Connection setup
33 //!
34 //! HTTP/3 connections require a QUIC transport-layer connection, see
35 //! [Connection setup] for a full description of the setup process.
36 //!
37 //! To use HTTP/3, the QUIC connection must be configured with a suitable
38 //! Application Layer Protocol Negotiation (ALPN) Protocol ID:
39 //!
40 //! ```
41 //! let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
42 //! config.set_application_protos(quiche::h3::APPLICATION_PROTOCOL)?;
43 //! # Ok::<(), quiche::Error>(())
44 //! ```
45 //!
46 //! The QUIC handshake is driven by [sending] and [receiving] QUIC packets.
47 //!
48 //! Once the handshake has completed, the first step in establishing an HTTP/3
49 //! connection is creating its configuration object:
50 //!
51 //! ```
52 //! let h3_config = quiche::h3::Config::new()?;
53 //! # Ok::<(), quiche::h3::Error>(())
54 //! ```
55 //!
56 //! HTTP/3 client and server connections are both created using the
57 //! [`with_transport()`] function, the role is inferred from the type of QUIC
58 //! connection:
59 //!
60 //! ```no_run
61 //! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
62 //! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
63 //! # let peer = "127.0.0.1:1234".parse().unwrap();
64 //! # let local = "127.0.0.1:4321".parse().unwrap();
65 //! # let mut conn = quiche::accept(&scid, None, local, peer, &mut config).unwrap();
66 //! # let h3_config = quiche::h3::Config::new()?;
67 //! let h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
68 //! # Ok::<(), quiche::h3::Error>(())
69 //! ```
70 //!
71 //! ## Sending a request
72 //!
73 //! An HTTP/3 client can send a request by using the connection's
74 //! [`send_request()`] method to queue request headers; [sending] QUIC packets
75 //! causes the requests to get sent to the peer:
76 //!
77 //! ```no_run
78 //! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
79 //! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
80 //! # let peer = "127.0.0.1:1234".parse().unwrap();
81 //! # let local = "127.0.0.1:4321".parse().unwrap();
82 //! # let mut conn = quiche::connect(None, &scid, local, peer, &mut config).unwrap();
83 //! # let h3_config = quiche::h3::Config::new()?;
84 //! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
85 //! let req = vec![
86 //!     quiche::h3::Header::new(b":method", b"GET"),
87 //!     quiche::h3::Header::new(b":scheme", b"https"),
88 //!     quiche::h3::Header::new(b":authority", b"quic.tech"),
89 //!     quiche::h3::Header::new(b":path", b"/"),
90 //!     quiche::h3::Header::new(b"user-agent", b"quiche"),
91 //! ];
92 //!
93 //! h3_conn.send_request(&mut conn, &req, true)?;
94 //! # Ok::<(), quiche::h3::Error>(())
95 //! ```
96 //!
97 //! An HTTP/3 client can send a request with additional body data by using
98 //! the connection's [`send_body()`] method:
99 //!
100 //! ```no_run
101 //! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
102 //! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
103 //! # let peer = "127.0.0.1:1234".parse().unwrap();
104 //! # let local = "127.0.0.1:4321".parse().unwrap();
105 //! # let mut conn = quiche::connect(None, &scid, local, peer, &mut config).unwrap();
106 //! # let h3_config = quiche::h3::Config::new()?;
107 //! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
108 //! let req = vec![
109 //!     quiche::h3::Header::new(b":method", b"GET"),
110 //!     quiche::h3::Header::new(b":scheme", b"https"),
111 //!     quiche::h3::Header::new(b":authority", b"quic.tech"),
112 //!     quiche::h3::Header::new(b":path", b"/"),
113 //!     quiche::h3::Header::new(b"user-agent", b"quiche"),
114 //! ];
115 //!
116 //! let stream_id = h3_conn.send_request(&mut conn, &req, false)?;
117 //! h3_conn.send_body(&mut conn, stream_id, b"Hello World!", true)?;
118 //! # Ok::<(), quiche::h3::Error>(())
119 //! ```
120 //!
121 //! ## Handling requests and responses
122 //!
123 //! After [receiving] QUIC packets, HTTP/3 data is processed using the
124 //! connection's [`poll()`] method. On success, this returns an [`Event`] object
125 //! and an ID corresponding to the stream where the `Event` originated.
126 //!
127 //! An HTTP/3 server uses [`poll()`] to read requests and responds to them using
128 //! [`send_response()`] and [`send_body()`]:
129 //!
130 //! ```no_run
131 //! use quiche::h3::NameValue;
132 //!
133 //! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
134 //! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
135 //! # let peer = "127.0.0.1:1234".parse().unwrap();
136 //! # let local = "127.0.0.1:1234".parse().unwrap();
137 //! # let mut conn = quiche::accept(&scid, None, local, peer, &mut config).unwrap();
138 //! # let h3_config = quiche::h3::Config::new()?;
139 //! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
140 //! loop {
141 //!     match h3_conn.poll(&mut conn) {
142 //!         Ok((stream_id, quiche::h3::Event::Headers{list, has_body})) => {
143 //!             let mut headers = list.into_iter();
144 //!
145 //!             // Look for the request's method.
146 //!             let method = headers.find(|h| h.name() == b":method").unwrap();
147 //!
148 //!             // Look for the request's path.
149 //!             let path = headers.find(|h| h.name() == b":path").unwrap();
150 //!
151 //!             if method.value() == b"GET" && path.value() == b"/" {
152 //!                 let resp = vec![
153 //!                     quiche::h3::Header::new(b":status", 200.to_string().as_bytes()),
154 //!                     quiche::h3::Header::new(b"server", b"quiche"),
155 //!                 ];
156 //!
157 //!                 h3_conn.send_response(&mut conn, stream_id, &resp, false)?;
158 //!                 h3_conn.send_body(&mut conn, stream_id, b"Hello World!", true)?;
159 //!             }
160 //!         },
161 //!
162 //!         Ok((stream_id, quiche::h3::Event::Data)) => {
163 //!             // Request body data, handle it.
164 //!             # return Ok(());
165 //!         },
166 //!
167 //!         Ok((stream_id, quiche::h3::Event::Finished)) => {
168 //!             // Peer terminated stream, handle it.
169 //!         },
170 //!
171 //!         Ok((stream_id, quiche::h3::Event::Reset(err))) => {
172 //!             // Peer reset the stream, handle it.
173 //!         },
174 //!
175 //!         Ok((_flow_id, quiche::h3::Event::Datagram)) => (),
176 //!
177 //!         Ok((_flow_id, quiche::h3::Event::PriorityUpdate)) => (),
178 //!
179 //!         Ok((goaway_id, quiche::h3::Event::GoAway)) => {
180 //!              // Peer signalled it is going away, handle it.
181 //!         },
182 //!
183 //!         Err(quiche::h3::Error::Done) => {
184 //!             // Done reading.
185 //!             break;
186 //!         },
187 //!
188 //!         Err(e) => {
189 //!             // An error occurred, handle it.
190 //!             break;
191 //!         },
192 //!     }
193 //! }
194 //! # Ok::<(), quiche::h3::Error>(())
195 //! ```
196 //!
197 //! An HTTP/3 client uses [`poll()`] to read responses:
198 //!
199 //! ```no_run
200 //! use quiche::h3::NameValue;
201 //!
202 //! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
203 //! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
204 //! # let peer = "127.0.0.1:1234".parse().unwrap();
205 //! # let local = "127.0.0.1:1234".parse().unwrap();
206 //! # let mut conn = quiche::connect(None, &scid, local, peer, &mut config).unwrap();
207 //! # let h3_config = quiche::h3::Config::new()?;
208 //! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
209 //! loop {
210 //!     match h3_conn.poll(&mut conn) {
211 //!         Ok((stream_id, quiche::h3::Event::Headers{list, has_body})) => {
212 //!             let status = list.iter().find(|h| h.name() == b":status").unwrap();
213 //!             println!("Received {} response on stream {}",
214 //!                      std::str::from_utf8(status.value()).unwrap(),
215 //!                      stream_id);
216 //!         },
217 //!
218 //!         Ok((stream_id, quiche::h3::Event::Data)) => {
219 //!             let mut body = vec![0; 4096];
220 //!
221 //!             // Consume all body data received on the stream.
222 //!             while let Ok(read) =
223 //!                 h3_conn.recv_body(&mut conn, stream_id, &mut body)
224 //!             {
225 //!                 println!("Received {} bytes of payload on stream {}",
226 //!                          read, stream_id);
227 //!             }
228 //!         },
229 //!
230 //!         Ok((stream_id, quiche::h3::Event::Finished)) => {
231 //!             // Peer terminated stream, handle it.
232 //!         },
233 //!
234 //!         Ok((stream_id, quiche::h3::Event::Reset(err))) => {
235 //!             // Peer reset the stream, handle it.
236 //!         },
237 //!
238 //!         Ok((_flow_id, quiche::h3::Event::Datagram)) => (),
239 //!
240 //!         Ok((_prioritized_element_id, quiche::h3::Event::PriorityUpdate)) => (),
241 //!
242 //!         Ok((goaway_id, quiche::h3::Event::GoAway)) => {
243 //!              // Peer signalled it is going away, handle it.
244 //!         },
245 //!
246 //!         Err(quiche::h3::Error::Done) => {
247 //!             // Done reading.
248 //!             break;
249 //!         },
250 //!
251 //!         Err(e) => {
252 //!             // An error occurred, handle it.
253 //!             break;
254 //!         },
255 //!     }
256 //! }
257 //! # Ok::<(), quiche::h3::Error>(())
258 //! ```
259 //!
260 //! ## Detecting end of request or response
261 //!
262 //! A single HTTP/3 request or response may consist of several HEADERS and DATA
263 //! frames; it is finished when the QUIC stream is closed. Calling [`poll()`]
264 //! repeatedly will generate an [`Event`] for each of these. The application may
265 //! use these event to do additional HTTP semantic validation.
266 //!
267 //! ## HTTP/3 protocol errors
268 //!
269 //! Quiche is responsible for managing the HTTP/3 connection, ensuring it is in
270 //! a correct state and validating all messages received by a peer. This mainly
271 //! takes place in the [`poll()`] method. If an HTTP/3 error occurs, quiche will
272 //! close the connection and send an appropriate CONNECTION_CLOSE frame to the
273 //! peer. An [`Error`] is returned to the application so that it can perform any
274 //! required tidy up such as closing sockets.
275 //!
276 //! [`application_proto()`]: ../struct.Connection.html#method.application_proto
277 //! [`stream_finished()`]: ../struct.Connection.html#method.stream_finished
278 //! [Connection setup]: ../index.html#connection-setup
279 //! [sending]: ../index.html#generating-outgoing-packets
280 //! [receiving]: ../index.html#handling-incoming-packets
281 //! [`with_transport()`]: struct.Connection.html#method.with_transport
282 //! [`poll()`]: struct.Connection.html#method.poll
283 //! [`Event`]: enum.Event.html
284 //! [`Error`]: enum.Error.html
285 //! [`send_request()`]: struct.Connection.html#method.send_response
286 //! [`send_response()`]: struct.Connection.html#method.send_response
287 //! [`send_body()`]: struct.Connection.html#method.send_body
288 
289 use std::collections::VecDeque;
290 
291 #[cfg(feature = "sfv")]
292 use std::convert::TryFrom;
293 use std::fmt;
294 use std::fmt::Write;
295 
296 #[cfg(feature = "qlog")]
297 use qlog::events::h3::H3FrameCreated;
298 #[cfg(feature = "qlog")]
299 use qlog::events::h3::H3FrameParsed;
300 #[cfg(feature = "qlog")]
301 use qlog::events::h3::H3Owner;
302 #[cfg(feature = "qlog")]
303 use qlog::events::h3::H3PriorityTargetStreamType;
304 #[cfg(feature = "qlog")]
305 use qlog::events::h3::H3StreamType;
306 #[cfg(feature = "qlog")]
307 use qlog::events::h3::H3StreamTypeSet;
308 #[cfg(feature = "qlog")]
309 use qlog::events::h3::Http3EventType;
310 #[cfg(feature = "qlog")]
311 use qlog::events::h3::Http3Frame;
312 #[cfg(feature = "qlog")]
313 use qlog::events::EventData;
314 #[cfg(feature = "qlog")]
315 use qlog::events::EventImportance;
316 #[cfg(feature = "qlog")]
317 use qlog::events::EventType;
318 
319 /// List of ALPN tokens of supported HTTP/3 versions.
320 ///
321 /// This can be passed directly to the [`Config::set_application_protos()`]
322 /// method when implementing HTTP/3 applications.
323 ///
324 /// [`Config::set_application_protos()`]:
325 /// ../struct.Config.html#method.set_application_protos
326 pub const APPLICATION_PROTOCOL: &[&[u8]] = &[b"h3", b"h3-29", b"h3-28", b"h3-27"];
327 
328 // The offset used when converting HTTP/3 urgency to quiche urgency.
329 const PRIORITY_URGENCY_OFFSET: u8 = 124;
330 
331 // Parameter values as specified in [Extensible Priorities].
332 //
333 // [Extensible Priorities]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
334 const PRIORITY_URGENCY_LOWER_BOUND: u8 = 0;
335 const PRIORITY_URGENCY_UPPER_BOUND: u8 = 7;
336 const PRIORITY_URGENCY_DEFAULT: u8 = 3;
337 const PRIORITY_INCREMENTAL_DEFAULT: bool = false;
338 
339 #[cfg(feature = "qlog")]
340 const QLOG_FRAME_CREATED: EventType =
341     EventType::Http3EventType(Http3EventType::FrameCreated);
342 #[cfg(feature = "qlog")]
343 const QLOG_FRAME_PARSED: EventType =
344     EventType::Http3EventType(Http3EventType::FrameParsed);
345 #[cfg(feature = "qlog")]
346 const QLOG_STREAM_TYPE_SET: EventType =
347     EventType::Http3EventType(Http3EventType::StreamTypeSet);
348 
349 /// A specialized [`Result`] type for quiche HTTP/3 operations.
350 ///
351 /// This type is used throughout quiche's HTTP/3 public API for any operation
352 /// that can produce an error.
353 ///
354 /// [`Result`]: https://doc.rust-lang.org/std/result/enum.Result.html
355 pub type Result<T> = std::result::Result<T, Error>;
356 
357 /// An HTTP/3 error.
358 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
359 pub enum Error {
360     /// There is no error or no work to do
361     Done,
362 
363     /// The provided buffer is too short.
364     BufferTooShort,
365 
366     /// Internal error in the HTTP/3 stack.
367     InternalError,
368 
369     /// Endpoint detected that the peer is exhibiting behavior that causes.
370     /// excessive load.
371     ExcessiveLoad,
372 
373     /// Stream ID or Push ID greater that current maximum was
374     /// used incorrectly, such as exceeding a limit, reducing a limit,
375     /// or being reused.
376     IdError,
377 
378     /// The endpoint detected that its peer created a stream that it will not
379     /// accept.
380     StreamCreationError,
381 
382     /// A required critical stream was closed.
383     ClosedCriticalStream,
384 
385     /// No SETTINGS frame at beginning of control stream.
386     MissingSettings,
387 
388     /// A frame was received which is not permitted in the current state.
389     FrameUnexpected,
390 
391     /// Frame violated layout or size rules.
392     FrameError,
393 
394     /// QPACK Header block decompression failure.
395     QpackDecompressionFailed,
396 
397     /// Error originated from the transport layer.
398     TransportError(crate::Error),
399 
400     /// The underlying QUIC stream (or connection) doesn't have enough capacity
401     /// for the operation to complete. The application should retry later on.
402     StreamBlocked,
403 
404     /// Error in the payload of a SETTINGS frame.
405     SettingsError,
406 
407     /// Server rejected request.
408     RequestRejected,
409 
410     /// Request or its response cancelled.
411     RequestCancelled,
412 
413     /// Client's request stream terminated without containing a full-formed
414     /// request.
415     RequestIncomplete,
416 
417     /// An HTTP message was malformed and cannot be processed.
418     MessageError,
419 
420     /// The TCP connection established in response to a CONNECT request was
421     /// reset or abnormally closed.
422     ConnectError,
423 
424     /// The requested operation cannot be served over HTTP/3. Peer should retry
425     /// over HTTP/1.1.
426     VersionFallback,
427 }
428 
429 impl Error {
to_wire(self) -> u64430     fn to_wire(self) -> u64 {
431         match self {
432             Error::Done => 0x100,
433             Error::InternalError => 0x102,
434             Error::StreamCreationError => 0x103,
435             Error::ClosedCriticalStream => 0x104,
436             Error::FrameUnexpected => 0x105,
437             Error::FrameError => 0x106,
438             Error::ExcessiveLoad => 0x107,
439             Error::IdError => 0x108,
440             Error::MissingSettings => 0x10A,
441             Error::QpackDecompressionFailed => 0x200,
442             Error::BufferTooShort => 0x999,
443             Error::TransportError { .. } => 0xFF,
444             Error::StreamBlocked => 0xFF,
445             Error::SettingsError => 0x109,
446             Error::RequestRejected => 0x10B,
447             Error::RequestCancelled => 0x10C,
448             Error::RequestIncomplete => 0x10D,
449             Error::MessageError => 0x10E,
450             Error::ConnectError => 0x10F,
451             Error::VersionFallback => 0x110,
452         }
453     }
454 
455     #[cfg(feature = "ffi")]
to_c(self) -> libc::ssize_t456     fn to_c(self) -> libc::ssize_t {
457         match self {
458             Error::Done => -1,
459             Error::BufferTooShort => -2,
460             Error::InternalError => -3,
461             Error::ExcessiveLoad => -4,
462             Error::IdError => -5,
463             Error::StreamCreationError => -6,
464             Error::ClosedCriticalStream => -7,
465             Error::MissingSettings => -8,
466             Error::FrameUnexpected => -9,
467             Error::FrameError => -10,
468             Error::QpackDecompressionFailed => -11,
469             // -12 was previously used for TransportError, skip it
470             Error::StreamBlocked => -13,
471             Error::SettingsError => -14,
472             Error::RequestRejected => -15,
473             Error::RequestCancelled => -16,
474             Error::RequestIncomplete => -17,
475             Error::MessageError => -18,
476             Error::ConnectError => -19,
477             Error::VersionFallback => -20,
478 
479             Error::TransportError(quic_error) => quic_error.to_c() - 1000,
480         }
481     }
482 }
483 
484 impl std::fmt::Display for Error {
fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result485     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
486         write!(f, "{self:?}")
487     }
488 }
489 
490 impl std::error::Error for Error {
source(&self) -> Option<&(dyn std::error::Error + 'static)>491     fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
492         None
493     }
494 }
495 
496 impl std::convert::From<super::Error> for Error {
from(err: super::Error) -> Self497     fn from(err: super::Error) -> Self {
498         match err {
499             super::Error::Done => Error::Done,
500 
501             _ => Error::TransportError(err),
502         }
503     }
504 }
505 
506 impl std::convert::From<octets::BufferTooShortError> for Error {
from(_err: octets::BufferTooShortError) -> Self507     fn from(_err: octets::BufferTooShortError) -> Self {
508         Error::BufferTooShort
509     }
510 }
511 
512 /// An HTTP/3 configuration.
513 pub struct Config {
514     max_field_section_size: Option<u64>,
515     qpack_max_table_capacity: Option<u64>,
516     qpack_blocked_streams: Option<u64>,
517     connect_protocol_enabled: Option<u64>,
518 }
519 
520 impl Config {
521     /// Creates a new configuration object with default settings.
new() -> Result<Config>522     pub const fn new() -> Result<Config> {
523         Ok(Config {
524             max_field_section_size: None,
525             qpack_max_table_capacity: None,
526             qpack_blocked_streams: None,
527             connect_protocol_enabled: None,
528         })
529     }
530 
531     /// Sets the `SETTINGS_MAX_FIELD_SECTION_SIZE` setting.
532     ///
533     /// By default no limit is enforced. When a request whose headers exceed
534     /// the limit set by the application is received, the call to the [`poll()`]
535     /// method will return the [`Error::ExcessiveLoad`] error, and the
536     /// connection will be closed.
537     ///
538     /// [`poll()`]: struct.Connection.html#method.poll
539     /// [`Error::ExcessiveLoad`]: enum.Error.html#variant.ExcessiveLoad
set_max_field_section_size(&mut self, v: u64)540     pub fn set_max_field_section_size(&mut self, v: u64) {
541         self.max_field_section_size = Some(v);
542     }
543 
544     /// Sets the `SETTINGS_QPACK_MAX_TABLE_CAPACITY` setting.
545     ///
546     /// The default value is `0`.
set_qpack_max_table_capacity(&mut self, v: u64)547     pub fn set_qpack_max_table_capacity(&mut self, v: u64) {
548         self.qpack_max_table_capacity = Some(v);
549     }
550 
551     /// Sets the `SETTINGS_QPACK_BLOCKED_STREAMS` setting.
552     ///
553     /// The default value is `0`.
set_qpack_blocked_streams(&mut self, v: u64)554     pub fn set_qpack_blocked_streams(&mut self, v: u64) {
555         self.qpack_blocked_streams = Some(v);
556     }
557 
558     /// Sets or omits the `SETTINGS_ENABLE_CONNECT_PROTOCOL` setting.
559     ///
560     /// The default value is `false`.
enable_extended_connect(&mut self, enabled: bool)561     pub fn enable_extended_connect(&mut self, enabled: bool) {
562         if enabled {
563             self.connect_protocol_enabled = Some(1);
564         } else {
565             self.connect_protocol_enabled = None;
566         }
567     }
568 }
569 
570 /// A trait for types with associated string name and value.
571 pub trait NameValue {
572     /// Returns the object's name.
name(&self) -> &[u8]573     fn name(&self) -> &[u8];
574 
575     /// Returns the object's value.
value(&self) -> &[u8]576     fn value(&self) -> &[u8];
577 }
578 
579 impl NameValue for (&[u8], &[u8]) {
name(&self) -> &[u8]580     fn name(&self) -> &[u8] {
581         self.0
582     }
583 
value(&self) -> &[u8]584     fn value(&self) -> &[u8] {
585         self.1
586     }
587 }
588 
589 /// An owned name-value pair representing a raw HTTP header.
590 #[derive(Clone, PartialEq, Eq)]
591 pub struct Header(Vec<u8>, Vec<u8>);
592 
try_print_as_readable(hdr: &[u8], f: &mut fmt::Formatter) -> fmt::Result593 fn try_print_as_readable(hdr: &[u8], f: &mut fmt::Formatter) -> fmt::Result {
594     match std::str::from_utf8(hdr) {
595         Ok(s) => f.write_str(&s.escape_default().to_string()),
596         Err(_) => write!(f, "{hdr:?}"),
597     }
598 }
599 
600 impl fmt::Debug for Header {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result601     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
602         f.write_char('"')?;
603         try_print_as_readable(&self.0, f)?;
604         f.write_str(": ")?;
605         try_print_as_readable(&self.1, f)?;
606         f.write_char('"')
607     }
608 }
609 
610 impl Header {
611     /// Creates a new header.
612     ///
613     /// Both `name` and `value` will be cloned.
new(name: &[u8], value: &[u8]) -> Self614     pub fn new(name: &[u8], value: &[u8]) -> Self {
615         Self(name.to_vec(), value.to_vec())
616     }
617 }
618 
619 impl NameValue for Header {
name(&self) -> &[u8]620     fn name(&self) -> &[u8] {
621         &self.0
622     }
623 
value(&self) -> &[u8]624     fn value(&self) -> &[u8] {
625         &self.1
626     }
627 }
628 
629 /// A non-owned name-value pair representing a raw HTTP header.
630 #[derive(Clone, Debug, PartialEq, Eq)]
631 pub struct HeaderRef<'a>(&'a [u8], &'a [u8]);
632 
633 impl<'a> HeaderRef<'a> {
634     /// Creates a new header.
new(name: &'a [u8], value: &'a [u8]) -> Self635     pub const fn new(name: &'a [u8], value: &'a [u8]) -> Self {
636         Self(name, value)
637     }
638 }
639 
640 impl<'a> NameValue for HeaderRef<'a> {
name(&self) -> &[u8]641     fn name(&self) -> &[u8] {
642         self.0
643     }
644 
value(&self) -> &[u8]645     fn value(&self) -> &[u8] {
646         self.1
647     }
648 }
649 
650 /// An HTTP/3 connection event.
651 #[derive(Clone, Debug, PartialEq, Eq)]
652 pub enum Event {
653     /// Request/response headers were received.
654     Headers {
655         /// The list of received header fields. The application should validate
656         /// pseudo-headers and headers.
657         list: Vec<Header>,
658 
659         /// Whether data will follow the headers on the stream.
660         has_body: bool,
661     },
662 
663     /// Data was received.
664     ///
665     /// This indicates that the application can use the [`recv_body()`] method
666     /// to retrieve the data from the stream.
667     ///
668     /// Note that [`recv_body()`] will need to be called repeatedly until the
669     /// [`Done`] value is returned, as the event will not be re-armed until all
670     /// buffered data is read.
671     ///
672     /// [`recv_body()`]: struct.Connection.html#method.recv_body
673     /// [`Done`]: enum.Error.html#variant.Done
674     Data,
675 
676     /// Stream was closed,
677     Finished,
678 
679     /// Stream was reset.
680     ///
681     /// The associated data represents the error code sent by the peer.
682     Reset(u64),
683 
684     /// DATAGRAM was received.
685     ///
686     /// This indicates that the application can use the [`recv_dgram()`] method
687     /// to retrieve the HTTP/3 DATAGRAM.
688     ///
689     /// Note that [`recv_dgram()`] will need to be called repeatedly until the
690     /// [`Done`] value is returned, as the event will not be re-armed until all
691     /// buffered DATAGRAMs with the same flow ID are read.
692     ///
693     /// [`recv_dgram()`]: struct.Connection.html#method.recv_dgram
694     /// [`Done`]: enum.Error.html#variant.Done
695     Datagram,
696 
697     /// PRIORITY_UPDATE was received.
698     ///
699     /// This indicates that the application can use the
700     /// [`take_last_priority_update()`] method to take the last received
701     /// PRIORITY_UPDATE for a specified stream.
702     ///
703     /// This event is triggered once per stream until the last PRIORITY_UPDATE
704     /// is taken. It is recommended that applications defer taking the
705     /// PRIORITY_UPDATE until after [`poll()`] returns [`Done`].
706     ///
707     /// [`take_last_priority_update()`]: struct.Connection.html#method.take_last_priority_update
708     /// [`poll()`]: struct.Connection.html#method.poll
709     /// [`Done`]: enum.Error.html#variant.Done
710     PriorityUpdate,
711 
712     /// GOAWAY was received.
713     GoAway,
714 }
715 
716 /// Extensible Priorities parameters.
717 ///
718 /// The `TryFrom` trait supports constructing this object from the serialized
719 /// Structured Fields Dictionary field value. I.e, use `TryFrom` to parse the
720 /// value of a Priority header field or a PRIORITY_UPDATE frame. Using this
721 /// trait requires the `sfv` feature to be enabled.
722 #[derive(Debug, PartialEq, Eq)]
723 #[repr(C)]
724 pub struct Priority {
725     urgency: u8,
726     incremental: bool,
727 }
728 
729 impl Default for Priority {
default() -> Self730     fn default() -> Self {
731         Priority {
732             urgency: PRIORITY_URGENCY_DEFAULT,
733             incremental: PRIORITY_INCREMENTAL_DEFAULT,
734         }
735     }
736 }
737 
738 impl Priority {
739     /// Creates a new Priority.
new(urgency: u8, incremental: bool) -> Self740     pub const fn new(urgency: u8, incremental: bool) -> Self {
741         Priority {
742             urgency,
743             incremental,
744         }
745     }
746 }
747 
748 #[cfg(feature = "sfv")]
749 #[cfg_attr(docsrs, doc(cfg(feature = "sfv")))]
750 impl TryFrom<&[u8]> for Priority {
751     type Error = crate::h3::Error;
752 
753     /// Try to parse an Extensible Priority field value.
754     ///
755     /// The field value is expected to be a Structured Fields Dictionary; see
756     /// [Extensible Priorities].
757     ///
758     /// If the `u` or `i` fields are contained with correct types, a constructed
759     /// Priority object is returned. Note that urgency values outside of valid
760     /// range (0 through 7) are clamped to 7.
761     ///
762     /// If the `u` or `i` fields are contained with the wrong types,
763     /// Error::Done is returned.
764     ///
765     /// Omitted parameters will yield default values.
766     ///
767     /// [Extensible Priorities]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
try_from(value: &[u8]) -> std::result::Result<Self, Self::Error>768     fn try_from(value: &[u8]) -> std::result::Result<Self, Self::Error> {
769         let dict = match sfv::Parser::parse_dictionary(value) {
770             Ok(v) => v,
771 
772             Err(_) => return Err(Error::Done),
773         };
774 
775         let urgency = match dict.get("u") {
776             // If there is a u parameter, try to read it as an Item of type
777             // Integer. If the value out of the spec's allowed range
778             // (0 through 7), that's an error so set it to the upper
779             // bound (lowest priority) to avoid interference with
780             // other streams.
781             Some(sfv::ListEntry::Item(item)) => match item.bare_item.as_int() {
782                 Some(v) => {
783                     if !(PRIORITY_URGENCY_LOWER_BOUND as i64..=
784                         PRIORITY_URGENCY_UPPER_BOUND as i64)
785                         .contains(&v)
786                     {
787                         PRIORITY_URGENCY_UPPER_BOUND
788                     } else {
789                         v as u8
790                     }
791                 },
792 
793                 None => return Err(Error::Done),
794             },
795 
796             Some(sfv::ListEntry::InnerList(_)) => return Err(Error::Done),
797 
798             // Omitted so use default value.
799             None => PRIORITY_URGENCY_DEFAULT,
800         };
801 
802         let incremental = match dict.get("i") {
803             Some(sfv::ListEntry::Item(item)) =>
804                 item.bare_item.as_bool().ok_or(Error::Done)?,
805 
806             // Omitted so use default value.
807             _ => false,
808         };
809 
810         Ok(Priority::new(urgency, incremental))
811     }
812 }
813 
814 struct ConnectionSettings {
815     pub max_field_section_size: Option<u64>,
816     pub qpack_max_table_capacity: Option<u64>,
817     pub qpack_blocked_streams: Option<u64>,
818     pub connect_protocol_enabled: Option<u64>,
819     pub h3_datagram: Option<u64>,
820     pub raw: Option<Vec<(u64, u64)>>,
821 }
822 
823 struct QpackStreams {
824     pub encoder_stream_id: Option<u64>,
825     pub decoder_stream_id: Option<u64>,
826 }
827 
828 /// An HTTP/3 connection.
829 pub struct Connection {
830     is_server: bool,
831 
832     next_request_stream_id: u64,
833     next_uni_stream_id: u64,
834 
835     streams: crate::stream::StreamIdHashMap<stream::Stream>,
836 
837     local_settings: ConnectionSettings,
838     peer_settings: ConnectionSettings,
839 
840     control_stream_id: Option<u64>,
841     peer_control_stream_id: Option<u64>,
842 
843     qpack_encoder: qpack::Encoder,
844     qpack_decoder: qpack::Decoder,
845 
846     local_qpack_streams: QpackStreams,
847     peer_qpack_streams: QpackStreams,
848 
849     max_push_id: u64,
850 
851     finished_streams: VecDeque<u64>,
852 
853     frames_greased: bool,
854 
855     local_goaway_id: Option<u64>,
856     peer_goaway_id: Option<u64>,
857 
858     dgram_event_triggered: bool,
859 }
860 
861 impl Connection {
new( config: &Config, is_server: bool, enable_dgram: bool, ) -> Result<Connection>862     fn new(
863         config: &Config, is_server: bool, enable_dgram: bool,
864     ) -> Result<Connection> {
865         let initial_uni_stream_id = if is_server { 0x3 } else { 0x2 };
866         let h3_datagram = if enable_dgram { Some(1) } else { None };
867 
868         Ok(Connection {
869             is_server,
870 
871             next_request_stream_id: 0,
872 
873             next_uni_stream_id: initial_uni_stream_id,
874 
875             streams: Default::default(),
876 
877             local_settings: ConnectionSettings {
878                 max_field_section_size: config.max_field_section_size,
879                 qpack_max_table_capacity: config.qpack_max_table_capacity,
880                 qpack_blocked_streams: config.qpack_blocked_streams,
881                 connect_protocol_enabled: config.connect_protocol_enabled,
882                 h3_datagram,
883                 raw: Default::default(),
884             },
885 
886             peer_settings: ConnectionSettings {
887                 max_field_section_size: None,
888                 qpack_max_table_capacity: None,
889                 qpack_blocked_streams: None,
890                 h3_datagram: None,
891                 connect_protocol_enabled: None,
892                 raw: Default::default(),
893             },
894 
895             control_stream_id: None,
896             peer_control_stream_id: None,
897 
898             qpack_encoder: qpack::Encoder::new(),
899             qpack_decoder: qpack::Decoder::new(),
900 
901             local_qpack_streams: QpackStreams {
902                 encoder_stream_id: None,
903                 decoder_stream_id: None,
904             },
905 
906             peer_qpack_streams: QpackStreams {
907                 encoder_stream_id: None,
908                 decoder_stream_id: None,
909             },
910 
911             max_push_id: 0,
912 
913             finished_streams: VecDeque::new(),
914 
915             frames_greased: false,
916 
917             local_goaway_id: None,
918             peer_goaway_id: None,
919 
920             dgram_event_triggered: false,
921         })
922     }
923 
924     /// Creates a new HTTP/3 connection using the provided QUIC connection.
925     ///
926     /// This will also initiate the HTTP/3 handshake with the peer by opening
927     /// all control streams (including QPACK) and sending the local settings.
928     ///
929     /// On success the new connection is returned.
930     ///
931     /// The [`StreamLimit`] error is returned when the HTTP/3 control stream
932     /// cannot be created due to stream limits.
933     ///
934     /// The [`InternalError`] error is returned when either the underlying QUIC
935     /// connection is not in a suitable state, or the HTTP/3 control stream
936     /// cannot be created due to flow control limits.
937     ///
938     /// [`StreamLimit`]: ../enum.Error.html#variant.StreamLimit
939     /// [`InternalError`]: ../enum.Error.html#variant.InternalError
with_transport( conn: &mut super::Connection, config: &Config, ) -> Result<Connection>940     pub fn with_transport(
941         conn: &mut super::Connection, config: &Config,
942     ) -> Result<Connection> {
943         let is_client = !conn.is_server;
944         if is_client && !(conn.is_established() || conn.is_in_early_data()) {
945             trace!("{} QUIC connection must be established or in early data before creating an HTTP/3 connection", conn.trace_id());
946             return Err(Error::InternalError);
947         }
948 
949         let mut http3_conn =
950             Connection::new(config, conn.is_server, conn.dgram_enabled())?;
951 
952         match http3_conn.send_settings(conn) {
953             Ok(_) => (),
954 
955             Err(e) => {
956                 conn.close(true, e.to_wire(), b"Error opening control stream")?;
957                 return Err(e);
958             },
959         };
960 
961         // Try opening QPACK streams, but ignore errors if it fails since we
962         // don't need them right now.
963         http3_conn.open_qpack_encoder_stream(conn).ok();
964         http3_conn.open_qpack_decoder_stream(conn).ok();
965 
966         if conn.grease {
967             // Try opening a GREASE stream, but ignore errors since it's not
968             // critical.
969             http3_conn.open_grease_stream(conn).ok();
970         }
971 
972         Ok(http3_conn)
973     }
974 
975     /// Sends an HTTP/3 request.
976     ///
977     /// The request is encoded from the provided list of headers without a
978     /// body, and sent on a newly allocated stream. To include a body,
979     /// set `fin` as `false` and subsequently call [`send_body()`] with the
980     /// same `conn` and the `stream_id` returned from this method.
981     ///
982     /// On success the newly allocated stream ID is returned.
983     ///
984     /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
985     /// doesn't have enough capacity for the operation to complete. When this
986     /// happens the application should retry the operation once the stream is
987     /// reported as writable again.
988     ///
989     /// [`send_body()`]: struct.Connection.html#method.send_body
990     /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
send_request<T: NameValue>( &mut self, conn: &mut super::Connection, headers: &[T], fin: bool, ) -> Result<u64>991     pub fn send_request<T: NameValue>(
992         &mut self, conn: &mut super::Connection, headers: &[T], fin: bool,
993     ) -> Result<u64> {
994         // If we received a GOAWAY from the peer, MUST NOT initiate new
995         // requests.
996         if self.peer_goaway_id.is_some() {
997             return Err(Error::FrameUnexpected);
998         }
999 
1000         let stream_id = self.next_request_stream_id;
1001 
1002         self.streams
1003             .insert(stream_id, stream::Stream::new(stream_id, true));
1004 
1005         // The underlying QUIC stream does not exist yet, so calls to e.g.
1006         // stream_capacity() will fail. By writing a 0-length buffer, we force
1007         // the creation of the QUIC stream state, without actually writing
1008         // anything.
1009         if let Err(e) = conn.stream_send(stream_id, b"", false) {
1010             self.streams.remove(&stream_id);
1011 
1012             if e == super::Error::Done {
1013                 return Err(Error::StreamBlocked);
1014             }
1015 
1016             return Err(e.into());
1017         };
1018 
1019         self.send_headers(conn, stream_id, headers, fin)?;
1020 
1021         // To avoid skipping stream IDs, we only calculate the next available
1022         // stream ID when a request has been successfully buffered.
1023         self.next_request_stream_id = self
1024             .next_request_stream_id
1025             .checked_add(4)
1026             .ok_or(Error::IdError)?;
1027 
1028         Ok(stream_id)
1029     }
1030 
1031     /// Sends an HTTP/3 response on the specified stream with default priority.
1032     ///
1033     /// This method sends the provided `headers` without a body. To include a
1034     /// body, set `fin` as `false` and subsequently call [`send_body()`] with
1035     /// the same `conn` and `stream_id`.
1036     ///
1037     /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1038     /// doesn't have enough capacity for the operation to complete. When this
1039     /// happens the application should retry the operation once the stream is
1040     /// reported as writable again.
1041     ///
1042     /// [`send_body()`]: struct.Connection.html#method.send_body
1043     /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
send_response<T: NameValue>( &mut self, conn: &mut super::Connection, stream_id: u64, headers: &[T], fin: bool, ) -> Result<()>1044     pub fn send_response<T: NameValue>(
1045         &mut self, conn: &mut super::Connection, stream_id: u64, headers: &[T],
1046         fin: bool,
1047     ) -> Result<()> {
1048         let priority = Default::default();
1049 
1050         self.send_response_with_priority(
1051             conn, stream_id, headers, &priority, fin,
1052         )?;
1053 
1054         Ok(())
1055     }
1056 
1057     /// Sends an HTTP/3 response on the specified stream with specified
1058     /// priority.
1059     ///
1060     /// The `priority` parameter represents [Extensible Priority]
1061     /// parameters. If the urgency is outside the range 0-7, it will be clamped
1062     /// to 7.
1063     ///
1064     /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1065     /// doesn't have enough capacity for the operation to complete. When this
1066     /// happens the application should retry the operation once the stream is
1067     /// reported as writable again.
1068     ///
1069     /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1070     /// [Extensible Priority]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
send_response_with_priority<T: NameValue>( &mut self, conn: &mut super::Connection, stream_id: u64, headers: &[T], priority: &Priority, fin: bool, ) -> Result<()>1071     pub fn send_response_with_priority<T: NameValue>(
1072         &mut self, conn: &mut super::Connection, stream_id: u64, headers: &[T],
1073         priority: &Priority, fin: bool,
1074     ) -> Result<()> {
1075         if !self.streams.contains_key(&stream_id) {
1076             return Err(Error::FrameUnexpected);
1077         }
1078 
1079         // Clamp and shift urgency into quiche-priority space
1080         let urgency = priority
1081             .urgency
1082             .clamp(PRIORITY_URGENCY_LOWER_BOUND, PRIORITY_URGENCY_UPPER_BOUND) +
1083             PRIORITY_URGENCY_OFFSET;
1084 
1085         conn.stream_priority(stream_id, urgency, priority.incremental)?;
1086 
1087         self.send_headers(conn, stream_id, headers, fin)?;
1088 
1089         Ok(())
1090     }
1091 
encode_header_block<T: NameValue>( &mut self, headers: &[T], ) -> Result<Vec<u8>>1092     fn encode_header_block<T: NameValue>(
1093         &mut self, headers: &[T],
1094     ) -> Result<Vec<u8>> {
1095         let headers_len = headers
1096             .iter()
1097             .fold(0, |acc, h| acc + h.value().len() + h.name().len() + 32);
1098 
1099         let mut header_block = vec![0; headers_len];
1100         let len = self
1101             .qpack_encoder
1102             .encode(headers, &mut header_block)
1103             .map_err(|_| Error::InternalError)?;
1104 
1105         header_block.truncate(len);
1106 
1107         Ok(header_block)
1108     }
1109 
send_headers<T: NameValue>( &mut self, conn: &mut super::Connection, stream_id: u64, headers: &[T], fin: bool, ) -> Result<()>1110     fn send_headers<T: NameValue>(
1111         &mut self, conn: &mut super::Connection, stream_id: u64, headers: &[T],
1112         fin: bool,
1113     ) -> Result<()> {
1114         let mut d = [42; 10];
1115         let mut b = octets::OctetsMut::with_slice(&mut d);
1116 
1117         if !self.frames_greased && conn.grease {
1118             self.send_grease_frames(conn, stream_id)?;
1119             self.frames_greased = true;
1120         }
1121 
1122         let header_block = self.encode_header_block(headers)?;
1123 
1124         let overhead = octets::varint_len(frame::HEADERS_FRAME_TYPE_ID) +
1125             octets::varint_len(header_block.len() as u64);
1126 
1127         // Headers need to be sent atomically, so make sure the stream has
1128         // enough capacity.
1129         match conn.stream_writable(stream_id, overhead + header_block.len()) {
1130             Ok(true) => (),
1131 
1132             Ok(false) => return Err(Error::StreamBlocked),
1133 
1134             Err(e) => {
1135                 if conn.stream_finished(stream_id) {
1136                     self.streams.remove(&stream_id);
1137                 }
1138 
1139                 return Err(e.into());
1140             },
1141         };
1142 
1143         b.put_varint(frame::HEADERS_FRAME_TYPE_ID)?;
1144         b.put_varint(header_block.len() as u64)?;
1145         let off = b.off();
1146         conn.stream_send(stream_id, &d[..off], false)?;
1147 
1148         // Sending header block separately avoids unnecessary copy.
1149         conn.stream_send(stream_id, &header_block, fin)?;
1150 
1151         trace!(
1152             "{} tx frm HEADERS stream={} len={} fin={}",
1153             conn.trace_id(),
1154             stream_id,
1155             header_block.len(),
1156             fin
1157         );
1158 
1159         qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
1160             let qlog_headers = headers
1161                 .iter()
1162                 .map(|h| qlog::events::h3::HttpHeader {
1163                     name: String::from_utf8_lossy(h.name()).into_owned(),
1164                     value: String::from_utf8_lossy(h.value()).into_owned(),
1165                 })
1166                 .collect();
1167 
1168             let frame = Http3Frame::Headers {
1169                 headers: qlog_headers,
1170             };
1171             let ev_data = EventData::H3FrameCreated(H3FrameCreated {
1172                 stream_id,
1173                 length: Some(header_block.len() as u64),
1174                 frame,
1175                 raw: None,
1176             });
1177 
1178             q.add_event_data_now(ev_data).ok();
1179         });
1180 
1181         if let Some(s) = self.streams.get_mut(&stream_id) {
1182             s.initialize_local();
1183         }
1184 
1185         if fin && conn.stream_finished(stream_id) {
1186             self.streams.remove(&stream_id);
1187         }
1188 
1189         Ok(())
1190     }
1191 
1192     /// Sends an HTTP/3 body chunk on the given stream.
1193     ///
1194     /// On success the number of bytes written is returned, or [`Done`] if no
1195     /// bytes could be written (e.g. because the stream is blocked).
1196     ///
1197     /// Note that the number of written bytes returned can be lower than the
1198     /// length of the input buffer when the underlying QUIC stream doesn't have
1199     /// enough capacity for the operation to complete.
1200     ///
1201     /// When a partial write happens (including when [`Done`] is returned) the
1202     /// application should retry the operation once the stream is reported as
1203     /// writable again.
1204     ///
1205     /// [`Done`]: enum.Error.html#variant.Done
send_body( &mut self, conn: &mut super::Connection, stream_id: u64, body: &[u8], fin: bool, ) -> Result<usize>1206     pub fn send_body(
1207         &mut self, conn: &mut super::Connection, stream_id: u64, body: &[u8],
1208         fin: bool,
1209     ) -> Result<usize> {
1210         let mut d = [42; 10];
1211         let mut b = octets::OctetsMut::with_slice(&mut d);
1212 
1213         // Validate that it is sane to send data on the stream.
1214         if stream_id % 4 != 0 {
1215             return Err(Error::FrameUnexpected);
1216         }
1217 
1218         match self.streams.get(&stream_id) {
1219             Some(s) =>
1220                 if !s.local_initialized() {
1221                     return Err(Error::FrameUnexpected);
1222                 },
1223 
1224             None => {
1225                 return Err(Error::FrameUnexpected);
1226             },
1227         };
1228 
1229         // Avoid sending 0-length DATA frames when the fin flag is false.
1230         if body.is_empty() && !fin {
1231             return Err(Error::Done);
1232         }
1233 
1234         let overhead = octets::varint_len(frame::DATA_FRAME_TYPE_ID) +
1235             octets::varint_len(body.len() as u64);
1236 
1237         let stream_cap = match conn.stream_capacity(stream_id) {
1238             Ok(v) => v,
1239 
1240             Err(e) => {
1241                 if conn.stream_finished(stream_id) {
1242                     self.streams.remove(&stream_id);
1243                 }
1244 
1245                 return Err(e.into());
1246             },
1247         };
1248 
1249         // Make sure there is enough capacity to send the DATA frame header.
1250         if stream_cap < overhead {
1251             let _ = conn.stream_writable(stream_id, overhead + 1);
1252             return Err(Error::Done);
1253         }
1254 
1255         // Cap the frame payload length to the stream's capacity.
1256         let body_len = std::cmp::min(body.len(), stream_cap - overhead);
1257 
1258         // If we can't send the entire body, set the fin flag to false so the
1259         // application can try again later.
1260         let fin = if body_len != body.len() { false } else { fin };
1261 
1262         // Again, avoid sending 0-length DATA frames when the fin flag is false.
1263         if body_len == 0 && !fin {
1264             let _ = conn.stream_writable(stream_id, overhead + 1);
1265             return Err(Error::Done);
1266         }
1267 
1268         b.put_varint(frame::DATA_FRAME_TYPE_ID)?;
1269         b.put_varint(body_len as u64)?;
1270         let off = b.off();
1271         conn.stream_send(stream_id, &d[..off], false)?;
1272 
1273         // Return how many bytes were written, excluding the frame header.
1274         // Sending body separately avoids unnecessary copy.
1275         let written = conn.stream_send(stream_id, &body[..body_len], fin)?;
1276 
1277         trace!(
1278             "{} tx frm DATA stream={} len={} fin={}",
1279             conn.trace_id(),
1280             stream_id,
1281             written,
1282             fin
1283         );
1284 
1285         qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
1286             let frame = Http3Frame::Data { raw: None };
1287             let ev_data = EventData::H3FrameCreated(H3FrameCreated {
1288                 stream_id,
1289                 length: Some(written as u64),
1290                 frame,
1291                 raw: None,
1292             });
1293 
1294             q.add_event_data_now(ev_data).ok();
1295         });
1296 
1297         if written < body.len() {
1298             // Ensure the peer is notified that the connection or stream is
1299             // blocked when the stream's capacity is limited by flow control.
1300             //
1301             // We only need enough capacity to send a few bytes, to make sure
1302             // the stream doesn't hang due to congestion window not growing
1303             // enough.
1304             let _ = conn.stream_writable(stream_id, overhead + 1);
1305         }
1306 
1307         if fin && written == body.len() && conn.stream_finished(stream_id) {
1308             self.streams.remove(&stream_id);
1309         }
1310 
1311         Ok(written)
1312     }
1313 
1314     /// Returns whether the peer enabled HTTP/3 DATAGRAM frame support.
1315     ///
1316     /// Support is signalled by the peer's SETTINGS, so this method always
1317     /// returns false until they have been processed using the [`poll()`]
1318     /// method.
1319     ///
1320     /// [`poll()`]: struct.Connection.html#method.poll
dgram_enabled_by_peer(&self, conn: &super::Connection) -> bool1321     pub fn dgram_enabled_by_peer(&self, conn: &super::Connection) -> bool {
1322         self.peer_settings.h3_datagram == Some(1) &&
1323             conn.dgram_max_writable_len().is_some()
1324     }
1325 
1326     /// Returns whether the peer enabled extended CONNECT support.
1327     ///
1328     /// Support is signalled by the peer's SETTINGS, so this method always
1329     /// returns false until they have been processed using the [`poll()`]
1330     /// method.
1331     ///
1332     /// [`poll()`]: struct.Connection.html#method.poll
extended_connect_enabled_by_peer(&self) -> bool1333     pub fn extended_connect_enabled_by_peer(&self) -> bool {
1334         self.peer_settings.connect_protocol_enabled == Some(1)
1335     }
1336 
1337     /// Sends an HTTP/3 DATAGRAM with the specified flow ID.
send_dgram( &mut self, conn: &mut super::Connection, flow_id: u64, buf: &[u8], ) -> Result<()>1338     pub fn send_dgram(
1339         &mut self, conn: &mut super::Connection, flow_id: u64, buf: &[u8],
1340     ) -> Result<()> {
1341         let len = octets::varint_len(flow_id) + buf.len();
1342         let mut d = vec![0; len];
1343         let mut b = octets::OctetsMut::with_slice(&mut d);
1344 
1345         b.put_varint(flow_id)?;
1346         b.put_bytes(buf)?;
1347 
1348         conn.dgram_send_vec(d)?;
1349 
1350         Ok(())
1351     }
1352 
1353     /// Reads a DATAGRAM into the provided buffer.
1354     ///
1355     /// Applications should call this method whenever the [`poll()`] method
1356     /// returns a [`Datagram`] event.
1357     ///
1358     /// On success the DATAGRAM data is returned, with length and Flow ID and
1359     /// length of the Flow ID.
1360     ///
1361     /// [`Done`] is returned if there is no data to read.
1362     ///
1363     /// [`BufferTooShort`] is returned if the provided buffer is too small for
1364     /// the data.
1365     ///
1366     /// [`poll()`]: struct.Connection.html#method.poll
1367     /// [`Datagram`]: enum.Event.html#variant.Datagram
1368     /// [`Done`]: enum.Error.html#variant.Done
1369     /// [`BufferTooShort`]: enum.Error.html#variant.BufferTooShort
recv_dgram( &mut self, conn: &mut super::Connection, buf: &mut [u8], ) -> Result<(usize, u64, usize)>1370     pub fn recv_dgram(
1371         &mut self, conn: &mut super::Connection, buf: &mut [u8],
1372     ) -> Result<(usize, u64, usize)> {
1373         let len = conn.dgram_recv(buf)?;
1374         let mut b = octets::Octets::with_slice(buf);
1375         let flow_id = b.get_varint()?;
1376         Ok((len, flow_id, b.off()))
1377     }
1378 
1379     /// Returns the maximum HTTP/3 DATAGRAM payload that can be sent.
dgram_max_writable_len( &self, conn: &super::Connection, flow_id: u64, ) -> Option<usize>1380     pub fn dgram_max_writable_len(
1381         &self, conn: &super::Connection, flow_id: u64,
1382     ) -> Option<usize> {
1383         let flow_id_len = octets::varint_len(flow_id);
1384         match conn.dgram_max_writable_len() {
1385             None => None,
1386             Some(len) => len.checked_sub(flow_id_len),
1387         }
1388     }
1389 
1390     // A helper function for determining if there is a DATAGRAM event.
process_dgrams( &mut self, conn: &mut super::Connection, ) -> Result<(u64, Event)>1391     fn process_dgrams(
1392         &mut self, conn: &mut super::Connection,
1393     ) -> Result<(u64, Event)> {
1394         if conn.dgram_recv_queue_len() > 0 {
1395             if self.dgram_event_triggered {
1396                 return Err(Error::Done);
1397             }
1398 
1399             self.dgram_event_triggered = true;
1400 
1401             return Ok((0, Event::Datagram));
1402         }
1403 
1404         self.dgram_event_triggered = false;
1405 
1406         Err(Error::Done)
1407     }
1408 
1409     /// Reads request or response body data into the provided buffer.
1410     ///
1411     /// Applications should call this method whenever the [`poll()`] method
1412     /// returns a [`Data`] event.
1413     ///
1414     /// On success the amount of bytes read is returned, or [`Done`] if there
1415     /// is no data to read.
1416     ///
1417     /// [`poll()`]: struct.Connection.html#method.poll
1418     /// [`Data`]: enum.Event.html#variant.Data
1419     /// [`Done`]: enum.Error.html#variant.Done
recv_body( &mut self, conn: &mut super::Connection, stream_id: u64, out: &mut [u8], ) -> Result<usize>1420     pub fn recv_body(
1421         &mut self, conn: &mut super::Connection, stream_id: u64, out: &mut [u8],
1422     ) -> Result<usize> {
1423         let mut total = 0;
1424 
1425         // Try to consume all buffered data for the stream, even across multiple
1426         // DATA frames.
1427         while total < out.len() {
1428             let stream = self.streams.get_mut(&stream_id).ok_or(Error::Done)?;
1429 
1430             if stream.state() != stream::State::Data {
1431                 break;
1432             }
1433 
1434             let (read, fin) =
1435                 match stream.try_consume_data(conn, &mut out[total..]) {
1436                     Ok(v) => v,
1437 
1438                     Err(Error::Done) => break,
1439 
1440                     Err(e) => return Err(e),
1441                 };
1442 
1443             total += read;
1444 
1445             // No more data to read, we are done.
1446             if read == 0 || fin {
1447                 break;
1448             }
1449 
1450             // Process incoming data from the stream. For example, if a whole
1451             // DATA frame was consumed, and another one is queued behind it,
1452             // this will ensure the additional data will also be returned to
1453             // the application.
1454             match self.process_readable_stream(conn, stream_id, false) {
1455                 Ok(_) => unreachable!(),
1456 
1457                 Err(Error::Done) => (),
1458 
1459                 Err(e) => return Err(e),
1460             };
1461 
1462             if conn.stream_finished(stream_id) {
1463                 break;
1464             }
1465         }
1466 
1467         // While body is being received, the stream is marked as finished only
1468         // when all data is read by the application.
1469         if conn.stream_finished(stream_id) {
1470             self.process_finished_stream(stream_id);
1471         }
1472 
1473         if total == 0 {
1474             return Err(Error::Done);
1475         }
1476 
1477         Ok(total)
1478     }
1479 
1480     /// Sends a PRIORITY_UPDATE frame on the control stream with specified
1481     /// request stream ID and priority.
1482     ///
1483     /// The `priority` parameter represents [Extensible Priority]
1484     /// parameters. If the urgency is outside the range 0-7, it will be clamped
1485     /// to 7.
1486     ///
1487     /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1488     /// doesn't have enough capacity for the operation to complete. When this
1489     /// happens the application should retry the operation once the stream is
1490     /// reported as writable again.
1491     ///
1492     /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1493     /// [Extensible Priority]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
send_priority_update_for_request( &mut self, conn: &mut super::Connection, stream_id: u64, priority: &Priority, ) -> Result<()>1494     pub fn send_priority_update_for_request(
1495         &mut self, conn: &mut super::Connection, stream_id: u64,
1496         priority: &Priority,
1497     ) -> Result<()> {
1498         let mut d = [42; 20];
1499         let mut b = octets::OctetsMut::with_slice(&mut d);
1500 
1501         // Validate that it is sane to send PRIORITY_UPDATE.
1502         if self.is_server {
1503             return Err(Error::FrameUnexpected);
1504         }
1505 
1506         if stream_id % 4 != 0 {
1507             return Err(Error::FrameUnexpected);
1508         }
1509 
1510         let control_stream_id =
1511             self.control_stream_id.ok_or(Error::FrameUnexpected)?;
1512 
1513         let urgency = priority
1514             .urgency
1515             .clamp(PRIORITY_URGENCY_LOWER_BOUND, PRIORITY_URGENCY_UPPER_BOUND);
1516 
1517         let mut field_value = format!("u={urgency}");
1518 
1519         if priority.incremental {
1520             field_value.push_str(",i");
1521         }
1522 
1523         let priority_field_value = field_value.as_bytes();
1524         let frame_payload_len =
1525             octets::varint_len(stream_id) + priority_field_value.len();
1526 
1527         let overhead =
1528             octets::varint_len(frame::PRIORITY_UPDATE_FRAME_REQUEST_TYPE_ID) +
1529                 octets::varint_len(stream_id) +
1530                 octets::varint_len(frame_payload_len as u64);
1531 
1532         // Make sure the control stream has enough capacity.
1533         match conn.stream_writable(
1534             control_stream_id,
1535             overhead + priority_field_value.len(),
1536         ) {
1537             Ok(true) => (),
1538 
1539             Ok(false) => return Err(Error::StreamBlocked),
1540 
1541             Err(e) => {
1542                 return Err(e.into());
1543             },
1544         }
1545 
1546         b.put_varint(frame::PRIORITY_UPDATE_FRAME_REQUEST_TYPE_ID)?;
1547         b.put_varint(frame_payload_len as u64)?;
1548         b.put_varint(stream_id)?;
1549         let off = b.off();
1550         conn.stream_send(control_stream_id, &d[..off], false)?;
1551 
1552         // Sending field value separately avoids unnecessary copy.
1553         conn.stream_send(control_stream_id, priority_field_value, false)?;
1554 
1555         trace!(
1556             "{} tx frm PRIORITY_UPDATE request_stream={} priority_field_value={}",
1557             conn.trace_id(),
1558             stream_id,
1559             field_value,
1560         );
1561 
1562         qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
1563             let frame = Http3Frame::PriorityUpdate {
1564                 target_stream_type: H3PriorityTargetStreamType::Request,
1565                 prioritized_element_id: stream_id,
1566                 priority_field_value: field_value.clone(),
1567             };
1568 
1569             let ev_data = EventData::H3FrameCreated(H3FrameCreated {
1570                 stream_id,
1571                 length: Some(priority_field_value.len() as u64),
1572                 frame,
1573                 raw: None,
1574             });
1575 
1576             q.add_event_data_now(ev_data).ok();
1577         });
1578 
1579         Ok(())
1580     }
1581 
1582     /// Take the last PRIORITY_UPDATE for a prioritized element ID.
1583     ///
1584     /// When the [`poll()`] method returns a [`PriorityUpdate`] event for a
1585     /// prioritized element, the event has triggered and will not rearm until
1586     /// applications call this method. It is recommended that applications defer
1587     /// taking the PRIORITY_UPDATE until after [`poll()`] returns [`Done`].
1588     ///
1589     /// On success the Priority Field Value is returned, or [`Done`] if there is
1590     /// no PRIORITY_UPDATE to read (either because there is no value to take, or
1591     /// because the prioritized element does not exist).
1592     ///
1593     /// [`poll()`]: struct.Connection.html#method.poll
1594     /// [`PriorityUpdate`]: enum.Event.html#variant.PriorityUpdate
1595     /// [`Done`]: enum.Error.html#variant.Done
take_last_priority_update( &mut self, prioritized_element_id: u64, ) -> Result<Vec<u8>>1596     pub fn take_last_priority_update(
1597         &mut self, prioritized_element_id: u64,
1598     ) -> Result<Vec<u8>> {
1599         if let Some(stream) = self.streams.get_mut(&prioritized_element_id) {
1600             return stream.take_last_priority_update().ok_or(Error::Done);
1601         }
1602 
1603         Err(Error::Done)
1604     }
1605 
1606     /// Processes HTTP/3 data received from the peer.
1607     ///
1608     /// On success it returns an [`Event`] and an ID, or [`Done`] when there are
1609     /// no events to report.
1610     ///
1611     /// Note that all events are edge-triggered, meaning that once reported they
1612     /// will not be reported again by calling this method again, until the event
1613     /// is re-armed.
1614     ///
1615     /// The events [`Headers`], [`Data`] and [`Finished`] return a stream ID,
1616     /// which is used in methods [`recv_body()`], [`send_response()`] or
1617     /// [`send_body()`].
1618     ///
1619     /// The event [`Datagram`] returns a dummy value of `0`, this should be
1620     /// ignored by the application.
1621     ///
1622     /// The event [`GoAway`] returns an ID that depends on the connection role.
1623     /// A client receives the largest processed stream ID. A server receives the
1624     /// the largest permitted push ID.
1625     ///
1626     /// The event [`PriorityUpdate`] only occurs at servers. It returns a
1627     /// prioritized element ID that is used in the method
1628     /// [`take_last_priority_update()`], which rearms the event for that ID.
1629     ///
1630     /// If an error occurs while processing data, the connection is closed with
1631     /// the appropriate error code, using the transport's [`close()`] method.
1632     ///
1633     /// [`Event`]: enum.Event.html
1634     /// [`Done`]: enum.Error.html#variant.Done
1635     /// [`Headers`]: enum.Event.html#variant.Headers
1636     /// [`Data`]: enum.Event.html#variant.Data
1637     /// [`Finished`]: enum.Event.html#variant.Finished
1638     /// [`Datagram`]: enum.Event.html#variant.Datagram
1639     /// [`GoAway`]: enum.Event.html#variant.GoAWay
1640     /// [`PriorityUpdate`]: enum.Event.html#variant.PriorityUpdate
1641     /// [`recv_body()`]: struct.Connection.html#method.recv_body
1642     /// [`send_response()`]: struct.Connection.html#method.send_response
1643     /// [`send_body()`]: struct.Connection.html#method.send_body
1644     /// [`recv_dgram()`]: struct.Connection.html#method.recv_dgram
1645     /// [`take_last_priority_update()`]: struct.Connection.html#method.take_last_priority_update
1646     /// [`close()`]: ../struct.Connection.html#method.close
poll(&mut self, conn: &mut super::Connection) -> Result<(u64, Event)>1647     pub fn poll(&mut self, conn: &mut super::Connection) -> Result<(u64, Event)> {
1648         // When connection close is initiated by the local application (e.g. due
1649         // to a protocol error), the connection itself might be in a broken
1650         // state, so return early.
1651         if conn.local_error.is_some() {
1652             return Err(Error::Done);
1653         }
1654 
1655         // Process control streams first.
1656         if let Some(stream_id) = self.peer_control_stream_id {
1657             match self.process_control_stream(conn, stream_id) {
1658                 Ok(ev) => return Ok(ev),
1659 
1660                 Err(Error::Done) => (),
1661 
1662                 Err(e) => return Err(e),
1663             };
1664         }
1665 
1666         if let Some(stream_id) = self.peer_qpack_streams.encoder_stream_id {
1667             match self.process_control_stream(conn, stream_id) {
1668                 Ok(ev) => return Ok(ev),
1669 
1670                 Err(Error::Done) => (),
1671 
1672                 Err(e) => return Err(e),
1673             };
1674         }
1675 
1676         if let Some(stream_id) = self.peer_qpack_streams.decoder_stream_id {
1677             match self.process_control_stream(conn, stream_id) {
1678                 Ok(ev) => return Ok(ev),
1679 
1680                 Err(Error::Done) => (),
1681 
1682                 Err(e) => return Err(e),
1683             };
1684         }
1685 
1686         // Process finished streams list.
1687         if let Some(finished) = self.finished_streams.pop_front() {
1688             return Ok((finished, Event::Finished));
1689         }
1690 
1691         // Process queued DATAGRAMs if the poll threshold allows it.
1692         match self.process_dgrams(conn) {
1693             Ok(v) => return Ok(v),
1694 
1695             Err(Error::Done) => (),
1696 
1697             Err(e) => return Err(e),
1698         };
1699 
1700         // Process HTTP/3 data from readable streams.
1701         for s in conn.readable() {
1702             trace!("{} stream id {} is readable", conn.trace_id(), s);
1703 
1704             let ev = match self.process_readable_stream(conn, s, true) {
1705                 Ok(v) => Some(v),
1706 
1707                 Err(Error::Done) => None,
1708 
1709                 // Return early if the stream was reset, to avoid returning
1710                 // a Finished event later as well.
1711                 Err(Error::TransportError(crate::Error::StreamReset(e))) =>
1712                     return Ok((s, Event::Reset(e))),
1713 
1714                 Err(e) => return Err(e),
1715             };
1716 
1717             if conn.stream_finished(s) {
1718                 self.process_finished_stream(s);
1719             }
1720 
1721             // TODO: check if stream is completed so it can be freed
1722             if let Some(ev) = ev {
1723                 return Ok(ev);
1724             }
1725         }
1726 
1727         // Process finished streams list once again, to make sure `Finished`
1728         // events are returned when receiving empty stream frames with the fin
1729         // flag set.
1730         if let Some(finished) = self.finished_streams.pop_front() {
1731             return Ok((finished, Event::Finished));
1732         }
1733 
1734         Err(Error::Done)
1735     }
1736 
1737     /// Sends a GOAWAY frame to initiate graceful connection closure.
1738     ///
1739     /// When quiche is used in the server role, the `id` parameter is the stream
1740     /// ID of the highest processed request. This can be any valid ID between 0
1741     /// and 2^62-4. However, the ID cannot be increased. Failure to satisfy
1742     /// these conditions will return an error.
1743     ///
1744     /// This method does not close the QUIC connection. Applications are
1745     /// required to call [`close()`] themselves.
1746     ///
1747     /// [`close()`]: ../struct.Connection.html#method.close
send_goaway( &mut self, conn: &mut super::Connection, id: u64, ) -> Result<()>1748     pub fn send_goaway(
1749         &mut self, conn: &mut super::Connection, id: u64,
1750     ) -> Result<()> {
1751         let mut id = id;
1752 
1753         // TODO: server push
1754         //
1755         // In the meantime always send 0 from client.
1756         if !self.is_server {
1757             id = 0;
1758         }
1759 
1760         if self.is_server && id % 4 != 0 {
1761             return Err(Error::IdError);
1762         }
1763 
1764         if let Some(sent_id) = self.local_goaway_id {
1765             if id > sent_id {
1766                 return Err(Error::IdError);
1767             }
1768         }
1769 
1770         if let Some(stream_id) = self.control_stream_id {
1771             let mut d = [42; 10];
1772             let mut b = octets::OctetsMut::with_slice(&mut d);
1773 
1774             let frame = frame::Frame::GoAway { id };
1775 
1776             let wire_len = frame.to_bytes(&mut b)?;
1777             let stream_cap = conn.stream_capacity(stream_id)?;
1778 
1779             if stream_cap < wire_len {
1780                 return Err(Error::StreamBlocked);
1781             }
1782 
1783             trace!("{} tx frm {:?}", conn.trace_id(), frame);
1784 
1785             qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
1786                 let ev_data = EventData::H3FrameCreated(H3FrameCreated {
1787                     stream_id,
1788                     length: Some(octets::varint_len(id) as u64),
1789                     frame: frame.to_qlog(),
1790                     raw: None,
1791                 });
1792 
1793                 q.add_event_data_now(ev_data).ok();
1794             });
1795 
1796             let off = b.off();
1797             conn.stream_send(stream_id, &d[..off], false)?;
1798 
1799             self.local_goaway_id = Some(id);
1800         }
1801 
1802         Ok(())
1803     }
1804 
1805     /// Gets the raw settings from peer including unknown and reserved types.
1806     ///
1807     /// The order of settings is the same as received in the SETTINGS frame.
peer_settings_raw(&self) -> Option<&[(u64, u64)]>1808     pub fn peer_settings_raw(&self) -> Option<&[(u64, u64)]> {
1809         self.peer_settings.raw.as_deref()
1810     }
1811 
open_uni_stream( &mut self, conn: &mut super::Connection, ty: u64, ) -> Result<u64>1812     fn open_uni_stream(
1813         &mut self, conn: &mut super::Connection, ty: u64,
1814     ) -> Result<u64> {
1815         let stream_id = self.next_uni_stream_id;
1816 
1817         let mut d = [0; 8];
1818         let mut b = octets::OctetsMut::with_slice(&mut d);
1819 
1820         match ty {
1821             // Control and QPACK streams are the most important to schedule.
1822             stream::HTTP3_CONTROL_STREAM_TYPE_ID |
1823             stream::QPACK_ENCODER_STREAM_TYPE_ID |
1824             stream::QPACK_DECODER_STREAM_TYPE_ID => {
1825                 conn.stream_priority(stream_id, 0, true)?;
1826             },
1827 
1828             // TODO: Server push
1829             stream::HTTP3_PUSH_STREAM_TYPE_ID => (),
1830 
1831             // Anything else is a GREASE stream, so make it the least important.
1832             _ => {
1833                 conn.stream_priority(stream_id, 255, true)?;
1834             },
1835         }
1836 
1837         conn.stream_send(stream_id, b.put_varint(ty)?, false)?;
1838 
1839         // To avoid skipping stream IDs, we only calculate the next available
1840         // stream ID when data has been successfully buffered.
1841         self.next_uni_stream_id = self
1842             .next_uni_stream_id
1843             .checked_add(4)
1844             .ok_or(Error::IdError)?;
1845 
1846         Ok(stream_id)
1847     }
1848 
open_qpack_encoder_stream( &mut self, conn: &mut super::Connection, ) -> Result<()>1849     fn open_qpack_encoder_stream(
1850         &mut self, conn: &mut super::Connection,
1851     ) -> Result<()> {
1852         let stream_id =
1853             self.open_uni_stream(conn, stream::QPACK_ENCODER_STREAM_TYPE_ID)?;
1854 
1855         self.local_qpack_streams.encoder_stream_id = Some(stream_id);
1856 
1857         qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
1858             let ev_data = EventData::H3StreamTypeSet(H3StreamTypeSet {
1859                 stream_id,
1860                 owner: Some(H3Owner::Local),
1861                 stream_type: H3StreamType::QpackEncode,
1862                 associated_push_id: None,
1863             });
1864 
1865             q.add_event_data_now(ev_data).ok();
1866         });
1867 
1868         Ok(())
1869     }
1870 
open_qpack_decoder_stream( &mut self, conn: &mut super::Connection, ) -> Result<()>1871     fn open_qpack_decoder_stream(
1872         &mut self, conn: &mut super::Connection,
1873     ) -> Result<()> {
1874         let stream_id =
1875             self.open_uni_stream(conn, stream::QPACK_DECODER_STREAM_TYPE_ID)?;
1876 
1877         self.local_qpack_streams.decoder_stream_id = Some(stream_id);
1878 
1879         qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
1880             let ev_data = EventData::H3StreamTypeSet(H3StreamTypeSet {
1881                 stream_id,
1882                 owner: Some(H3Owner::Local),
1883                 stream_type: H3StreamType::QpackDecode,
1884                 associated_push_id: None,
1885             });
1886 
1887             q.add_event_data_now(ev_data).ok();
1888         });
1889 
1890         Ok(())
1891     }
1892 
1893     /// Send GREASE frames on the provided stream ID.
send_grease_frames( &mut self, conn: &mut super::Connection, stream_id: u64, ) -> Result<()>1894     fn send_grease_frames(
1895         &mut self, conn: &mut super::Connection, stream_id: u64,
1896     ) -> Result<()> {
1897         let mut d = [0; 8];
1898 
1899         let stream_cap = match conn.stream_capacity(stream_id) {
1900             Ok(v) => v,
1901 
1902             Err(e) => {
1903                 if conn.stream_finished(stream_id) {
1904                     self.streams.remove(&stream_id);
1905                 }
1906 
1907                 return Err(e.into());
1908             },
1909         };
1910 
1911         let grease_frame1 = grease_value();
1912         let grease_frame2 = grease_value();
1913         let grease_payload = b"GREASE is the word";
1914 
1915         let overhead = octets::varint_len(grease_frame1) + // frame type
1916             1 + // payload len
1917             octets::varint_len(grease_frame2) + // frame type
1918             1 + // payload len
1919             grease_payload.len(); // payload
1920 
1921         // Don't send GREASE if there is not enough capacity for it. Greasing
1922         // will _not_ be attempted again later on.
1923         if stream_cap < overhead {
1924             return Ok(());
1925         }
1926 
1927         // Empty GREASE frame.
1928         let mut b = octets::OctetsMut::with_slice(&mut d);
1929         conn.stream_send(stream_id, b.put_varint(grease_frame1)?, false)?;
1930 
1931         let mut b = octets::OctetsMut::with_slice(&mut d);
1932         conn.stream_send(stream_id, b.put_varint(0)?, false)?;
1933 
1934         trace!(
1935             "{} tx frm GREASE stream={} len=0",
1936             conn.trace_id(),
1937             stream_id
1938         );
1939 
1940         qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
1941             let frame = Http3Frame::Reserved { length: Some(0) };
1942             let ev_data = EventData::H3FrameCreated(H3FrameCreated {
1943                 stream_id,
1944                 length: Some(0),
1945                 frame,
1946                 raw: None,
1947             });
1948 
1949             q.add_event_data_now(ev_data).ok();
1950         });
1951 
1952         // GREASE frame with payload.
1953         let mut b = octets::OctetsMut::with_slice(&mut d);
1954         conn.stream_send(stream_id, b.put_varint(grease_frame2)?, false)?;
1955 
1956         let mut b = octets::OctetsMut::with_slice(&mut d);
1957         conn.stream_send(stream_id, b.put_varint(18)?, false)?;
1958 
1959         conn.stream_send(stream_id, grease_payload, false)?;
1960 
1961         trace!(
1962             "{} tx frm GREASE stream={} len={}",
1963             conn.trace_id(),
1964             stream_id,
1965             grease_payload.len()
1966         );
1967 
1968         qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
1969             let frame = Http3Frame::Reserved {
1970                 length: Some(grease_payload.len() as u64),
1971             };
1972             let ev_data = EventData::H3FrameCreated(H3FrameCreated {
1973                 stream_id,
1974                 length: Some(grease_payload.len() as u64),
1975                 frame,
1976                 raw: None,
1977             });
1978 
1979             q.add_event_data_now(ev_data).ok();
1980         });
1981 
1982         Ok(())
1983     }
1984 
1985     /// Opens a new unidirectional stream with a GREASE type and sends some
1986     /// unframed payload.
open_grease_stream(&mut self, conn: &mut super::Connection) -> Result<()>1987     fn open_grease_stream(&mut self, conn: &mut super::Connection) -> Result<()> {
1988         match self.open_uni_stream(conn, grease_value()) {
1989             Ok(stream_id) => {
1990                 conn.stream_send(stream_id, b"GREASE is the word", true)?;
1991 
1992                 trace!("{} open GREASE stream {}", conn.trace_id(), stream_id);
1993 
1994                 qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
1995                     let ev_data = EventData::H3StreamTypeSet(H3StreamTypeSet {
1996                         stream_id,
1997                         owner: Some(H3Owner::Local),
1998                         stream_type: H3StreamType::Unknown,
1999                         associated_push_id: None,
2000                     });
2001 
2002                     q.add_event_data_now(ev_data).ok();
2003                 });
2004             },
2005 
2006             Err(Error::IdError) => {
2007                 trace!("{} GREASE stream blocked", conn.trace_id(),);
2008 
2009                 return Ok(());
2010             },
2011 
2012             Err(e) => return Err(e),
2013         };
2014 
2015         Ok(())
2016     }
2017 
2018     /// Sends SETTINGS frame based on HTTP/3 configuration.
send_settings(&mut self, conn: &mut super::Connection) -> Result<()>2019     fn send_settings(&mut self, conn: &mut super::Connection) -> Result<()> {
2020         let stream_id = match self
2021             .open_uni_stream(conn, stream::HTTP3_CONTROL_STREAM_TYPE_ID)
2022         {
2023             Ok(v) => v,
2024 
2025             Err(e) => {
2026                 trace!("{} Control stream blocked", conn.trace_id(),);
2027 
2028                 if e == Error::Done {
2029                     return Err(Error::InternalError);
2030                 }
2031 
2032                 return Err(e);
2033             },
2034         };
2035 
2036         self.control_stream_id = Some(stream_id);
2037 
2038         qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2039             let ev_data = EventData::H3StreamTypeSet(H3StreamTypeSet {
2040                 stream_id,
2041                 owner: Some(H3Owner::Local),
2042                 stream_type: H3StreamType::Control,
2043                 associated_push_id: None,
2044             });
2045 
2046             q.add_event_data_now(ev_data).ok();
2047         });
2048 
2049         let grease = if conn.grease {
2050             Some((grease_value(), grease_value()))
2051         } else {
2052             None
2053         };
2054 
2055         let frame = frame::Frame::Settings {
2056             max_field_section_size: self.local_settings.max_field_section_size,
2057             qpack_max_table_capacity: self
2058                 .local_settings
2059                 .qpack_max_table_capacity,
2060             qpack_blocked_streams: self.local_settings.qpack_blocked_streams,
2061             connect_protocol_enabled: self
2062                 .local_settings
2063                 .connect_protocol_enabled,
2064             h3_datagram: self.local_settings.h3_datagram,
2065             grease,
2066             raw: Default::default(),
2067         };
2068 
2069         let mut d = [42; 128];
2070         let mut b = octets::OctetsMut::with_slice(&mut d);
2071 
2072         frame.to_bytes(&mut b)?;
2073 
2074         let off = b.off();
2075 
2076         if let Some(id) = self.control_stream_id {
2077             conn.stream_send(id, &d[..off], false)?;
2078 
2079             trace!(
2080                 "{} tx frm SETTINGS stream={} len={}",
2081                 conn.trace_id(),
2082                 id,
2083                 off
2084             );
2085 
2086             qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
2087                 let frame = frame.to_qlog();
2088                 let ev_data = EventData::H3FrameCreated(H3FrameCreated {
2089                     stream_id: id,
2090                     length: Some(off as u64),
2091                     frame,
2092                     raw: None,
2093                 });
2094 
2095                 q.add_event_data_now(ev_data).ok();
2096             });
2097         }
2098 
2099         Ok(())
2100     }
2101 
process_control_stream( &mut self, conn: &mut super::Connection, stream_id: u64, ) -> Result<(u64, Event)>2102     fn process_control_stream(
2103         &mut self, conn: &mut super::Connection, stream_id: u64,
2104     ) -> Result<(u64, Event)> {
2105         if conn.stream_finished(stream_id) {
2106             conn.close(
2107                 true,
2108                 Error::ClosedCriticalStream.to_wire(),
2109                 b"Critical stream closed.",
2110             )?;
2111 
2112             return Err(Error::ClosedCriticalStream);
2113         }
2114 
2115         match self.process_readable_stream(conn, stream_id, true) {
2116             Ok(ev) => return Ok(ev),
2117 
2118             Err(Error::Done) => (),
2119 
2120             Err(e) => return Err(e),
2121         };
2122 
2123         if conn.stream_finished(stream_id) {
2124             conn.close(
2125                 true,
2126                 Error::ClosedCriticalStream.to_wire(),
2127                 b"Critical stream closed.",
2128             )?;
2129 
2130             return Err(Error::ClosedCriticalStream);
2131         }
2132 
2133         Err(Error::Done)
2134     }
2135 
process_readable_stream( &mut self, conn: &mut super::Connection, stream_id: u64, polling: bool, ) -> Result<(u64, Event)>2136     fn process_readable_stream(
2137         &mut self, conn: &mut super::Connection, stream_id: u64, polling: bool,
2138     ) -> Result<(u64, Event)> {
2139         self.streams
2140             .entry(stream_id)
2141             .or_insert_with(|| stream::Stream::new(stream_id, false));
2142 
2143         // We need to get a fresh reference to the stream for each
2144         // iteration, to avoid borrowing `self` for the entire duration
2145         // of the loop, because we'll need to borrow it again in the
2146         // `State::FramePayload` case below.
2147         while let Some(stream) = self.streams.get_mut(&stream_id) {
2148             match stream.state() {
2149                 stream::State::StreamType => {
2150                     stream.try_fill_buffer(conn)?;
2151 
2152                     let varint = match stream.try_consume_varint() {
2153                         Ok(v) => v,
2154 
2155                         Err(_) => continue,
2156                     };
2157 
2158                     let ty = stream::Type::deserialize(varint)?;
2159 
2160                     if let Err(e) = stream.set_ty(ty) {
2161                         conn.close(true, e.to_wire(), b"")?;
2162                         return Err(e);
2163                     }
2164 
2165                     qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2166                         let ev_data =
2167                             EventData::H3StreamTypeSet(H3StreamTypeSet {
2168                                 stream_id,
2169                                 owner: Some(H3Owner::Remote),
2170                                 stream_type: ty.to_qlog(),
2171                                 associated_push_id: None,
2172                             });
2173 
2174                         q.add_event_data_now(ev_data).ok();
2175                     });
2176 
2177                     match &ty {
2178                         stream::Type::Control => {
2179                             // Only one control stream allowed.
2180                             if self.peer_control_stream_id.is_some() {
2181                                 conn.close(
2182                                     true,
2183                                     Error::StreamCreationError.to_wire(),
2184                                     b"Received multiple control streams",
2185                                 )?;
2186 
2187                                 return Err(Error::StreamCreationError);
2188                             }
2189 
2190                             trace!(
2191                                 "{} open peer's control stream {}",
2192                                 conn.trace_id(),
2193                                 stream_id
2194                             );
2195 
2196                             self.peer_control_stream_id = Some(stream_id);
2197                         },
2198 
2199                         stream::Type::Push => {
2200                             // Only clients can receive push stream.
2201                             if self.is_server {
2202                                 conn.close(
2203                                     true,
2204                                     Error::StreamCreationError.to_wire(),
2205                                     b"Server received push stream.",
2206                                 )?;
2207 
2208                                 return Err(Error::StreamCreationError);
2209                             }
2210                         },
2211 
2212                         stream::Type::QpackEncoder => {
2213                             // Only one qpack encoder stream allowed.
2214                             if self.peer_qpack_streams.encoder_stream_id.is_some()
2215                             {
2216                                 conn.close(
2217                                     true,
2218                                     Error::StreamCreationError.to_wire(),
2219                                     b"Received multiple QPACK encoder streams",
2220                                 )?;
2221 
2222                                 return Err(Error::StreamCreationError);
2223                             }
2224 
2225                             self.peer_qpack_streams.encoder_stream_id =
2226                                 Some(stream_id);
2227                         },
2228 
2229                         stream::Type::QpackDecoder => {
2230                             // Only one qpack decoder allowed.
2231                             if self.peer_qpack_streams.decoder_stream_id.is_some()
2232                             {
2233                                 conn.close(
2234                                     true,
2235                                     Error::StreamCreationError.to_wire(),
2236                                     b"Received multiple QPACK decoder streams",
2237                                 )?;
2238 
2239                                 return Err(Error::StreamCreationError);
2240                             }
2241 
2242                             self.peer_qpack_streams.decoder_stream_id =
2243                                 Some(stream_id);
2244                         },
2245 
2246                         stream::Type::Unknown => {
2247                             // Unknown stream types are ignored.
2248                             // TODO: we MAY send STOP_SENDING
2249                         },
2250 
2251                         stream::Type::Request => unreachable!(),
2252                     }
2253                 },
2254 
2255                 stream::State::PushId => {
2256                     stream.try_fill_buffer(conn)?;
2257 
2258                     let varint = match stream.try_consume_varint() {
2259                         Ok(v) => v,
2260 
2261                         Err(_) => continue,
2262                     };
2263 
2264                     if let Err(e) = stream.set_push_id(varint) {
2265                         conn.close(true, e.to_wire(), b"")?;
2266                         return Err(e);
2267                     }
2268                 },
2269 
2270                 stream::State::FrameType => {
2271                     stream.try_fill_buffer(conn)?;
2272 
2273                     let varint = match stream.try_consume_varint() {
2274                         Ok(v) => v,
2275 
2276                         Err(_) => continue,
2277                     };
2278 
2279                     match stream.set_frame_type(varint) {
2280                         Err(Error::FrameUnexpected) => {
2281                             let msg = format!("Unexpected frame type {varint}");
2282 
2283                             conn.close(
2284                                 true,
2285                                 Error::FrameUnexpected.to_wire(),
2286                                 msg.as_bytes(),
2287                             )?;
2288 
2289                             return Err(Error::FrameUnexpected);
2290                         },
2291 
2292                         Err(e) => {
2293                             conn.close(
2294                                 true,
2295                                 e.to_wire(),
2296                                 b"Error handling frame.",
2297                             )?;
2298 
2299                             return Err(e);
2300                         },
2301 
2302                         _ => (),
2303                     }
2304                 },
2305 
2306                 stream::State::FramePayloadLen => {
2307                     stream.try_fill_buffer(conn)?;
2308 
2309                     let payload_len = match stream.try_consume_varint() {
2310                         Ok(v) => v,
2311 
2312                         Err(_) => continue,
2313                     };
2314 
2315                     // DATA frames are handled uniquely. After this point we lose
2316                     // visibility of DATA framing, so just log here.
2317                     if Some(frame::DATA_FRAME_TYPE_ID) == stream.frame_type() {
2318                         trace!(
2319                             "{} rx frm DATA stream={} wire_payload_len={}",
2320                             conn.trace_id(),
2321                             stream_id,
2322                             payload_len
2323                         );
2324 
2325                         qlog_with_type!(QLOG_FRAME_PARSED, conn.qlog, q, {
2326                             let frame = Http3Frame::Data { raw: None };
2327 
2328                             let ev_data =
2329                                 EventData::H3FrameParsed(H3FrameParsed {
2330                                     stream_id,
2331                                     length: Some(payload_len),
2332                                     frame,
2333                                     raw: None,
2334                                 });
2335 
2336                             q.add_event_data_now(ev_data).ok();
2337                         });
2338                     }
2339 
2340                     if let Err(e) = stream.set_frame_payload_len(payload_len) {
2341                         conn.close(true, e.to_wire(), b"")?;
2342                         return Err(e);
2343                     }
2344                 },
2345 
2346                 stream::State::FramePayload => {
2347                     // Do not emit events when not polling.
2348                     if !polling {
2349                         break;
2350                     }
2351 
2352                     stream.try_fill_buffer(conn)?;
2353 
2354                     let (frame, payload_len) = match stream.try_consume_frame() {
2355                         Ok(frame) => frame,
2356 
2357                         Err(Error::Done) => return Err(Error::Done),
2358 
2359                         Err(e) => {
2360                             conn.close(
2361                                 true,
2362                                 e.to_wire(),
2363                                 b"Error handling frame.",
2364                             )?;
2365 
2366                             return Err(e);
2367                         },
2368                     };
2369 
2370                     match self.process_frame(conn, stream_id, frame, payload_len)
2371                     {
2372                         Ok(ev) => return Ok(ev),
2373 
2374                         Err(Error::Done) => {
2375                             // This might be a frame that is processed internally
2376                             // without needing to bubble up to the user as an
2377                             // event. Check whether the frame has FIN'd by QUIC
2378                             // to prevent trying to read again on a closed stream.
2379                             if conn.stream_finished(stream_id) {
2380                                 break;
2381                             }
2382                         },
2383 
2384                         Err(e) => return Err(e),
2385                     };
2386                 },
2387 
2388                 stream::State::Data => {
2389                     // Do not emit events when not polling.
2390                     if !polling {
2391                         break;
2392                     }
2393 
2394                     if !stream.try_trigger_data_event() {
2395                         break;
2396                     }
2397 
2398                     return Ok((stream_id, Event::Data));
2399                 },
2400 
2401                 stream::State::QpackInstruction => {
2402                     let mut d = [0; 4096];
2403 
2404                     // Read data from the stream and discard immediately.
2405                     loop {
2406                         conn.stream_recv(stream_id, &mut d)?;
2407                     }
2408                 },
2409 
2410                 stream::State::Drain => {
2411                     // Discard incoming data on the stream.
2412                     conn.stream_shutdown(
2413                         stream_id,
2414                         crate::Shutdown::Read,
2415                         0x100,
2416                     )?;
2417 
2418                     break;
2419                 },
2420 
2421                 stream::State::Finished => break,
2422             }
2423         }
2424 
2425         Err(Error::Done)
2426     }
2427 
process_finished_stream(&mut self, stream_id: u64)2428     fn process_finished_stream(&mut self, stream_id: u64) {
2429         let stream = match self.streams.get_mut(&stream_id) {
2430             Some(v) => v,
2431 
2432             None => return,
2433         };
2434 
2435         if stream.state() == stream::State::Finished {
2436             return;
2437         }
2438 
2439         match stream.ty() {
2440             Some(stream::Type::Request) | Some(stream::Type::Push) => {
2441                 stream.finished();
2442 
2443                 self.finished_streams.push_back(stream_id);
2444             },
2445 
2446             _ => (),
2447         };
2448     }
2449 
process_frame( &mut self, conn: &mut super::Connection, stream_id: u64, frame: frame::Frame, payload_len: u64, ) -> Result<(u64, Event)>2450     fn process_frame(
2451         &mut self, conn: &mut super::Connection, stream_id: u64,
2452         frame: frame::Frame, payload_len: u64,
2453     ) -> Result<(u64, Event)> {
2454         trace!(
2455             "{} rx frm {:?} stream={} payload_len={}",
2456             conn.trace_id(),
2457             frame,
2458             stream_id,
2459             payload_len
2460         );
2461 
2462         qlog_with_type!(QLOG_FRAME_PARSED, conn.qlog, q, {
2463             // HEADERS frames are special case and will be logged below.
2464             if !matches!(frame, frame::Frame::Headers { .. }) {
2465                 let frame = frame.to_qlog();
2466                 let ev_data = EventData::H3FrameParsed(H3FrameParsed {
2467                     stream_id,
2468                     length: Some(payload_len),
2469                     frame,
2470                     raw: None,
2471                 });
2472 
2473                 q.add_event_data_now(ev_data).ok();
2474             }
2475         });
2476 
2477         match frame {
2478             frame::Frame::Settings {
2479                 max_field_section_size,
2480                 qpack_max_table_capacity,
2481                 qpack_blocked_streams,
2482                 connect_protocol_enabled,
2483                 h3_datagram,
2484                 raw,
2485                 ..
2486             } => {
2487                 self.peer_settings = ConnectionSettings {
2488                     max_field_section_size,
2489                     qpack_max_table_capacity,
2490                     qpack_blocked_streams,
2491                     connect_protocol_enabled,
2492                     h3_datagram,
2493                     raw,
2494                 };
2495 
2496                 if let Some(1) = h3_datagram {
2497                     // The peer MUST have also enabled DATAGRAM with a TP
2498                     if conn.dgram_max_writable_len().is_none() {
2499                         conn.close(
2500                             true,
2501                             Error::SettingsError.to_wire(),
2502                             b"H3_DATAGRAM sent with value 1 but max_datagram_frame_size TP not set.",
2503                         )?;
2504 
2505                         return Err(Error::SettingsError);
2506                     }
2507                 }
2508             },
2509 
2510             frame::Frame::Headers { header_block } => {
2511                 if Some(stream_id) == self.peer_control_stream_id {
2512                     conn.close(
2513                         true,
2514                         Error::FrameUnexpected.to_wire(),
2515                         b"HEADERS received on control stream",
2516                     )?;
2517 
2518                     return Err(Error::FrameUnexpected);
2519                 }
2520 
2521                 // Use "infinite" as default value for max_field_section_size if
2522                 // it is not configured by the application.
2523                 let max_size = self
2524                     .local_settings
2525                     .max_field_section_size
2526                     .unwrap_or(u64::MAX);
2527 
2528                 let headers = match self
2529                     .qpack_decoder
2530                     .decode(&header_block[..], max_size)
2531                 {
2532                     Ok(v) => v,
2533 
2534                     Err(e) => {
2535                         let e = match e {
2536                             qpack::Error::HeaderListTooLarge =>
2537                                 Error::ExcessiveLoad,
2538 
2539                             _ => Error::QpackDecompressionFailed,
2540                         };
2541 
2542                         conn.close(true, e.to_wire(), b"Error parsing headers.")?;
2543 
2544                         return Err(e);
2545                     },
2546                 };
2547 
2548                 qlog_with_type!(QLOG_FRAME_PARSED, conn.qlog, q, {
2549                     let qlog_headers = headers
2550                         .iter()
2551                         .map(|h| qlog::events::h3::HttpHeader {
2552                             name: String::from_utf8_lossy(h.name()).into_owned(),
2553                             value: String::from_utf8_lossy(h.value())
2554                                 .into_owned(),
2555                         })
2556                         .collect();
2557 
2558                     let frame = Http3Frame::Headers {
2559                         headers: qlog_headers,
2560                     };
2561 
2562                     let ev_data = EventData::H3FrameParsed(H3FrameParsed {
2563                         stream_id,
2564                         length: Some(payload_len),
2565                         frame,
2566                         raw: None,
2567                     });
2568 
2569                     q.add_event_data_now(ev_data).ok();
2570                 });
2571 
2572                 let has_body = !conn.stream_finished(stream_id);
2573 
2574                 return Ok((stream_id, Event::Headers {
2575                     list: headers,
2576                     has_body,
2577                 }));
2578             },
2579 
2580             frame::Frame::Data { .. } => {
2581                 if Some(stream_id) == self.peer_control_stream_id {
2582                     conn.close(
2583                         true,
2584                         Error::FrameUnexpected.to_wire(),
2585                         b"DATA received on control stream",
2586                     )?;
2587 
2588                     return Err(Error::FrameUnexpected);
2589                 }
2590 
2591                 // Do nothing. The Data event is returned separately.
2592             },
2593 
2594             frame::Frame::GoAway { id } => {
2595                 if Some(stream_id) != self.peer_control_stream_id {
2596                     conn.close(
2597                         true,
2598                         Error::FrameUnexpected.to_wire(),
2599                         b"GOAWAY received on non-control stream",
2600                     )?;
2601 
2602                     return Err(Error::FrameUnexpected);
2603                 }
2604 
2605                 if !self.is_server && id % 4 != 0 {
2606                     conn.close(
2607                         true,
2608                         Error::FrameUnexpected.to_wire(),
2609                         b"GOAWAY received with ID of non-request stream",
2610                     )?;
2611 
2612                     return Err(Error::IdError);
2613                 }
2614 
2615                 if let Some(received_id) = self.peer_goaway_id {
2616                     if id > received_id {
2617                         conn.close(
2618                             true,
2619                             Error::IdError.to_wire(),
2620                             b"GOAWAY received with ID larger than previously received",
2621                         )?;
2622 
2623                         return Err(Error::IdError);
2624                     }
2625                 }
2626 
2627                 self.peer_goaway_id = Some(id);
2628 
2629                 return Ok((id, Event::GoAway));
2630             },
2631 
2632             frame::Frame::MaxPushId { push_id } => {
2633                 if Some(stream_id) != self.peer_control_stream_id {
2634                     conn.close(
2635                         true,
2636                         Error::FrameUnexpected.to_wire(),
2637                         b"MAX_PUSH_ID received on non-control stream",
2638                     )?;
2639 
2640                     return Err(Error::FrameUnexpected);
2641                 }
2642 
2643                 if !self.is_server {
2644                     conn.close(
2645                         true,
2646                         Error::FrameUnexpected.to_wire(),
2647                         b"MAX_PUSH_ID received by client",
2648                     )?;
2649 
2650                     return Err(Error::FrameUnexpected);
2651                 }
2652 
2653                 if push_id < self.max_push_id {
2654                     conn.close(
2655                         true,
2656                         Error::IdError.to_wire(),
2657                         b"MAX_PUSH_ID reduced limit",
2658                     )?;
2659 
2660                     return Err(Error::IdError);
2661                 }
2662 
2663                 self.max_push_id = push_id;
2664             },
2665 
2666             frame::Frame::PushPromise { .. } => {
2667                 if self.is_server {
2668                     conn.close(
2669                         true,
2670                         Error::FrameUnexpected.to_wire(),
2671                         b"PUSH_PROMISE received by server",
2672                     )?;
2673 
2674                     return Err(Error::FrameUnexpected);
2675                 }
2676 
2677                 if stream_id % 4 != 0 {
2678                     conn.close(
2679                         true,
2680                         Error::FrameUnexpected.to_wire(),
2681                         b"PUSH_PROMISE received on non-request stream",
2682                     )?;
2683 
2684                     return Err(Error::FrameUnexpected);
2685                 }
2686 
2687                 // TODO: implement more checks and PUSH_PROMISE event
2688             },
2689 
2690             frame::Frame::CancelPush { .. } => {
2691                 if Some(stream_id) != self.peer_control_stream_id {
2692                     conn.close(
2693                         true,
2694                         Error::FrameUnexpected.to_wire(),
2695                         b"CANCEL_PUSH received on non-control stream",
2696                     )?;
2697 
2698                     return Err(Error::FrameUnexpected);
2699                 }
2700 
2701                 // TODO: implement CANCEL_PUSH frame
2702             },
2703 
2704             frame::Frame::PriorityUpdateRequest {
2705                 prioritized_element_id,
2706                 priority_field_value,
2707             } => {
2708                 if !self.is_server {
2709                     conn.close(
2710                         true,
2711                         Error::FrameUnexpected.to_wire(),
2712                         b"PRIORITY_UPDATE received by client",
2713                     )?;
2714 
2715                     return Err(Error::FrameUnexpected);
2716                 }
2717 
2718                 if Some(stream_id) != self.peer_control_stream_id {
2719                     conn.close(
2720                         true,
2721                         Error::FrameUnexpected.to_wire(),
2722                         b"PRIORITY_UPDATE received on non-control stream",
2723                     )?;
2724 
2725                     return Err(Error::FrameUnexpected);
2726                 }
2727 
2728                 if prioritized_element_id % 4 != 0 {
2729                     conn.close(
2730                         true,
2731                         Error::FrameUnexpected.to_wire(),
2732                         b"PRIORITY_UPDATE for request stream type with wrong ID",
2733                     )?;
2734 
2735                     return Err(Error::FrameUnexpected);
2736                 }
2737 
2738                 if prioritized_element_id > conn.streams.max_streams_bidi() * 4 {
2739                     conn.close(
2740                         true,
2741                         Error::IdError.to_wire(),
2742                         b"PRIORITY_UPDATE for request stream beyond max streams limit",
2743                     )?;
2744 
2745                     return Err(Error::IdError);
2746                 }
2747 
2748                 // If the PRIORITY_UPDATE is valid, consider storing the latest
2749                 // contents. Due to reordering, it is possible that we might
2750                 // receive frames that reference streams that have not yet to
2751                 // been opened and that's OK because it's within our concurrency
2752                 // limit. However, we discard PRIORITY_UPDATE that refers to
2753                 // streams that we know have been collected.
2754                 if conn.streams.is_collected(prioritized_element_id) {
2755                     return Err(Error::Done);
2756                 }
2757 
2758                 // If the stream did not yet exist, create it and store.
2759                 let stream =
2760                     self.streams.entry(prioritized_element_id).or_insert_with(
2761                         || stream::Stream::new(prioritized_element_id, false),
2762                     );
2763 
2764                 let had_priority_update = stream.has_last_priority_update();
2765                 stream.set_last_priority_update(Some(priority_field_value));
2766 
2767                 // Only trigger the event when there wasn't already a stored
2768                 // PRIORITY_UPDATE.
2769                 if !had_priority_update {
2770                     return Ok((prioritized_element_id, Event::PriorityUpdate));
2771                 } else {
2772                     return Err(Error::Done);
2773                 }
2774             },
2775 
2776             frame::Frame::PriorityUpdatePush {
2777                 prioritized_element_id,
2778                 ..
2779             } => {
2780                 if !self.is_server {
2781                     conn.close(
2782                         true,
2783                         Error::FrameUnexpected.to_wire(),
2784                         b"PRIORITY_UPDATE received by client",
2785                     )?;
2786 
2787                     return Err(Error::FrameUnexpected);
2788                 }
2789 
2790                 if Some(stream_id) != self.peer_control_stream_id {
2791                     conn.close(
2792                         true,
2793                         Error::FrameUnexpected.to_wire(),
2794                         b"PRIORITY_UPDATE received on non-control stream",
2795                     )?;
2796 
2797                     return Err(Error::FrameUnexpected);
2798                 }
2799 
2800                 if prioritized_element_id % 3 != 0 {
2801                     conn.close(
2802                         true,
2803                         Error::FrameUnexpected.to_wire(),
2804                         b"PRIORITY_UPDATE for push stream type with wrong ID",
2805                     )?;
2806 
2807                     return Err(Error::FrameUnexpected);
2808                 }
2809 
2810                 // TODO: we only implement this if we implement server push
2811             },
2812 
2813             frame::Frame::Unknown { .. } => (),
2814         }
2815 
2816         Err(Error::Done)
2817     }
2818 }
2819 
2820 /// Generates an HTTP/3 GREASE variable length integer.
grease_value() -> u642821 fn grease_value() -> u64 {
2822     let n = super::rand::rand_u64_uniform(148_764_065_110_560_899);
2823     31 * n + 33
2824 }
2825 
2826 #[doc(hidden)]
2827 pub mod testing {
2828     use super::*;
2829 
2830     use crate::testing;
2831 
2832     /// Session is an HTTP/3 test helper structure. It holds a client, server
2833     /// and pipe that allows them to communicate.
2834     ///
2835     /// `default()` creates a session with some sensible default
2836     /// configuration. `with_configs()` allows for providing a specific
2837     /// configuration.
2838     ///
2839     /// `handshake()` performs all the steps needed to establish an HTTP/3
2840     /// connection.
2841     ///
2842     /// Some utility functions are provided that make it less verbose to send
2843     /// request, responses and individual headers. The full quiche API remains
2844     /// available for any test that need to do unconventional things (such as
2845     /// bad behaviour that triggers errors).
2846     pub struct Session {
2847         pub pipe: testing::Pipe,
2848         pub client: Connection,
2849         pub server: Connection,
2850     }
2851 
2852     impl Session {
new() -> Result<Session>2853         pub fn new() -> Result<Session> {
2854             let mut config = crate::Config::new(crate::PROTOCOL_VERSION)?;
2855             config.load_cert_chain_from_pem_file("examples/cert.crt")?;
2856             config.load_priv_key_from_pem_file("examples/cert.key")?;
2857             config.set_application_protos(&[b"h3"])?;
2858             config.set_initial_max_data(1500);
2859             config.set_initial_max_stream_data_bidi_local(150);
2860             config.set_initial_max_stream_data_bidi_remote(150);
2861             config.set_initial_max_stream_data_uni(150);
2862             config.set_initial_max_streams_bidi(5);
2863             config.set_initial_max_streams_uni(5);
2864             config.verify_peer(false);
2865             config.enable_dgram(true, 3, 3);
2866             config.set_ack_delay_exponent(8);
2867 
2868             let h3_config = Config::new()?;
2869             Session::with_configs(&mut config, &h3_config)
2870         }
2871 
with_configs( config: &mut crate::Config, h3_config: &Config, ) -> Result<Session>2872         pub fn with_configs(
2873             config: &mut crate::Config, h3_config: &Config,
2874         ) -> Result<Session> {
2875             let pipe = testing::Pipe::with_config(config)?;
2876             let client_dgram = pipe.client.dgram_enabled();
2877             let server_dgram = pipe.server.dgram_enabled();
2878             Ok(Session {
2879                 pipe,
2880                 client: Connection::new(h3_config, false, client_dgram)?,
2881                 server: Connection::new(h3_config, true, server_dgram)?,
2882             })
2883         }
2884 
2885         /// Do the HTTP/3 handshake so both ends are in sane initial state.
handshake(&mut self) -> Result<()>2886         pub fn handshake(&mut self) -> Result<()> {
2887             self.pipe.handshake()?;
2888 
2889             // Client streams.
2890             self.client.send_settings(&mut self.pipe.client)?;
2891             self.pipe.advance().ok();
2892 
2893             self.client
2894                 .open_qpack_encoder_stream(&mut self.pipe.client)?;
2895             self.pipe.advance().ok();
2896 
2897             self.client
2898                 .open_qpack_decoder_stream(&mut self.pipe.client)?;
2899             self.pipe.advance().ok();
2900 
2901             if self.pipe.client.grease {
2902                 self.client.open_grease_stream(&mut self.pipe.client)?;
2903             }
2904 
2905             self.pipe.advance().ok();
2906 
2907             // Server streams.
2908             self.server.send_settings(&mut self.pipe.server)?;
2909             self.pipe.advance().ok();
2910 
2911             self.server
2912                 .open_qpack_encoder_stream(&mut self.pipe.server)?;
2913             self.pipe.advance().ok();
2914 
2915             self.server
2916                 .open_qpack_decoder_stream(&mut self.pipe.server)?;
2917             self.pipe.advance().ok();
2918 
2919             if self.pipe.server.grease {
2920                 self.server.open_grease_stream(&mut self.pipe.server)?;
2921             }
2922 
2923             self.advance().ok();
2924 
2925             while self.client.poll(&mut self.pipe.client).is_ok() {
2926                 // Do nothing.
2927             }
2928 
2929             while self.server.poll(&mut self.pipe.server).is_ok() {
2930                 // Do nothing.
2931             }
2932 
2933             Ok(())
2934         }
2935 
2936         /// Advances the session pipe over the buffer.
advance(&mut self) -> crate::Result<()>2937         pub fn advance(&mut self) -> crate::Result<()> {
2938             self.pipe.advance()
2939         }
2940 
2941         /// Polls the client for events.
poll_client(&mut self) -> Result<(u64, Event)>2942         pub fn poll_client(&mut self) -> Result<(u64, Event)> {
2943             self.client.poll(&mut self.pipe.client)
2944         }
2945 
2946         /// Polls the server for events.
poll_server(&mut self) -> Result<(u64, Event)>2947         pub fn poll_server(&mut self) -> Result<(u64, Event)> {
2948             self.server.poll(&mut self.pipe.server)
2949         }
2950 
2951         /// Sends a request from client with default headers.
2952         ///
2953         /// On success it returns the newly allocated stream and the headers.
send_request(&mut self, fin: bool) -> Result<(u64, Vec<Header>)>2954         pub fn send_request(&mut self, fin: bool) -> Result<(u64, Vec<Header>)> {
2955             let req = vec![
2956                 Header::new(b":method", b"GET"),
2957                 Header::new(b":scheme", b"https"),
2958                 Header::new(b":authority", b"quic.tech"),
2959                 Header::new(b":path", b"/test"),
2960                 Header::new(b"user-agent", b"quiche-test"),
2961             ];
2962 
2963             let stream =
2964                 self.client.send_request(&mut self.pipe.client, &req, fin)?;
2965 
2966             self.advance().ok();
2967 
2968             Ok((stream, req))
2969         }
2970 
2971         /// Sends a response from server with default headers.
2972         ///
2973         /// On success it returns the headers.
send_response( &mut self, stream: u64, fin: bool, ) -> Result<Vec<Header>>2974         pub fn send_response(
2975             &mut self, stream: u64, fin: bool,
2976         ) -> Result<Vec<Header>> {
2977             let resp = vec![
2978                 Header::new(b":status", b"200"),
2979                 Header::new(b"server", b"quiche-test"),
2980             ];
2981 
2982             self.server.send_response(
2983                 &mut self.pipe.server,
2984                 stream,
2985                 &resp,
2986                 fin,
2987             )?;
2988 
2989             self.advance().ok();
2990 
2991             Ok(resp)
2992         }
2993 
2994         /// Sends some default payload from client.
2995         ///
2996         /// On success it returns the payload.
send_body_client( &mut self, stream: u64, fin: bool, ) -> Result<Vec<u8>>2997         pub fn send_body_client(
2998             &mut self, stream: u64, fin: bool,
2999         ) -> Result<Vec<u8>> {
3000             let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3001 
3002             self.client
3003                 .send_body(&mut self.pipe.client, stream, &bytes, fin)?;
3004 
3005             self.advance().ok();
3006 
3007             Ok(bytes)
3008         }
3009 
3010         /// Fetches DATA payload from the server.
3011         ///
3012         /// On success it returns the number of bytes received.
recv_body_client( &mut self, stream: u64, buf: &mut [u8], ) -> Result<usize>3013         pub fn recv_body_client(
3014             &mut self, stream: u64, buf: &mut [u8],
3015         ) -> Result<usize> {
3016             self.client.recv_body(&mut self.pipe.client, stream, buf)
3017         }
3018 
3019         /// Sends some default payload from server.
3020         ///
3021         /// On success it returns the payload.
send_body_server( &mut self, stream: u64, fin: bool, ) -> Result<Vec<u8>>3022         pub fn send_body_server(
3023             &mut self, stream: u64, fin: bool,
3024         ) -> Result<Vec<u8>> {
3025             let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3026 
3027             self.server
3028                 .send_body(&mut self.pipe.server, stream, &bytes, fin)?;
3029 
3030             self.advance().ok();
3031 
3032             Ok(bytes)
3033         }
3034 
3035         /// Fetches DATA payload from the client.
3036         ///
3037         /// On success it returns the number of bytes received.
recv_body_server( &mut self, stream: u64, buf: &mut [u8], ) -> Result<usize>3038         pub fn recv_body_server(
3039             &mut self, stream: u64, buf: &mut [u8],
3040         ) -> Result<usize> {
3041             self.server.recv_body(&mut self.pipe.server, stream, buf)
3042         }
3043 
3044         /// Sends a single HTTP/3 frame from the client.
send_frame_client( &mut self, frame: frame::Frame, stream_id: u64, fin: bool, ) -> Result<()>3045         pub fn send_frame_client(
3046             &mut self, frame: frame::Frame, stream_id: u64, fin: bool,
3047         ) -> Result<()> {
3048             let mut d = [42; 65535];
3049 
3050             let mut b = octets::OctetsMut::with_slice(&mut d);
3051 
3052             frame.to_bytes(&mut b)?;
3053 
3054             let off = b.off();
3055             self.pipe.client.stream_send(stream_id, &d[..off], fin)?;
3056 
3057             self.advance().ok();
3058 
3059             Ok(())
3060         }
3061 
3062         /// Send an HTTP/3 DATAGRAM with default data from the client.
3063         ///
3064         /// On success it returns the data.
send_dgram_client(&mut self, flow_id: u64) -> Result<Vec<u8>>3065         pub fn send_dgram_client(&mut self, flow_id: u64) -> Result<Vec<u8>> {
3066             let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3067 
3068             self.client
3069                 .send_dgram(&mut self.pipe.client, flow_id, &bytes)?;
3070 
3071             self.advance().ok();
3072 
3073             Ok(bytes)
3074         }
3075 
3076         /// Receives an HTTP/3 DATAGRAM from the server.
3077         ///
3078         /// On success it returns the DATAGRAM length, flow ID and flow ID
3079         /// length.
recv_dgram_client( &mut self, buf: &mut [u8], ) -> Result<(usize, u64, usize)>3080         pub fn recv_dgram_client(
3081             &mut self, buf: &mut [u8],
3082         ) -> Result<(usize, u64, usize)> {
3083             self.client.recv_dgram(&mut self.pipe.client, buf)
3084         }
3085 
3086         /// Send an HTTP/3 DATAGRAM with default data from the server
3087         ///
3088         /// On success it returns the data.
send_dgram_server(&mut self, flow_id: u64) -> Result<Vec<u8>>3089         pub fn send_dgram_server(&mut self, flow_id: u64) -> Result<Vec<u8>> {
3090             let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3091 
3092             self.server
3093                 .send_dgram(&mut self.pipe.server, flow_id, &bytes)?;
3094 
3095             self.advance().ok();
3096 
3097             Ok(bytes)
3098         }
3099 
3100         /// Receives an HTTP/3 DATAGRAM from the client.
3101         ///
3102         /// On success it returns the DATAGRAM length, flow ID and flow ID
3103         /// length.
recv_dgram_server( &mut self, buf: &mut [u8], ) -> Result<(usize, u64, usize)>3104         pub fn recv_dgram_server(
3105             &mut self, buf: &mut [u8],
3106         ) -> Result<(usize, u64, usize)> {
3107             self.server.recv_dgram(&mut self.pipe.server, buf)
3108         }
3109 
3110         /// Sends a single HTTP/3 frame from the server.
send_frame_server( &mut self, frame: frame::Frame, stream_id: u64, fin: bool, ) -> Result<()>3111         pub fn send_frame_server(
3112             &mut self, frame: frame::Frame, stream_id: u64, fin: bool,
3113         ) -> Result<()> {
3114             let mut d = [42; 65535];
3115 
3116             let mut b = octets::OctetsMut::with_slice(&mut d);
3117 
3118             frame.to_bytes(&mut b)?;
3119 
3120             let off = b.off();
3121             self.pipe.server.stream_send(stream_id, &d[..off], fin)?;
3122 
3123             self.advance().ok();
3124 
3125             Ok(())
3126         }
3127 
3128         /// Sends an arbitrary buffer of HTTP/3 stream data from the client.
send_arbitrary_stream_data_client( &mut self, data: &[u8], stream_id: u64, fin: bool, ) -> Result<()>3129         pub fn send_arbitrary_stream_data_client(
3130             &mut self, data: &[u8], stream_id: u64, fin: bool,
3131         ) -> Result<()> {
3132             self.pipe.client.stream_send(stream_id, data, fin)?;
3133 
3134             self.advance().ok();
3135 
3136             Ok(())
3137         }
3138 
3139         /// Sends an arbitrary buffer of HTTP/3 stream data from the server.
send_arbitrary_stream_data_server( &mut self, data: &[u8], stream_id: u64, fin: bool, ) -> Result<()>3140         pub fn send_arbitrary_stream_data_server(
3141             &mut self, data: &[u8], stream_id: u64, fin: bool,
3142         ) -> Result<()> {
3143             self.pipe.server.stream_send(stream_id, data, fin)?;
3144 
3145             self.advance().ok();
3146 
3147             Ok(())
3148         }
3149     }
3150 }
3151 
3152 #[cfg(test)]
3153 mod tests {
3154     use super::*;
3155 
3156     use super::testing::*;
3157 
3158     #[test]
3159     /// Make sure that random GREASE values is within the specified limit.
grease_value_in_varint_limit()3160     fn grease_value_in_varint_limit() {
3161         assert!(grease_value() < 2u64.pow(62) - 1);
3162     }
3163 
3164     #[test]
h3_handshake_0rtt()3165     fn h3_handshake_0rtt() {
3166         let mut buf = [0; 65535];
3167 
3168         let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
3169         config
3170             .load_cert_chain_from_pem_file("examples/cert.crt")
3171             .unwrap();
3172         config
3173             .load_priv_key_from_pem_file("examples/cert.key")
3174             .unwrap();
3175         config
3176             .set_application_protos(&[b"proto1", b"proto2"])
3177             .unwrap();
3178         config.set_initial_max_data(30);
3179         config.set_initial_max_stream_data_bidi_local(15);
3180         config.set_initial_max_stream_data_bidi_remote(15);
3181         config.set_initial_max_stream_data_uni(15);
3182         config.set_initial_max_streams_bidi(3);
3183         config.set_initial_max_streams_uni(3);
3184         config.enable_early_data();
3185         config.verify_peer(false);
3186 
3187         let h3_config = Config::new().unwrap();
3188 
3189         // Perform initial handshake.
3190         let mut pipe = crate::testing::Pipe::with_config(&mut config).unwrap();
3191         assert_eq!(pipe.handshake(), Ok(()));
3192 
3193         // Extract session,
3194         let session = pipe.client.session().unwrap();
3195 
3196         // Configure session on new connection.
3197         let mut pipe = crate::testing::Pipe::with_config(&mut config).unwrap();
3198         assert_eq!(pipe.client.set_session(session), Ok(()));
3199 
3200         // Can't create an H3 connection until the QUIC connection is determined
3201         // to have made sufficient early data progress.
3202         assert!(matches!(
3203             Connection::with_transport(&mut pipe.client, &h3_config),
3204             Err(Error::InternalError)
3205         ));
3206 
3207         // Client sends initial flight.
3208         let (len, _) = pipe.client.send(&mut buf).unwrap();
3209 
3210         // Now an H3 connection can be created.
3211         assert!(matches!(
3212             Connection::with_transport(&mut pipe.client, &h3_config),
3213             Ok(_)
3214         ));
3215         assert_eq!(pipe.server_recv(&mut buf[..len]), Ok(len));
3216 
3217         // Client sends 0-RTT packet.
3218         let pkt_type = crate::packet::Type::ZeroRTT;
3219 
3220         let frames = [crate::frame::Frame::Stream {
3221             stream_id: 6,
3222             data: crate::stream::RangeBuf::from(b"aaaaa", 0, true),
3223         }];
3224 
3225         assert_eq!(
3226             pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
3227             Ok(1200)
3228         );
3229 
3230         assert_eq!(pipe.server.undecryptable_pkts.len(), 0);
3231 
3232         // 0-RTT stream data is readable.
3233         let mut r = pipe.server.readable();
3234         assert_eq!(r.next(), Some(6));
3235         assert_eq!(r.next(), None);
3236 
3237         let mut b = [0; 15];
3238         assert_eq!(pipe.server.stream_recv(6, &mut b), Ok((5, true)));
3239         assert_eq!(&b[..5], b"aaaaa");
3240     }
3241 
3242     #[test]
3243     /// Send a request with no body, get a response with no body.
request_no_body_response_no_body()3244     fn request_no_body_response_no_body() {
3245         let mut s = Session::new().unwrap();
3246         s.handshake().unwrap();
3247 
3248         let (stream, req) = s.send_request(true).unwrap();
3249 
3250         assert_eq!(stream, 0);
3251 
3252         let ev_headers = Event::Headers {
3253             list: req,
3254             has_body: false,
3255         };
3256 
3257         assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3258         assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3259 
3260         let resp = s.send_response(stream, true).unwrap();
3261 
3262         let ev_headers = Event::Headers {
3263             list: resp,
3264             has_body: false,
3265         };
3266 
3267         assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3268         assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3269         assert_eq!(s.poll_client(), Err(Error::Done));
3270     }
3271 
3272     #[test]
3273     /// Send a request with no body, get a response with one DATA frame.
request_no_body_response_one_chunk()3274     fn request_no_body_response_one_chunk() {
3275         let mut s = Session::new().unwrap();
3276         s.handshake().unwrap();
3277 
3278         let (stream, req) = s.send_request(true).unwrap();
3279         assert_eq!(stream, 0);
3280 
3281         let ev_headers = Event::Headers {
3282             list: req,
3283             has_body: false,
3284         };
3285 
3286         assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3287 
3288         assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3289 
3290         let resp = s.send_response(stream, false).unwrap();
3291 
3292         let body = s.send_body_server(stream, true).unwrap();
3293 
3294         let mut recv_buf = vec![0; body.len()];
3295 
3296         let ev_headers = Event::Headers {
3297             list: resp,
3298             has_body: true,
3299         };
3300 
3301         assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3302 
3303         assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
3304         assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
3305 
3306         assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3307         assert_eq!(s.poll_client(), Err(Error::Done));
3308     }
3309 
3310     #[test]
3311     /// Send a request with no body, get a response with multiple DATA frames.
request_no_body_response_many_chunks()3312     fn request_no_body_response_many_chunks() {
3313         let mut s = Session::new().unwrap();
3314         s.handshake().unwrap();
3315 
3316         let (stream, req) = s.send_request(true).unwrap();
3317 
3318         let ev_headers = Event::Headers {
3319             list: req,
3320             has_body: false,
3321         };
3322 
3323         assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3324         assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3325 
3326         let total_data_frames = 4;
3327 
3328         let resp = s.send_response(stream, false).unwrap();
3329 
3330         for _ in 0..total_data_frames - 1 {
3331             s.send_body_server(stream, false).unwrap();
3332         }
3333 
3334         let body = s.send_body_server(stream, true).unwrap();
3335 
3336         let mut recv_buf = vec![0; body.len()];
3337 
3338         let ev_headers = Event::Headers {
3339             list: resp,
3340             has_body: true,
3341         };
3342 
3343         assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3344         assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
3345         assert_eq!(s.poll_client(), Err(Error::Done));
3346 
3347         for _ in 0..total_data_frames {
3348             assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
3349         }
3350 
3351         assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3352         assert_eq!(s.poll_client(), Err(Error::Done));
3353     }
3354 
3355     #[test]
3356     /// Send a request with one DATA frame, get a response with no body.
request_one_chunk_response_no_body()3357     fn request_one_chunk_response_no_body() {
3358         let mut s = Session::new().unwrap();
3359         s.handshake().unwrap();
3360 
3361         let (stream, req) = s.send_request(false).unwrap();
3362 
3363         let body = s.send_body_client(stream, true).unwrap();
3364 
3365         let mut recv_buf = vec![0; body.len()];
3366 
3367         let ev_headers = Event::Headers {
3368             list: req,
3369             has_body: true,
3370         };
3371 
3372         assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3373 
3374         assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
3375         assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
3376 
3377         assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3378 
3379         let resp = s.send_response(stream, true).unwrap();
3380 
3381         let ev_headers = Event::Headers {
3382             list: resp,
3383             has_body: false,
3384         };
3385 
3386         assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3387         assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3388     }
3389 
3390     #[test]
3391     /// Send a request with multiple DATA frames, get a response with no body.
request_many_chunks_response_no_body()3392     fn request_many_chunks_response_no_body() {
3393         let mut s = Session::new().unwrap();
3394         s.handshake().unwrap();
3395 
3396         let (stream, req) = s.send_request(false).unwrap();
3397 
3398         let total_data_frames = 4;
3399 
3400         for _ in 0..total_data_frames - 1 {
3401             s.send_body_client(stream, false).unwrap();
3402         }
3403 
3404         let body = s.send_body_client(stream, true).unwrap();
3405 
3406         let mut recv_buf = vec![0; body.len()];
3407 
3408         let ev_headers = Event::Headers {
3409             list: req,
3410             has_body: true,
3411         };
3412 
3413         assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3414         assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
3415         assert_eq!(s.poll_server(), Err(Error::Done));
3416 
3417         for _ in 0..total_data_frames {
3418             assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
3419         }
3420 
3421         assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3422 
3423         let resp = s.send_response(stream, true).unwrap();
3424 
3425         let ev_headers = Event::Headers {
3426             list: resp,
3427             has_body: false,
3428         };
3429 
3430         assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3431         assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3432     }
3433 
3434     #[test]
3435     /// Send a request with multiple DATA frames, get a response with one DATA
3436     /// frame.
many_requests_many_chunks_response_one_chunk()3437     fn many_requests_many_chunks_response_one_chunk() {
3438         let mut s = Session::new().unwrap();
3439         s.handshake().unwrap();
3440 
3441         let mut reqs = Vec::new();
3442 
3443         let (stream1, req1) = s.send_request(false).unwrap();
3444         assert_eq!(stream1, 0);
3445         reqs.push(req1);
3446 
3447         let (stream2, req2) = s.send_request(false).unwrap();
3448         assert_eq!(stream2, 4);
3449         reqs.push(req2);
3450 
3451         let (stream3, req3) = s.send_request(false).unwrap();
3452         assert_eq!(stream3, 8);
3453         reqs.push(req3);
3454 
3455         let body = s.send_body_client(stream1, false).unwrap();
3456         s.send_body_client(stream2, false).unwrap();
3457         s.send_body_client(stream3, false).unwrap();
3458 
3459         let mut recv_buf = vec![0; body.len()];
3460 
3461         // Reverse order of writes.
3462 
3463         s.send_body_client(stream3, true).unwrap();
3464         s.send_body_client(stream2, true).unwrap();
3465         s.send_body_client(stream1, true).unwrap();
3466 
3467         for _ in 0..reqs.len() {
3468             let (stream, ev) = s.poll_server().unwrap();
3469             let ev_headers = Event::Headers {
3470                 list: reqs[(stream / 4) as usize].clone(),
3471                 has_body: true,
3472             };
3473             assert_eq!(ev, ev_headers);
3474             assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
3475             assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
3476             assert_eq!(s.poll_client(), Err(Error::Done));
3477 
3478             assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
3479             assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3480         }
3481 
3482         assert_eq!(s.poll_server(), Err(Error::Done));
3483 
3484         let mut resps = Vec::new();
3485 
3486         let resp1 = s.send_response(stream1, true).unwrap();
3487         resps.push(resp1);
3488 
3489         let resp2 = s.send_response(stream2, true).unwrap();
3490         resps.push(resp2);
3491 
3492         let resp3 = s.send_response(stream3, true).unwrap();
3493         resps.push(resp3);
3494 
3495         for _ in 0..resps.len() {
3496             let (stream, ev) = s.poll_client().unwrap();
3497             let ev_headers = Event::Headers {
3498                 list: resps[(stream / 4) as usize].clone(),
3499                 has_body: false,
3500             };
3501             assert_eq!(ev, ev_headers);
3502             assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3503         }
3504 
3505         assert_eq!(s.poll_client(), Err(Error::Done));
3506     }
3507 
3508     #[test]
3509     /// Send a request with no body, get a response with one DATA frame and an
3510     /// empty FIN after reception from the client.
request_no_body_response_one_chunk_empty_fin()3511     fn request_no_body_response_one_chunk_empty_fin() {
3512         let mut s = Session::new().unwrap();
3513         s.handshake().unwrap();
3514 
3515         let (stream, req) = s.send_request(true).unwrap();
3516 
3517         let ev_headers = Event::Headers {
3518             list: req,
3519             has_body: false,
3520         };
3521 
3522         assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3523         assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3524 
3525         let resp = s.send_response(stream, false).unwrap();
3526 
3527         let body = s.send_body_server(stream, false).unwrap();
3528 
3529         let mut recv_buf = vec![0; body.len()];
3530 
3531         let ev_headers = Event::Headers {
3532             list: resp,
3533             has_body: true,
3534         };
3535 
3536         assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3537 
3538         assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
3539         assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
3540 
3541         assert_eq!(s.pipe.server.stream_send(stream, &[], true), Ok(0));
3542         s.advance().ok();
3543 
3544         assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3545         assert_eq!(s.poll_client(), Err(Error::Done));
3546     }
3547 
3548     #[test]
3549     /// Send a request with no body, get a response with no body followed by
3550     /// GREASE that is STREAM frame with a FIN.
request_no_body_response_no_body_with_grease()3551     fn request_no_body_response_no_body_with_grease() {
3552         let mut s = Session::new().unwrap();
3553         s.handshake().unwrap();
3554 
3555         let (stream, req) = s.send_request(true).unwrap();
3556 
3557         assert_eq!(stream, 0);
3558 
3559         let ev_headers = Event::Headers {
3560             list: req,
3561             has_body: false,
3562         };
3563 
3564         assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3565         assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3566 
3567         let resp = s.send_response(stream, false).unwrap();
3568 
3569         // Note that "has_body" is a misnomer, there will never be a body in
3570         // this test. There's other work that will fix this, once it lands
3571         // remove this comment.
3572         let ev_headers = Event::Headers {
3573             list: resp,
3574             has_body: true,
3575         };
3576 
3577         // Inject a GREASE frame
3578         let mut d = [42; 10];
3579         let mut b = octets::OctetsMut::with_slice(&mut d);
3580 
3581         let frame_type = b.put_varint(148_764_065_110_560_899).unwrap();
3582         s.pipe.server.stream_send(0, frame_type, false).unwrap();
3583 
3584         let frame_len = b.put_varint(10).unwrap();
3585         s.pipe.server.stream_send(0, frame_len, false).unwrap();
3586 
3587         s.pipe.server.stream_send(0, &d, true).unwrap();
3588 
3589         s.advance().ok();
3590 
3591         assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3592         assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3593         assert_eq!(s.poll_client(), Err(Error::Done));
3594     }
3595 
3596     #[test]
3597     /// Try to send DATA frames before HEADERS.
body_response_before_headers()3598     fn body_response_before_headers() {
3599         let mut s = Session::new().unwrap();
3600         s.handshake().unwrap();
3601 
3602         let (stream, req) = s.send_request(true).unwrap();
3603         assert_eq!(stream, 0);
3604 
3605         let ev_headers = Event::Headers {
3606             list: req,
3607             has_body: false,
3608         };
3609 
3610         assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3611 
3612         assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3613 
3614         assert_eq!(
3615             s.send_body_server(stream, true),
3616             Err(Error::FrameUnexpected)
3617         );
3618 
3619         assert_eq!(s.poll_client(), Err(Error::Done));
3620     }
3621 
3622     #[test]
3623     /// Try to send DATA frames on wrong streams, ensure the API returns an
3624     /// error before anything hits the transport layer.
send_body_invalid_client_stream()3625     fn send_body_invalid_client_stream() {
3626         let mut s = Session::new().unwrap();
3627         s.handshake().unwrap();
3628 
3629         assert_eq!(s.send_body_client(0, true), Err(Error::FrameUnexpected));
3630 
3631         assert_eq!(
3632             s.send_body_client(s.client.control_stream_id.unwrap(), true),
3633             Err(Error::FrameUnexpected)
3634         );
3635 
3636         assert_eq!(
3637             s.send_body_client(
3638                 s.client.local_qpack_streams.encoder_stream_id.unwrap(),
3639                 true
3640             ),
3641             Err(Error::FrameUnexpected)
3642         );
3643 
3644         assert_eq!(
3645             s.send_body_client(
3646                 s.client.local_qpack_streams.decoder_stream_id.unwrap(),
3647                 true
3648             ),
3649             Err(Error::FrameUnexpected)
3650         );
3651 
3652         assert_eq!(
3653             s.send_body_client(s.client.peer_control_stream_id.unwrap(), true),
3654             Err(Error::FrameUnexpected)
3655         );
3656 
3657         assert_eq!(
3658             s.send_body_client(
3659                 s.client.peer_qpack_streams.encoder_stream_id.unwrap(),
3660                 true
3661             ),
3662             Err(Error::FrameUnexpected)
3663         );
3664 
3665         assert_eq!(
3666             s.send_body_client(
3667                 s.client.peer_qpack_streams.decoder_stream_id.unwrap(),
3668                 true
3669             ),
3670             Err(Error::FrameUnexpected)
3671         );
3672     }
3673 
3674     #[test]
3675     /// Try to send DATA frames on wrong streams, ensure the API returns an
3676     /// error before anything hits the transport layer.
send_body_invalid_server_stream()3677     fn send_body_invalid_server_stream() {
3678         let mut s = Session::new().unwrap();
3679         s.handshake().unwrap();
3680 
3681         assert_eq!(s.send_body_server(0, true), Err(Error::FrameUnexpected));
3682 
3683         assert_eq!(
3684             s.send_body_server(s.server.control_stream_id.unwrap(), true),
3685             Err(Error::FrameUnexpected)
3686         );
3687 
3688         assert_eq!(
3689             s.send_body_server(
3690                 s.server.local_qpack_streams.encoder_stream_id.unwrap(),
3691                 true
3692             ),
3693             Err(Error::FrameUnexpected)
3694         );
3695 
3696         assert_eq!(
3697             s.send_body_server(
3698                 s.server.local_qpack_streams.decoder_stream_id.unwrap(),
3699                 true
3700             ),
3701             Err(Error::FrameUnexpected)
3702         );
3703 
3704         assert_eq!(
3705             s.send_body_server(s.server.peer_control_stream_id.unwrap(), true),
3706             Err(Error::FrameUnexpected)
3707         );
3708 
3709         assert_eq!(
3710             s.send_body_server(
3711                 s.server.peer_qpack_streams.encoder_stream_id.unwrap(),
3712                 true
3713             ),
3714             Err(Error::FrameUnexpected)
3715         );
3716 
3717         assert_eq!(
3718             s.send_body_server(
3719                 s.server.peer_qpack_streams.decoder_stream_id.unwrap(),
3720                 true
3721             ),
3722             Err(Error::FrameUnexpected)
3723         );
3724     }
3725 
3726     #[test]
3727     /// Send a MAX_PUSH_ID frame from the client on a valid stream.
max_push_id_from_client_good()3728     fn max_push_id_from_client_good() {
3729         let mut s = Session::new().unwrap();
3730         s.handshake().unwrap();
3731 
3732         s.send_frame_client(
3733             frame::Frame::MaxPushId { push_id: 1 },
3734             s.client.control_stream_id.unwrap(),
3735             false,
3736         )
3737         .unwrap();
3738 
3739         assert_eq!(s.poll_server(), Err(Error::Done));
3740     }
3741 
3742     #[test]
3743     /// Send a MAX_PUSH_ID frame from the client on an invalid stream.
max_push_id_from_client_bad_stream()3744     fn max_push_id_from_client_bad_stream() {
3745         let mut s = Session::new().unwrap();
3746         s.handshake().unwrap();
3747 
3748         let (stream, req) = s.send_request(false).unwrap();
3749 
3750         s.send_frame_client(
3751             frame::Frame::MaxPushId { push_id: 2 },
3752             stream,
3753             false,
3754         )
3755         .unwrap();
3756 
3757         let ev_headers = Event::Headers {
3758             list: req,
3759             has_body: true,
3760         };
3761 
3762         assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3763         assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
3764     }
3765 
3766     #[test]
3767     /// Send a sequence of MAX_PUSH_ID frames from the client that attempt to
3768     /// reduce the limit.
max_push_id_from_client_limit_reduction()3769     fn max_push_id_from_client_limit_reduction() {
3770         let mut s = Session::new().unwrap();
3771         s.handshake().unwrap();
3772 
3773         s.send_frame_client(
3774             frame::Frame::MaxPushId { push_id: 2 },
3775             s.client.control_stream_id.unwrap(),
3776             false,
3777         )
3778         .unwrap();
3779 
3780         s.send_frame_client(
3781             frame::Frame::MaxPushId { push_id: 1 },
3782             s.client.control_stream_id.unwrap(),
3783             false,
3784         )
3785         .unwrap();
3786 
3787         assert_eq!(s.poll_server(), Err(Error::IdError));
3788     }
3789 
3790     #[test]
3791     /// Send a MAX_PUSH_ID frame from the server, which is forbidden.
max_push_id_from_server()3792     fn max_push_id_from_server() {
3793         let mut s = Session::new().unwrap();
3794         s.handshake().unwrap();
3795 
3796         s.send_frame_server(
3797             frame::Frame::MaxPushId { push_id: 1 },
3798             s.server.control_stream_id.unwrap(),
3799             false,
3800         )
3801         .unwrap();
3802 
3803         assert_eq!(s.poll_client(), Err(Error::FrameUnexpected));
3804     }
3805 
3806     #[test]
3807     /// Send a PUSH_PROMISE frame from the client, which is forbidden.
push_promise_from_client()3808     fn push_promise_from_client() {
3809         let mut s = Session::new().unwrap();
3810         s.handshake().unwrap();
3811 
3812         let (stream, req) = s.send_request(false).unwrap();
3813 
3814         let header_block = s.client.encode_header_block(&req).unwrap();
3815 
3816         s.send_frame_client(
3817             frame::Frame::PushPromise {
3818                 push_id: 1,
3819                 header_block,
3820             },
3821             stream,
3822             false,
3823         )
3824         .unwrap();
3825 
3826         let ev_headers = Event::Headers {
3827             list: req,
3828             has_body: true,
3829         };
3830 
3831         assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3832         assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
3833     }
3834 
3835     #[test]
3836     /// Send a CANCEL_PUSH frame from the client.
cancel_push_from_client()3837     fn cancel_push_from_client() {
3838         let mut s = Session::new().unwrap();
3839         s.handshake().unwrap();
3840 
3841         s.send_frame_client(
3842             frame::Frame::CancelPush { push_id: 1 },
3843             s.client.control_stream_id.unwrap(),
3844             false,
3845         )
3846         .unwrap();
3847 
3848         assert_eq!(s.poll_server(), Err(Error::Done));
3849     }
3850 
3851     #[test]
3852     /// Send a CANCEL_PUSH frame from the client on an invalid stream.
cancel_push_from_client_bad_stream()3853     fn cancel_push_from_client_bad_stream() {
3854         let mut s = Session::new().unwrap();
3855         s.handshake().unwrap();
3856 
3857         let (stream, req) = s.send_request(false).unwrap();
3858 
3859         s.send_frame_client(
3860             frame::Frame::CancelPush { push_id: 2 },
3861             stream,
3862             false,
3863         )
3864         .unwrap();
3865 
3866         let ev_headers = Event::Headers {
3867             list: req,
3868             has_body: true,
3869         };
3870 
3871         assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3872         assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
3873     }
3874 
3875     #[test]
3876     /// Send a CANCEL_PUSH frame from the client.
cancel_push_from_server()3877     fn cancel_push_from_server() {
3878         let mut s = Session::new().unwrap();
3879         s.handshake().unwrap();
3880 
3881         s.send_frame_server(
3882             frame::Frame::CancelPush { push_id: 1 },
3883             s.server.control_stream_id.unwrap(),
3884             false,
3885         )
3886         .unwrap();
3887 
3888         assert_eq!(s.poll_client(), Err(Error::Done));
3889     }
3890 
3891     #[test]
3892     /// Send a GOAWAY frame from the client.
goaway_from_client_good()3893     fn goaway_from_client_good() {
3894         let mut s = Session::new().unwrap();
3895         s.handshake().unwrap();
3896 
3897         s.client.send_goaway(&mut s.pipe.client, 100).unwrap();
3898 
3899         s.advance().ok();
3900 
3901         // TODO: server push
3902         assert_eq!(s.poll_server(), Ok((0, Event::GoAway)));
3903     }
3904 
3905     #[test]
3906     /// Send a GOAWAY frame from the server.
goaway_from_server_good()3907     fn goaway_from_server_good() {
3908         let mut s = Session::new().unwrap();
3909         s.handshake().unwrap();
3910 
3911         s.server.send_goaway(&mut s.pipe.server, 4000).unwrap();
3912 
3913         s.advance().ok();
3914 
3915         assert_eq!(s.poll_client(), Ok((4000, Event::GoAway)));
3916     }
3917 
3918     #[test]
3919     /// A client MUST NOT send a request after it receives GOAWAY.
client_request_after_goaway()3920     fn client_request_after_goaway() {
3921         let mut s = Session::new().unwrap();
3922         s.handshake().unwrap();
3923 
3924         s.server.send_goaway(&mut s.pipe.server, 4000).unwrap();
3925 
3926         s.advance().ok();
3927 
3928         assert_eq!(s.poll_client(), Ok((4000, Event::GoAway)));
3929 
3930         assert_eq!(s.send_request(true), Err(Error::FrameUnexpected));
3931     }
3932 
3933     #[test]
3934     /// Send a GOAWAY frame from the server, using an invalid goaway ID.
goaway_from_server_invalid_id()3935     fn goaway_from_server_invalid_id() {
3936         let mut s = Session::new().unwrap();
3937         s.handshake().unwrap();
3938 
3939         s.send_frame_server(
3940             frame::Frame::GoAway { id: 1 },
3941             s.server.control_stream_id.unwrap(),
3942             false,
3943         )
3944         .unwrap();
3945 
3946         assert_eq!(s.poll_client(), Err(Error::IdError));
3947     }
3948 
3949     #[test]
3950     /// Send multiple GOAWAY frames from the server, that increase the goaway
3951     /// ID.
goaway_from_server_increase_id()3952     fn goaway_from_server_increase_id() {
3953         let mut s = Session::new().unwrap();
3954         s.handshake().unwrap();
3955 
3956         s.send_frame_server(
3957             frame::Frame::GoAway { id: 0 },
3958             s.server.control_stream_id.unwrap(),
3959             false,
3960         )
3961         .unwrap();
3962 
3963         s.send_frame_server(
3964             frame::Frame::GoAway { id: 4 },
3965             s.server.control_stream_id.unwrap(),
3966             false,
3967         )
3968         .unwrap();
3969 
3970         assert_eq!(s.poll_client(), Ok((0, Event::GoAway)));
3971 
3972         assert_eq!(s.poll_client(), Err(Error::IdError));
3973     }
3974 
3975     #[test]
3976     #[cfg(feature = "sfv")]
parse_priority_field_value()3977     fn parse_priority_field_value() {
3978         // Legal dicts
3979         assert_eq!(
3980             Ok(Priority::new(0, false)),
3981             Priority::try_from(b"u=0".as_slice())
3982         );
3983         assert_eq!(
3984             Ok(Priority::new(3, false)),
3985             Priority::try_from(b"u=3".as_slice())
3986         );
3987         assert_eq!(
3988             Ok(Priority::new(7, false)),
3989             Priority::try_from(b"u=7".as_slice())
3990         );
3991 
3992         assert_eq!(
3993             Ok(Priority::new(0, true)),
3994             Priority::try_from(b"u=0, i".as_slice())
3995         );
3996         assert_eq!(
3997             Ok(Priority::new(3, true)),
3998             Priority::try_from(b"u=3, i".as_slice())
3999         );
4000         assert_eq!(
4001             Ok(Priority::new(7, true)),
4002             Priority::try_from(b"u=7, i".as_slice())
4003         );
4004 
4005         assert_eq!(
4006             Ok(Priority::new(0, true)),
4007             Priority::try_from(b"u=0, i=?1".as_slice())
4008         );
4009         assert_eq!(
4010             Ok(Priority::new(3, true)),
4011             Priority::try_from(b"u=3, i=?1".as_slice())
4012         );
4013         assert_eq!(
4014             Ok(Priority::new(7, true)),
4015             Priority::try_from(b"u=7, i=?1".as_slice())
4016         );
4017 
4018         assert_eq!(
4019             Ok(Priority::new(3, false)),
4020             Priority::try_from(b"".as_slice())
4021         );
4022 
4023         assert_eq!(
4024             Ok(Priority::new(0, true)),
4025             Priority::try_from(b"u=0;foo, i;bar".as_slice())
4026         );
4027         assert_eq!(
4028             Ok(Priority::new(3, true)),
4029             Priority::try_from(b"u=3;hello, i;world".as_slice())
4030         );
4031         assert_eq!(
4032             Ok(Priority::new(7, true)),
4033             Priority::try_from(b"u=7;croeso, i;gymru".as_slice())
4034         );
4035 
4036         assert_eq!(
4037             Ok(Priority::new(0, true)),
4038             Priority::try_from(b"u=0, i, spinaltap=11".as_slice())
4039         );
4040 
4041         // Illegal formats
4042         assert_eq!(Err(Error::Done), Priority::try_from(b"0".as_slice()));
4043         assert_eq!(
4044             Ok(Priority::new(7, false)),
4045             Priority::try_from(b"u=-1".as_slice())
4046         );
4047         assert_eq!(Err(Error::Done), Priority::try_from(b"u=0.2".as_slice()));
4048         assert_eq!(
4049             Ok(Priority::new(7, false)),
4050             Priority::try_from(b"u=100".as_slice())
4051         );
4052         assert_eq!(
4053             Err(Error::Done),
4054             Priority::try_from(b"u=3, i=true".as_slice())
4055         );
4056 
4057         // Trailing comma in dict is malformed
4058         assert_eq!(Err(Error::Done), Priority::try_from(b"u=7, ".as_slice()));
4059     }
4060 
4061     #[test]
4062     /// Send a PRIORITY_UPDATE for request stream from the client.
priority_update_request()4063     fn priority_update_request() {
4064         let mut s = Session::new().unwrap();
4065         s.handshake().unwrap();
4066 
4067         s.client
4068             .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4069                 urgency: 3,
4070                 incremental: false,
4071             })
4072             .unwrap();
4073         s.advance().ok();
4074 
4075         assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4076         assert_eq!(s.poll_server(), Err(Error::Done));
4077     }
4078 
4079     #[test]
4080     /// Send a PRIORITY_UPDATE for request stream from the client.
priority_update_single_stream_rearm()4081     fn priority_update_single_stream_rearm() {
4082         let mut s = Session::new().unwrap();
4083         s.handshake().unwrap();
4084 
4085         s.client
4086             .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4087                 urgency: 3,
4088                 incremental: false,
4089             })
4090             .unwrap();
4091         s.advance().ok();
4092 
4093         assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4094         assert_eq!(s.poll_server(), Err(Error::Done));
4095 
4096         s.client
4097             .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4098                 urgency: 5,
4099                 incremental: false,
4100             })
4101             .unwrap();
4102         s.advance().ok();
4103 
4104         assert_eq!(s.poll_server(), Err(Error::Done));
4105 
4106         // There is only one PRIORITY_UPDATE frame to read. Once read, the event
4107         // will rearm ready for more.
4108         assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=5".to_vec()));
4109         assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4110 
4111         s.client
4112             .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4113                 urgency: 7,
4114                 incremental: false,
4115             })
4116             .unwrap();
4117         s.advance().ok();
4118 
4119         assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4120         assert_eq!(s.poll_server(), Err(Error::Done));
4121 
4122         assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=7".to_vec()));
4123         assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4124     }
4125 
4126     #[test]
4127     /// Send multiple PRIORITY_UPDATE frames for different streams from the
4128     /// client across multiple flights of exchange.
priority_update_request_multiple_stream_arm_multiple_flights()4129     fn priority_update_request_multiple_stream_arm_multiple_flights() {
4130         let mut s = Session::new().unwrap();
4131         s.handshake().unwrap();
4132 
4133         s.client
4134             .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4135                 urgency: 3,
4136                 incremental: false,
4137             })
4138             .unwrap();
4139         s.advance().ok();
4140 
4141         assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4142         assert_eq!(s.poll_server(), Err(Error::Done));
4143 
4144         s.client
4145             .send_priority_update_for_request(&mut s.pipe.client, 4, &Priority {
4146                 urgency: 1,
4147                 incremental: false,
4148             })
4149             .unwrap();
4150         s.advance().ok();
4151 
4152         assert_eq!(s.poll_server(), Ok((4, Event::PriorityUpdate)));
4153         assert_eq!(s.poll_server(), Err(Error::Done));
4154 
4155         s.client
4156             .send_priority_update_for_request(&mut s.pipe.client, 8, &Priority {
4157                 urgency: 2,
4158                 incremental: false,
4159             })
4160             .unwrap();
4161         s.advance().ok();
4162 
4163         assert_eq!(s.poll_server(), Ok((8, Event::PriorityUpdate)));
4164         assert_eq!(s.poll_server(), Err(Error::Done));
4165 
4166         assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
4167         assert_eq!(s.server.take_last_priority_update(4), Ok(b"u=1".to_vec()));
4168         assert_eq!(s.server.take_last_priority_update(8), Ok(b"u=2".to_vec()));
4169         assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4170     }
4171 
4172     #[test]
4173     /// Send multiple PRIORITY_UPDATE frames for different streams from the
4174     /// client across a single flight.
priority_update_request_multiple_stream_arm_single_flight()4175     fn priority_update_request_multiple_stream_arm_single_flight() {
4176         let mut s = Session::new().unwrap();
4177         s.handshake().unwrap();
4178 
4179         let mut d = [42; 65535];
4180 
4181         let mut b = octets::OctetsMut::with_slice(&mut d);
4182 
4183         let p1 = frame::Frame::PriorityUpdateRequest {
4184             prioritized_element_id: 0,
4185             priority_field_value: b"u=3".to_vec(),
4186         };
4187 
4188         let p2 = frame::Frame::PriorityUpdateRequest {
4189             prioritized_element_id: 4,
4190             priority_field_value: b"u=3".to_vec(),
4191         };
4192 
4193         let p3 = frame::Frame::PriorityUpdateRequest {
4194             prioritized_element_id: 8,
4195             priority_field_value: b"u=3".to_vec(),
4196         };
4197 
4198         p1.to_bytes(&mut b).unwrap();
4199         p2.to_bytes(&mut b).unwrap();
4200         p3.to_bytes(&mut b).unwrap();
4201 
4202         let off = b.off();
4203         s.pipe
4204             .client
4205             .stream_send(s.client.control_stream_id.unwrap(), &d[..off], false)
4206             .unwrap();
4207 
4208         s.advance().ok();
4209 
4210         assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4211         assert_eq!(s.poll_server(), Ok((4, Event::PriorityUpdate)));
4212         assert_eq!(s.poll_server(), Ok((8, Event::PriorityUpdate)));
4213         assert_eq!(s.poll_server(), Err(Error::Done));
4214 
4215         assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
4216         assert_eq!(s.server.take_last_priority_update(4), Ok(b"u=3".to_vec()));
4217         assert_eq!(s.server.take_last_priority_update(8), Ok(b"u=3".to_vec()));
4218 
4219         assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4220     }
4221 
4222     #[test]
4223     /// Send a PRIORITY_UPDATE for a request stream, before and after the stream
4224     /// has been completed.
priority_update_request_collected_completed()4225     fn priority_update_request_collected_completed() {
4226         let mut s = Session::new().unwrap();
4227         s.handshake().unwrap();
4228 
4229         s.client
4230             .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4231                 urgency: 3,
4232                 incremental: false,
4233             })
4234             .unwrap();
4235         s.advance().ok();
4236 
4237         let (stream, req) = s.send_request(true).unwrap();
4238         let ev_headers = Event::Headers {
4239             list: req,
4240             has_body: false,
4241         };
4242 
4243         // Priority event is generated before request headers.
4244         assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4245         assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4246         assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4247         assert_eq!(s.poll_server(), Err(Error::Done));
4248 
4249         assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
4250         assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4251 
4252         let resp = s.send_response(stream, true).unwrap();
4253 
4254         let ev_headers = Event::Headers {
4255             list: resp,
4256             has_body: false,
4257         };
4258 
4259         assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
4260         assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
4261         assert_eq!(s.poll_client(), Err(Error::Done));
4262 
4263         // Now send a PRIORITY_UPDATE for the completed request stream.
4264         s.client
4265             .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4266                 urgency: 3,
4267                 incremental: false,
4268             })
4269             .unwrap();
4270         s.advance().ok();
4271 
4272         // No event generated at server
4273         assert_eq!(s.poll_server(), Err(Error::Done));
4274     }
4275 
4276     #[test]
4277     /// Send a PRIORITY_UPDATE for a request stream, before and after the stream
4278     /// has been stopped.
priority_update_request_collected_stopped()4279     fn priority_update_request_collected_stopped() {
4280         let mut s = Session::new().unwrap();
4281         s.handshake().unwrap();
4282 
4283         s.client
4284             .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4285                 urgency: 3,
4286                 incremental: false,
4287             })
4288             .unwrap();
4289         s.advance().ok();
4290 
4291         let (stream, req) = s.send_request(false).unwrap();
4292         let ev_headers = Event::Headers {
4293             list: req,
4294             has_body: true,
4295         };
4296 
4297         // Priority event is generated before request headers.
4298         assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4299         assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4300         assert_eq!(s.poll_server(), Err(Error::Done));
4301 
4302         assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
4303         assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4304 
4305         s.pipe
4306             .client
4307             .stream_shutdown(stream, crate::Shutdown::Write, 0x100)
4308             .unwrap();
4309         s.pipe
4310             .client
4311             .stream_shutdown(stream, crate::Shutdown::Read, 0x100)
4312             .unwrap();
4313 
4314         s.advance().ok();
4315 
4316         assert_eq!(s.poll_server(), Ok((0, Event::Reset(0x100))));
4317         assert_eq!(s.poll_server(), Err(Error::Done));
4318 
4319         // Now send a PRIORITY_UPDATE for the closed request stream.
4320         s.client
4321             .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4322                 urgency: 3,
4323                 incremental: false,
4324             })
4325             .unwrap();
4326         s.advance().ok();
4327 
4328         // No event generated at server
4329         assert_eq!(s.poll_server(), Err(Error::Done));
4330     }
4331 
4332     #[test]
4333     /// Send a PRIORITY_UPDATE for push stream from the client.
priority_update_push()4334     fn priority_update_push() {
4335         let mut s = Session::new().unwrap();
4336         s.handshake().unwrap();
4337 
4338         s.send_frame_client(
4339             frame::Frame::PriorityUpdatePush {
4340                 prioritized_element_id: 3,
4341                 priority_field_value: b"u=3".to_vec(),
4342             },
4343             s.client.control_stream_id.unwrap(),
4344             false,
4345         )
4346         .unwrap();
4347 
4348         assert_eq!(s.poll_server(), Err(Error::Done));
4349     }
4350 
4351     #[test]
4352     /// Send a PRIORITY_UPDATE for request stream from the client but for an
4353     /// incorrect stream type.
priority_update_request_bad_stream()4354     fn priority_update_request_bad_stream() {
4355         let mut s = Session::new().unwrap();
4356         s.handshake().unwrap();
4357 
4358         s.send_frame_client(
4359             frame::Frame::PriorityUpdateRequest {
4360                 prioritized_element_id: 5,
4361                 priority_field_value: b"u=3".to_vec(),
4362             },
4363             s.client.control_stream_id.unwrap(),
4364             false,
4365         )
4366         .unwrap();
4367 
4368         assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
4369     }
4370 
4371     #[test]
4372     /// Send a PRIORITY_UPDATE for push stream from the client but for an
4373     /// incorrect stream type.
priority_update_push_bad_stream()4374     fn priority_update_push_bad_stream() {
4375         let mut s = Session::new().unwrap();
4376         s.handshake().unwrap();
4377 
4378         s.send_frame_client(
4379             frame::Frame::PriorityUpdatePush {
4380                 prioritized_element_id: 5,
4381                 priority_field_value: b"u=3".to_vec(),
4382             },
4383             s.client.control_stream_id.unwrap(),
4384             false,
4385         )
4386         .unwrap();
4387 
4388         assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
4389     }
4390 
4391     #[test]
4392     /// Send a PRIORITY_UPDATE for request stream from the server.
priority_update_request_from_server()4393     fn priority_update_request_from_server() {
4394         let mut s = Session::new().unwrap();
4395         s.handshake().unwrap();
4396 
4397         s.send_frame_server(
4398             frame::Frame::PriorityUpdateRequest {
4399                 prioritized_element_id: 0,
4400                 priority_field_value: b"u=3".to_vec(),
4401             },
4402             s.server.control_stream_id.unwrap(),
4403             false,
4404         )
4405         .unwrap();
4406 
4407         assert_eq!(s.poll_client(), Err(Error::FrameUnexpected));
4408     }
4409 
4410     #[test]
4411     /// Send a PRIORITY_UPDATE for request stream from the server.
priority_update_push_from_server()4412     fn priority_update_push_from_server() {
4413         let mut s = Session::new().unwrap();
4414         s.handshake().unwrap();
4415 
4416         s.send_frame_server(
4417             frame::Frame::PriorityUpdatePush {
4418                 prioritized_element_id: 0,
4419                 priority_field_value: b"u=3".to_vec(),
4420             },
4421             s.server.control_stream_id.unwrap(),
4422             false,
4423         )
4424         .unwrap();
4425 
4426         assert_eq!(s.poll_client(), Err(Error::FrameUnexpected));
4427     }
4428 
4429     #[test]
4430     /// Ensure quiche allocates streams for client and server roles as expected.
uni_stream_local_counting()4431     fn uni_stream_local_counting() {
4432         let config = Config::new().unwrap();
4433 
4434         let h3_cln = Connection::new(&config, false, false).unwrap();
4435         assert_eq!(h3_cln.next_uni_stream_id, 2);
4436 
4437         let h3_srv = Connection::new(&config, true, false).unwrap();
4438         assert_eq!(h3_srv.next_uni_stream_id, 3);
4439     }
4440 
4441     #[test]
4442     /// Client opens multiple control streams, which is forbidden.
open_multiple_control_streams()4443     fn open_multiple_control_streams() {
4444         let mut s = Session::new().unwrap();
4445         s.handshake().unwrap();
4446 
4447         let stream_id = s.client.next_uni_stream_id;
4448 
4449         let mut d = [42; 8];
4450         let mut b = octets::OctetsMut::with_slice(&mut d);
4451 
4452         s.pipe
4453             .client
4454             .stream_send(
4455                 stream_id,
4456                 b.put_varint(stream::HTTP3_CONTROL_STREAM_TYPE_ID).unwrap(),
4457                 false,
4458             )
4459             .unwrap();
4460 
4461         s.advance().ok();
4462 
4463         assert_eq!(s.poll_server(), Err(Error::StreamCreationError));
4464     }
4465 
4466     #[test]
4467     /// Client closes the control stream, which is forbidden.
close_control_stream()4468     fn close_control_stream() {
4469         let mut s = Session::new().unwrap();
4470         s.handshake().unwrap();
4471 
4472         let mut control_stream_closed = false;
4473 
4474         s.send_frame_client(
4475             frame::Frame::MaxPushId { push_id: 1 },
4476             s.client.control_stream_id.unwrap(),
4477             true,
4478         )
4479         .unwrap();
4480 
4481         loop {
4482             match s.server.poll(&mut s.pipe.server) {
4483                 Ok(_) => (),
4484 
4485                 Err(Error::Done) => {
4486                     break;
4487                 },
4488 
4489                 Err(Error::ClosedCriticalStream) => {
4490                     control_stream_closed = true;
4491                     break;
4492                 },
4493 
4494                 Err(_) => (),
4495             }
4496         }
4497 
4498         assert!(control_stream_closed);
4499     }
4500 
4501     #[test]
4502     /// Client closes QPACK stream, which is forbidden.
close_qpack_stream()4503     fn close_qpack_stream() {
4504         let mut s = Session::new().unwrap();
4505         s.handshake().unwrap();
4506 
4507         let mut qpack_stream_closed = false;
4508 
4509         let stream_id = s.client.local_qpack_streams.encoder_stream_id.unwrap();
4510         let d = [0; 1];
4511 
4512         s.pipe.client.stream_send(stream_id, &d, false).unwrap();
4513         s.pipe.client.stream_send(stream_id, &d, true).unwrap();
4514 
4515         s.advance().ok();
4516 
4517         loop {
4518             match s.server.poll(&mut s.pipe.server) {
4519                 Ok(_) => (),
4520 
4521                 Err(Error::Done) => {
4522                     break;
4523                 },
4524 
4525                 Err(Error::ClosedCriticalStream) => {
4526                     qpack_stream_closed = true;
4527                     break;
4528                 },
4529 
4530                 Err(_) => (),
4531             }
4532         }
4533 
4534         assert!(qpack_stream_closed);
4535     }
4536 
4537     #[test]
4538     /// Client sends QPACK data.
qpack_data()4539     fn qpack_data() {
4540         // TODO: QPACK instructions are ignored until dynamic table support is
4541         // added so we just test that the data is safely ignored.
4542         let mut s = Session::new().unwrap();
4543         s.handshake().unwrap();
4544 
4545         let e_stream_id = s.client.local_qpack_streams.encoder_stream_id.unwrap();
4546         let d_stream_id = s.client.local_qpack_streams.decoder_stream_id.unwrap();
4547         let d = [0; 20];
4548 
4549         s.pipe.client.stream_send(e_stream_id, &d, false).unwrap();
4550         s.advance().ok();
4551 
4552         s.pipe.client.stream_send(d_stream_id, &d, false).unwrap();
4553         s.advance().ok();
4554 
4555         loop {
4556             match s.server.poll(&mut s.pipe.server) {
4557                 Ok(_) => (),
4558 
4559                 Err(Error::Done) => {
4560                     break;
4561                 },
4562 
4563                 Err(_) => {
4564                     panic!();
4565                 },
4566             }
4567         }
4568     }
4569 
4570     #[test]
4571     /// Tests limits for the stream state buffer maximum size.
max_state_buf_size()4572     fn max_state_buf_size() {
4573         let mut s = Session::new().unwrap();
4574         s.handshake().unwrap();
4575 
4576         let req = vec![
4577             Header::new(b":method", b"GET"),
4578             Header::new(b":scheme", b"https"),
4579             Header::new(b":authority", b"quic.tech"),
4580             Header::new(b":path", b"/test"),
4581             Header::new(b"user-agent", b"quiche-test"),
4582         ];
4583 
4584         assert_eq!(
4585             s.client.send_request(&mut s.pipe.client, &req, false),
4586             Ok(0)
4587         );
4588 
4589         s.advance().ok();
4590 
4591         let ev_headers = Event::Headers {
4592             list: req,
4593             has_body: true,
4594         };
4595 
4596         assert_eq!(s.server.poll(&mut s.pipe.server), Ok((0, ev_headers)));
4597 
4598         // DATA frames don't consume the state buffer, so can be of any size.
4599         let mut d = [42; 128];
4600         let mut b = octets::OctetsMut::with_slice(&mut d);
4601 
4602         let frame_type = b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
4603         s.pipe.client.stream_send(0, frame_type, false).unwrap();
4604 
4605         let frame_len = b.put_varint(1 << 24).unwrap();
4606         s.pipe.client.stream_send(0, frame_len, false).unwrap();
4607 
4608         s.pipe.client.stream_send(0, &d, false).unwrap();
4609 
4610         s.advance().ok();
4611 
4612         assert_eq!(s.server.poll(&mut s.pipe.server), Ok((0, Event::Data)));
4613 
4614         // GREASE frames consume the state buffer, so need to be limited.
4615         let mut s = Session::new().unwrap();
4616         s.handshake().unwrap();
4617 
4618         let mut d = [42; 128];
4619         let mut b = octets::OctetsMut::with_slice(&mut d);
4620 
4621         let frame_type = b.put_varint(148_764_065_110_560_899).unwrap();
4622         s.pipe.client.stream_send(0, frame_type, false).unwrap();
4623 
4624         let frame_len = b.put_varint(1 << 24).unwrap();
4625         s.pipe.client.stream_send(0, frame_len, false).unwrap();
4626 
4627         s.pipe.client.stream_send(0, &d, false).unwrap();
4628 
4629         s.advance().ok();
4630 
4631         assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::ExcessiveLoad));
4632     }
4633 
4634     #[test]
4635     /// Tests that DATA frames are properly truncated depending on the request
4636     /// stream's outgoing flow control capacity.
stream_backpressure()4637     fn stream_backpressure() {
4638         let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
4639 
4640         let mut s = Session::new().unwrap();
4641         s.handshake().unwrap();
4642 
4643         let (stream, req) = s.send_request(false).unwrap();
4644 
4645         let total_data_frames = 6;
4646 
4647         for _ in 0..total_data_frames {
4648             assert_eq!(
4649                 s.client
4650                     .send_body(&mut s.pipe.client, stream, &bytes, false),
4651                 Ok(bytes.len())
4652             );
4653 
4654             s.advance().ok();
4655         }
4656 
4657         assert_eq!(
4658             s.client.send_body(&mut s.pipe.client, stream, &bytes, true),
4659             Ok(bytes.len() - 2)
4660         );
4661 
4662         s.advance().ok();
4663 
4664         let mut recv_buf = vec![0; bytes.len()];
4665 
4666         let ev_headers = Event::Headers {
4667             list: req,
4668             has_body: true,
4669         };
4670 
4671         assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4672         assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
4673         assert_eq!(s.poll_server(), Err(Error::Done));
4674 
4675         for _ in 0..total_data_frames {
4676             assert_eq!(
4677                 s.recv_body_server(stream, &mut recv_buf),
4678                 Ok(bytes.len())
4679             );
4680         }
4681 
4682         assert_eq!(
4683             s.recv_body_server(stream, &mut recv_buf),
4684             Ok(bytes.len() - 2)
4685         );
4686 
4687         // Fin flag from last send_body() call was not sent as the buffer was
4688         // only partially written.
4689         assert_eq!(s.poll_server(), Err(Error::Done));
4690     }
4691 
4692     #[test]
4693     /// Tests that the max header list size setting is enforced.
request_max_header_size_limit()4694     fn request_max_header_size_limit() {
4695         let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
4696         config
4697             .load_cert_chain_from_pem_file("examples/cert.crt")
4698             .unwrap();
4699         config
4700             .load_priv_key_from_pem_file("examples/cert.key")
4701             .unwrap();
4702         config.set_application_protos(&[b"h3"]).unwrap();
4703         config.set_initial_max_data(1500);
4704         config.set_initial_max_stream_data_bidi_local(150);
4705         config.set_initial_max_stream_data_bidi_remote(150);
4706         config.set_initial_max_stream_data_uni(150);
4707         config.set_initial_max_streams_bidi(5);
4708         config.set_initial_max_streams_uni(5);
4709         config.verify_peer(false);
4710 
4711         let mut h3_config = Config::new().unwrap();
4712         h3_config.set_max_field_section_size(65);
4713 
4714         let mut s = Session::with_configs(&mut config, &mut h3_config).unwrap();
4715 
4716         s.handshake().unwrap();
4717 
4718         let req = vec![
4719             Header::new(b":method", b"GET"),
4720             Header::new(b":scheme", b"https"),
4721             Header::new(b":authority", b"quic.tech"),
4722             Header::new(b":path", b"/test"),
4723             Header::new(b"aaaaaaa", b"aaaaaaaa"),
4724         ];
4725 
4726         let stream = s
4727             .client
4728             .send_request(&mut s.pipe.client, &req, true)
4729             .unwrap();
4730 
4731         s.advance().ok();
4732 
4733         assert_eq!(stream, 0);
4734 
4735         assert_eq!(s.poll_server(), Err(Error::ExcessiveLoad));
4736 
4737         assert_eq!(
4738             s.pipe.server.local_error.as_ref().unwrap().error_code,
4739             Error::to_wire(Error::ExcessiveLoad)
4740         );
4741     }
4742 
4743     #[test]
4744     /// Tests that Error::TransportError contains a transport error.
transport_error()4745     fn transport_error() {
4746         let mut s = Session::new().unwrap();
4747         s.handshake().unwrap();
4748 
4749         let req = vec![
4750             Header::new(b":method", b"GET"),
4751             Header::new(b":scheme", b"https"),
4752             Header::new(b":authority", b"quic.tech"),
4753             Header::new(b":path", b"/test"),
4754             Header::new(b"user-agent", b"quiche-test"),
4755         ];
4756 
4757         // We need to open all streams in the same flight, so we can't use the
4758         // Session::send_request() method because it also calls advance(),
4759         // otherwise the server would send a MAX_STREAMS frame and the client
4760         // wouldn't hit the streams limit.
4761         assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(0));
4762         assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(4));
4763         assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(8));
4764         assert_eq!(
4765             s.client.send_request(&mut s.pipe.client, &req, true),
4766             Ok(12)
4767         );
4768         assert_eq!(
4769             s.client.send_request(&mut s.pipe.client, &req, true),
4770             Ok(16)
4771         );
4772 
4773         assert_eq!(
4774             s.client.send_request(&mut s.pipe.client, &req, true),
4775             Err(Error::TransportError(crate::Error::StreamLimit))
4776         );
4777     }
4778 
4779     #[test]
4780     /// Tests that sending DATA before HEADERS causes an error.
data_before_headers()4781     fn data_before_headers() {
4782         let mut s = Session::new().unwrap();
4783         s.handshake().unwrap();
4784 
4785         let mut d = [42; 128];
4786         let mut b = octets::OctetsMut::with_slice(&mut d);
4787 
4788         let frame_type = b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
4789         s.pipe.client.stream_send(0, frame_type, false).unwrap();
4790 
4791         let frame_len = b.put_varint(5).unwrap();
4792         s.pipe.client.stream_send(0, frame_len, false).unwrap();
4793 
4794         s.pipe.client.stream_send(0, b"hello", false).unwrap();
4795 
4796         s.advance().ok();
4797 
4798         assert_eq!(
4799             s.server.poll(&mut s.pipe.server),
4800             Err(Error::FrameUnexpected)
4801         );
4802     }
4803 
4804     #[test]
4805     /// Tests that calling poll() after an error occurred does nothing.
poll_after_error()4806     fn poll_after_error() {
4807         let mut s = Session::new().unwrap();
4808         s.handshake().unwrap();
4809 
4810         let mut d = [42; 128];
4811         let mut b = octets::OctetsMut::with_slice(&mut d);
4812 
4813         let frame_type = b.put_varint(148_764_065_110_560_899).unwrap();
4814         s.pipe.client.stream_send(0, frame_type, false).unwrap();
4815 
4816         let frame_len = b.put_varint(1 << 24).unwrap();
4817         s.pipe.client.stream_send(0, frame_len, false).unwrap();
4818 
4819         s.pipe.client.stream_send(0, &d, false).unwrap();
4820 
4821         s.advance().ok();
4822 
4823         assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::ExcessiveLoad));
4824 
4825         // Try to call poll() again after an error occurred.
4826         assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::Done));
4827     }
4828 
4829     #[test]
4830     /// Tests that we limit sending HEADERS based on the stream capacity.
headers_blocked()4831     fn headers_blocked() {
4832         let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
4833         config
4834             .load_cert_chain_from_pem_file("examples/cert.crt")
4835             .unwrap();
4836         config
4837             .load_priv_key_from_pem_file("examples/cert.key")
4838             .unwrap();
4839         config.set_application_protos(&[b"h3"]).unwrap();
4840         config.set_initial_max_data(70);
4841         config.set_initial_max_stream_data_bidi_local(150);
4842         config.set_initial_max_stream_data_bidi_remote(150);
4843         config.set_initial_max_stream_data_uni(150);
4844         config.set_initial_max_streams_bidi(100);
4845         config.set_initial_max_streams_uni(5);
4846         config.verify_peer(false);
4847 
4848         let mut h3_config = Config::new().unwrap();
4849 
4850         let mut s = Session::with_configs(&mut config, &mut h3_config).unwrap();
4851 
4852         s.handshake().unwrap();
4853 
4854         let req = vec![
4855             Header::new(b":method", b"GET"),
4856             Header::new(b":scheme", b"https"),
4857             Header::new(b":authority", b"quic.tech"),
4858             Header::new(b":path", b"/test"),
4859         ];
4860 
4861         assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(0));
4862 
4863         assert_eq!(
4864             s.client.send_request(&mut s.pipe.client, &req, true),
4865             Err(Error::StreamBlocked)
4866         );
4867 
4868         // Clear the writable stream queue.
4869         assert_eq!(s.pipe.client.stream_writable_next(), Some(10));
4870         assert_eq!(s.pipe.client.stream_writable_next(), Some(2));
4871         assert_eq!(s.pipe.client.stream_writable_next(), Some(6));
4872         assert_eq!(s.pipe.client.stream_writable_next(), None);
4873 
4874         s.advance().ok();
4875 
4876         // Once the server gives flow control credits back, we can send the
4877         // request.
4878         assert_eq!(s.pipe.client.stream_writable_next(), Some(4));
4879         assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(4));
4880     }
4881 
4882     #[test]
4883     /// Ensure StreamBlocked when connection flow control prevents headers.
headers_blocked_on_conn()4884     fn headers_blocked_on_conn() {
4885         let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
4886         config
4887             .load_cert_chain_from_pem_file("examples/cert.crt")
4888             .unwrap();
4889         config
4890             .load_priv_key_from_pem_file("examples/cert.key")
4891             .unwrap();
4892         config.set_application_protos(&[b"h3"]).unwrap();
4893         config.set_initial_max_data(70);
4894         config.set_initial_max_stream_data_bidi_local(150);
4895         config.set_initial_max_stream_data_bidi_remote(150);
4896         config.set_initial_max_stream_data_uni(150);
4897         config.set_initial_max_streams_bidi(100);
4898         config.set_initial_max_streams_uni(5);
4899         config.verify_peer(false);
4900 
4901         let mut h3_config = Config::new().unwrap();
4902 
4903         let mut s = Session::with_configs(&mut config, &mut h3_config).unwrap();
4904 
4905         s.handshake().unwrap();
4906 
4907         // After the HTTP handshake, some bytes of connection flow control have
4908         // been consumed. Fill the connection with more grease data on the control
4909         // stream.
4910         let d = [42; 28];
4911         assert_eq!(s.pipe.client.stream_send(2, &d, false), Ok(23));
4912 
4913         let req = vec![
4914             Header::new(b":method", b"GET"),
4915             Header::new(b":scheme", b"https"),
4916             Header::new(b":authority", b"quic.tech"),
4917             Header::new(b":path", b"/test"),
4918         ];
4919 
4920         // There is 0 connection-level flow control, so sending a request is
4921         // blocked.
4922         assert_eq!(
4923             s.client.send_request(&mut s.pipe.client, &req, true),
4924             Err(Error::StreamBlocked)
4925         );
4926         assert_eq!(s.pipe.client.stream_writable_next(), None);
4927 
4928         // Emit the control stream data and drain it at the server via poll() to
4929         // consumes it via poll() and gives back flow control.
4930         s.advance().ok();
4931         assert_eq!(s.poll_server(), Err(Error::Done));
4932         s.advance().ok();
4933 
4934         // Now we can send the request.
4935         assert_eq!(s.pipe.client.stream_writable_next(), Some(0));
4936         assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(0));
4937     }
4938 
4939     #[test]
4940     /// Ensure STREAM_DATA_BLOCKED is not emitted multiple times with the same
4941     /// offset when trying to send large bodies.
send_body_truncation_stream_blocked()4942     fn send_body_truncation_stream_blocked() {
4943         use crate::testing::decode_pkt;
4944 
4945         let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
4946         config
4947             .load_cert_chain_from_pem_file("examples/cert.crt")
4948             .unwrap();
4949         config
4950             .load_priv_key_from_pem_file("examples/cert.key")
4951             .unwrap();
4952         config.set_application_protos(&[b"h3"]).unwrap();
4953         config.set_initial_max_data(10000); // large connection-level flow control
4954         config.set_initial_max_stream_data_bidi_local(80);
4955         config.set_initial_max_stream_data_bidi_remote(80);
4956         config.set_initial_max_stream_data_uni(150);
4957         config.set_initial_max_streams_bidi(100);
4958         config.set_initial_max_streams_uni(5);
4959         config.verify_peer(false);
4960 
4961         let mut h3_config = Config::new().unwrap();
4962 
4963         let mut s = Session::with_configs(&mut config, &mut h3_config).unwrap();
4964 
4965         s.handshake().unwrap();
4966 
4967         let (stream, req) = s.send_request(true).unwrap();
4968 
4969         let ev_headers = Event::Headers {
4970             list: req,
4971             has_body: false,
4972         };
4973 
4974         assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4975         assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4976 
4977         let _ = s.send_response(stream, false).unwrap();
4978 
4979         assert_eq!(s.pipe.server.streams.blocked().len(), 0);
4980 
4981         // The body must be larger than the stream window would allow
4982         let d = [42; 500];
4983         let mut off = 0;
4984 
4985         let sent = s
4986             .server
4987             .send_body(&mut s.pipe.server, stream, &d, true)
4988             .unwrap();
4989         assert_eq!(sent, 25);
4990         off += sent;
4991 
4992         // send_body wrote as much as it could (sent < size of buff).
4993         assert_eq!(s.pipe.server.streams.blocked().len(), 1);
4994         assert_eq!(
4995             s.server
4996                 .send_body(&mut s.pipe.server, stream, &d[off..], true),
4997             Err(Error::Done)
4998         );
4999         assert_eq!(s.pipe.server.streams.blocked().len(), 1);
5000 
5001         // Now read raw frames to see what the QUIC layer did
5002         let mut buf = [0; 65535];
5003         let (len, _) = s.pipe.server.send(&mut buf).unwrap();
5004 
5005         let frames = decode_pkt(&mut s.pipe.client, &mut buf, len).unwrap();
5006 
5007         let mut iter = frames.iter();
5008 
5009         assert_eq!(
5010             iter.next(),
5011             Some(&crate::frame::Frame::StreamDataBlocked {
5012                 stream_id: 0,
5013                 limit: 80,
5014             })
5015         );
5016 
5017         // At the server, after sending the STREAM_DATA_BLOCKED frame, we clear
5018         // the mark.
5019         assert_eq!(s.pipe.server.streams.blocked().len(), 0);
5020 
5021         // Don't read any data from the client, so stream flow control is never
5022         // given back in the form of changing the stream's max offset.
5023         // Subsequent body send operations will still fail but no more
5024         // STREAM_DATA_BLOCKED frames should be submitted since the limit didn't
5025         // change. No frames means no packet to send.
5026         assert_eq!(
5027             s.server
5028                 .send_body(&mut s.pipe.server, stream, &d[off..], true),
5029             Err(Error::Done)
5030         );
5031         assert_eq!(s.pipe.server.streams.blocked().len(), 0);
5032         assert_eq!(s.pipe.server.send(&mut buf), Err(crate::Error::Done));
5033 
5034         // Now update the client's max offset manually.
5035         let frames = [crate::frame::Frame::MaxStreamData {
5036             stream_id: 0,
5037             max: 100,
5038         }];
5039 
5040         let pkt_type = crate::packet::Type::Short;
5041         assert_eq!(
5042             s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
5043             Ok(39),
5044         );
5045 
5046         let sent = s
5047             .server
5048             .send_body(&mut s.pipe.server, stream, &d[off..], true)
5049             .unwrap();
5050         assert_eq!(sent, 18);
5051 
5052         // Same thing here...
5053         assert_eq!(s.pipe.server.streams.blocked().len(), 1);
5054         assert_eq!(
5055             s.server
5056                 .send_body(&mut s.pipe.server, stream, &d[off..], true),
5057             Err(Error::Done)
5058         );
5059         assert_eq!(s.pipe.server.streams.blocked().len(), 1);
5060 
5061         let (len, _) = s.pipe.server.send(&mut buf).unwrap();
5062 
5063         let frames = decode_pkt(&mut s.pipe.client, &mut buf, len).unwrap();
5064 
5065         let mut iter = frames.iter();
5066 
5067         assert_eq!(
5068             iter.next(),
5069             Some(&crate::frame::Frame::StreamDataBlocked {
5070                 stream_id: 0,
5071                 limit: 100,
5072             })
5073         );
5074     }
5075 
5076     #[test]
5077     /// Ensure stream doesn't hang due to small cwnd.
send_body_stream_blocked_by_small_cwnd()5078     fn send_body_stream_blocked_by_small_cwnd() {
5079         let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5080         config
5081             .load_cert_chain_from_pem_file("examples/cert.crt")
5082             .unwrap();
5083         config
5084             .load_priv_key_from_pem_file("examples/cert.key")
5085             .unwrap();
5086         config.set_application_protos(&[b"h3"]).unwrap();
5087         config.set_initial_max_data(100000); // large connection-level flow control
5088         config.set_initial_max_stream_data_bidi_local(100000);
5089         config.set_initial_max_stream_data_bidi_remote(50000);
5090         config.set_initial_max_stream_data_uni(150);
5091         config.set_initial_max_streams_bidi(100);
5092         config.set_initial_max_streams_uni(5);
5093         config.verify_peer(false);
5094 
5095         let mut h3_config = Config::new().unwrap();
5096 
5097         let mut s = Session::with_configs(&mut config, &mut h3_config).unwrap();
5098 
5099         s.handshake().unwrap();
5100 
5101         let (stream, req) = s.send_request(true).unwrap();
5102 
5103         let ev_headers = Event::Headers {
5104             list: req,
5105             has_body: false,
5106         };
5107 
5108         assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5109         assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
5110 
5111         let _ = s.send_response(stream, false).unwrap();
5112 
5113         // Clear the writable stream queue.
5114         assert_eq!(s.pipe.server.stream_writable_next(), Some(stream));
5115         assert_eq!(s.pipe.server.stream_writable_next(), Some(11));
5116         assert_eq!(s.pipe.server.stream_writable_next(), Some(3));
5117         assert_eq!(s.pipe.server.stream_writable_next(), Some(7));
5118         assert_eq!(s.pipe.server.stream_writable_next(), None);
5119 
5120         // The body must be larger than the cwnd would allow.
5121         let send_buf = [42; 80000];
5122 
5123         let sent = s
5124             .server
5125             .send_body(&mut s.pipe.server, stream, &send_buf, true)
5126             .unwrap();
5127 
5128         // send_body wrote as much as it could (sent < size of buff).
5129         assert_eq!(sent, 11995);
5130 
5131         s.advance().ok();
5132 
5133         // Client reads received headers and body.
5134         let mut recv_buf = [42; 80000];
5135         assert!(s.poll_client().is_ok());
5136         assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
5137         assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(11995));
5138 
5139         s.advance().ok();
5140 
5141         // Server send cap is smaller than remaining body buffer.
5142         assert!(s.pipe.server.tx_cap < send_buf.len() - sent);
5143 
5144         // Once the server cwnd opens up, we can send more body.
5145         assert_eq!(s.pipe.server.stream_writable_next(), Some(0));
5146     }
5147 
5148     #[test]
5149     /// Ensure stream doesn't hang due to small cwnd.
send_body_stream_blocked_zero_length()5150     fn send_body_stream_blocked_zero_length() {
5151         let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5152         config
5153             .load_cert_chain_from_pem_file("examples/cert.crt")
5154             .unwrap();
5155         config
5156             .load_priv_key_from_pem_file("examples/cert.key")
5157             .unwrap();
5158         config.set_application_protos(&[b"h3"]).unwrap();
5159         config.set_initial_max_data(100000); // large connection-level flow control
5160         config.set_initial_max_stream_data_bidi_local(100000);
5161         config.set_initial_max_stream_data_bidi_remote(50000);
5162         config.set_initial_max_stream_data_uni(150);
5163         config.set_initial_max_streams_bidi(100);
5164         config.set_initial_max_streams_uni(5);
5165         config.verify_peer(false);
5166 
5167         let mut h3_config = Config::new().unwrap();
5168 
5169         let mut s = Session::with_configs(&mut config, &mut h3_config).unwrap();
5170 
5171         s.handshake().unwrap();
5172 
5173         let (stream, req) = s.send_request(true).unwrap();
5174 
5175         let ev_headers = Event::Headers {
5176             list: req,
5177             has_body: false,
5178         };
5179 
5180         assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5181         assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
5182 
5183         let _ = s.send_response(stream, false).unwrap();
5184 
5185         // Clear the writable stream queue.
5186         assert_eq!(s.pipe.server.stream_writable_next(), Some(stream));
5187         assert_eq!(s.pipe.server.stream_writable_next(), Some(11));
5188         assert_eq!(s.pipe.server.stream_writable_next(), Some(3));
5189         assert_eq!(s.pipe.server.stream_writable_next(), Some(7));
5190         assert_eq!(s.pipe.server.stream_writable_next(), None);
5191 
5192         // The body is large enough to fill the cwnd, except for enough bytes
5193         // for another DATA frame header (but no payload).
5194         let send_buf = [42; 11994];
5195 
5196         let sent = s
5197             .server
5198             .send_body(&mut s.pipe.server, stream, &send_buf, false)
5199             .unwrap();
5200 
5201         assert_eq!(sent, 11994);
5202 
5203         // There is only enough capacity left for the DATA frame header, but
5204         // no payload.
5205         assert_eq!(s.pipe.server.stream_capacity(stream).unwrap(), 3);
5206         assert_eq!(
5207             s.server
5208                 .send_body(&mut s.pipe.server, stream, &send_buf, false),
5209             Err(Error::Done)
5210         );
5211 
5212         s.advance().ok();
5213 
5214         // Client reads received headers and body.
5215         let mut recv_buf = [42; 80000];
5216         assert!(s.poll_client().is_ok());
5217         assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
5218         assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(11994));
5219 
5220         s.advance().ok();
5221 
5222         // Once the server cwnd opens up, we can send more body.
5223         assert_eq!(s.pipe.server.stream_writable_next(), Some(0));
5224     }
5225 
5226     #[test]
5227     /// Test handling of 0-length DATA writes with and without fin.
zero_length_data()5228     fn zero_length_data() {
5229         let mut s = Session::new().unwrap();
5230         s.handshake().unwrap();
5231 
5232         let (stream, req) = s.send_request(false).unwrap();
5233 
5234         assert_eq!(
5235             s.client.send_body(&mut s.pipe.client, 0, b"", false),
5236             Err(Error::Done)
5237         );
5238         assert_eq!(s.client.send_body(&mut s.pipe.client, 0, b"", true), Ok(0));
5239 
5240         s.advance().ok();
5241 
5242         let mut recv_buf = vec![0; 100];
5243 
5244         let ev_headers = Event::Headers {
5245             list: req,
5246             has_body: true,
5247         };
5248 
5249         assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5250 
5251         assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
5252         assert_eq!(s.recv_body_server(stream, &mut recv_buf), Err(Error::Done));
5253 
5254         assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
5255         assert_eq!(s.poll_server(), Err(Error::Done));
5256 
5257         let resp = s.send_response(stream, false).unwrap();
5258 
5259         assert_eq!(
5260             s.server.send_body(&mut s.pipe.server, 0, b"", false),
5261             Err(Error::Done)
5262         );
5263         assert_eq!(s.server.send_body(&mut s.pipe.server, 0, b"", true), Ok(0));
5264 
5265         s.advance().ok();
5266 
5267         let ev_headers = Event::Headers {
5268             list: resp,
5269             has_body: true,
5270         };
5271 
5272         assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
5273 
5274         assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
5275         assert_eq!(s.recv_body_client(stream, &mut recv_buf), Err(Error::Done));
5276 
5277         assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
5278         assert_eq!(s.poll_client(), Err(Error::Done));
5279     }
5280 
5281     #[test]
5282     /// Tests that blocked 0-length DATA writes are reported correctly.
zero_length_data_blocked()5283     fn zero_length_data_blocked() {
5284         let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5285         config
5286             .load_cert_chain_from_pem_file("examples/cert.crt")
5287             .unwrap();
5288         config
5289             .load_priv_key_from_pem_file("examples/cert.key")
5290             .unwrap();
5291         config.set_application_protos(&[b"h3"]).unwrap();
5292         config.set_initial_max_data(69);
5293         config.set_initial_max_stream_data_bidi_local(150);
5294         config.set_initial_max_stream_data_bidi_remote(150);
5295         config.set_initial_max_stream_data_uni(150);
5296         config.set_initial_max_streams_bidi(100);
5297         config.set_initial_max_streams_uni(5);
5298         config.verify_peer(false);
5299 
5300         let mut h3_config = Config::new().unwrap();
5301 
5302         let mut s = Session::with_configs(&mut config, &mut h3_config).unwrap();
5303 
5304         s.handshake().unwrap();
5305 
5306         let req = vec![
5307             Header::new(b":method", b"GET"),
5308             Header::new(b":scheme", b"https"),
5309             Header::new(b":authority", b"quic.tech"),
5310             Header::new(b":path", b"/test"),
5311         ];
5312 
5313         assert_eq!(
5314             s.client.send_request(&mut s.pipe.client, &req, false),
5315             Ok(0)
5316         );
5317 
5318         assert_eq!(
5319             s.client.send_body(&mut s.pipe.client, 0, b"", true),
5320             Err(Error::Done)
5321         );
5322 
5323         // Clear the writable stream queue.
5324         assert_eq!(s.pipe.client.stream_writable_next(), Some(10));
5325         assert_eq!(s.pipe.client.stream_writable_next(), Some(2));
5326         assert_eq!(s.pipe.client.stream_writable_next(), Some(6));
5327         assert_eq!(s.pipe.client.stream_writable_next(), None);
5328 
5329         s.advance().ok();
5330 
5331         // Once the server gives flow control credits back, we can send the body.
5332         assert_eq!(s.pipe.client.stream_writable_next(), Some(0));
5333         assert_eq!(s.client.send_body(&mut s.pipe.client, 0, b"", true), Ok(0));
5334     }
5335 
5336     #[test]
5337     /// Tests that receiving an empty SETTINGS frame is handled and reported.
empty_settings()5338     fn empty_settings() {
5339         let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5340         config
5341             .load_cert_chain_from_pem_file("examples/cert.crt")
5342             .unwrap();
5343         config
5344             .load_priv_key_from_pem_file("examples/cert.key")
5345             .unwrap();
5346         config.set_application_protos(&[b"h3"]).unwrap();
5347         config.set_initial_max_data(1500);
5348         config.set_initial_max_stream_data_bidi_local(150);
5349         config.set_initial_max_stream_data_bidi_remote(150);
5350         config.set_initial_max_stream_data_uni(150);
5351         config.set_initial_max_streams_bidi(5);
5352         config.set_initial_max_streams_uni(5);
5353         config.verify_peer(false);
5354         config.set_ack_delay_exponent(8);
5355         config.grease(false);
5356 
5357         let h3_config = Config::new().unwrap();
5358         let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5359 
5360         s.handshake().unwrap();
5361 
5362         assert!(s.client.peer_settings_raw().is_some());
5363         assert!(s.server.peer_settings_raw().is_some());
5364     }
5365 
5366     #[test]
5367     /// Tests that receiving a H3_DATAGRAM setting is ok.
dgram_setting()5368     fn dgram_setting() {
5369         let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5370         config
5371             .load_cert_chain_from_pem_file("examples/cert.crt")
5372             .unwrap();
5373         config
5374             .load_priv_key_from_pem_file("examples/cert.key")
5375             .unwrap();
5376         config.set_application_protos(&[b"h3"]).unwrap();
5377         config.set_initial_max_data(70);
5378         config.set_initial_max_stream_data_bidi_local(150);
5379         config.set_initial_max_stream_data_bidi_remote(150);
5380         config.set_initial_max_stream_data_uni(150);
5381         config.set_initial_max_streams_bidi(100);
5382         config.set_initial_max_streams_uni(5);
5383         config.enable_dgram(true, 1000, 1000);
5384         config.verify_peer(false);
5385 
5386         let h3_config = Config::new().unwrap();
5387 
5388         let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5389         assert_eq!(s.pipe.handshake(), Ok(()));
5390 
5391         s.client.send_settings(&mut s.pipe.client).unwrap();
5392         assert_eq!(s.pipe.advance(), Ok(()));
5393 
5394         // Before processing SETTINGS (via poll), HTTP/3 DATAGRAMS are not
5395         // enabled.
5396         assert!(!s.server.dgram_enabled_by_peer(&s.pipe.server));
5397 
5398         // When everything is ok, poll returns Done and DATAGRAM is enabled.
5399         assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::Done));
5400         assert!(s.server.dgram_enabled_by_peer(&s.pipe.server));
5401 
5402         // Now detect things on the client
5403         s.server.send_settings(&mut s.pipe.server).unwrap();
5404         assert_eq!(s.pipe.advance(), Ok(()));
5405         assert!(!s.client.dgram_enabled_by_peer(&s.pipe.client));
5406         assert_eq!(s.client.poll(&mut s.pipe.client), Err(Error::Done));
5407         assert!(s.client.dgram_enabled_by_peer(&s.pipe.client));
5408     }
5409 
5410     #[test]
5411     /// Tests that receiving a H3_DATAGRAM setting when no TP is set generates
5412     /// an error.
dgram_setting_no_tp()5413     fn dgram_setting_no_tp() {
5414         let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5415         config
5416             .load_cert_chain_from_pem_file("examples/cert.crt")
5417             .unwrap();
5418         config
5419             .load_priv_key_from_pem_file("examples/cert.key")
5420             .unwrap();
5421         config.set_application_protos(&[b"h3"]).unwrap();
5422         config.set_initial_max_data(70);
5423         config.set_initial_max_stream_data_bidi_local(150);
5424         config.set_initial_max_stream_data_bidi_remote(150);
5425         config.set_initial_max_stream_data_uni(150);
5426         config.set_initial_max_streams_bidi(100);
5427         config.set_initial_max_streams_uni(5);
5428         config.verify_peer(false);
5429 
5430         let h3_config = Config::new().unwrap();
5431 
5432         let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5433         assert_eq!(s.pipe.handshake(), Ok(()));
5434 
5435         s.client.control_stream_id = Some(
5436             s.client
5437                 .open_uni_stream(
5438                     &mut s.pipe.client,
5439                     stream::HTTP3_CONTROL_STREAM_TYPE_ID,
5440                 )
5441                 .unwrap(),
5442         );
5443 
5444         let settings = frame::Frame::Settings {
5445             max_field_section_size: None,
5446             qpack_max_table_capacity: None,
5447             qpack_blocked_streams: None,
5448             connect_protocol_enabled: None,
5449             h3_datagram: Some(1),
5450             grease: None,
5451             raw: Default::default(),
5452         };
5453 
5454         s.send_frame_client(settings, s.client.control_stream_id.unwrap(), false)
5455             .unwrap();
5456 
5457         assert_eq!(s.pipe.advance(), Ok(()));
5458 
5459         assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::SettingsError));
5460     }
5461 
5462     #[test]
5463     /// Tests that receiving SETTINGS with prohibited values generates an error.
settings_h2_prohibited()5464     fn settings_h2_prohibited() {
5465         let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5466         config
5467             .load_cert_chain_from_pem_file("examples/cert.crt")
5468             .unwrap();
5469         config
5470             .load_priv_key_from_pem_file("examples/cert.key")
5471             .unwrap();
5472         config.set_application_protos(&[b"h3"]).unwrap();
5473         config.set_initial_max_data(70);
5474         config.set_initial_max_stream_data_bidi_local(150);
5475         config.set_initial_max_stream_data_bidi_remote(150);
5476         config.set_initial_max_stream_data_uni(150);
5477         config.set_initial_max_streams_bidi(100);
5478         config.set_initial_max_streams_uni(5);
5479         config.verify_peer(false);
5480 
5481         let h3_config = Config::new().unwrap();
5482 
5483         let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5484         assert_eq!(s.pipe.handshake(), Ok(()));
5485 
5486         s.client.control_stream_id = Some(
5487             s.client
5488                 .open_uni_stream(
5489                     &mut s.pipe.client,
5490                     stream::HTTP3_CONTROL_STREAM_TYPE_ID,
5491                 )
5492                 .unwrap(),
5493         );
5494 
5495         s.server.control_stream_id = Some(
5496             s.server
5497                 .open_uni_stream(
5498                     &mut s.pipe.server,
5499                     stream::HTTP3_CONTROL_STREAM_TYPE_ID,
5500                 )
5501                 .unwrap(),
5502         );
5503 
5504         let frame_payload_len = 2u64;
5505         let settings = [
5506             frame::SETTINGS_FRAME_TYPE_ID as u8,
5507             frame_payload_len as u8,
5508             0x2, // 0x2 is a reserved setting type
5509             1,
5510         ];
5511 
5512         s.send_arbitrary_stream_data_client(
5513             &settings,
5514             s.client.control_stream_id.unwrap(),
5515             false,
5516         )
5517         .unwrap();
5518 
5519         s.send_arbitrary_stream_data_server(
5520             &settings,
5521             s.server.control_stream_id.unwrap(),
5522             false,
5523         )
5524         .unwrap();
5525 
5526         assert_eq!(s.pipe.advance(), Ok(()));
5527 
5528         assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::SettingsError));
5529 
5530         assert_eq!(s.client.poll(&mut s.pipe.client), Err(Error::SettingsError));
5531     }
5532 
5533     #[test]
5534     /// Send a single DATAGRAM.
single_dgram()5535     fn single_dgram() {
5536         let mut buf = [0; 65535];
5537         let mut s = Session::new().unwrap();
5538         s.handshake().unwrap();
5539 
5540         // We'll send default data of 10 bytes on flow ID 0.
5541         let result = (11, 0, 1);
5542 
5543         s.send_dgram_client(0).unwrap();
5544 
5545         assert_eq!(s.poll_server(), Ok((0, Event::Datagram)));
5546         assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
5547         assert_eq!(s.poll_server(), Err(Error::Done));
5548 
5549         s.send_dgram_server(0).unwrap();
5550         assert_eq!(s.poll_client(), Ok((0, Event::Datagram)));
5551         assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
5552     }
5553 
5554     #[test]
5555     /// Send multiple DATAGRAMs.
multiple_dgram()5556     fn multiple_dgram() {
5557         let mut buf = [0; 65535];
5558         let mut s = Session::new().unwrap();
5559         s.handshake().unwrap();
5560 
5561         // We'll send default data of 10 bytes on flow ID 0.
5562         let result = (11, 0, 1);
5563 
5564         s.send_dgram_client(0).unwrap();
5565         s.send_dgram_client(0).unwrap();
5566         s.send_dgram_client(0).unwrap();
5567 
5568         assert_eq!(s.poll_server(), Ok((0, Event::Datagram)));
5569         assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
5570         assert_eq!(s.poll_server(), Err(Error::Done));
5571         assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
5572         assert_eq!(s.poll_server(), Err(Error::Done));
5573         assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
5574         assert_eq!(s.poll_server(), Err(Error::Done));
5575         assert_eq!(s.recv_dgram_server(&mut buf), Err(Error::Done));
5576         assert_eq!(s.poll_server(), Err(Error::Done));
5577 
5578         s.send_dgram_server(0).unwrap();
5579         s.send_dgram_server(0).unwrap();
5580         s.send_dgram_server(0).unwrap();
5581 
5582         assert_eq!(s.poll_client(), Ok((0, Event::Datagram)));
5583         assert_eq!(s.poll_server(), Err(Error::Done));
5584         assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
5585         assert_eq!(s.poll_client(), Err(Error::Done));
5586         assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
5587         assert_eq!(s.poll_client(), Err(Error::Done));
5588         assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
5589         assert_eq!(s.poll_client(), Err(Error::Done));
5590         assert_eq!(s.recv_dgram_client(&mut buf), Err(Error::Done));
5591         assert_eq!(s.poll_client(), Err(Error::Done));
5592     }
5593 
5594     #[test]
5595     /// Send more DATAGRAMs than the send queue allows.
multiple_dgram_overflow()5596     fn multiple_dgram_overflow() {
5597         let mut buf = [0; 65535];
5598         let mut s = Session::new().unwrap();
5599         s.handshake().unwrap();
5600 
5601         // We'll send default data of 10 bytes on flow ID 0.
5602         let result = (11, 0, 1);
5603 
5604         // Five DATAGRAMs
5605         s.send_dgram_client(0).unwrap();
5606         s.send_dgram_client(0).unwrap();
5607         s.send_dgram_client(0).unwrap();
5608         s.send_dgram_client(0).unwrap();
5609         s.send_dgram_client(0).unwrap();
5610 
5611         // Only 3 independent DATAGRAM events will fire.
5612         assert_eq!(s.poll_server(), Ok((0, Event::Datagram)));
5613         assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
5614         assert_eq!(s.poll_server(), Err(Error::Done));
5615         assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
5616         assert_eq!(s.poll_server(), Err(Error::Done));
5617         assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
5618         assert_eq!(s.poll_server(), Err(Error::Done));
5619         assert_eq!(s.recv_dgram_server(&mut buf), Err(Error::Done));
5620         assert_eq!(s.poll_server(), Err(Error::Done));
5621     }
5622 
5623     #[test]
5624     /// Send a single DATAGRAM and request. Ensure that poll continuously cycles
5625     /// between the two types if the data is not read.
poll_yield_cycling()5626     fn poll_yield_cycling() {
5627         let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5628         config
5629             .load_cert_chain_from_pem_file("examples/cert.crt")
5630             .unwrap();
5631         config
5632             .load_priv_key_from_pem_file("examples/cert.key")
5633             .unwrap();
5634         config.set_application_protos(&[b"h3"]).unwrap();
5635         config.set_initial_max_data(1500);
5636         config.set_initial_max_stream_data_bidi_local(150);
5637         config.set_initial_max_stream_data_bidi_remote(150);
5638         config.set_initial_max_stream_data_uni(150);
5639         config.set_initial_max_streams_bidi(100);
5640         config.set_initial_max_streams_uni(5);
5641         config.verify_peer(false);
5642         config.enable_dgram(true, 100, 100);
5643 
5644         let mut h3_config = Config::new().unwrap();
5645         let mut s = Session::with_configs(&mut config, &mut h3_config).unwrap();
5646         s.handshake().unwrap();
5647 
5648         // Send request followed by DATAGRAM on client side.
5649         let (stream, req) = s.send_request(false).unwrap();
5650 
5651         s.send_body_client(stream, true).unwrap();
5652 
5653         let ev_headers = Event::Headers {
5654             list: req,
5655             has_body: true,
5656         };
5657 
5658         s.send_dgram_client(0).unwrap();
5659 
5660         // Now let's test the poll counts and yielding.
5661         assert_eq!(s.poll_server(), Ok((0, Event::Datagram)));
5662 
5663         assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5664         assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
5665 
5666         assert_eq!(s.poll_server(), Err(Error::Done));
5667     }
5668 
5669     #[test]
5670     /// Send a single DATAGRAM and request. Ensure that poll
5671     /// yield cycles and cleanly exits if data is read.
poll_yield_single_read()5672     fn poll_yield_single_read() {
5673         let mut buf = [0; 65535];
5674 
5675         let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5676         config
5677             .load_cert_chain_from_pem_file("examples/cert.crt")
5678             .unwrap();
5679         config
5680             .load_priv_key_from_pem_file("examples/cert.key")
5681             .unwrap();
5682         config.set_application_protos(&[b"h3"]).unwrap();
5683         config.set_initial_max_data(1500);
5684         config.set_initial_max_stream_data_bidi_local(150);
5685         config.set_initial_max_stream_data_bidi_remote(150);
5686         config.set_initial_max_stream_data_uni(150);
5687         config.set_initial_max_streams_bidi(100);
5688         config.set_initial_max_streams_uni(5);
5689         config.verify_peer(false);
5690         config.enable_dgram(true, 100, 100);
5691 
5692         let mut h3_config = Config::new().unwrap();
5693         let mut s = Session::with_configs(&mut config, &mut h3_config).unwrap();
5694         s.handshake().unwrap();
5695 
5696         // We'll send default data of 10 bytes on flow ID 0.
5697         let result = (11, 0, 1);
5698 
5699         // Send request followed by DATAGRAM on client side.
5700         let (stream, req) = s.send_request(false).unwrap();
5701 
5702         let body = s.send_body_client(stream, true).unwrap();
5703 
5704         let mut recv_buf = vec![0; body.len()];
5705 
5706         let ev_headers = Event::Headers {
5707             list: req,
5708             has_body: true,
5709         };
5710 
5711         s.send_dgram_client(0).unwrap();
5712 
5713         // Now let's test the poll counts and yielding.
5714         assert_eq!(s.poll_server(), Ok((0, Event::Datagram)));
5715 
5716         assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5717         assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
5718 
5719         assert_eq!(s.poll_server(), Err(Error::Done));
5720 
5721         assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
5722 
5723         assert_eq!(s.poll_server(), Err(Error::Done));
5724 
5725         assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
5726         assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
5727         assert_eq!(s.poll_server(), Err(Error::Done));
5728 
5729         // Send response followed by DATAGRAM on server side
5730         let resp = s.send_response(stream, false).unwrap();
5731 
5732         let body = s.send_body_server(stream, true).unwrap();
5733 
5734         let mut recv_buf = vec![0; body.len()];
5735 
5736         let ev_headers = Event::Headers {
5737             list: resp,
5738             has_body: true,
5739         };
5740 
5741         s.send_dgram_server(0).unwrap();
5742 
5743         // Now let's test the poll counts and yielding.
5744         assert_eq!(s.poll_client(), Ok((0, Event::Datagram)));
5745 
5746         assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
5747         assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
5748 
5749         assert_eq!(s.poll_client(), Err(Error::Done));
5750 
5751         assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
5752 
5753         assert_eq!(s.poll_client(), Err(Error::Done));
5754 
5755         assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
5756 
5757         assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
5758         assert_eq!(s.poll_client(), Err(Error::Done));
5759     }
5760 
5761     #[test]
5762     /// Send a multiple DATAGRAMs and requests. Ensure that poll
5763     /// yield cycles and cleanly exits if data is read.
poll_yield_multi_read()5764     fn poll_yield_multi_read() {
5765         let mut buf = [0; 65535];
5766 
5767         let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5768         config
5769             .load_cert_chain_from_pem_file("examples/cert.crt")
5770             .unwrap();
5771         config
5772             .load_priv_key_from_pem_file("examples/cert.key")
5773             .unwrap();
5774         config.set_application_protos(&[b"h3"]).unwrap();
5775         config.set_initial_max_data(1500);
5776         config.set_initial_max_stream_data_bidi_local(150);
5777         config.set_initial_max_stream_data_bidi_remote(150);
5778         config.set_initial_max_stream_data_uni(150);
5779         config.set_initial_max_streams_bidi(100);
5780         config.set_initial_max_streams_uni(5);
5781         config.verify_peer(false);
5782         config.enable_dgram(true, 100, 100);
5783 
5784         let mut h3_config = Config::new().unwrap();
5785         let mut s = Session::with_configs(&mut config, &mut h3_config).unwrap();
5786         s.handshake().unwrap();
5787 
5788         // 10 bytes on flow ID 0 and 2.
5789         let flow_0_result = (11, 0, 1);
5790         let flow_2_result = (11, 2, 1);
5791 
5792         // Send requests followed by DATAGRAMs on client side.
5793         let (stream, req) = s.send_request(false).unwrap();
5794 
5795         let body = s.send_body_client(stream, true).unwrap();
5796 
5797         let mut recv_buf = vec![0; body.len()];
5798 
5799         let ev_headers = Event::Headers {
5800             list: req,
5801             has_body: true,
5802         };
5803 
5804         s.send_dgram_client(0).unwrap();
5805         s.send_dgram_client(0).unwrap();
5806         s.send_dgram_client(0).unwrap();
5807         s.send_dgram_client(0).unwrap();
5808         s.send_dgram_client(0).unwrap();
5809         s.send_dgram_client(2).unwrap();
5810         s.send_dgram_client(2).unwrap();
5811         s.send_dgram_client(2).unwrap();
5812         s.send_dgram_client(2).unwrap();
5813         s.send_dgram_client(2).unwrap();
5814 
5815         // Now let's test the poll counts and yielding.
5816         assert_eq!(s.poll_server(), Ok((0, Event::Datagram)));
5817 
5818         assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5819         assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
5820 
5821         assert_eq!(s.poll_server(), Err(Error::Done));
5822 
5823         // Second cycle, start to read
5824         assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
5825         assert_eq!(s.poll_server(), Err(Error::Done));
5826         assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
5827         assert_eq!(s.poll_server(), Err(Error::Done));
5828         assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
5829         assert_eq!(s.poll_server(), Err(Error::Done));
5830 
5831         assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
5832         assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
5833 
5834         assert_eq!(s.poll_server(), Err(Error::Done));
5835 
5836         // Third cycle.
5837         assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
5838         assert_eq!(s.poll_server(), Err(Error::Done));
5839         assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
5840         assert_eq!(s.poll_server(), Err(Error::Done));
5841         assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
5842         assert_eq!(s.poll_server(), Err(Error::Done));
5843         assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
5844         assert_eq!(s.poll_server(), Err(Error::Done));
5845         assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
5846         assert_eq!(s.poll_server(), Err(Error::Done));
5847         assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
5848         assert_eq!(s.poll_server(), Err(Error::Done));
5849         assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
5850         assert_eq!(s.poll_server(), Err(Error::Done));
5851 
5852         // Send response followed by DATAGRAM on server side
5853         let resp = s.send_response(stream, false).unwrap();
5854 
5855         let body = s.send_body_server(stream, true).unwrap();
5856 
5857         let mut recv_buf = vec![0; body.len()];
5858 
5859         let ev_headers = Event::Headers {
5860             list: resp,
5861             has_body: true,
5862         };
5863 
5864         s.send_dgram_server(0).unwrap();
5865         s.send_dgram_server(0).unwrap();
5866         s.send_dgram_server(0).unwrap();
5867         s.send_dgram_server(0).unwrap();
5868         s.send_dgram_server(0).unwrap();
5869         s.send_dgram_server(2).unwrap();
5870         s.send_dgram_server(2).unwrap();
5871         s.send_dgram_server(2).unwrap();
5872         s.send_dgram_server(2).unwrap();
5873         s.send_dgram_server(2).unwrap();
5874 
5875         assert_eq!(s.poll_client(), Ok((0, Event::Datagram)));
5876 
5877         assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
5878         assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
5879 
5880         assert_eq!(s.poll_client(), Err(Error::Done));
5881 
5882         // Second cycle, start to read
5883         assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
5884         assert_eq!(s.poll_client(), Err(Error::Done));
5885         assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
5886         assert_eq!(s.poll_client(), Err(Error::Done));
5887         assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
5888         assert_eq!(s.poll_client(), Err(Error::Done));
5889 
5890         assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
5891         assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
5892 
5893         assert_eq!(s.poll_client(), Err(Error::Done));
5894 
5895         // Third cycle.
5896         assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
5897         assert_eq!(s.poll_client(), Err(Error::Done));
5898         assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
5899         assert_eq!(s.poll_client(), Err(Error::Done));
5900         assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
5901         assert_eq!(s.poll_client(), Err(Error::Done));
5902         assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
5903         assert_eq!(s.poll_client(), Err(Error::Done));
5904         assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
5905         assert_eq!(s.poll_client(), Err(Error::Done));
5906         assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
5907         assert_eq!(s.poll_client(), Err(Error::Done));
5908         assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
5909         assert_eq!(s.poll_client(), Err(Error::Done));
5910     }
5911 
5912     #[test]
5913     /// Tests that the Finished event is not issued for streams of unknown type
5914     /// (e.g. GREASE).
finished_is_for_requests()5915     fn finished_is_for_requests() {
5916         let mut s = Session::new().unwrap();
5917         s.handshake().unwrap();
5918 
5919         assert_eq!(s.poll_client(), Err(Error::Done));
5920         assert_eq!(s.poll_server(), Err(Error::Done));
5921 
5922         assert_eq!(s.client.open_grease_stream(&mut s.pipe.client), Ok(()));
5923         assert_eq!(s.pipe.advance(), Ok(()));
5924 
5925         assert_eq!(s.poll_client(), Err(Error::Done));
5926         assert_eq!(s.poll_server(), Err(Error::Done));
5927     }
5928 
5929     #[test]
5930     /// Tests that streams are marked as finished only once.
finished_once()5931     fn finished_once() {
5932         let mut s = Session::new().unwrap();
5933         s.handshake().unwrap();
5934 
5935         let (stream, req) = s.send_request(false).unwrap();
5936         let body = s.send_body_client(stream, true).unwrap();
5937 
5938         let mut recv_buf = vec![0; body.len()];
5939 
5940         let ev_headers = Event::Headers {
5941             list: req,
5942             has_body: true,
5943         };
5944 
5945         assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5946         assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
5947 
5948         assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
5949         assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
5950 
5951         assert_eq!(s.recv_body_server(stream, &mut recv_buf), Err(Error::Done));
5952         assert_eq!(s.poll_server(), Err(Error::Done));
5953     }
5954 
5955     #[test]
5956     /// Tests that the Data event is properly re-armed.
data_event_rearm()5957     fn data_event_rearm() {
5958         let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
5959 
5960         let mut s = Session::new().unwrap();
5961         s.handshake().unwrap();
5962 
5963         let (stream, req) = s.send_request(false).unwrap();
5964 
5965         let mut recv_buf = vec![0; bytes.len()];
5966 
5967         let ev_headers = Event::Headers {
5968             list: req,
5969             has_body: true,
5970         };
5971 
5972         // Manually send an incomplete DATA frame (i.e. the frame size is longer
5973         // than the actual data sent).
5974         {
5975             let mut d = [42; 10];
5976             let mut b = octets::OctetsMut::with_slice(&mut d);
5977 
5978             b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
5979             b.put_varint(bytes.len() as u64).unwrap();
5980             let off = b.off();
5981             s.pipe.client.stream_send(stream, &d[..off], false).unwrap();
5982 
5983             assert_eq!(
5984                 s.pipe.client.stream_send(stream, &bytes[..5], false),
5985                 Ok(5)
5986             );
5987 
5988             s.advance().ok();
5989         }
5990 
5991         assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5992         assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
5993         assert_eq!(s.poll_server(), Err(Error::Done));
5994 
5995         // Read the available body data.
5996         assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(5));
5997 
5998         // Send the remaining DATA payload.
5999         assert_eq!(s.pipe.client.stream_send(stream, &bytes[5..], false), Ok(5));
6000         s.advance().ok();
6001 
6002         assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6003         assert_eq!(s.poll_server(), Err(Error::Done));
6004 
6005         // Read the rest of the body data.
6006         assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(5));
6007         assert_eq!(s.poll_server(), Err(Error::Done));
6008 
6009         // Send more data.
6010         let body = s.send_body_client(stream, false).unwrap();
6011 
6012         assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6013         assert_eq!(s.poll_server(), Err(Error::Done));
6014 
6015         assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
6016 
6017         // Send more data, then HEADERS, then more data.
6018         let body = s.send_body_client(stream, false).unwrap();
6019 
6020         let trailers = vec![Header::new(b"hello", b"world")];
6021 
6022         s.client
6023             .send_headers(&mut s.pipe.client, stream, &trailers, false)
6024             .unwrap();
6025 
6026         let ev_trailers = Event::Headers {
6027             list: trailers,
6028             has_body: true,
6029         };
6030 
6031         s.advance().ok();
6032 
6033         s.send_body_client(stream, false).unwrap();
6034 
6035         assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6036         assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
6037 
6038         assert_eq!(s.poll_server(), Ok((stream, ev_trailers)));
6039 
6040         assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6041         assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
6042 
6043         let (stream, req) = s.send_request(false).unwrap();
6044 
6045         let ev_headers = Event::Headers {
6046             list: req,
6047             has_body: true,
6048         };
6049 
6050         // Manually send an incomplete DATA frame (i.e. only the header is sent).
6051         {
6052             let mut d = [42; 10];
6053             let mut b = octets::OctetsMut::with_slice(&mut d);
6054 
6055             b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
6056             b.put_varint(bytes.len() as u64).unwrap();
6057             let off = b.off();
6058             s.pipe.client.stream_send(stream, &d[..off], false).unwrap();
6059 
6060             s.advance().ok();
6061         }
6062 
6063         assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6064         assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6065         assert_eq!(s.poll_server(), Err(Error::Done));
6066 
6067         assert_eq!(s.recv_body_server(stream, &mut recv_buf), Err(Error::Done));
6068 
6069         assert_eq!(s.pipe.client.stream_send(stream, &bytes[..5], false), Ok(5));
6070 
6071         s.advance().ok();
6072 
6073         assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6074         assert_eq!(s.poll_server(), Err(Error::Done));
6075 
6076         assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(5));
6077 
6078         assert_eq!(s.pipe.client.stream_send(stream, &bytes[5..], false), Ok(5));
6079         s.advance().ok();
6080 
6081         assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6082         assert_eq!(s.poll_server(), Err(Error::Done));
6083 
6084         assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(5));
6085 
6086         // Buffer multiple data frames.
6087         let body = s.send_body_client(stream, false).unwrap();
6088         s.send_body_client(stream, false).unwrap();
6089         s.send_body_client(stream, false).unwrap();
6090 
6091         assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6092         assert_eq!(s.poll_server(), Err(Error::Done));
6093 
6094         {
6095             let mut d = [42; 10];
6096             let mut b = octets::OctetsMut::with_slice(&mut d);
6097 
6098             b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
6099             b.put_varint(0).unwrap();
6100             let off = b.off();
6101             s.pipe.client.stream_send(stream, &d[..off], true).unwrap();
6102 
6103             s.advance().ok();
6104         }
6105 
6106         let mut recv_buf = vec![0; bytes.len() * 3];
6107 
6108         assert_eq!(
6109             s.recv_body_server(stream, &mut recv_buf),
6110             Ok(body.len() * 3)
6111         );
6112     }
6113 
6114     #[test]
6115     /// Tests that the Datagram event is properly re-armed.
dgram_event_rearm()6116     fn dgram_event_rearm() {
6117         let mut buf = [0; 65535];
6118 
6119         let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6120         config
6121             .load_cert_chain_from_pem_file("examples/cert.crt")
6122             .unwrap();
6123         config
6124             .load_priv_key_from_pem_file("examples/cert.key")
6125             .unwrap();
6126         config.set_application_protos(&[b"h3"]).unwrap();
6127         config.set_initial_max_data(1500);
6128         config.set_initial_max_stream_data_bidi_local(150);
6129         config.set_initial_max_stream_data_bidi_remote(150);
6130         config.set_initial_max_stream_data_uni(150);
6131         config.set_initial_max_streams_bidi(100);
6132         config.set_initial_max_streams_uni(5);
6133         config.verify_peer(false);
6134         config.enable_dgram(true, 100, 100);
6135 
6136         let mut h3_config = Config::new().unwrap();
6137         let mut s = Session::with_configs(&mut config, &mut h3_config).unwrap();
6138         s.handshake().unwrap();
6139 
6140         // 10 bytes on flow ID 0 and 2.
6141         let flow_0_result = (11, 0, 1);
6142         let flow_2_result = (11, 2, 1);
6143 
6144         // Send requests followed by DATAGRAMs on client side.
6145         let (stream, req) = s.send_request(false).unwrap();
6146 
6147         let body = s.send_body_client(stream, true).unwrap();
6148 
6149         let mut recv_buf = vec![0; body.len()];
6150 
6151         let ev_headers = Event::Headers {
6152             list: req,
6153             has_body: true,
6154         };
6155 
6156         s.send_dgram_client(0).unwrap();
6157         s.send_dgram_client(0).unwrap();
6158         s.send_dgram_client(2).unwrap();
6159         s.send_dgram_client(2).unwrap();
6160 
6161         assert_eq!(s.poll_server(), Ok((0, Event::Datagram)));
6162 
6163         assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6164         assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6165 
6166         assert_eq!(s.poll_server(), Err(Error::Done));
6167         assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6168 
6169         assert_eq!(s.poll_server(), Err(Error::Done));
6170         assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6171 
6172         assert_eq!(s.poll_server(), Err(Error::Done));
6173         assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6174 
6175         assert_eq!(s.poll_server(), Err(Error::Done));
6176         assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6177 
6178         assert_eq!(s.poll_server(), Err(Error::Done));
6179 
6180         s.send_dgram_client(0).unwrap();
6181         s.send_dgram_client(2).unwrap();
6182 
6183         assert_eq!(s.poll_server(), Ok((0, Event::Datagram)));
6184         assert_eq!(s.poll_server(), Err(Error::Done));
6185 
6186         assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6187         assert_eq!(s.poll_server(), Err(Error::Done));
6188 
6189         assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6190         assert_eq!(s.poll_server(), Err(Error::Done));
6191 
6192         assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
6193         assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6194     }
6195 
6196     #[test]
reset_stream()6197     fn reset_stream() {
6198         let mut buf = [0; 65535];
6199 
6200         let mut s = Session::new().unwrap();
6201         s.handshake().unwrap();
6202 
6203         // Client sends request.
6204         let (stream, req) = s.send_request(false).unwrap();
6205 
6206         let ev_headers = Event::Headers {
6207             list: req,
6208             has_body: true,
6209         };
6210 
6211         // Server sends response and closes stream.
6212         assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6213         assert_eq!(s.poll_server(), Err(Error::Done));
6214 
6215         let resp = s.send_response(stream, true).unwrap();
6216 
6217         let ev_headers = Event::Headers {
6218             list: resp,
6219             has_body: false,
6220         };
6221 
6222         assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
6223         assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
6224         assert_eq!(s.poll_client(), Err(Error::Done));
6225 
6226         // Client sends RESET_STREAM, closing stream.
6227         let frames = [crate::frame::Frame::ResetStream {
6228             stream_id: stream,
6229             error_code: 42,
6230             final_size: 68,
6231         }];
6232 
6233         let pkt_type = crate::packet::Type::Short;
6234         assert_eq!(
6235             s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
6236             Ok(39)
6237         );
6238 
6239         // Server issues Reset event for the stream.
6240         assert_eq!(s.poll_server(), Ok((stream, Event::Reset(42))));
6241         assert_eq!(s.poll_server(), Err(Error::Done));
6242 
6243         // Sending RESET_STREAM again shouldn't trigger another Reset event.
6244         assert_eq!(
6245             s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
6246             Ok(39)
6247         );
6248 
6249         assert_eq!(s.poll_server(), Err(Error::Done));
6250     }
6251 
6252     #[test]
reset_finished_at_server()6253     fn reset_finished_at_server() {
6254         let mut s = Session::new().unwrap();
6255         s.handshake().unwrap();
6256 
6257         // Client sends HEADERS and doesn't fin
6258         let (stream, _req) = s.send_request(false).unwrap();
6259 
6260         // ..then Client sends RESET_STREAM
6261         assert_eq!(
6262             s.pipe.client.stream_shutdown(0, crate::Shutdown::Write, 0),
6263             Ok(())
6264         );
6265 
6266         assert_eq!(s.pipe.advance(), Ok(()));
6267 
6268         // Server receives just a reset
6269         assert_eq!(s.poll_server(), Ok((stream, Event::Reset(0))));
6270         assert_eq!(s.poll_server(), Err(Error::Done));
6271 
6272         // Client sends HEADERS and fin
6273         let (stream, req) = s.send_request(true).unwrap();
6274 
6275         // ..then Client sends RESET_STREAM
6276         assert_eq!(
6277             s.pipe.client.stream_shutdown(4, crate::Shutdown::Write, 0),
6278             Ok(())
6279         );
6280 
6281         let ev_headers = Event::Headers {
6282             list: req,
6283             has_body: false,
6284         };
6285 
6286         // Server receives headers and fin.
6287         assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6288         assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6289         assert_eq!(s.poll_server(), Err(Error::Done));
6290     }
6291 
6292     #[test]
reset_finished_at_client()6293     fn reset_finished_at_client() {
6294         let mut buf = [0; 65535];
6295         let mut s = Session::new().unwrap();
6296         s.handshake().unwrap();
6297 
6298         // Client sends HEADERS and doesn't fin
6299         let (stream, req) = s.send_request(false).unwrap();
6300 
6301         let ev_headers = Event::Headers {
6302             list: req,
6303             has_body: true,
6304         };
6305 
6306         // Server receives headers.
6307         assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6308         assert_eq!(s.poll_server(), Err(Error::Done));
6309 
6310         // Server sends response and doesn't fin
6311         s.send_response(stream, false).unwrap();
6312 
6313         assert_eq!(s.pipe.advance(), Ok(()));
6314 
6315         // .. then Server sends RESET_STREAM
6316         assert_eq!(
6317             s.pipe
6318                 .server
6319                 .stream_shutdown(stream, crate::Shutdown::Write, 0),
6320             Ok(())
6321         );
6322 
6323         assert_eq!(s.pipe.advance(), Ok(()));
6324 
6325         // Client receives Reset only
6326         assert_eq!(s.poll_client(), Ok((stream, Event::Reset(0))));
6327         assert_eq!(s.poll_server(), Err(Error::Done));
6328 
6329         // Client sends headers and fin.
6330         let (stream, req) = s.send_request(true).unwrap();
6331 
6332         let ev_headers = Event::Headers {
6333             list: req,
6334             has_body: false,
6335         };
6336 
6337         // Server receives headers and fin.
6338         assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6339         assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6340         assert_eq!(s.poll_server(), Err(Error::Done));
6341 
6342         // Server sends response and fin
6343         let resp = s.send_response(stream, true).unwrap();
6344 
6345         assert_eq!(s.pipe.advance(), Ok(()));
6346 
6347         // ..then Server sends RESET_STREAM
6348         let frames = [crate::frame::Frame::ResetStream {
6349             stream_id: stream,
6350             error_code: 42,
6351             final_size: 68,
6352         }];
6353 
6354         let pkt_type = crate::packet::Type::Short;
6355         assert_eq!(
6356             s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
6357             Ok(39)
6358         );
6359 
6360         assert_eq!(s.pipe.advance(), Ok(()));
6361 
6362         let ev_headers = Event::Headers {
6363             list: resp,
6364             has_body: false,
6365         };
6366 
6367         // Client receives headers and fin.
6368         assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
6369         assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
6370         assert_eq!(s.poll_client(), Err(Error::Done));
6371     }
6372 }
6373 
6374 #[cfg(feature = "ffi")]
6375 mod ffi;
6376 mod frame;
6377 #[doc(hidden)]
6378 pub mod qpack;
6379 mod stream;
6380