1 // Copyright (C) 2019, Cloudflare, Inc.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 // * Redistributions of source code must retain the above copyright notice,
9 // this list of conditions and the following disclaimer.
10 //
11 // * Redistributions in binary form must reproduce the above copyright
12 // notice, this list of conditions and the following disclaimer in the
13 // documentation and/or other materials provided with the distribution.
14 //
15 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16 // IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17 // THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27 //! HTTP/3 wire protocol and QPACK implementation.
28 //!
29 //! This module provides a high level API for sending and receiving HTTP/3
30 //! requests and responses on top of the QUIC transport protocol.
31 //!
32 //! ## Connection setup
33 //!
34 //! HTTP/3 connections require a QUIC transport-layer connection, see
35 //! [Connection setup] for a full description of the setup process.
36 //!
37 //! To use HTTP/3, the QUIC connection must be configured with a suitable
38 //! Application Layer Protocol Negotiation (ALPN) Protocol ID:
39 //!
40 //! ```
41 //! let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
42 //! config.set_application_protos(quiche::h3::APPLICATION_PROTOCOL)?;
43 //! # Ok::<(), quiche::Error>(())
44 //! ```
45 //!
46 //! The QUIC handshake is driven by [sending] and [receiving] QUIC packets.
47 //!
48 //! Once the handshake has completed, the first step in establishing an HTTP/3
49 //! connection is creating its configuration object:
50 //!
51 //! ```
52 //! let h3_config = quiche::h3::Config::new()?;
53 //! # Ok::<(), quiche::h3::Error>(())
54 //! ```
55 //!
56 //! HTTP/3 client and server connections are both created using the
57 //! [`with_transport()`] function, the role is inferred from the type of QUIC
58 //! connection:
59 //!
60 //! ```no_run
61 //! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
62 //! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
63 //! # let peer = "127.0.0.1:1234".parse().unwrap();
64 //! # let local = "127.0.0.1:4321".parse().unwrap();
65 //! # let mut conn = quiche::accept(&scid, None, local, peer, &mut config).unwrap();
66 //! # let h3_config = quiche::h3::Config::new()?;
67 //! let h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
68 //! # Ok::<(), quiche::h3::Error>(())
69 //! ```
70 //!
71 //! ## Sending a request
72 //!
73 //! An HTTP/3 client can send a request by using the connection's
74 //! [`send_request()`] method to queue request headers; [sending] QUIC packets
75 //! causes the requests to get sent to the peer:
76 //!
77 //! ```no_run
78 //! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
79 //! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
80 //! # let peer = "127.0.0.1:1234".parse().unwrap();
81 //! # let local = "127.0.0.1:4321".parse().unwrap();
82 //! # let mut conn = quiche::connect(None, &scid, local, peer, &mut config).unwrap();
83 //! # let h3_config = quiche::h3::Config::new()?;
84 //! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
85 //! let req = vec![
86 //! quiche::h3::Header::new(b":method", b"GET"),
87 //! quiche::h3::Header::new(b":scheme", b"https"),
88 //! quiche::h3::Header::new(b":authority", b"quic.tech"),
89 //! quiche::h3::Header::new(b":path", b"/"),
90 //! quiche::h3::Header::new(b"user-agent", b"quiche"),
91 //! ];
92 //!
93 //! h3_conn.send_request(&mut conn, &req, true)?;
94 //! # Ok::<(), quiche::h3::Error>(())
95 //! ```
96 //!
97 //! An HTTP/3 client can send a request with additional body data by using
98 //! the connection's [`send_body()`] method:
99 //!
100 //! ```no_run
101 //! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
102 //! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
103 //! # let peer = "127.0.0.1:1234".parse().unwrap();
104 //! # let local = "127.0.0.1:4321".parse().unwrap();
105 //! # let mut conn = quiche::connect(None, &scid, local, peer, &mut config).unwrap();
106 //! # let h3_config = quiche::h3::Config::new()?;
107 //! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
108 //! let req = vec![
109 //! quiche::h3::Header::new(b":method", b"GET"),
110 //! quiche::h3::Header::new(b":scheme", b"https"),
111 //! quiche::h3::Header::new(b":authority", b"quic.tech"),
112 //! quiche::h3::Header::new(b":path", b"/"),
113 //! quiche::h3::Header::new(b"user-agent", b"quiche"),
114 //! ];
115 //!
116 //! let stream_id = h3_conn.send_request(&mut conn, &req, false)?;
117 //! h3_conn.send_body(&mut conn, stream_id, b"Hello World!", true)?;
118 //! # Ok::<(), quiche::h3::Error>(())
119 //! ```
120 //!
121 //! ## Handling requests and responses
122 //!
123 //! After [receiving] QUIC packets, HTTP/3 data is processed using the
124 //! connection's [`poll()`] method. On success, this returns an [`Event`] object
125 //! and an ID corresponding to the stream where the `Event` originated.
126 //!
127 //! An HTTP/3 server uses [`poll()`] to read requests and responds to them using
128 //! [`send_response()`] and [`send_body()`]:
129 //!
130 //! ```no_run
131 //! use quiche::h3::NameValue;
132 //!
133 //! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
134 //! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
135 //! # let peer = "127.0.0.1:1234".parse().unwrap();
136 //! # let local = "127.0.0.1:1234".parse().unwrap();
137 //! # let mut conn = quiche::accept(&scid, None, local, peer, &mut config).unwrap();
138 //! # let h3_config = quiche::h3::Config::new()?;
139 //! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
140 //! loop {
141 //! match h3_conn.poll(&mut conn) {
142 //! Ok((stream_id, quiche::h3::Event::Headers{list, has_body})) => {
143 //! let mut headers = list.into_iter();
144 //!
145 //! // Look for the request's method.
146 //! let method = headers.find(|h| h.name() == b":method").unwrap();
147 //!
148 //! // Look for the request's path.
149 //! let path = headers.find(|h| h.name() == b":path").unwrap();
150 //!
151 //! if method.value() == b"GET" && path.value() == b"/" {
152 //! let resp = vec![
153 //! quiche::h3::Header::new(b":status", 200.to_string().as_bytes()),
154 //! quiche::h3::Header::new(b"server", b"quiche"),
155 //! ];
156 //!
157 //! h3_conn.send_response(&mut conn, stream_id, &resp, false)?;
158 //! h3_conn.send_body(&mut conn, stream_id, b"Hello World!", true)?;
159 //! }
160 //! },
161 //!
162 //! Ok((stream_id, quiche::h3::Event::Data)) => {
163 //! // Request body data, handle it.
164 //! # return Ok(());
165 //! },
166 //!
167 //! Ok((stream_id, quiche::h3::Event::Finished)) => {
168 //! // Peer terminated stream, handle it.
169 //! },
170 //!
171 //! Ok((stream_id, quiche::h3::Event::Reset(err))) => {
172 //! // Peer reset the stream, handle it.
173 //! },
174 //!
175 //! Ok((_flow_id, quiche::h3::Event::Datagram)) => (),
176 //!
177 //! Ok((_flow_id, quiche::h3::Event::PriorityUpdate)) => (),
178 //!
179 //! Ok((goaway_id, quiche::h3::Event::GoAway)) => {
180 //! // Peer signalled it is going away, handle it.
181 //! },
182 //!
183 //! Err(quiche::h3::Error::Done) => {
184 //! // Done reading.
185 //! break;
186 //! },
187 //!
188 //! Err(e) => {
189 //! // An error occurred, handle it.
190 //! break;
191 //! },
192 //! }
193 //! }
194 //! # Ok::<(), quiche::h3::Error>(())
195 //! ```
196 //!
197 //! An HTTP/3 client uses [`poll()`] to read responses:
198 //!
199 //! ```no_run
200 //! use quiche::h3::NameValue;
201 //!
202 //! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
203 //! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
204 //! # let peer = "127.0.0.1:1234".parse().unwrap();
205 //! # let local = "127.0.0.1:1234".parse().unwrap();
206 //! # let mut conn = quiche::connect(None, &scid, local, peer, &mut config).unwrap();
207 //! # let h3_config = quiche::h3::Config::new()?;
208 //! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
209 //! loop {
210 //! match h3_conn.poll(&mut conn) {
211 //! Ok((stream_id, quiche::h3::Event::Headers{list, has_body})) => {
212 //! let status = list.iter().find(|h| h.name() == b":status").unwrap();
213 //! println!("Received {} response on stream {}",
214 //! std::str::from_utf8(status.value()).unwrap(),
215 //! stream_id);
216 //! },
217 //!
218 //! Ok((stream_id, quiche::h3::Event::Data)) => {
219 //! let mut body = vec![0; 4096];
220 //!
221 //! // Consume all body data received on the stream.
222 //! while let Ok(read) =
223 //! h3_conn.recv_body(&mut conn, stream_id, &mut body)
224 //! {
225 //! println!("Received {} bytes of payload on stream {}",
226 //! read, stream_id);
227 //! }
228 //! },
229 //!
230 //! Ok((stream_id, quiche::h3::Event::Finished)) => {
231 //! // Peer terminated stream, handle it.
232 //! },
233 //!
234 //! Ok((stream_id, quiche::h3::Event::Reset(err))) => {
235 //! // Peer reset the stream, handle it.
236 //! },
237 //!
238 //! Ok((_flow_id, quiche::h3::Event::Datagram)) => (),
239 //!
240 //! Ok((_prioritized_element_id, quiche::h3::Event::PriorityUpdate)) => (),
241 //!
242 //! Ok((goaway_id, quiche::h3::Event::GoAway)) => {
243 //! // Peer signalled it is going away, handle it.
244 //! },
245 //!
246 //! Err(quiche::h3::Error::Done) => {
247 //! // Done reading.
248 //! break;
249 //! },
250 //!
251 //! Err(e) => {
252 //! // An error occurred, handle it.
253 //! break;
254 //! },
255 //! }
256 //! }
257 //! # Ok::<(), quiche::h3::Error>(())
258 //! ```
259 //!
260 //! ## Detecting end of request or response
261 //!
262 //! A single HTTP/3 request or response may consist of several HEADERS and DATA
263 //! frames; it is finished when the QUIC stream is closed. Calling [`poll()`]
264 //! repeatedly will generate an [`Event`] for each of these. The application may
265 //! use these event to do additional HTTP semantic validation.
266 //!
267 //! ## HTTP/3 protocol errors
268 //!
269 //! Quiche is responsible for managing the HTTP/3 connection, ensuring it is in
270 //! a correct state and validating all messages received by a peer. This mainly
271 //! takes place in the [`poll()`] method. If an HTTP/3 error occurs, quiche will
272 //! close the connection and send an appropriate CONNECTION_CLOSE frame to the
273 //! peer. An [`Error`] is returned to the application so that it can perform any
274 //! required tidy up such as closing sockets.
275 //!
276 //! [`application_proto()`]: ../struct.Connection.html#method.application_proto
277 //! [`stream_finished()`]: ../struct.Connection.html#method.stream_finished
278 //! [Connection setup]: ../index.html#connection-setup
279 //! [sending]: ../index.html#generating-outgoing-packets
280 //! [receiving]: ../index.html#handling-incoming-packets
281 //! [`with_transport()`]: struct.Connection.html#method.with_transport
282 //! [`poll()`]: struct.Connection.html#method.poll
283 //! [`Event`]: enum.Event.html
284 //! [`Error`]: enum.Error.html
285 //! [`send_request()`]: struct.Connection.html#method.send_response
286 //! [`send_response()`]: struct.Connection.html#method.send_response
287 //! [`send_body()`]: struct.Connection.html#method.send_body
288
289 use std::collections::VecDeque;
290
291 #[cfg(feature = "sfv")]
292 use std::convert::TryFrom;
293 use std::fmt;
294 use std::fmt::Write;
295
296 #[cfg(feature = "qlog")]
297 use qlog::events::h3::H3FrameCreated;
298 #[cfg(feature = "qlog")]
299 use qlog::events::h3::H3FrameParsed;
300 #[cfg(feature = "qlog")]
301 use qlog::events::h3::H3Owner;
302 #[cfg(feature = "qlog")]
303 use qlog::events::h3::H3PriorityTargetStreamType;
304 #[cfg(feature = "qlog")]
305 use qlog::events::h3::H3StreamType;
306 #[cfg(feature = "qlog")]
307 use qlog::events::h3::H3StreamTypeSet;
308 #[cfg(feature = "qlog")]
309 use qlog::events::h3::Http3EventType;
310 #[cfg(feature = "qlog")]
311 use qlog::events::h3::Http3Frame;
312 #[cfg(feature = "qlog")]
313 use qlog::events::EventData;
314 #[cfg(feature = "qlog")]
315 use qlog::events::EventImportance;
316 #[cfg(feature = "qlog")]
317 use qlog::events::EventType;
318
319 /// List of ALPN tokens of supported HTTP/3 versions.
320 ///
321 /// This can be passed directly to the [`Config::set_application_protos()`]
322 /// method when implementing HTTP/3 applications.
323 ///
324 /// [`Config::set_application_protos()`]:
325 /// ../struct.Config.html#method.set_application_protos
326 pub const APPLICATION_PROTOCOL: &[&[u8]] = &[b"h3", b"h3-29", b"h3-28", b"h3-27"];
327
328 // The offset used when converting HTTP/3 urgency to quiche urgency.
329 const PRIORITY_URGENCY_OFFSET: u8 = 124;
330
331 // Parameter values as specified in [Extensible Priorities].
332 //
333 // [Extensible Priorities]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
334 const PRIORITY_URGENCY_LOWER_BOUND: u8 = 0;
335 const PRIORITY_URGENCY_UPPER_BOUND: u8 = 7;
336 const PRIORITY_URGENCY_DEFAULT: u8 = 3;
337 const PRIORITY_INCREMENTAL_DEFAULT: bool = false;
338
339 #[cfg(feature = "qlog")]
340 const QLOG_FRAME_CREATED: EventType =
341 EventType::Http3EventType(Http3EventType::FrameCreated);
342 #[cfg(feature = "qlog")]
343 const QLOG_FRAME_PARSED: EventType =
344 EventType::Http3EventType(Http3EventType::FrameParsed);
345 #[cfg(feature = "qlog")]
346 const QLOG_STREAM_TYPE_SET: EventType =
347 EventType::Http3EventType(Http3EventType::StreamTypeSet);
348
349 /// A specialized [`Result`] type for quiche HTTP/3 operations.
350 ///
351 /// This type is used throughout quiche's HTTP/3 public API for any operation
352 /// that can produce an error.
353 ///
354 /// [`Result`]: https://doc.rust-lang.org/std/result/enum.Result.html
355 pub type Result<T> = std::result::Result<T, Error>;
356
357 /// An HTTP/3 error.
358 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
359 pub enum Error {
360 /// There is no error or no work to do
361 Done,
362
363 /// The provided buffer is too short.
364 BufferTooShort,
365
366 /// Internal error in the HTTP/3 stack.
367 InternalError,
368
369 /// Endpoint detected that the peer is exhibiting behavior that causes.
370 /// excessive load.
371 ExcessiveLoad,
372
373 /// Stream ID or Push ID greater that current maximum was
374 /// used incorrectly, such as exceeding a limit, reducing a limit,
375 /// or being reused.
376 IdError,
377
378 /// The endpoint detected that its peer created a stream that it will not
379 /// accept.
380 StreamCreationError,
381
382 /// A required critical stream was closed.
383 ClosedCriticalStream,
384
385 /// No SETTINGS frame at beginning of control stream.
386 MissingSettings,
387
388 /// A frame was received which is not permitted in the current state.
389 FrameUnexpected,
390
391 /// Frame violated layout or size rules.
392 FrameError,
393
394 /// QPACK Header block decompression failure.
395 QpackDecompressionFailed,
396
397 /// Error originated from the transport layer.
398 TransportError(crate::Error),
399
400 /// The underlying QUIC stream (or connection) doesn't have enough capacity
401 /// for the operation to complete. The application should retry later on.
402 StreamBlocked,
403
404 /// Error in the payload of a SETTINGS frame.
405 SettingsError,
406
407 /// Server rejected request.
408 RequestRejected,
409
410 /// Request or its response cancelled.
411 RequestCancelled,
412
413 /// Client's request stream terminated without containing a full-formed
414 /// request.
415 RequestIncomplete,
416
417 /// An HTTP message was malformed and cannot be processed.
418 MessageError,
419
420 /// The TCP connection established in response to a CONNECT request was
421 /// reset or abnormally closed.
422 ConnectError,
423
424 /// The requested operation cannot be served over HTTP/3. Peer should retry
425 /// over HTTP/1.1.
426 VersionFallback,
427 }
428
429 impl Error {
to_wire(self) -> u64430 fn to_wire(self) -> u64 {
431 match self {
432 Error::Done => 0x100,
433 Error::InternalError => 0x102,
434 Error::StreamCreationError => 0x103,
435 Error::ClosedCriticalStream => 0x104,
436 Error::FrameUnexpected => 0x105,
437 Error::FrameError => 0x106,
438 Error::ExcessiveLoad => 0x107,
439 Error::IdError => 0x108,
440 Error::MissingSettings => 0x10A,
441 Error::QpackDecompressionFailed => 0x200,
442 Error::BufferTooShort => 0x999,
443 Error::TransportError { .. } => 0xFF,
444 Error::StreamBlocked => 0xFF,
445 Error::SettingsError => 0x109,
446 Error::RequestRejected => 0x10B,
447 Error::RequestCancelled => 0x10C,
448 Error::RequestIncomplete => 0x10D,
449 Error::MessageError => 0x10E,
450 Error::ConnectError => 0x10F,
451 Error::VersionFallback => 0x110,
452 }
453 }
454
455 #[cfg(feature = "ffi")]
to_c(self) -> libc::ssize_t456 fn to_c(self) -> libc::ssize_t {
457 match self {
458 Error::Done => -1,
459 Error::BufferTooShort => -2,
460 Error::InternalError => -3,
461 Error::ExcessiveLoad => -4,
462 Error::IdError => -5,
463 Error::StreamCreationError => -6,
464 Error::ClosedCriticalStream => -7,
465 Error::MissingSettings => -8,
466 Error::FrameUnexpected => -9,
467 Error::FrameError => -10,
468 Error::QpackDecompressionFailed => -11,
469 // -12 was previously used for TransportError, skip it
470 Error::StreamBlocked => -13,
471 Error::SettingsError => -14,
472 Error::RequestRejected => -15,
473 Error::RequestCancelled => -16,
474 Error::RequestIncomplete => -17,
475 Error::MessageError => -18,
476 Error::ConnectError => -19,
477 Error::VersionFallback => -20,
478
479 Error::TransportError(quic_error) => quic_error.to_c() - 1000,
480 }
481 }
482 }
483
484 impl std::fmt::Display for Error {
fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result485 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
486 write!(f, "{self:?}")
487 }
488 }
489
490 impl std::error::Error for Error {
source(&self) -> Option<&(dyn std::error::Error + 'static)>491 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
492 None
493 }
494 }
495
496 impl std::convert::From<super::Error> for Error {
from(err: super::Error) -> Self497 fn from(err: super::Error) -> Self {
498 match err {
499 super::Error::Done => Error::Done,
500
501 _ => Error::TransportError(err),
502 }
503 }
504 }
505
506 impl std::convert::From<octets::BufferTooShortError> for Error {
from(_err: octets::BufferTooShortError) -> Self507 fn from(_err: octets::BufferTooShortError) -> Self {
508 Error::BufferTooShort
509 }
510 }
511
512 /// An HTTP/3 configuration.
513 pub struct Config {
514 max_field_section_size: Option<u64>,
515 qpack_max_table_capacity: Option<u64>,
516 qpack_blocked_streams: Option<u64>,
517 connect_protocol_enabled: Option<u64>,
518 }
519
520 impl Config {
521 /// Creates a new configuration object with default settings.
new() -> Result<Config>522 pub const fn new() -> Result<Config> {
523 Ok(Config {
524 max_field_section_size: None,
525 qpack_max_table_capacity: None,
526 qpack_blocked_streams: None,
527 connect_protocol_enabled: None,
528 })
529 }
530
531 /// Sets the `SETTINGS_MAX_FIELD_SECTION_SIZE` setting.
532 ///
533 /// By default no limit is enforced. When a request whose headers exceed
534 /// the limit set by the application is received, the call to the [`poll()`]
535 /// method will return the [`Error::ExcessiveLoad`] error, and the
536 /// connection will be closed.
537 ///
538 /// [`poll()`]: struct.Connection.html#method.poll
539 /// [`Error::ExcessiveLoad`]: enum.Error.html#variant.ExcessiveLoad
set_max_field_section_size(&mut self, v: u64)540 pub fn set_max_field_section_size(&mut self, v: u64) {
541 self.max_field_section_size = Some(v);
542 }
543
544 /// Sets the `SETTINGS_QPACK_MAX_TABLE_CAPACITY` setting.
545 ///
546 /// The default value is `0`.
set_qpack_max_table_capacity(&mut self, v: u64)547 pub fn set_qpack_max_table_capacity(&mut self, v: u64) {
548 self.qpack_max_table_capacity = Some(v);
549 }
550
551 /// Sets the `SETTINGS_QPACK_BLOCKED_STREAMS` setting.
552 ///
553 /// The default value is `0`.
set_qpack_blocked_streams(&mut self, v: u64)554 pub fn set_qpack_blocked_streams(&mut self, v: u64) {
555 self.qpack_blocked_streams = Some(v);
556 }
557
558 /// Sets or omits the `SETTINGS_ENABLE_CONNECT_PROTOCOL` setting.
559 ///
560 /// The default value is `false`.
enable_extended_connect(&mut self, enabled: bool)561 pub fn enable_extended_connect(&mut self, enabled: bool) {
562 if enabled {
563 self.connect_protocol_enabled = Some(1);
564 } else {
565 self.connect_protocol_enabled = None;
566 }
567 }
568 }
569
570 /// A trait for types with associated string name and value.
571 pub trait NameValue {
572 /// Returns the object's name.
name(&self) -> &[u8]573 fn name(&self) -> &[u8];
574
575 /// Returns the object's value.
value(&self) -> &[u8]576 fn value(&self) -> &[u8];
577 }
578
579 impl NameValue for (&[u8], &[u8]) {
name(&self) -> &[u8]580 fn name(&self) -> &[u8] {
581 self.0
582 }
583
value(&self) -> &[u8]584 fn value(&self) -> &[u8] {
585 self.1
586 }
587 }
588
589 /// An owned name-value pair representing a raw HTTP header.
590 #[derive(Clone, PartialEq, Eq)]
591 pub struct Header(Vec<u8>, Vec<u8>);
592
try_print_as_readable(hdr: &[u8], f: &mut fmt::Formatter) -> fmt::Result593 fn try_print_as_readable(hdr: &[u8], f: &mut fmt::Formatter) -> fmt::Result {
594 match std::str::from_utf8(hdr) {
595 Ok(s) => f.write_str(&s.escape_default().to_string()),
596 Err(_) => write!(f, "{hdr:?}"),
597 }
598 }
599
600 impl fmt::Debug for Header {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result601 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
602 f.write_char('"')?;
603 try_print_as_readable(&self.0, f)?;
604 f.write_str(": ")?;
605 try_print_as_readable(&self.1, f)?;
606 f.write_char('"')
607 }
608 }
609
610 impl Header {
611 /// Creates a new header.
612 ///
613 /// Both `name` and `value` will be cloned.
new(name: &[u8], value: &[u8]) -> Self614 pub fn new(name: &[u8], value: &[u8]) -> Self {
615 Self(name.to_vec(), value.to_vec())
616 }
617 }
618
619 impl NameValue for Header {
name(&self) -> &[u8]620 fn name(&self) -> &[u8] {
621 &self.0
622 }
623
value(&self) -> &[u8]624 fn value(&self) -> &[u8] {
625 &self.1
626 }
627 }
628
629 /// A non-owned name-value pair representing a raw HTTP header.
630 #[derive(Clone, Debug, PartialEq, Eq)]
631 pub struct HeaderRef<'a>(&'a [u8], &'a [u8]);
632
633 impl<'a> HeaderRef<'a> {
634 /// Creates a new header.
new(name: &'a [u8], value: &'a [u8]) -> Self635 pub const fn new(name: &'a [u8], value: &'a [u8]) -> Self {
636 Self(name, value)
637 }
638 }
639
640 impl<'a> NameValue for HeaderRef<'a> {
name(&self) -> &[u8]641 fn name(&self) -> &[u8] {
642 self.0
643 }
644
value(&self) -> &[u8]645 fn value(&self) -> &[u8] {
646 self.1
647 }
648 }
649
650 /// An HTTP/3 connection event.
651 #[derive(Clone, Debug, PartialEq, Eq)]
652 pub enum Event {
653 /// Request/response headers were received.
654 Headers {
655 /// The list of received header fields. The application should validate
656 /// pseudo-headers and headers.
657 list: Vec<Header>,
658
659 /// Whether data will follow the headers on the stream.
660 has_body: bool,
661 },
662
663 /// Data was received.
664 ///
665 /// This indicates that the application can use the [`recv_body()`] method
666 /// to retrieve the data from the stream.
667 ///
668 /// Note that [`recv_body()`] will need to be called repeatedly until the
669 /// [`Done`] value is returned, as the event will not be re-armed until all
670 /// buffered data is read.
671 ///
672 /// [`recv_body()`]: struct.Connection.html#method.recv_body
673 /// [`Done`]: enum.Error.html#variant.Done
674 Data,
675
676 /// Stream was closed,
677 Finished,
678
679 /// Stream was reset.
680 ///
681 /// The associated data represents the error code sent by the peer.
682 Reset(u64),
683
684 /// DATAGRAM was received.
685 ///
686 /// This indicates that the application can use the [`recv_dgram()`] method
687 /// to retrieve the HTTP/3 DATAGRAM.
688 ///
689 /// Note that [`recv_dgram()`] will need to be called repeatedly until the
690 /// [`Done`] value is returned, as the event will not be re-armed until all
691 /// buffered DATAGRAMs with the same flow ID are read.
692 ///
693 /// [`recv_dgram()`]: struct.Connection.html#method.recv_dgram
694 /// [`Done`]: enum.Error.html#variant.Done
695 Datagram,
696
697 /// PRIORITY_UPDATE was received.
698 ///
699 /// This indicates that the application can use the
700 /// [`take_last_priority_update()`] method to take the last received
701 /// PRIORITY_UPDATE for a specified stream.
702 ///
703 /// This event is triggered once per stream until the last PRIORITY_UPDATE
704 /// is taken. It is recommended that applications defer taking the
705 /// PRIORITY_UPDATE until after [`poll()`] returns [`Done`].
706 ///
707 /// [`take_last_priority_update()`]: struct.Connection.html#method.take_last_priority_update
708 /// [`poll()`]: struct.Connection.html#method.poll
709 /// [`Done`]: enum.Error.html#variant.Done
710 PriorityUpdate,
711
712 /// GOAWAY was received.
713 GoAway,
714 }
715
716 /// Extensible Priorities parameters.
717 ///
718 /// The `TryFrom` trait supports constructing this object from the serialized
719 /// Structured Fields Dictionary field value. I.e, use `TryFrom` to parse the
720 /// value of a Priority header field or a PRIORITY_UPDATE frame. Using this
721 /// trait requires the `sfv` feature to be enabled.
722 #[derive(Debug, PartialEq, Eq)]
723 #[repr(C)]
724 pub struct Priority {
725 urgency: u8,
726 incremental: bool,
727 }
728
729 impl Default for Priority {
default() -> Self730 fn default() -> Self {
731 Priority {
732 urgency: PRIORITY_URGENCY_DEFAULT,
733 incremental: PRIORITY_INCREMENTAL_DEFAULT,
734 }
735 }
736 }
737
738 impl Priority {
739 /// Creates a new Priority.
new(urgency: u8, incremental: bool) -> Self740 pub const fn new(urgency: u8, incremental: bool) -> Self {
741 Priority {
742 urgency,
743 incremental,
744 }
745 }
746 }
747
748 #[cfg(feature = "sfv")]
749 #[cfg_attr(docsrs, doc(cfg(feature = "sfv")))]
750 impl TryFrom<&[u8]> for Priority {
751 type Error = crate::h3::Error;
752
753 /// Try to parse an Extensible Priority field value.
754 ///
755 /// The field value is expected to be a Structured Fields Dictionary; see
756 /// [Extensible Priorities].
757 ///
758 /// If the `u` or `i` fields are contained with correct types, a constructed
759 /// Priority object is returned. Note that urgency values outside of valid
760 /// range (0 through 7) are clamped to 7.
761 ///
762 /// If the `u` or `i` fields are contained with the wrong types,
763 /// Error::Done is returned.
764 ///
765 /// Omitted parameters will yield default values.
766 ///
767 /// [Extensible Priorities]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
try_from(value: &[u8]) -> std::result::Result<Self, Self::Error>768 fn try_from(value: &[u8]) -> std::result::Result<Self, Self::Error> {
769 let dict = match sfv::Parser::parse_dictionary(value) {
770 Ok(v) => v,
771
772 Err(_) => return Err(Error::Done),
773 };
774
775 let urgency = match dict.get("u") {
776 // If there is a u parameter, try to read it as an Item of type
777 // Integer. If the value out of the spec's allowed range
778 // (0 through 7), that's an error so set it to the upper
779 // bound (lowest priority) to avoid interference with
780 // other streams.
781 Some(sfv::ListEntry::Item(item)) => match item.bare_item.as_int() {
782 Some(v) => {
783 if !(PRIORITY_URGENCY_LOWER_BOUND as i64..=
784 PRIORITY_URGENCY_UPPER_BOUND as i64)
785 .contains(&v)
786 {
787 PRIORITY_URGENCY_UPPER_BOUND
788 } else {
789 v as u8
790 }
791 },
792
793 None => return Err(Error::Done),
794 },
795
796 Some(sfv::ListEntry::InnerList(_)) => return Err(Error::Done),
797
798 // Omitted so use default value.
799 None => PRIORITY_URGENCY_DEFAULT,
800 };
801
802 let incremental = match dict.get("i") {
803 Some(sfv::ListEntry::Item(item)) =>
804 item.bare_item.as_bool().ok_or(Error::Done)?,
805
806 // Omitted so use default value.
807 _ => false,
808 };
809
810 Ok(Priority::new(urgency, incremental))
811 }
812 }
813
814 struct ConnectionSettings {
815 pub max_field_section_size: Option<u64>,
816 pub qpack_max_table_capacity: Option<u64>,
817 pub qpack_blocked_streams: Option<u64>,
818 pub connect_protocol_enabled: Option<u64>,
819 pub h3_datagram: Option<u64>,
820 pub raw: Option<Vec<(u64, u64)>>,
821 }
822
823 struct QpackStreams {
824 pub encoder_stream_id: Option<u64>,
825 pub decoder_stream_id: Option<u64>,
826 }
827
828 /// An HTTP/3 connection.
829 pub struct Connection {
830 is_server: bool,
831
832 next_request_stream_id: u64,
833 next_uni_stream_id: u64,
834
835 streams: crate::stream::StreamIdHashMap<stream::Stream>,
836
837 local_settings: ConnectionSettings,
838 peer_settings: ConnectionSettings,
839
840 control_stream_id: Option<u64>,
841 peer_control_stream_id: Option<u64>,
842
843 qpack_encoder: qpack::Encoder,
844 qpack_decoder: qpack::Decoder,
845
846 local_qpack_streams: QpackStreams,
847 peer_qpack_streams: QpackStreams,
848
849 max_push_id: u64,
850
851 finished_streams: VecDeque<u64>,
852
853 frames_greased: bool,
854
855 local_goaway_id: Option<u64>,
856 peer_goaway_id: Option<u64>,
857
858 dgram_event_triggered: bool,
859 }
860
861 impl Connection {
new( config: &Config, is_server: bool, enable_dgram: bool, ) -> Result<Connection>862 fn new(
863 config: &Config, is_server: bool, enable_dgram: bool,
864 ) -> Result<Connection> {
865 let initial_uni_stream_id = if is_server { 0x3 } else { 0x2 };
866 let h3_datagram = if enable_dgram { Some(1) } else { None };
867
868 Ok(Connection {
869 is_server,
870
871 next_request_stream_id: 0,
872
873 next_uni_stream_id: initial_uni_stream_id,
874
875 streams: Default::default(),
876
877 local_settings: ConnectionSettings {
878 max_field_section_size: config.max_field_section_size,
879 qpack_max_table_capacity: config.qpack_max_table_capacity,
880 qpack_blocked_streams: config.qpack_blocked_streams,
881 connect_protocol_enabled: config.connect_protocol_enabled,
882 h3_datagram,
883 raw: Default::default(),
884 },
885
886 peer_settings: ConnectionSettings {
887 max_field_section_size: None,
888 qpack_max_table_capacity: None,
889 qpack_blocked_streams: None,
890 h3_datagram: None,
891 connect_protocol_enabled: None,
892 raw: Default::default(),
893 },
894
895 control_stream_id: None,
896 peer_control_stream_id: None,
897
898 qpack_encoder: qpack::Encoder::new(),
899 qpack_decoder: qpack::Decoder::new(),
900
901 local_qpack_streams: QpackStreams {
902 encoder_stream_id: None,
903 decoder_stream_id: None,
904 },
905
906 peer_qpack_streams: QpackStreams {
907 encoder_stream_id: None,
908 decoder_stream_id: None,
909 },
910
911 max_push_id: 0,
912
913 finished_streams: VecDeque::new(),
914
915 frames_greased: false,
916
917 local_goaway_id: None,
918 peer_goaway_id: None,
919
920 dgram_event_triggered: false,
921 })
922 }
923
924 /// Creates a new HTTP/3 connection using the provided QUIC connection.
925 ///
926 /// This will also initiate the HTTP/3 handshake with the peer by opening
927 /// all control streams (including QPACK) and sending the local settings.
928 ///
929 /// On success the new connection is returned.
930 ///
931 /// The [`StreamLimit`] error is returned when the HTTP/3 control stream
932 /// cannot be created due to stream limits.
933 ///
934 /// The [`InternalError`] error is returned when either the underlying QUIC
935 /// connection is not in a suitable state, or the HTTP/3 control stream
936 /// cannot be created due to flow control limits.
937 ///
938 /// [`StreamLimit`]: ../enum.Error.html#variant.StreamLimit
939 /// [`InternalError`]: ../enum.Error.html#variant.InternalError
with_transport( conn: &mut super::Connection, config: &Config, ) -> Result<Connection>940 pub fn with_transport(
941 conn: &mut super::Connection, config: &Config,
942 ) -> Result<Connection> {
943 let is_client = !conn.is_server;
944 if is_client && !(conn.is_established() || conn.is_in_early_data()) {
945 trace!("{} QUIC connection must be established or in early data before creating an HTTP/3 connection", conn.trace_id());
946 return Err(Error::InternalError);
947 }
948
949 let mut http3_conn =
950 Connection::new(config, conn.is_server, conn.dgram_enabled())?;
951
952 match http3_conn.send_settings(conn) {
953 Ok(_) => (),
954
955 Err(e) => {
956 conn.close(true, e.to_wire(), b"Error opening control stream")?;
957 return Err(e);
958 },
959 };
960
961 // Try opening QPACK streams, but ignore errors if it fails since we
962 // don't need them right now.
963 http3_conn.open_qpack_encoder_stream(conn).ok();
964 http3_conn.open_qpack_decoder_stream(conn).ok();
965
966 if conn.grease {
967 // Try opening a GREASE stream, but ignore errors since it's not
968 // critical.
969 http3_conn.open_grease_stream(conn).ok();
970 }
971
972 Ok(http3_conn)
973 }
974
975 /// Sends an HTTP/3 request.
976 ///
977 /// The request is encoded from the provided list of headers without a
978 /// body, and sent on a newly allocated stream. To include a body,
979 /// set `fin` as `false` and subsequently call [`send_body()`] with the
980 /// same `conn` and the `stream_id` returned from this method.
981 ///
982 /// On success the newly allocated stream ID is returned.
983 ///
984 /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
985 /// doesn't have enough capacity for the operation to complete. When this
986 /// happens the application should retry the operation once the stream is
987 /// reported as writable again.
988 ///
989 /// [`send_body()`]: struct.Connection.html#method.send_body
990 /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
send_request<T: NameValue>( &mut self, conn: &mut super::Connection, headers: &[T], fin: bool, ) -> Result<u64>991 pub fn send_request<T: NameValue>(
992 &mut self, conn: &mut super::Connection, headers: &[T], fin: bool,
993 ) -> Result<u64> {
994 // If we received a GOAWAY from the peer, MUST NOT initiate new
995 // requests.
996 if self.peer_goaway_id.is_some() {
997 return Err(Error::FrameUnexpected);
998 }
999
1000 let stream_id = self.next_request_stream_id;
1001
1002 self.streams
1003 .insert(stream_id, stream::Stream::new(stream_id, true));
1004
1005 // The underlying QUIC stream does not exist yet, so calls to e.g.
1006 // stream_capacity() will fail. By writing a 0-length buffer, we force
1007 // the creation of the QUIC stream state, without actually writing
1008 // anything.
1009 if let Err(e) = conn.stream_send(stream_id, b"", false) {
1010 self.streams.remove(&stream_id);
1011
1012 if e == super::Error::Done {
1013 return Err(Error::StreamBlocked);
1014 }
1015
1016 return Err(e.into());
1017 };
1018
1019 self.send_headers(conn, stream_id, headers, fin)?;
1020
1021 // To avoid skipping stream IDs, we only calculate the next available
1022 // stream ID when a request has been successfully buffered.
1023 self.next_request_stream_id = self
1024 .next_request_stream_id
1025 .checked_add(4)
1026 .ok_or(Error::IdError)?;
1027
1028 Ok(stream_id)
1029 }
1030
1031 /// Sends an HTTP/3 response on the specified stream with default priority.
1032 ///
1033 /// This method sends the provided `headers` without a body. To include a
1034 /// body, set `fin` as `false` and subsequently call [`send_body()`] with
1035 /// the same `conn` and `stream_id`.
1036 ///
1037 /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1038 /// doesn't have enough capacity for the operation to complete. When this
1039 /// happens the application should retry the operation once the stream is
1040 /// reported as writable again.
1041 ///
1042 /// [`send_body()`]: struct.Connection.html#method.send_body
1043 /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
send_response<T: NameValue>( &mut self, conn: &mut super::Connection, stream_id: u64, headers: &[T], fin: bool, ) -> Result<()>1044 pub fn send_response<T: NameValue>(
1045 &mut self, conn: &mut super::Connection, stream_id: u64, headers: &[T],
1046 fin: bool,
1047 ) -> Result<()> {
1048 let priority = Default::default();
1049
1050 self.send_response_with_priority(
1051 conn, stream_id, headers, &priority, fin,
1052 )?;
1053
1054 Ok(())
1055 }
1056
1057 /// Sends an HTTP/3 response on the specified stream with specified
1058 /// priority.
1059 ///
1060 /// The `priority` parameter represents [Extensible Priority]
1061 /// parameters. If the urgency is outside the range 0-7, it will be clamped
1062 /// to 7.
1063 ///
1064 /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1065 /// doesn't have enough capacity for the operation to complete. When this
1066 /// happens the application should retry the operation once the stream is
1067 /// reported as writable again.
1068 ///
1069 /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1070 /// [Extensible Priority]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
send_response_with_priority<T: NameValue>( &mut self, conn: &mut super::Connection, stream_id: u64, headers: &[T], priority: &Priority, fin: bool, ) -> Result<()>1071 pub fn send_response_with_priority<T: NameValue>(
1072 &mut self, conn: &mut super::Connection, stream_id: u64, headers: &[T],
1073 priority: &Priority, fin: bool,
1074 ) -> Result<()> {
1075 if !self.streams.contains_key(&stream_id) {
1076 return Err(Error::FrameUnexpected);
1077 }
1078
1079 // Clamp and shift urgency into quiche-priority space
1080 let urgency = priority
1081 .urgency
1082 .clamp(PRIORITY_URGENCY_LOWER_BOUND, PRIORITY_URGENCY_UPPER_BOUND) +
1083 PRIORITY_URGENCY_OFFSET;
1084
1085 conn.stream_priority(stream_id, urgency, priority.incremental)?;
1086
1087 self.send_headers(conn, stream_id, headers, fin)?;
1088
1089 Ok(())
1090 }
1091
encode_header_block<T: NameValue>( &mut self, headers: &[T], ) -> Result<Vec<u8>>1092 fn encode_header_block<T: NameValue>(
1093 &mut self, headers: &[T],
1094 ) -> Result<Vec<u8>> {
1095 let headers_len = headers
1096 .iter()
1097 .fold(0, |acc, h| acc + h.value().len() + h.name().len() + 32);
1098
1099 let mut header_block = vec![0; headers_len];
1100 let len = self
1101 .qpack_encoder
1102 .encode(headers, &mut header_block)
1103 .map_err(|_| Error::InternalError)?;
1104
1105 header_block.truncate(len);
1106
1107 Ok(header_block)
1108 }
1109
send_headers<T: NameValue>( &mut self, conn: &mut super::Connection, stream_id: u64, headers: &[T], fin: bool, ) -> Result<()>1110 fn send_headers<T: NameValue>(
1111 &mut self, conn: &mut super::Connection, stream_id: u64, headers: &[T],
1112 fin: bool,
1113 ) -> Result<()> {
1114 let mut d = [42; 10];
1115 let mut b = octets::OctetsMut::with_slice(&mut d);
1116
1117 if !self.frames_greased && conn.grease {
1118 self.send_grease_frames(conn, stream_id)?;
1119 self.frames_greased = true;
1120 }
1121
1122 let header_block = self.encode_header_block(headers)?;
1123
1124 let overhead = octets::varint_len(frame::HEADERS_FRAME_TYPE_ID) +
1125 octets::varint_len(header_block.len() as u64);
1126
1127 // Headers need to be sent atomically, so make sure the stream has
1128 // enough capacity.
1129 match conn.stream_writable(stream_id, overhead + header_block.len()) {
1130 Ok(true) => (),
1131
1132 Ok(false) => return Err(Error::StreamBlocked),
1133
1134 Err(e) => {
1135 if conn.stream_finished(stream_id) {
1136 self.streams.remove(&stream_id);
1137 }
1138
1139 return Err(e.into());
1140 },
1141 };
1142
1143 b.put_varint(frame::HEADERS_FRAME_TYPE_ID)?;
1144 b.put_varint(header_block.len() as u64)?;
1145 let off = b.off();
1146 conn.stream_send(stream_id, &d[..off], false)?;
1147
1148 // Sending header block separately avoids unnecessary copy.
1149 conn.stream_send(stream_id, &header_block, fin)?;
1150
1151 trace!(
1152 "{} tx frm HEADERS stream={} len={} fin={}",
1153 conn.trace_id(),
1154 stream_id,
1155 header_block.len(),
1156 fin
1157 );
1158
1159 qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
1160 let qlog_headers = headers
1161 .iter()
1162 .map(|h| qlog::events::h3::HttpHeader {
1163 name: String::from_utf8_lossy(h.name()).into_owned(),
1164 value: String::from_utf8_lossy(h.value()).into_owned(),
1165 })
1166 .collect();
1167
1168 let frame = Http3Frame::Headers {
1169 headers: qlog_headers,
1170 };
1171 let ev_data = EventData::H3FrameCreated(H3FrameCreated {
1172 stream_id,
1173 length: Some(header_block.len() as u64),
1174 frame,
1175 raw: None,
1176 });
1177
1178 q.add_event_data_now(ev_data).ok();
1179 });
1180
1181 if let Some(s) = self.streams.get_mut(&stream_id) {
1182 s.initialize_local();
1183 }
1184
1185 if fin && conn.stream_finished(stream_id) {
1186 self.streams.remove(&stream_id);
1187 }
1188
1189 Ok(())
1190 }
1191
1192 /// Sends an HTTP/3 body chunk on the given stream.
1193 ///
1194 /// On success the number of bytes written is returned, or [`Done`] if no
1195 /// bytes could be written (e.g. because the stream is blocked).
1196 ///
1197 /// Note that the number of written bytes returned can be lower than the
1198 /// length of the input buffer when the underlying QUIC stream doesn't have
1199 /// enough capacity for the operation to complete.
1200 ///
1201 /// When a partial write happens (including when [`Done`] is returned) the
1202 /// application should retry the operation once the stream is reported as
1203 /// writable again.
1204 ///
1205 /// [`Done`]: enum.Error.html#variant.Done
send_body( &mut self, conn: &mut super::Connection, stream_id: u64, body: &[u8], fin: bool, ) -> Result<usize>1206 pub fn send_body(
1207 &mut self, conn: &mut super::Connection, stream_id: u64, body: &[u8],
1208 fin: bool,
1209 ) -> Result<usize> {
1210 let mut d = [42; 10];
1211 let mut b = octets::OctetsMut::with_slice(&mut d);
1212
1213 // Validate that it is sane to send data on the stream.
1214 if stream_id % 4 != 0 {
1215 return Err(Error::FrameUnexpected);
1216 }
1217
1218 match self.streams.get(&stream_id) {
1219 Some(s) =>
1220 if !s.local_initialized() {
1221 return Err(Error::FrameUnexpected);
1222 },
1223
1224 None => {
1225 return Err(Error::FrameUnexpected);
1226 },
1227 };
1228
1229 // Avoid sending 0-length DATA frames when the fin flag is false.
1230 if body.is_empty() && !fin {
1231 return Err(Error::Done);
1232 }
1233
1234 let overhead = octets::varint_len(frame::DATA_FRAME_TYPE_ID) +
1235 octets::varint_len(body.len() as u64);
1236
1237 let stream_cap = match conn.stream_capacity(stream_id) {
1238 Ok(v) => v,
1239
1240 Err(e) => {
1241 if conn.stream_finished(stream_id) {
1242 self.streams.remove(&stream_id);
1243 }
1244
1245 return Err(e.into());
1246 },
1247 };
1248
1249 // Make sure there is enough capacity to send the DATA frame header.
1250 if stream_cap < overhead {
1251 let _ = conn.stream_writable(stream_id, overhead + 1);
1252 return Err(Error::Done);
1253 }
1254
1255 // Cap the frame payload length to the stream's capacity.
1256 let body_len = std::cmp::min(body.len(), stream_cap - overhead);
1257
1258 // If we can't send the entire body, set the fin flag to false so the
1259 // application can try again later.
1260 let fin = if body_len != body.len() { false } else { fin };
1261
1262 // Again, avoid sending 0-length DATA frames when the fin flag is false.
1263 if body_len == 0 && !fin {
1264 let _ = conn.stream_writable(stream_id, overhead + 1);
1265 return Err(Error::Done);
1266 }
1267
1268 b.put_varint(frame::DATA_FRAME_TYPE_ID)?;
1269 b.put_varint(body_len as u64)?;
1270 let off = b.off();
1271 conn.stream_send(stream_id, &d[..off], false)?;
1272
1273 // Return how many bytes were written, excluding the frame header.
1274 // Sending body separately avoids unnecessary copy.
1275 let written = conn.stream_send(stream_id, &body[..body_len], fin)?;
1276
1277 trace!(
1278 "{} tx frm DATA stream={} len={} fin={}",
1279 conn.trace_id(),
1280 stream_id,
1281 written,
1282 fin
1283 );
1284
1285 qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
1286 let frame = Http3Frame::Data { raw: None };
1287 let ev_data = EventData::H3FrameCreated(H3FrameCreated {
1288 stream_id,
1289 length: Some(written as u64),
1290 frame,
1291 raw: None,
1292 });
1293
1294 q.add_event_data_now(ev_data).ok();
1295 });
1296
1297 if written < body.len() {
1298 // Ensure the peer is notified that the connection or stream is
1299 // blocked when the stream's capacity is limited by flow control.
1300 //
1301 // We only need enough capacity to send a few bytes, to make sure
1302 // the stream doesn't hang due to congestion window not growing
1303 // enough.
1304 let _ = conn.stream_writable(stream_id, overhead + 1);
1305 }
1306
1307 if fin && written == body.len() && conn.stream_finished(stream_id) {
1308 self.streams.remove(&stream_id);
1309 }
1310
1311 Ok(written)
1312 }
1313
1314 /// Returns whether the peer enabled HTTP/3 DATAGRAM frame support.
1315 ///
1316 /// Support is signalled by the peer's SETTINGS, so this method always
1317 /// returns false until they have been processed using the [`poll()`]
1318 /// method.
1319 ///
1320 /// [`poll()`]: struct.Connection.html#method.poll
dgram_enabled_by_peer(&self, conn: &super::Connection) -> bool1321 pub fn dgram_enabled_by_peer(&self, conn: &super::Connection) -> bool {
1322 self.peer_settings.h3_datagram == Some(1) &&
1323 conn.dgram_max_writable_len().is_some()
1324 }
1325
1326 /// Returns whether the peer enabled extended CONNECT support.
1327 ///
1328 /// Support is signalled by the peer's SETTINGS, so this method always
1329 /// returns false until they have been processed using the [`poll()`]
1330 /// method.
1331 ///
1332 /// [`poll()`]: struct.Connection.html#method.poll
extended_connect_enabled_by_peer(&self) -> bool1333 pub fn extended_connect_enabled_by_peer(&self) -> bool {
1334 self.peer_settings.connect_protocol_enabled == Some(1)
1335 }
1336
1337 /// Sends an HTTP/3 DATAGRAM with the specified flow ID.
send_dgram( &mut self, conn: &mut super::Connection, flow_id: u64, buf: &[u8], ) -> Result<()>1338 pub fn send_dgram(
1339 &mut self, conn: &mut super::Connection, flow_id: u64, buf: &[u8],
1340 ) -> Result<()> {
1341 let len = octets::varint_len(flow_id) + buf.len();
1342 let mut d = vec![0; len];
1343 let mut b = octets::OctetsMut::with_slice(&mut d);
1344
1345 b.put_varint(flow_id)?;
1346 b.put_bytes(buf)?;
1347
1348 conn.dgram_send_vec(d)?;
1349
1350 Ok(())
1351 }
1352
1353 /// Reads a DATAGRAM into the provided buffer.
1354 ///
1355 /// Applications should call this method whenever the [`poll()`] method
1356 /// returns a [`Datagram`] event.
1357 ///
1358 /// On success the DATAGRAM data is returned, with length and Flow ID and
1359 /// length of the Flow ID.
1360 ///
1361 /// [`Done`] is returned if there is no data to read.
1362 ///
1363 /// [`BufferTooShort`] is returned if the provided buffer is too small for
1364 /// the data.
1365 ///
1366 /// [`poll()`]: struct.Connection.html#method.poll
1367 /// [`Datagram`]: enum.Event.html#variant.Datagram
1368 /// [`Done`]: enum.Error.html#variant.Done
1369 /// [`BufferTooShort`]: enum.Error.html#variant.BufferTooShort
recv_dgram( &mut self, conn: &mut super::Connection, buf: &mut [u8], ) -> Result<(usize, u64, usize)>1370 pub fn recv_dgram(
1371 &mut self, conn: &mut super::Connection, buf: &mut [u8],
1372 ) -> Result<(usize, u64, usize)> {
1373 let len = conn.dgram_recv(buf)?;
1374 let mut b = octets::Octets::with_slice(buf);
1375 let flow_id = b.get_varint()?;
1376 Ok((len, flow_id, b.off()))
1377 }
1378
1379 /// Returns the maximum HTTP/3 DATAGRAM payload that can be sent.
dgram_max_writable_len( &self, conn: &super::Connection, flow_id: u64, ) -> Option<usize>1380 pub fn dgram_max_writable_len(
1381 &self, conn: &super::Connection, flow_id: u64,
1382 ) -> Option<usize> {
1383 let flow_id_len = octets::varint_len(flow_id);
1384 match conn.dgram_max_writable_len() {
1385 None => None,
1386 Some(len) => len.checked_sub(flow_id_len),
1387 }
1388 }
1389
1390 // A helper function for determining if there is a DATAGRAM event.
process_dgrams( &mut self, conn: &mut super::Connection, ) -> Result<(u64, Event)>1391 fn process_dgrams(
1392 &mut self, conn: &mut super::Connection,
1393 ) -> Result<(u64, Event)> {
1394 if conn.dgram_recv_queue_len() > 0 {
1395 if self.dgram_event_triggered {
1396 return Err(Error::Done);
1397 }
1398
1399 self.dgram_event_triggered = true;
1400
1401 return Ok((0, Event::Datagram));
1402 }
1403
1404 self.dgram_event_triggered = false;
1405
1406 Err(Error::Done)
1407 }
1408
1409 /// Reads request or response body data into the provided buffer.
1410 ///
1411 /// Applications should call this method whenever the [`poll()`] method
1412 /// returns a [`Data`] event.
1413 ///
1414 /// On success the amount of bytes read is returned, or [`Done`] if there
1415 /// is no data to read.
1416 ///
1417 /// [`poll()`]: struct.Connection.html#method.poll
1418 /// [`Data`]: enum.Event.html#variant.Data
1419 /// [`Done`]: enum.Error.html#variant.Done
recv_body( &mut self, conn: &mut super::Connection, stream_id: u64, out: &mut [u8], ) -> Result<usize>1420 pub fn recv_body(
1421 &mut self, conn: &mut super::Connection, stream_id: u64, out: &mut [u8],
1422 ) -> Result<usize> {
1423 let mut total = 0;
1424
1425 // Try to consume all buffered data for the stream, even across multiple
1426 // DATA frames.
1427 while total < out.len() {
1428 let stream = self.streams.get_mut(&stream_id).ok_or(Error::Done)?;
1429
1430 if stream.state() != stream::State::Data {
1431 break;
1432 }
1433
1434 let (read, fin) =
1435 match stream.try_consume_data(conn, &mut out[total..]) {
1436 Ok(v) => v,
1437
1438 Err(Error::Done) => break,
1439
1440 Err(e) => return Err(e),
1441 };
1442
1443 total += read;
1444
1445 // No more data to read, we are done.
1446 if read == 0 || fin {
1447 break;
1448 }
1449
1450 // Process incoming data from the stream. For example, if a whole
1451 // DATA frame was consumed, and another one is queued behind it,
1452 // this will ensure the additional data will also be returned to
1453 // the application.
1454 match self.process_readable_stream(conn, stream_id, false) {
1455 Ok(_) => unreachable!(),
1456
1457 Err(Error::Done) => (),
1458
1459 Err(e) => return Err(e),
1460 };
1461
1462 if conn.stream_finished(stream_id) {
1463 break;
1464 }
1465 }
1466
1467 // While body is being received, the stream is marked as finished only
1468 // when all data is read by the application.
1469 if conn.stream_finished(stream_id) {
1470 self.process_finished_stream(stream_id);
1471 }
1472
1473 if total == 0 {
1474 return Err(Error::Done);
1475 }
1476
1477 Ok(total)
1478 }
1479
1480 /// Sends a PRIORITY_UPDATE frame on the control stream with specified
1481 /// request stream ID and priority.
1482 ///
1483 /// The `priority` parameter represents [Extensible Priority]
1484 /// parameters. If the urgency is outside the range 0-7, it will be clamped
1485 /// to 7.
1486 ///
1487 /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1488 /// doesn't have enough capacity for the operation to complete. When this
1489 /// happens the application should retry the operation once the stream is
1490 /// reported as writable again.
1491 ///
1492 /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1493 /// [Extensible Priority]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
send_priority_update_for_request( &mut self, conn: &mut super::Connection, stream_id: u64, priority: &Priority, ) -> Result<()>1494 pub fn send_priority_update_for_request(
1495 &mut self, conn: &mut super::Connection, stream_id: u64,
1496 priority: &Priority,
1497 ) -> Result<()> {
1498 let mut d = [42; 20];
1499 let mut b = octets::OctetsMut::with_slice(&mut d);
1500
1501 // Validate that it is sane to send PRIORITY_UPDATE.
1502 if self.is_server {
1503 return Err(Error::FrameUnexpected);
1504 }
1505
1506 if stream_id % 4 != 0 {
1507 return Err(Error::FrameUnexpected);
1508 }
1509
1510 let control_stream_id =
1511 self.control_stream_id.ok_or(Error::FrameUnexpected)?;
1512
1513 let urgency = priority
1514 .urgency
1515 .clamp(PRIORITY_URGENCY_LOWER_BOUND, PRIORITY_URGENCY_UPPER_BOUND);
1516
1517 let mut field_value = format!("u={urgency}");
1518
1519 if priority.incremental {
1520 field_value.push_str(",i");
1521 }
1522
1523 let priority_field_value = field_value.as_bytes();
1524 let frame_payload_len =
1525 octets::varint_len(stream_id) + priority_field_value.len();
1526
1527 let overhead =
1528 octets::varint_len(frame::PRIORITY_UPDATE_FRAME_REQUEST_TYPE_ID) +
1529 octets::varint_len(stream_id) +
1530 octets::varint_len(frame_payload_len as u64);
1531
1532 // Make sure the control stream has enough capacity.
1533 match conn.stream_writable(
1534 control_stream_id,
1535 overhead + priority_field_value.len(),
1536 ) {
1537 Ok(true) => (),
1538
1539 Ok(false) => return Err(Error::StreamBlocked),
1540
1541 Err(e) => {
1542 return Err(e.into());
1543 },
1544 }
1545
1546 b.put_varint(frame::PRIORITY_UPDATE_FRAME_REQUEST_TYPE_ID)?;
1547 b.put_varint(frame_payload_len as u64)?;
1548 b.put_varint(stream_id)?;
1549 let off = b.off();
1550 conn.stream_send(control_stream_id, &d[..off], false)?;
1551
1552 // Sending field value separately avoids unnecessary copy.
1553 conn.stream_send(control_stream_id, priority_field_value, false)?;
1554
1555 trace!(
1556 "{} tx frm PRIORITY_UPDATE request_stream={} priority_field_value={}",
1557 conn.trace_id(),
1558 stream_id,
1559 field_value,
1560 );
1561
1562 qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
1563 let frame = Http3Frame::PriorityUpdate {
1564 target_stream_type: H3PriorityTargetStreamType::Request,
1565 prioritized_element_id: stream_id,
1566 priority_field_value: field_value.clone(),
1567 };
1568
1569 let ev_data = EventData::H3FrameCreated(H3FrameCreated {
1570 stream_id,
1571 length: Some(priority_field_value.len() as u64),
1572 frame,
1573 raw: None,
1574 });
1575
1576 q.add_event_data_now(ev_data).ok();
1577 });
1578
1579 Ok(())
1580 }
1581
1582 /// Take the last PRIORITY_UPDATE for a prioritized element ID.
1583 ///
1584 /// When the [`poll()`] method returns a [`PriorityUpdate`] event for a
1585 /// prioritized element, the event has triggered and will not rearm until
1586 /// applications call this method. It is recommended that applications defer
1587 /// taking the PRIORITY_UPDATE until after [`poll()`] returns [`Done`].
1588 ///
1589 /// On success the Priority Field Value is returned, or [`Done`] if there is
1590 /// no PRIORITY_UPDATE to read (either because there is no value to take, or
1591 /// because the prioritized element does not exist).
1592 ///
1593 /// [`poll()`]: struct.Connection.html#method.poll
1594 /// [`PriorityUpdate`]: enum.Event.html#variant.PriorityUpdate
1595 /// [`Done`]: enum.Error.html#variant.Done
take_last_priority_update( &mut self, prioritized_element_id: u64, ) -> Result<Vec<u8>>1596 pub fn take_last_priority_update(
1597 &mut self, prioritized_element_id: u64,
1598 ) -> Result<Vec<u8>> {
1599 if let Some(stream) = self.streams.get_mut(&prioritized_element_id) {
1600 return stream.take_last_priority_update().ok_or(Error::Done);
1601 }
1602
1603 Err(Error::Done)
1604 }
1605
1606 /// Processes HTTP/3 data received from the peer.
1607 ///
1608 /// On success it returns an [`Event`] and an ID, or [`Done`] when there are
1609 /// no events to report.
1610 ///
1611 /// Note that all events are edge-triggered, meaning that once reported they
1612 /// will not be reported again by calling this method again, until the event
1613 /// is re-armed.
1614 ///
1615 /// The events [`Headers`], [`Data`] and [`Finished`] return a stream ID,
1616 /// which is used in methods [`recv_body()`], [`send_response()`] or
1617 /// [`send_body()`].
1618 ///
1619 /// The event [`Datagram`] returns a dummy value of `0`, this should be
1620 /// ignored by the application.
1621 ///
1622 /// The event [`GoAway`] returns an ID that depends on the connection role.
1623 /// A client receives the largest processed stream ID. A server receives the
1624 /// the largest permitted push ID.
1625 ///
1626 /// The event [`PriorityUpdate`] only occurs at servers. It returns a
1627 /// prioritized element ID that is used in the method
1628 /// [`take_last_priority_update()`], which rearms the event for that ID.
1629 ///
1630 /// If an error occurs while processing data, the connection is closed with
1631 /// the appropriate error code, using the transport's [`close()`] method.
1632 ///
1633 /// [`Event`]: enum.Event.html
1634 /// [`Done`]: enum.Error.html#variant.Done
1635 /// [`Headers`]: enum.Event.html#variant.Headers
1636 /// [`Data`]: enum.Event.html#variant.Data
1637 /// [`Finished`]: enum.Event.html#variant.Finished
1638 /// [`Datagram`]: enum.Event.html#variant.Datagram
1639 /// [`GoAway`]: enum.Event.html#variant.GoAWay
1640 /// [`PriorityUpdate`]: enum.Event.html#variant.PriorityUpdate
1641 /// [`recv_body()`]: struct.Connection.html#method.recv_body
1642 /// [`send_response()`]: struct.Connection.html#method.send_response
1643 /// [`send_body()`]: struct.Connection.html#method.send_body
1644 /// [`recv_dgram()`]: struct.Connection.html#method.recv_dgram
1645 /// [`take_last_priority_update()`]: struct.Connection.html#method.take_last_priority_update
1646 /// [`close()`]: ../struct.Connection.html#method.close
poll(&mut self, conn: &mut super::Connection) -> Result<(u64, Event)>1647 pub fn poll(&mut self, conn: &mut super::Connection) -> Result<(u64, Event)> {
1648 // When connection close is initiated by the local application (e.g. due
1649 // to a protocol error), the connection itself might be in a broken
1650 // state, so return early.
1651 if conn.local_error.is_some() {
1652 return Err(Error::Done);
1653 }
1654
1655 // Process control streams first.
1656 if let Some(stream_id) = self.peer_control_stream_id {
1657 match self.process_control_stream(conn, stream_id) {
1658 Ok(ev) => return Ok(ev),
1659
1660 Err(Error::Done) => (),
1661
1662 Err(e) => return Err(e),
1663 };
1664 }
1665
1666 if let Some(stream_id) = self.peer_qpack_streams.encoder_stream_id {
1667 match self.process_control_stream(conn, stream_id) {
1668 Ok(ev) => return Ok(ev),
1669
1670 Err(Error::Done) => (),
1671
1672 Err(e) => return Err(e),
1673 };
1674 }
1675
1676 if let Some(stream_id) = self.peer_qpack_streams.decoder_stream_id {
1677 match self.process_control_stream(conn, stream_id) {
1678 Ok(ev) => return Ok(ev),
1679
1680 Err(Error::Done) => (),
1681
1682 Err(e) => return Err(e),
1683 };
1684 }
1685
1686 // Process finished streams list.
1687 if let Some(finished) = self.finished_streams.pop_front() {
1688 return Ok((finished, Event::Finished));
1689 }
1690
1691 // Process queued DATAGRAMs if the poll threshold allows it.
1692 match self.process_dgrams(conn) {
1693 Ok(v) => return Ok(v),
1694
1695 Err(Error::Done) => (),
1696
1697 Err(e) => return Err(e),
1698 };
1699
1700 // Process HTTP/3 data from readable streams.
1701 for s in conn.readable() {
1702 trace!("{} stream id {} is readable", conn.trace_id(), s);
1703
1704 let ev = match self.process_readable_stream(conn, s, true) {
1705 Ok(v) => Some(v),
1706
1707 Err(Error::Done) => None,
1708
1709 // Return early if the stream was reset, to avoid returning
1710 // a Finished event later as well.
1711 Err(Error::TransportError(crate::Error::StreamReset(e))) =>
1712 return Ok((s, Event::Reset(e))),
1713
1714 Err(e) => return Err(e),
1715 };
1716
1717 if conn.stream_finished(s) {
1718 self.process_finished_stream(s);
1719 }
1720
1721 // TODO: check if stream is completed so it can be freed
1722 if let Some(ev) = ev {
1723 return Ok(ev);
1724 }
1725 }
1726
1727 // Process finished streams list once again, to make sure `Finished`
1728 // events are returned when receiving empty stream frames with the fin
1729 // flag set.
1730 if let Some(finished) = self.finished_streams.pop_front() {
1731 return Ok((finished, Event::Finished));
1732 }
1733
1734 Err(Error::Done)
1735 }
1736
1737 /// Sends a GOAWAY frame to initiate graceful connection closure.
1738 ///
1739 /// When quiche is used in the server role, the `id` parameter is the stream
1740 /// ID of the highest processed request. This can be any valid ID between 0
1741 /// and 2^62-4. However, the ID cannot be increased. Failure to satisfy
1742 /// these conditions will return an error.
1743 ///
1744 /// This method does not close the QUIC connection. Applications are
1745 /// required to call [`close()`] themselves.
1746 ///
1747 /// [`close()`]: ../struct.Connection.html#method.close
send_goaway( &mut self, conn: &mut super::Connection, id: u64, ) -> Result<()>1748 pub fn send_goaway(
1749 &mut self, conn: &mut super::Connection, id: u64,
1750 ) -> Result<()> {
1751 let mut id = id;
1752
1753 // TODO: server push
1754 //
1755 // In the meantime always send 0 from client.
1756 if !self.is_server {
1757 id = 0;
1758 }
1759
1760 if self.is_server && id % 4 != 0 {
1761 return Err(Error::IdError);
1762 }
1763
1764 if let Some(sent_id) = self.local_goaway_id {
1765 if id > sent_id {
1766 return Err(Error::IdError);
1767 }
1768 }
1769
1770 if let Some(stream_id) = self.control_stream_id {
1771 let mut d = [42; 10];
1772 let mut b = octets::OctetsMut::with_slice(&mut d);
1773
1774 let frame = frame::Frame::GoAway { id };
1775
1776 let wire_len = frame.to_bytes(&mut b)?;
1777 let stream_cap = conn.stream_capacity(stream_id)?;
1778
1779 if stream_cap < wire_len {
1780 return Err(Error::StreamBlocked);
1781 }
1782
1783 trace!("{} tx frm {:?}", conn.trace_id(), frame);
1784
1785 qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
1786 let ev_data = EventData::H3FrameCreated(H3FrameCreated {
1787 stream_id,
1788 length: Some(octets::varint_len(id) as u64),
1789 frame: frame.to_qlog(),
1790 raw: None,
1791 });
1792
1793 q.add_event_data_now(ev_data).ok();
1794 });
1795
1796 let off = b.off();
1797 conn.stream_send(stream_id, &d[..off], false)?;
1798
1799 self.local_goaway_id = Some(id);
1800 }
1801
1802 Ok(())
1803 }
1804
1805 /// Gets the raw settings from peer including unknown and reserved types.
1806 ///
1807 /// The order of settings is the same as received in the SETTINGS frame.
peer_settings_raw(&self) -> Option<&[(u64, u64)]>1808 pub fn peer_settings_raw(&self) -> Option<&[(u64, u64)]> {
1809 self.peer_settings.raw.as_deref()
1810 }
1811
open_uni_stream( &mut self, conn: &mut super::Connection, ty: u64, ) -> Result<u64>1812 fn open_uni_stream(
1813 &mut self, conn: &mut super::Connection, ty: u64,
1814 ) -> Result<u64> {
1815 let stream_id = self.next_uni_stream_id;
1816
1817 let mut d = [0; 8];
1818 let mut b = octets::OctetsMut::with_slice(&mut d);
1819
1820 match ty {
1821 // Control and QPACK streams are the most important to schedule.
1822 stream::HTTP3_CONTROL_STREAM_TYPE_ID |
1823 stream::QPACK_ENCODER_STREAM_TYPE_ID |
1824 stream::QPACK_DECODER_STREAM_TYPE_ID => {
1825 conn.stream_priority(stream_id, 0, true)?;
1826 },
1827
1828 // TODO: Server push
1829 stream::HTTP3_PUSH_STREAM_TYPE_ID => (),
1830
1831 // Anything else is a GREASE stream, so make it the least important.
1832 _ => {
1833 conn.stream_priority(stream_id, 255, true)?;
1834 },
1835 }
1836
1837 conn.stream_send(stream_id, b.put_varint(ty)?, false)?;
1838
1839 // To avoid skipping stream IDs, we only calculate the next available
1840 // stream ID when data has been successfully buffered.
1841 self.next_uni_stream_id = self
1842 .next_uni_stream_id
1843 .checked_add(4)
1844 .ok_or(Error::IdError)?;
1845
1846 Ok(stream_id)
1847 }
1848
open_qpack_encoder_stream( &mut self, conn: &mut super::Connection, ) -> Result<()>1849 fn open_qpack_encoder_stream(
1850 &mut self, conn: &mut super::Connection,
1851 ) -> Result<()> {
1852 let stream_id =
1853 self.open_uni_stream(conn, stream::QPACK_ENCODER_STREAM_TYPE_ID)?;
1854
1855 self.local_qpack_streams.encoder_stream_id = Some(stream_id);
1856
1857 qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
1858 let ev_data = EventData::H3StreamTypeSet(H3StreamTypeSet {
1859 stream_id,
1860 owner: Some(H3Owner::Local),
1861 stream_type: H3StreamType::QpackEncode,
1862 associated_push_id: None,
1863 });
1864
1865 q.add_event_data_now(ev_data).ok();
1866 });
1867
1868 Ok(())
1869 }
1870
open_qpack_decoder_stream( &mut self, conn: &mut super::Connection, ) -> Result<()>1871 fn open_qpack_decoder_stream(
1872 &mut self, conn: &mut super::Connection,
1873 ) -> Result<()> {
1874 let stream_id =
1875 self.open_uni_stream(conn, stream::QPACK_DECODER_STREAM_TYPE_ID)?;
1876
1877 self.local_qpack_streams.decoder_stream_id = Some(stream_id);
1878
1879 qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
1880 let ev_data = EventData::H3StreamTypeSet(H3StreamTypeSet {
1881 stream_id,
1882 owner: Some(H3Owner::Local),
1883 stream_type: H3StreamType::QpackDecode,
1884 associated_push_id: None,
1885 });
1886
1887 q.add_event_data_now(ev_data).ok();
1888 });
1889
1890 Ok(())
1891 }
1892
1893 /// Send GREASE frames on the provided stream ID.
send_grease_frames( &mut self, conn: &mut super::Connection, stream_id: u64, ) -> Result<()>1894 fn send_grease_frames(
1895 &mut self, conn: &mut super::Connection, stream_id: u64,
1896 ) -> Result<()> {
1897 let mut d = [0; 8];
1898
1899 let stream_cap = match conn.stream_capacity(stream_id) {
1900 Ok(v) => v,
1901
1902 Err(e) => {
1903 if conn.stream_finished(stream_id) {
1904 self.streams.remove(&stream_id);
1905 }
1906
1907 return Err(e.into());
1908 },
1909 };
1910
1911 let grease_frame1 = grease_value();
1912 let grease_frame2 = grease_value();
1913 let grease_payload = b"GREASE is the word";
1914
1915 let overhead = octets::varint_len(grease_frame1) + // frame type
1916 1 + // payload len
1917 octets::varint_len(grease_frame2) + // frame type
1918 1 + // payload len
1919 grease_payload.len(); // payload
1920
1921 // Don't send GREASE if there is not enough capacity for it. Greasing
1922 // will _not_ be attempted again later on.
1923 if stream_cap < overhead {
1924 return Ok(());
1925 }
1926
1927 // Empty GREASE frame.
1928 let mut b = octets::OctetsMut::with_slice(&mut d);
1929 conn.stream_send(stream_id, b.put_varint(grease_frame1)?, false)?;
1930
1931 let mut b = octets::OctetsMut::with_slice(&mut d);
1932 conn.stream_send(stream_id, b.put_varint(0)?, false)?;
1933
1934 trace!(
1935 "{} tx frm GREASE stream={} len=0",
1936 conn.trace_id(),
1937 stream_id
1938 );
1939
1940 qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
1941 let frame = Http3Frame::Reserved { length: Some(0) };
1942 let ev_data = EventData::H3FrameCreated(H3FrameCreated {
1943 stream_id,
1944 length: Some(0),
1945 frame,
1946 raw: None,
1947 });
1948
1949 q.add_event_data_now(ev_data).ok();
1950 });
1951
1952 // GREASE frame with payload.
1953 let mut b = octets::OctetsMut::with_slice(&mut d);
1954 conn.stream_send(stream_id, b.put_varint(grease_frame2)?, false)?;
1955
1956 let mut b = octets::OctetsMut::with_slice(&mut d);
1957 conn.stream_send(stream_id, b.put_varint(18)?, false)?;
1958
1959 conn.stream_send(stream_id, grease_payload, false)?;
1960
1961 trace!(
1962 "{} tx frm GREASE stream={} len={}",
1963 conn.trace_id(),
1964 stream_id,
1965 grease_payload.len()
1966 );
1967
1968 qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
1969 let frame = Http3Frame::Reserved {
1970 length: Some(grease_payload.len() as u64),
1971 };
1972 let ev_data = EventData::H3FrameCreated(H3FrameCreated {
1973 stream_id,
1974 length: Some(grease_payload.len() as u64),
1975 frame,
1976 raw: None,
1977 });
1978
1979 q.add_event_data_now(ev_data).ok();
1980 });
1981
1982 Ok(())
1983 }
1984
1985 /// Opens a new unidirectional stream with a GREASE type and sends some
1986 /// unframed payload.
open_grease_stream(&mut self, conn: &mut super::Connection) -> Result<()>1987 fn open_grease_stream(&mut self, conn: &mut super::Connection) -> Result<()> {
1988 match self.open_uni_stream(conn, grease_value()) {
1989 Ok(stream_id) => {
1990 conn.stream_send(stream_id, b"GREASE is the word", true)?;
1991
1992 trace!("{} open GREASE stream {}", conn.trace_id(), stream_id);
1993
1994 qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
1995 let ev_data = EventData::H3StreamTypeSet(H3StreamTypeSet {
1996 stream_id,
1997 owner: Some(H3Owner::Local),
1998 stream_type: H3StreamType::Unknown,
1999 associated_push_id: None,
2000 });
2001
2002 q.add_event_data_now(ev_data).ok();
2003 });
2004 },
2005
2006 Err(Error::IdError) => {
2007 trace!("{} GREASE stream blocked", conn.trace_id(),);
2008
2009 return Ok(());
2010 },
2011
2012 Err(e) => return Err(e),
2013 };
2014
2015 Ok(())
2016 }
2017
2018 /// Sends SETTINGS frame based on HTTP/3 configuration.
send_settings(&mut self, conn: &mut super::Connection) -> Result<()>2019 fn send_settings(&mut self, conn: &mut super::Connection) -> Result<()> {
2020 let stream_id = match self
2021 .open_uni_stream(conn, stream::HTTP3_CONTROL_STREAM_TYPE_ID)
2022 {
2023 Ok(v) => v,
2024
2025 Err(e) => {
2026 trace!("{} Control stream blocked", conn.trace_id(),);
2027
2028 if e == Error::Done {
2029 return Err(Error::InternalError);
2030 }
2031
2032 return Err(e);
2033 },
2034 };
2035
2036 self.control_stream_id = Some(stream_id);
2037
2038 qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2039 let ev_data = EventData::H3StreamTypeSet(H3StreamTypeSet {
2040 stream_id,
2041 owner: Some(H3Owner::Local),
2042 stream_type: H3StreamType::Control,
2043 associated_push_id: None,
2044 });
2045
2046 q.add_event_data_now(ev_data).ok();
2047 });
2048
2049 let grease = if conn.grease {
2050 Some((grease_value(), grease_value()))
2051 } else {
2052 None
2053 };
2054
2055 let frame = frame::Frame::Settings {
2056 max_field_section_size: self.local_settings.max_field_section_size,
2057 qpack_max_table_capacity: self
2058 .local_settings
2059 .qpack_max_table_capacity,
2060 qpack_blocked_streams: self.local_settings.qpack_blocked_streams,
2061 connect_protocol_enabled: self
2062 .local_settings
2063 .connect_protocol_enabled,
2064 h3_datagram: self.local_settings.h3_datagram,
2065 grease,
2066 raw: Default::default(),
2067 };
2068
2069 let mut d = [42; 128];
2070 let mut b = octets::OctetsMut::with_slice(&mut d);
2071
2072 frame.to_bytes(&mut b)?;
2073
2074 let off = b.off();
2075
2076 if let Some(id) = self.control_stream_id {
2077 conn.stream_send(id, &d[..off], false)?;
2078
2079 trace!(
2080 "{} tx frm SETTINGS stream={} len={}",
2081 conn.trace_id(),
2082 id,
2083 off
2084 );
2085
2086 qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
2087 let frame = frame.to_qlog();
2088 let ev_data = EventData::H3FrameCreated(H3FrameCreated {
2089 stream_id: id,
2090 length: Some(off as u64),
2091 frame,
2092 raw: None,
2093 });
2094
2095 q.add_event_data_now(ev_data).ok();
2096 });
2097 }
2098
2099 Ok(())
2100 }
2101
process_control_stream( &mut self, conn: &mut super::Connection, stream_id: u64, ) -> Result<(u64, Event)>2102 fn process_control_stream(
2103 &mut self, conn: &mut super::Connection, stream_id: u64,
2104 ) -> Result<(u64, Event)> {
2105 if conn.stream_finished(stream_id) {
2106 conn.close(
2107 true,
2108 Error::ClosedCriticalStream.to_wire(),
2109 b"Critical stream closed.",
2110 )?;
2111
2112 return Err(Error::ClosedCriticalStream);
2113 }
2114
2115 match self.process_readable_stream(conn, stream_id, true) {
2116 Ok(ev) => return Ok(ev),
2117
2118 Err(Error::Done) => (),
2119
2120 Err(e) => return Err(e),
2121 };
2122
2123 if conn.stream_finished(stream_id) {
2124 conn.close(
2125 true,
2126 Error::ClosedCriticalStream.to_wire(),
2127 b"Critical stream closed.",
2128 )?;
2129
2130 return Err(Error::ClosedCriticalStream);
2131 }
2132
2133 Err(Error::Done)
2134 }
2135
process_readable_stream( &mut self, conn: &mut super::Connection, stream_id: u64, polling: bool, ) -> Result<(u64, Event)>2136 fn process_readable_stream(
2137 &mut self, conn: &mut super::Connection, stream_id: u64, polling: bool,
2138 ) -> Result<(u64, Event)> {
2139 self.streams
2140 .entry(stream_id)
2141 .or_insert_with(|| stream::Stream::new(stream_id, false));
2142
2143 // We need to get a fresh reference to the stream for each
2144 // iteration, to avoid borrowing `self` for the entire duration
2145 // of the loop, because we'll need to borrow it again in the
2146 // `State::FramePayload` case below.
2147 while let Some(stream) = self.streams.get_mut(&stream_id) {
2148 match stream.state() {
2149 stream::State::StreamType => {
2150 stream.try_fill_buffer(conn)?;
2151
2152 let varint = match stream.try_consume_varint() {
2153 Ok(v) => v,
2154
2155 Err(_) => continue,
2156 };
2157
2158 let ty = stream::Type::deserialize(varint)?;
2159
2160 if let Err(e) = stream.set_ty(ty) {
2161 conn.close(true, e.to_wire(), b"")?;
2162 return Err(e);
2163 }
2164
2165 qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2166 let ev_data =
2167 EventData::H3StreamTypeSet(H3StreamTypeSet {
2168 stream_id,
2169 owner: Some(H3Owner::Remote),
2170 stream_type: ty.to_qlog(),
2171 associated_push_id: None,
2172 });
2173
2174 q.add_event_data_now(ev_data).ok();
2175 });
2176
2177 match &ty {
2178 stream::Type::Control => {
2179 // Only one control stream allowed.
2180 if self.peer_control_stream_id.is_some() {
2181 conn.close(
2182 true,
2183 Error::StreamCreationError.to_wire(),
2184 b"Received multiple control streams",
2185 )?;
2186
2187 return Err(Error::StreamCreationError);
2188 }
2189
2190 trace!(
2191 "{} open peer's control stream {}",
2192 conn.trace_id(),
2193 stream_id
2194 );
2195
2196 self.peer_control_stream_id = Some(stream_id);
2197 },
2198
2199 stream::Type::Push => {
2200 // Only clients can receive push stream.
2201 if self.is_server {
2202 conn.close(
2203 true,
2204 Error::StreamCreationError.to_wire(),
2205 b"Server received push stream.",
2206 )?;
2207
2208 return Err(Error::StreamCreationError);
2209 }
2210 },
2211
2212 stream::Type::QpackEncoder => {
2213 // Only one qpack encoder stream allowed.
2214 if self.peer_qpack_streams.encoder_stream_id.is_some()
2215 {
2216 conn.close(
2217 true,
2218 Error::StreamCreationError.to_wire(),
2219 b"Received multiple QPACK encoder streams",
2220 )?;
2221
2222 return Err(Error::StreamCreationError);
2223 }
2224
2225 self.peer_qpack_streams.encoder_stream_id =
2226 Some(stream_id);
2227 },
2228
2229 stream::Type::QpackDecoder => {
2230 // Only one qpack decoder allowed.
2231 if self.peer_qpack_streams.decoder_stream_id.is_some()
2232 {
2233 conn.close(
2234 true,
2235 Error::StreamCreationError.to_wire(),
2236 b"Received multiple QPACK decoder streams",
2237 )?;
2238
2239 return Err(Error::StreamCreationError);
2240 }
2241
2242 self.peer_qpack_streams.decoder_stream_id =
2243 Some(stream_id);
2244 },
2245
2246 stream::Type::Unknown => {
2247 // Unknown stream types are ignored.
2248 // TODO: we MAY send STOP_SENDING
2249 },
2250
2251 stream::Type::Request => unreachable!(),
2252 }
2253 },
2254
2255 stream::State::PushId => {
2256 stream.try_fill_buffer(conn)?;
2257
2258 let varint = match stream.try_consume_varint() {
2259 Ok(v) => v,
2260
2261 Err(_) => continue,
2262 };
2263
2264 if let Err(e) = stream.set_push_id(varint) {
2265 conn.close(true, e.to_wire(), b"")?;
2266 return Err(e);
2267 }
2268 },
2269
2270 stream::State::FrameType => {
2271 stream.try_fill_buffer(conn)?;
2272
2273 let varint = match stream.try_consume_varint() {
2274 Ok(v) => v,
2275
2276 Err(_) => continue,
2277 };
2278
2279 match stream.set_frame_type(varint) {
2280 Err(Error::FrameUnexpected) => {
2281 let msg = format!("Unexpected frame type {varint}");
2282
2283 conn.close(
2284 true,
2285 Error::FrameUnexpected.to_wire(),
2286 msg.as_bytes(),
2287 )?;
2288
2289 return Err(Error::FrameUnexpected);
2290 },
2291
2292 Err(e) => {
2293 conn.close(
2294 true,
2295 e.to_wire(),
2296 b"Error handling frame.",
2297 )?;
2298
2299 return Err(e);
2300 },
2301
2302 _ => (),
2303 }
2304 },
2305
2306 stream::State::FramePayloadLen => {
2307 stream.try_fill_buffer(conn)?;
2308
2309 let payload_len = match stream.try_consume_varint() {
2310 Ok(v) => v,
2311
2312 Err(_) => continue,
2313 };
2314
2315 // DATA frames are handled uniquely. After this point we lose
2316 // visibility of DATA framing, so just log here.
2317 if Some(frame::DATA_FRAME_TYPE_ID) == stream.frame_type() {
2318 trace!(
2319 "{} rx frm DATA stream={} wire_payload_len={}",
2320 conn.trace_id(),
2321 stream_id,
2322 payload_len
2323 );
2324
2325 qlog_with_type!(QLOG_FRAME_PARSED, conn.qlog, q, {
2326 let frame = Http3Frame::Data { raw: None };
2327
2328 let ev_data =
2329 EventData::H3FrameParsed(H3FrameParsed {
2330 stream_id,
2331 length: Some(payload_len),
2332 frame,
2333 raw: None,
2334 });
2335
2336 q.add_event_data_now(ev_data).ok();
2337 });
2338 }
2339
2340 if let Err(e) = stream.set_frame_payload_len(payload_len) {
2341 conn.close(true, e.to_wire(), b"")?;
2342 return Err(e);
2343 }
2344 },
2345
2346 stream::State::FramePayload => {
2347 // Do not emit events when not polling.
2348 if !polling {
2349 break;
2350 }
2351
2352 stream.try_fill_buffer(conn)?;
2353
2354 let (frame, payload_len) = match stream.try_consume_frame() {
2355 Ok(frame) => frame,
2356
2357 Err(Error::Done) => return Err(Error::Done),
2358
2359 Err(e) => {
2360 conn.close(
2361 true,
2362 e.to_wire(),
2363 b"Error handling frame.",
2364 )?;
2365
2366 return Err(e);
2367 },
2368 };
2369
2370 match self.process_frame(conn, stream_id, frame, payload_len)
2371 {
2372 Ok(ev) => return Ok(ev),
2373
2374 Err(Error::Done) => {
2375 // This might be a frame that is processed internally
2376 // without needing to bubble up to the user as an
2377 // event. Check whether the frame has FIN'd by QUIC
2378 // to prevent trying to read again on a closed stream.
2379 if conn.stream_finished(stream_id) {
2380 break;
2381 }
2382 },
2383
2384 Err(e) => return Err(e),
2385 };
2386 },
2387
2388 stream::State::Data => {
2389 // Do not emit events when not polling.
2390 if !polling {
2391 break;
2392 }
2393
2394 if !stream.try_trigger_data_event() {
2395 break;
2396 }
2397
2398 return Ok((stream_id, Event::Data));
2399 },
2400
2401 stream::State::QpackInstruction => {
2402 let mut d = [0; 4096];
2403
2404 // Read data from the stream and discard immediately.
2405 loop {
2406 conn.stream_recv(stream_id, &mut d)?;
2407 }
2408 },
2409
2410 stream::State::Drain => {
2411 // Discard incoming data on the stream.
2412 conn.stream_shutdown(
2413 stream_id,
2414 crate::Shutdown::Read,
2415 0x100,
2416 )?;
2417
2418 break;
2419 },
2420
2421 stream::State::Finished => break,
2422 }
2423 }
2424
2425 Err(Error::Done)
2426 }
2427
process_finished_stream(&mut self, stream_id: u64)2428 fn process_finished_stream(&mut self, stream_id: u64) {
2429 let stream = match self.streams.get_mut(&stream_id) {
2430 Some(v) => v,
2431
2432 None => return,
2433 };
2434
2435 if stream.state() == stream::State::Finished {
2436 return;
2437 }
2438
2439 match stream.ty() {
2440 Some(stream::Type::Request) | Some(stream::Type::Push) => {
2441 stream.finished();
2442
2443 self.finished_streams.push_back(stream_id);
2444 },
2445
2446 _ => (),
2447 };
2448 }
2449
process_frame( &mut self, conn: &mut super::Connection, stream_id: u64, frame: frame::Frame, payload_len: u64, ) -> Result<(u64, Event)>2450 fn process_frame(
2451 &mut self, conn: &mut super::Connection, stream_id: u64,
2452 frame: frame::Frame, payload_len: u64,
2453 ) -> Result<(u64, Event)> {
2454 trace!(
2455 "{} rx frm {:?} stream={} payload_len={}",
2456 conn.trace_id(),
2457 frame,
2458 stream_id,
2459 payload_len
2460 );
2461
2462 qlog_with_type!(QLOG_FRAME_PARSED, conn.qlog, q, {
2463 // HEADERS frames are special case and will be logged below.
2464 if !matches!(frame, frame::Frame::Headers { .. }) {
2465 let frame = frame.to_qlog();
2466 let ev_data = EventData::H3FrameParsed(H3FrameParsed {
2467 stream_id,
2468 length: Some(payload_len),
2469 frame,
2470 raw: None,
2471 });
2472
2473 q.add_event_data_now(ev_data).ok();
2474 }
2475 });
2476
2477 match frame {
2478 frame::Frame::Settings {
2479 max_field_section_size,
2480 qpack_max_table_capacity,
2481 qpack_blocked_streams,
2482 connect_protocol_enabled,
2483 h3_datagram,
2484 raw,
2485 ..
2486 } => {
2487 self.peer_settings = ConnectionSettings {
2488 max_field_section_size,
2489 qpack_max_table_capacity,
2490 qpack_blocked_streams,
2491 connect_protocol_enabled,
2492 h3_datagram,
2493 raw,
2494 };
2495
2496 if let Some(1) = h3_datagram {
2497 // The peer MUST have also enabled DATAGRAM with a TP
2498 if conn.dgram_max_writable_len().is_none() {
2499 conn.close(
2500 true,
2501 Error::SettingsError.to_wire(),
2502 b"H3_DATAGRAM sent with value 1 but max_datagram_frame_size TP not set.",
2503 )?;
2504
2505 return Err(Error::SettingsError);
2506 }
2507 }
2508 },
2509
2510 frame::Frame::Headers { header_block } => {
2511 if Some(stream_id) == self.peer_control_stream_id {
2512 conn.close(
2513 true,
2514 Error::FrameUnexpected.to_wire(),
2515 b"HEADERS received on control stream",
2516 )?;
2517
2518 return Err(Error::FrameUnexpected);
2519 }
2520
2521 // Use "infinite" as default value for max_field_section_size if
2522 // it is not configured by the application.
2523 let max_size = self
2524 .local_settings
2525 .max_field_section_size
2526 .unwrap_or(u64::MAX);
2527
2528 let headers = match self
2529 .qpack_decoder
2530 .decode(&header_block[..], max_size)
2531 {
2532 Ok(v) => v,
2533
2534 Err(e) => {
2535 let e = match e {
2536 qpack::Error::HeaderListTooLarge =>
2537 Error::ExcessiveLoad,
2538
2539 _ => Error::QpackDecompressionFailed,
2540 };
2541
2542 conn.close(true, e.to_wire(), b"Error parsing headers.")?;
2543
2544 return Err(e);
2545 },
2546 };
2547
2548 qlog_with_type!(QLOG_FRAME_PARSED, conn.qlog, q, {
2549 let qlog_headers = headers
2550 .iter()
2551 .map(|h| qlog::events::h3::HttpHeader {
2552 name: String::from_utf8_lossy(h.name()).into_owned(),
2553 value: String::from_utf8_lossy(h.value())
2554 .into_owned(),
2555 })
2556 .collect();
2557
2558 let frame = Http3Frame::Headers {
2559 headers: qlog_headers,
2560 };
2561
2562 let ev_data = EventData::H3FrameParsed(H3FrameParsed {
2563 stream_id,
2564 length: Some(payload_len),
2565 frame,
2566 raw: None,
2567 });
2568
2569 q.add_event_data_now(ev_data).ok();
2570 });
2571
2572 let has_body = !conn.stream_finished(stream_id);
2573
2574 return Ok((stream_id, Event::Headers {
2575 list: headers,
2576 has_body,
2577 }));
2578 },
2579
2580 frame::Frame::Data { .. } => {
2581 if Some(stream_id) == self.peer_control_stream_id {
2582 conn.close(
2583 true,
2584 Error::FrameUnexpected.to_wire(),
2585 b"DATA received on control stream",
2586 )?;
2587
2588 return Err(Error::FrameUnexpected);
2589 }
2590
2591 // Do nothing. The Data event is returned separately.
2592 },
2593
2594 frame::Frame::GoAway { id } => {
2595 if Some(stream_id) != self.peer_control_stream_id {
2596 conn.close(
2597 true,
2598 Error::FrameUnexpected.to_wire(),
2599 b"GOAWAY received on non-control stream",
2600 )?;
2601
2602 return Err(Error::FrameUnexpected);
2603 }
2604
2605 if !self.is_server && id % 4 != 0 {
2606 conn.close(
2607 true,
2608 Error::FrameUnexpected.to_wire(),
2609 b"GOAWAY received with ID of non-request stream",
2610 )?;
2611
2612 return Err(Error::IdError);
2613 }
2614
2615 if let Some(received_id) = self.peer_goaway_id {
2616 if id > received_id {
2617 conn.close(
2618 true,
2619 Error::IdError.to_wire(),
2620 b"GOAWAY received with ID larger than previously received",
2621 )?;
2622
2623 return Err(Error::IdError);
2624 }
2625 }
2626
2627 self.peer_goaway_id = Some(id);
2628
2629 return Ok((id, Event::GoAway));
2630 },
2631
2632 frame::Frame::MaxPushId { push_id } => {
2633 if Some(stream_id) != self.peer_control_stream_id {
2634 conn.close(
2635 true,
2636 Error::FrameUnexpected.to_wire(),
2637 b"MAX_PUSH_ID received on non-control stream",
2638 )?;
2639
2640 return Err(Error::FrameUnexpected);
2641 }
2642
2643 if !self.is_server {
2644 conn.close(
2645 true,
2646 Error::FrameUnexpected.to_wire(),
2647 b"MAX_PUSH_ID received by client",
2648 )?;
2649
2650 return Err(Error::FrameUnexpected);
2651 }
2652
2653 if push_id < self.max_push_id {
2654 conn.close(
2655 true,
2656 Error::IdError.to_wire(),
2657 b"MAX_PUSH_ID reduced limit",
2658 )?;
2659
2660 return Err(Error::IdError);
2661 }
2662
2663 self.max_push_id = push_id;
2664 },
2665
2666 frame::Frame::PushPromise { .. } => {
2667 if self.is_server {
2668 conn.close(
2669 true,
2670 Error::FrameUnexpected.to_wire(),
2671 b"PUSH_PROMISE received by server",
2672 )?;
2673
2674 return Err(Error::FrameUnexpected);
2675 }
2676
2677 if stream_id % 4 != 0 {
2678 conn.close(
2679 true,
2680 Error::FrameUnexpected.to_wire(),
2681 b"PUSH_PROMISE received on non-request stream",
2682 )?;
2683
2684 return Err(Error::FrameUnexpected);
2685 }
2686
2687 // TODO: implement more checks and PUSH_PROMISE event
2688 },
2689
2690 frame::Frame::CancelPush { .. } => {
2691 if Some(stream_id) != self.peer_control_stream_id {
2692 conn.close(
2693 true,
2694 Error::FrameUnexpected.to_wire(),
2695 b"CANCEL_PUSH received on non-control stream",
2696 )?;
2697
2698 return Err(Error::FrameUnexpected);
2699 }
2700
2701 // TODO: implement CANCEL_PUSH frame
2702 },
2703
2704 frame::Frame::PriorityUpdateRequest {
2705 prioritized_element_id,
2706 priority_field_value,
2707 } => {
2708 if !self.is_server {
2709 conn.close(
2710 true,
2711 Error::FrameUnexpected.to_wire(),
2712 b"PRIORITY_UPDATE received by client",
2713 )?;
2714
2715 return Err(Error::FrameUnexpected);
2716 }
2717
2718 if Some(stream_id) != self.peer_control_stream_id {
2719 conn.close(
2720 true,
2721 Error::FrameUnexpected.to_wire(),
2722 b"PRIORITY_UPDATE received on non-control stream",
2723 )?;
2724
2725 return Err(Error::FrameUnexpected);
2726 }
2727
2728 if prioritized_element_id % 4 != 0 {
2729 conn.close(
2730 true,
2731 Error::FrameUnexpected.to_wire(),
2732 b"PRIORITY_UPDATE for request stream type with wrong ID",
2733 )?;
2734
2735 return Err(Error::FrameUnexpected);
2736 }
2737
2738 if prioritized_element_id > conn.streams.max_streams_bidi() * 4 {
2739 conn.close(
2740 true,
2741 Error::IdError.to_wire(),
2742 b"PRIORITY_UPDATE for request stream beyond max streams limit",
2743 )?;
2744
2745 return Err(Error::IdError);
2746 }
2747
2748 // If the PRIORITY_UPDATE is valid, consider storing the latest
2749 // contents. Due to reordering, it is possible that we might
2750 // receive frames that reference streams that have not yet to
2751 // been opened and that's OK because it's within our concurrency
2752 // limit. However, we discard PRIORITY_UPDATE that refers to
2753 // streams that we know have been collected.
2754 if conn.streams.is_collected(prioritized_element_id) {
2755 return Err(Error::Done);
2756 }
2757
2758 // If the stream did not yet exist, create it and store.
2759 let stream =
2760 self.streams.entry(prioritized_element_id).or_insert_with(
2761 || stream::Stream::new(prioritized_element_id, false),
2762 );
2763
2764 let had_priority_update = stream.has_last_priority_update();
2765 stream.set_last_priority_update(Some(priority_field_value));
2766
2767 // Only trigger the event when there wasn't already a stored
2768 // PRIORITY_UPDATE.
2769 if !had_priority_update {
2770 return Ok((prioritized_element_id, Event::PriorityUpdate));
2771 } else {
2772 return Err(Error::Done);
2773 }
2774 },
2775
2776 frame::Frame::PriorityUpdatePush {
2777 prioritized_element_id,
2778 ..
2779 } => {
2780 if !self.is_server {
2781 conn.close(
2782 true,
2783 Error::FrameUnexpected.to_wire(),
2784 b"PRIORITY_UPDATE received by client",
2785 )?;
2786
2787 return Err(Error::FrameUnexpected);
2788 }
2789
2790 if Some(stream_id) != self.peer_control_stream_id {
2791 conn.close(
2792 true,
2793 Error::FrameUnexpected.to_wire(),
2794 b"PRIORITY_UPDATE received on non-control stream",
2795 )?;
2796
2797 return Err(Error::FrameUnexpected);
2798 }
2799
2800 if prioritized_element_id % 3 != 0 {
2801 conn.close(
2802 true,
2803 Error::FrameUnexpected.to_wire(),
2804 b"PRIORITY_UPDATE for push stream type with wrong ID",
2805 )?;
2806
2807 return Err(Error::FrameUnexpected);
2808 }
2809
2810 // TODO: we only implement this if we implement server push
2811 },
2812
2813 frame::Frame::Unknown { .. } => (),
2814 }
2815
2816 Err(Error::Done)
2817 }
2818 }
2819
2820 /// Generates an HTTP/3 GREASE variable length integer.
grease_value() -> u642821 fn grease_value() -> u64 {
2822 let n = super::rand::rand_u64_uniform(148_764_065_110_560_899);
2823 31 * n + 33
2824 }
2825
2826 #[doc(hidden)]
2827 pub mod testing {
2828 use super::*;
2829
2830 use crate::testing;
2831
2832 /// Session is an HTTP/3 test helper structure. It holds a client, server
2833 /// and pipe that allows them to communicate.
2834 ///
2835 /// `default()` creates a session with some sensible default
2836 /// configuration. `with_configs()` allows for providing a specific
2837 /// configuration.
2838 ///
2839 /// `handshake()` performs all the steps needed to establish an HTTP/3
2840 /// connection.
2841 ///
2842 /// Some utility functions are provided that make it less verbose to send
2843 /// request, responses and individual headers. The full quiche API remains
2844 /// available for any test that need to do unconventional things (such as
2845 /// bad behaviour that triggers errors).
2846 pub struct Session {
2847 pub pipe: testing::Pipe,
2848 pub client: Connection,
2849 pub server: Connection,
2850 }
2851
2852 impl Session {
new() -> Result<Session>2853 pub fn new() -> Result<Session> {
2854 let mut config = crate::Config::new(crate::PROTOCOL_VERSION)?;
2855 config.load_cert_chain_from_pem_file("examples/cert.crt")?;
2856 config.load_priv_key_from_pem_file("examples/cert.key")?;
2857 config.set_application_protos(&[b"h3"])?;
2858 config.set_initial_max_data(1500);
2859 config.set_initial_max_stream_data_bidi_local(150);
2860 config.set_initial_max_stream_data_bidi_remote(150);
2861 config.set_initial_max_stream_data_uni(150);
2862 config.set_initial_max_streams_bidi(5);
2863 config.set_initial_max_streams_uni(5);
2864 config.verify_peer(false);
2865 config.enable_dgram(true, 3, 3);
2866 config.set_ack_delay_exponent(8);
2867
2868 let h3_config = Config::new()?;
2869 Session::with_configs(&mut config, &h3_config)
2870 }
2871
with_configs( config: &mut crate::Config, h3_config: &Config, ) -> Result<Session>2872 pub fn with_configs(
2873 config: &mut crate::Config, h3_config: &Config,
2874 ) -> Result<Session> {
2875 let pipe = testing::Pipe::with_config(config)?;
2876 let client_dgram = pipe.client.dgram_enabled();
2877 let server_dgram = pipe.server.dgram_enabled();
2878 Ok(Session {
2879 pipe,
2880 client: Connection::new(h3_config, false, client_dgram)?,
2881 server: Connection::new(h3_config, true, server_dgram)?,
2882 })
2883 }
2884
2885 /// Do the HTTP/3 handshake so both ends are in sane initial state.
handshake(&mut self) -> Result<()>2886 pub fn handshake(&mut self) -> Result<()> {
2887 self.pipe.handshake()?;
2888
2889 // Client streams.
2890 self.client.send_settings(&mut self.pipe.client)?;
2891 self.pipe.advance().ok();
2892
2893 self.client
2894 .open_qpack_encoder_stream(&mut self.pipe.client)?;
2895 self.pipe.advance().ok();
2896
2897 self.client
2898 .open_qpack_decoder_stream(&mut self.pipe.client)?;
2899 self.pipe.advance().ok();
2900
2901 if self.pipe.client.grease {
2902 self.client.open_grease_stream(&mut self.pipe.client)?;
2903 }
2904
2905 self.pipe.advance().ok();
2906
2907 // Server streams.
2908 self.server.send_settings(&mut self.pipe.server)?;
2909 self.pipe.advance().ok();
2910
2911 self.server
2912 .open_qpack_encoder_stream(&mut self.pipe.server)?;
2913 self.pipe.advance().ok();
2914
2915 self.server
2916 .open_qpack_decoder_stream(&mut self.pipe.server)?;
2917 self.pipe.advance().ok();
2918
2919 if self.pipe.server.grease {
2920 self.server.open_grease_stream(&mut self.pipe.server)?;
2921 }
2922
2923 self.advance().ok();
2924
2925 while self.client.poll(&mut self.pipe.client).is_ok() {
2926 // Do nothing.
2927 }
2928
2929 while self.server.poll(&mut self.pipe.server).is_ok() {
2930 // Do nothing.
2931 }
2932
2933 Ok(())
2934 }
2935
2936 /// Advances the session pipe over the buffer.
advance(&mut self) -> crate::Result<()>2937 pub fn advance(&mut self) -> crate::Result<()> {
2938 self.pipe.advance()
2939 }
2940
2941 /// Polls the client for events.
poll_client(&mut self) -> Result<(u64, Event)>2942 pub fn poll_client(&mut self) -> Result<(u64, Event)> {
2943 self.client.poll(&mut self.pipe.client)
2944 }
2945
2946 /// Polls the server for events.
poll_server(&mut self) -> Result<(u64, Event)>2947 pub fn poll_server(&mut self) -> Result<(u64, Event)> {
2948 self.server.poll(&mut self.pipe.server)
2949 }
2950
2951 /// Sends a request from client with default headers.
2952 ///
2953 /// On success it returns the newly allocated stream and the headers.
send_request(&mut self, fin: bool) -> Result<(u64, Vec<Header>)>2954 pub fn send_request(&mut self, fin: bool) -> Result<(u64, Vec<Header>)> {
2955 let req = vec![
2956 Header::new(b":method", b"GET"),
2957 Header::new(b":scheme", b"https"),
2958 Header::new(b":authority", b"quic.tech"),
2959 Header::new(b":path", b"/test"),
2960 Header::new(b"user-agent", b"quiche-test"),
2961 ];
2962
2963 let stream =
2964 self.client.send_request(&mut self.pipe.client, &req, fin)?;
2965
2966 self.advance().ok();
2967
2968 Ok((stream, req))
2969 }
2970
2971 /// Sends a response from server with default headers.
2972 ///
2973 /// On success it returns the headers.
send_response( &mut self, stream: u64, fin: bool, ) -> Result<Vec<Header>>2974 pub fn send_response(
2975 &mut self, stream: u64, fin: bool,
2976 ) -> Result<Vec<Header>> {
2977 let resp = vec![
2978 Header::new(b":status", b"200"),
2979 Header::new(b"server", b"quiche-test"),
2980 ];
2981
2982 self.server.send_response(
2983 &mut self.pipe.server,
2984 stream,
2985 &resp,
2986 fin,
2987 )?;
2988
2989 self.advance().ok();
2990
2991 Ok(resp)
2992 }
2993
2994 /// Sends some default payload from client.
2995 ///
2996 /// On success it returns the payload.
send_body_client( &mut self, stream: u64, fin: bool, ) -> Result<Vec<u8>>2997 pub fn send_body_client(
2998 &mut self, stream: u64, fin: bool,
2999 ) -> Result<Vec<u8>> {
3000 let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3001
3002 self.client
3003 .send_body(&mut self.pipe.client, stream, &bytes, fin)?;
3004
3005 self.advance().ok();
3006
3007 Ok(bytes)
3008 }
3009
3010 /// Fetches DATA payload from the server.
3011 ///
3012 /// On success it returns the number of bytes received.
recv_body_client( &mut self, stream: u64, buf: &mut [u8], ) -> Result<usize>3013 pub fn recv_body_client(
3014 &mut self, stream: u64, buf: &mut [u8],
3015 ) -> Result<usize> {
3016 self.client.recv_body(&mut self.pipe.client, stream, buf)
3017 }
3018
3019 /// Sends some default payload from server.
3020 ///
3021 /// On success it returns the payload.
send_body_server( &mut self, stream: u64, fin: bool, ) -> Result<Vec<u8>>3022 pub fn send_body_server(
3023 &mut self, stream: u64, fin: bool,
3024 ) -> Result<Vec<u8>> {
3025 let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3026
3027 self.server
3028 .send_body(&mut self.pipe.server, stream, &bytes, fin)?;
3029
3030 self.advance().ok();
3031
3032 Ok(bytes)
3033 }
3034
3035 /// Fetches DATA payload from the client.
3036 ///
3037 /// On success it returns the number of bytes received.
recv_body_server( &mut self, stream: u64, buf: &mut [u8], ) -> Result<usize>3038 pub fn recv_body_server(
3039 &mut self, stream: u64, buf: &mut [u8],
3040 ) -> Result<usize> {
3041 self.server.recv_body(&mut self.pipe.server, stream, buf)
3042 }
3043
3044 /// Sends a single HTTP/3 frame from the client.
send_frame_client( &mut self, frame: frame::Frame, stream_id: u64, fin: bool, ) -> Result<()>3045 pub fn send_frame_client(
3046 &mut self, frame: frame::Frame, stream_id: u64, fin: bool,
3047 ) -> Result<()> {
3048 let mut d = [42; 65535];
3049
3050 let mut b = octets::OctetsMut::with_slice(&mut d);
3051
3052 frame.to_bytes(&mut b)?;
3053
3054 let off = b.off();
3055 self.pipe.client.stream_send(stream_id, &d[..off], fin)?;
3056
3057 self.advance().ok();
3058
3059 Ok(())
3060 }
3061
3062 /// Send an HTTP/3 DATAGRAM with default data from the client.
3063 ///
3064 /// On success it returns the data.
send_dgram_client(&mut self, flow_id: u64) -> Result<Vec<u8>>3065 pub fn send_dgram_client(&mut self, flow_id: u64) -> Result<Vec<u8>> {
3066 let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3067
3068 self.client
3069 .send_dgram(&mut self.pipe.client, flow_id, &bytes)?;
3070
3071 self.advance().ok();
3072
3073 Ok(bytes)
3074 }
3075
3076 /// Receives an HTTP/3 DATAGRAM from the server.
3077 ///
3078 /// On success it returns the DATAGRAM length, flow ID and flow ID
3079 /// length.
recv_dgram_client( &mut self, buf: &mut [u8], ) -> Result<(usize, u64, usize)>3080 pub fn recv_dgram_client(
3081 &mut self, buf: &mut [u8],
3082 ) -> Result<(usize, u64, usize)> {
3083 self.client.recv_dgram(&mut self.pipe.client, buf)
3084 }
3085
3086 /// Send an HTTP/3 DATAGRAM with default data from the server
3087 ///
3088 /// On success it returns the data.
send_dgram_server(&mut self, flow_id: u64) -> Result<Vec<u8>>3089 pub fn send_dgram_server(&mut self, flow_id: u64) -> Result<Vec<u8>> {
3090 let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3091
3092 self.server
3093 .send_dgram(&mut self.pipe.server, flow_id, &bytes)?;
3094
3095 self.advance().ok();
3096
3097 Ok(bytes)
3098 }
3099
3100 /// Receives an HTTP/3 DATAGRAM from the client.
3101 ///
3102 /// On success it returns the DATAGRAM length, flow ID and flow ID
3103 /// length.
recv_dgram_server( &mut self, buf: &mut [u8], ) -> Result<(usize, u64, usize)>3104 pub fn recv_dgram_server(
3105 &mut self, buf: &mut [u8],
3106 ) -> Result<(usize, u64, usize)> {
3107 self.server.recv_dgram(&mut self.pipe.server, buf)
3108 }
3109
3110 /// Sends a single HTTP/3 frame from the server.
send_frame_server( &mut self, frame: frame::Frame, stream_id: u64, fin: bool, ) -> Result<()>3111 pub fn send_frame_server(
3112 &mut self, frame: frame::Frame, stream_id: u64, fin: bool,
3113 ) -> Result<()> {
3114 let mut d = [42; 65535];
3115
3116 let mut b = octets::OctetsMut::with_slice(&mut d);
3117
3118 frame.to_bytes(&mut b)?;
3119
3120 let off = b.off();
3121 self.pipe.server.stream_send(stream_id, &d[..off], fin)?;
3122
3123 self.advance().ok();
3124
3125 Ok(())
3126 }
3127
3128 /// Sends an arbitrary buffer of HTTP/3 stream data from the client.
send_arbitrary_stream_data_client( &mut self, data: &[u8], stream_id: u64, fin: bool, ) -> Result<()>3129 pub fn send_arbitrary_stream_data_client(
3130 &mut self, data: &[u8], stream_id: u64, fin: bool,
3131 ) -> Result<()> {
3132 self.pipe.client.stream_send(stream_id, data, fin)?;
3133
3134 self.advance().ok();
3135
3136 Ok(())
3137 }
3138
3139 /// Sends an arbitrary buffer of HTTP/3 stream data from the server.
send_arbitrary_stream_data_server( &mut self, data: &[u8], stream_id: u64, fin: bool, ) -> Result<()>3140 pub fn send_arbitrary_stream_data_server(
3141 &mut self, data: &[u8], stream_id: u64, fin: bool,
3142 ) -> Result<()> {
3143 self.pipe.server.stream_send(stream_id, data, fin)?;
3144
3145 self.advance().ok();
3146
3147 Ok(())
3148 }
3149 }
3150 }
3151
3152 #[cfg(test)]
3153 mod tests {
3154 use super::*;
3155
3156 use super::testing::*;
3157
3158 #[test]
3159 /// Make sure that random GREASE values is within the specified limit.
grease_value_in_varint_limit()3160 fn grease_value_in_varint_limit() {
3161 assert!(grease_value() < 2u64.pow(62) - 1);
3162 }
3163
3164 #[test]
h3_handshake_0rtt()3165 fn h3_handshake_0rtt() {
3166 let mut buf = [0; 65535];
3167
3168 let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
3169 config
3170 .load_cert_chain_from_pem_file("examples/cert.crt")
3171 .unwrap();
3172 config
3173 .load_priv_key_from_pem_file("examples/cert.key")
3174 .unwrap();
3175 config
3176 .set_application_protos(&[b"proto1", b"proto2"])
3177 .unwrap();
3178 config.set_initial_max_data(30);
3179 config.set_initial_max_stream_data_bidi_local(15);
3180 config.set_initial_max_stream_data_bidi_remote(15);
3181 config.set_initial_max_stream_data_uni(15);
3182 config.set_initial_max_streams_bidi(3);
3183 config.set_initial_max_streams_uni(3);
3184 config.enable_early_data();
3185 config.verify_peer(false);
3186
3187 let h3_config = Config::new().unwrap();
3188
3189 // Perform initial handshake.
3190 let mut pipe = crate::testing::Pipe::with_config(&mut config).unwrap();
3191 assert_eq!(pipe.handshake(), Ok(()));
3192
3193 // Extract session,
3194 let session = pipe.client.session().unwrap();
3195
3196 // Configure session on new connection.
3197 let mut pipe = crate::testing::Pipe::with_config(&mut config).unwrap();
3198 assert_eq!(pipe.client.set_session(session), Ok(()));
3199
3200 // Can't create an H3 connection until the QUIC connection is determined
3201 // to have made sufficient early data progress.
3202 assert!(matches!(
3203 Connection::with_transport(&mut pipe.client, &h3_config),
3204 Err(Error::InternalError)
3205 ));
3206
3207 // Client sends initial flight.
3208 let (len, _) = pipe.client.send(&mut buf).unwrap();
3209
3210 // Now an H3 connection can be created.
3211 assert!(matches!(
3212 Connection::with_transport(&mut pipe.client, &h3_config),
3213 Ok(_)
3214 ));
3215 assert_eq!(pipe.server_recv(&mut buf[..len]), Ok(len));
3216
3217 // Client sends 0-RTT packet.
3218 let pkt_type = crate::packet::Type::ZeroRTT;
3219
3220 let frames = [crate::frame::Frame::Stream {
3221 stream_id: 6,
3222 data: crate::stream::RangeBuf::from(b"aaaaa", 0, true),
3223 }];
3224
3225 assert_eq!(
3226 pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
3227 Ok(1200)
3228 );
3229
3230 assert_eq!(pipe.server.undecryptable_pkts.len(), 0);
3231
3232 // 0-RTT stream data is readable.
3233 let mut r = pipe.server.readable();
3234 assert_eq!(r.next(), Some(6));
3235 assert_eq!(r.next(), None);
3236
3237 let mut b = [0; 15];
3238 assert_eq!(pipe.server.stream_recv(6, &mut b), Ok((5, true)));
3239 assert_eq!(&b[..5], b"aaaaa");
3240 }
3241
3242 #[test]
3243 /// Send a request with no body, get a response with no body.
request_no_body_response_no_body()3244 fn request_no_body_response_no_body() {
3245 let mut s = Session::new().unwrap();
3246 s.handshake().unwrap();
3247
3248 let (stream, req) = s.send_request(true).unwrap();
3249
3250 assert_eq!(stream, 0);
3251
3252 let ev_headers = Event::Headers {
3253 list: req,
3254 has_body: false,
3255 };
3256
3257 assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3258 assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3259
3260 let resp = s.send_response(stream, true).unwrap();
3261
3262 let ev_headers = Event::Headers {
3263 list: resp,
3264 has_body: false,
3265 };
3266
3267 assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3268 assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3269 assert_eq!(s.poll_client(), Err(Error::Done));
3270 }
3271
3272 #[test]
3273 /// Send a request with no body, get a response with one DATA frame.
request_no_body_response_one_chunk()3274 fn request_no_body_response_one_chunk() {
3275 let mut s = Session::new().unwrap();
3276 s.handshake().unwrap();
3277
3278 let (stream, req) = s.send_request(true).unwrap();
3279 assert_eq!(stream, 0);
3280
3281 let ev_headers = Event::Headers {
3282 list: req,
3283 has_body: false,
3284 };
3285
3286 assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3287
3288 assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3289
3290 let resp = s.send_response(stream, false).unwrap();
3291
3292 let body = s.send_body_server(stream, true).unwrap();
3293
3294 let mut recv_buf = vec![0; body.len()];
3295
3296 let ev_headers = Event::Headers {
3297 list: resp,
3298 has_body: true,
3299 };
3300
3301 assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3302
3303 assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
3304 assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
3305
3306 assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3307 assert_eq!(s.poll_client(), Err(Error::Done));
3308 }
3309
3310 #[test]
3311 /// Send a request with no body, get a response with multiple DATA frames.
request_no_body_response_many_chunks()3312 fn request_no_body_response_many_chunks() {
3313 let mut s = Session::new().unwrap();
3314 s.handshake().unwrap();
3315
3316 let (stream, req) = s.send_request(true).unwrap();
3317
3318 let ev_headers = Event::Headers {
3319 list: req,
3320 has_body: false,
3321 };
3322
3323 assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3324 assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3325
3326 let total_data_frames = 4;
3327
3328 let resp = s.send_response(stream, false).unwrap();
3329
3330 for _ in 0..total_data_frames - 1 {
3331 s.send_body_server(stream, false).unwrap();
3332 }
3333
3334 let body = s.send_body_server(stream, true).unwrap();
3335
3336 let mut recv_buf = vec![0; body.len()];
3337
3338 let ev_headers = Event::Headers {
3339 list: resp,
3340 has_body: true,
3341 };
3342
3343 assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3344 assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
3345 assert_eq!(s.poll_client(), Err(Error::Done));
3346
3347 for _ in 0..total_data_frames {
3348 assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
3349 }
3350
3351 assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3352 assert_eq!(s.poll_client(), Err(Error::Done));
3353 }
3354
3355 #[test]
3356 /// Send a request with one DATA frame, get a response with no body.
request_one_chunk_response_no_body()3357 fn request_one_chunk_response_no_body() {
3358 let mut s = Session::new().unwrap();
3359 s.handshake().unwrap();
3360
3361 let (stream, req) = s.send_request(false).unwrap();
3362
3363 let body = s.send_body_client(stream, true).unwrap();
3364
3365 let mut recv_buf = vec![0; body.len()];
3366
3367 let ev_headers = Event::Headers {
3368 list: req,
3369 has_body: true,
3370 };
3371
3372 assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3373
3374 assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
3375 assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
3376
3377 assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3378
3379 let resp = s.send_response(stream, true).unwrap();
3380
3381 let ev_headers = Event::Headers {
3382 list: resp,
3383 has_body: false,
3384 };
3385
3386 assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3387 assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3388 }
3389
3390 #[test]
3391 /// Send a request with multiple DATA frames, get a response with no body.
request_many_chunks_response_no_body()3392 fn request_many_chunks_response_no_body() {
3393 let mut s = Session::new().unwrap();
3394 s.handshake().unwrap();
3395
3396 let (stream, req) = s.send_request(false).unwrap();
3397
3398 let total_data_frames = 4;
3399
3400 for _ in 0..total_data_frames - 1 {
3401 s.send_body_client(stream, false).unwrap();
3402 }
3403
3404 let body = s.send_body_client(stream, true).unwrap();
3405
3406 let mut recv_buf = vec![0; body.len()];
3407
3408 let ev_headers = Event::Headers {
3409 list: req,
3410 has_body: true,
3411 };
3412
3413 assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3414 assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
3415 assert_eq!(s.poll_server(), Err(Error::Done));
3416
3417 for _ in 0..total_data_frames {
3418 assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
3419 }
3420
3421 assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3422
3423 let resp = s.send_response(stream, true).unwrap();
3424
3425 let ev_headers = Event::Headers {
3426 list: resp,
3427 has_body: false,
3428 };
3429
3430 assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3431 assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3432 }
3433
3434 #[test]
3435 /// Send a request with multiple DATA frames, get a response with one DATA
3436 /// frame.
many_requests_many_chunks_response_one_chunk()3437 fn many_requests_many_chunks_response_one_chunk() {
3438 let mut s = Session::new().unwrap();
3439 s.handshake().unwrap();
3440
3441 let mut reqs = Vec::new();
3442
3443 let (stream1, req1) = s.send_request(false).unwrap();
3444 assert_eq!(stream1, 0);
3445 reqs.push(req1);
3446
3447 let (stream2, req2) = s.send_request(false).unwrap();
3448 assert_eq!(stream2, 4);
3449 reqs.push(req2);
3450
3451 let (stream3, req3) = s.send_request(false).unwrap();
3452 assert_eq!(stream3, 8);
3453 reqs.push(req3);
3454
3455 let body = s.send_body_client(stream1, false).unwrap();
3456 s.send_body_client(stream2, false).unwrap();
3457 s.send_body_client(stream3, false).unwrap();
3458
3459 let mut recv_buf = vec![0; body.len()];
3460
3461 // Reverse order of writes.
3462
3463 s.send_body_client(stream3, true).unwrap();
3464 s.send_body_client(stream2, true).unwrap();
3465 s.send_body_client(stream1, true).unwrap();
3466
3467 for _ in 0..reqs.len() {
3468 let (stream, ev) = s.poll_server().unwrap();
3469 let ev_headers = Event::Headers {
3470 list: reqs[(stream / 4) as usize].clone(),
3471 has_body: true,
3472 };
3473 assert_eq!(ev, ev_headers);
3474 assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
3475 assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
3476 assert_eq!(s.poll_client(), Err(Error::Done));
3477
3478 assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
3479 assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3480 }
3481
3482 assert_eq!(s.poll_server(), Err(Error::Done));
3483
3484 let mut resps = Vec::new();
3485
3486 let resp1 = s.send_response(stream1, true).unwrap();
3487 resps.push(resp1);
3488
3489 let resp2 = s.send_response(stream2, true).unwrap();
3490 resps.push(resp2);
3491
3492 let resp3 = s.send_response(stream3, true).unwrap();
3493 resps.push(resp3);
3494
3495 for _ in 0..resps.len() {
3496 let (stream, ev) = s.poll_client().unwrap();
3497 let ev_headers = Event::Headers {
3498 list: resps[(stream / 4) as usize].clone(),
3499 has_body: false,
3500 };
3501 assert_eq!(ev, ev_headers);
3502 assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3503 }
3504
3505 assert_eq!(s.poll_client(), Err(Error::Done));
3506 }
3507
3508 #[test]
3509 /// Send a request with no body, get a response with one DATA frame and an
3510 /// empty FIN after reception from the client.
request_no_body_response_one_chunk_empty_fin()3511 fn request_no_body_response_one_chunk_empty_fin() {
3512 let mut s = Session::new().unwrap();
3513 s.handshake().unwrap();
3514
3515 let (stream, req) = s.send_request(true).unwrap();
3516
3517 let ev_headers = Event::Headers {
3518 list: req,
3519 has_body: false,
3520 };
3521
3522 assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3523 assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3524
3525 let resp = s.send_response(stream, false).unwrap();
3526
3527 let body = s.send_body_server(stream, false).unwrap();
3528
3529 let mut recv_buf = vec![0; body.len()];
3530
3531 let ev_headers = Event::Headers {
3532 list: resp,
3533 has_body: true,
3534 };
3535
3536 assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3537
3538 assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
3539 assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
3540
3541 assert_eq!(s.pipe.server.stream_send(stream, &[], true), Ok(0));
3542 s.advance().ok();
3543
3544 assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3545 assert_eq!(s.poll_client(), Err(Error::Done));
3546 }
3547
3548 #[test]
3549 /// Send a request with no body, get a response with no body followed by
3550 /// GREASE that is STREAM frame with a FIN.
request_no_body_response_no_body_with_grease()3551 fn request_no_body_response_no_body_with_grease() {
3552 let mut s = Session::new().unwrap();
3553 s.handshake().unwrap();
3554
3555 let (stream, req) = s.send_request(true).unwrap();
3556
3557 assert_eq!(stream, 0);
3558
3559 let ev_headers = Event::Headers {
3560 list: req,
3561 has_body: false,
3562 };
3563
3564 assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3565 assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3566
3567 let resp = s.send_response(stream, false).unwrap();
3568
3569 // Note that "has_body" is a misnomer, there will never be a body in
3570 // this test. There's other work that will fix this, once it lands
3571 // remove this comment.
3572 let ev_headers = Event::Headers {
3573 list: resp,
3574 has_body: true,
3575 };
3576
3577 // Inject a GREASE frame
3578 let mut d = [42; 10];
3579 let mut b = octets::OctetsMut::with_slice(&mut d);
3580
3581 let frame_type = b.put_varint(148_764_065_110_560_899).unwrap();
3582 s.pipe.server.stream_send(0, frame_type, false).unwrap();
3583
3584 let frame_len = b.put_varint(10).unwrap();
3585 s.pipe.server.stream_send(0, frame_len, false).unwrap();
3586
3587 s.pipe.server.stream_send(0, &d, true).unwrap();
3588
3589 s.advance().ok();
3590
3591 assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3592 assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3593 assert_eq!(s.poll_client(), Err(Error::Done));
3594 }
3595
3596 #[test]
3597 /// Try to send DATA frames before HEADERS.
body_response_before_headers()3598 fn body_response_before_headers() {
3599 let mut s = Session::new().unwrap();
3600 s.handshake().unwrap();
3601
3602 let (stream, req) = s.send_request(true).unwrap();
3603 assert_eq!(stream, 0);
3604
3605 let ev_headers = Event::Headers {
3606 list: req,
3607 has_body: false,
3608 };
3609
3610 assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3611
3612 assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3613
3614 assert_eq!(
3615 s.send_body_server(stream, true),
3616 Err(Error::FrameUnexpected)
3617 );
3618
3619 assert_eq!(s.poll_client(), Err(Error::Done));
3620 }
3621
3622 #[test]
3623 /// Try to send DATA frames on wrong streams, ensure the API returns an
3624 /// error before anything hits the transport layer.
send_body_invalid_client_stream()3625 fn send_body_invalid_client_stream() {
3626 let mut s = Session::new().unwrap();
3627 s.handshake().unwrap();
3628
3629 assert_eq!(s.send_body_client(0, true), Err(Error::FrameUnexpected));
3630
3631 assert_eq!(
3632 s.send_body_client(s.client.control_stream_id.unwrap(), true),
3633 Err(Error::FrameUnexpected)
3634 );
3635
3636 assert_eq!(
3637 s.send_body_client(
3638 s.client.local_qpack_streams.encoder_stream_id.unwrap(),
3639 true
3640 ),
3641 Err(Error::FrameUnexpected)
3642 );
3643
3644 assert_eq!(
3645 s.send_body_client(
3646 s.client.local_qpack_streams.decoder_stream_id.unwrap(),
3647 true
3648 ),
3649 Err(Error::FrameUnexpected)
3650 );
3651
3652 assert_eq!(
3653 s.send_body_client(s.client.peer_control_stream_id.unwrap(), true),
3654 Err(Error::FrameUnexpected)
3655 );
3656
3657 assert_eq!(
3658 s.send_body_client(
3659 s.client.peer_qpack_streams.encoder_stream_id.unwrap(),
3660 true
3661 ),
3662 Err(Error::FrameUnexpected)
3663 );
3664
3665 assert_eq!(
3666 s.send_body_client(
3667 s.client.peer_qpack_streams.decoder_stream_id.unwrap(),
3668 true
3669 ),
3670 Err(Error::FrameUnexpected)
3671 );
3672 }
3673
3674 #[test]
3675 /// Try to send DATA frames on wrong streams, ensure the API returns an
3676 /// error before anything hits the transport layer.
send_body_invalid_server_stream()3677 fn send_body_invalid_server_stream() {
3678 let mut s = Session::new().unwrap();
3679 s.handshake().unwrap();
3680
3681 assert_eq!(s.send_body_server(0, true), Err(Error::FrameUnexpected));
3682
3683 assert_eq!(
3684 s.send_body_server(s.server.control_stream_id.unwrap(), true),
3685 Err(Error::FrameUnexpected)
3686 );
3687
3688 assert_eq!(
3689 s.send_body_server(
3690 s.server.local_qpack_streams.encoder_stream_id.unwrap(),
3691 true
3692 ),
3693 Err(Error::FrameUnexpected)
3694 );
3695
3696 assert_eq!(
3697 s.send_body_server(
3698 s.server.local_qpack_streams.decoder_stream_id.unwrap(),
3699 true
3700 ),
3701 Err(Error::FrameUnexpected)
3702 );
3703
3704 assert_eq!(
3705 s.send_body_server(s.server.peer_control_stream_id.unwrap(), true),
3706 Err(Error::FrameUnexpected)
3707 );
3708
3709 assert_eq!(
3710 s.send_body_server(
3711 s.server.peer_qpack_streams.encoder_stream_id.unwrap(),
3712 true
3713 ),
3714 Err(Error::FrameUnexpected)
3715 );
3716
3717 assert_eq!(
3718 s.send_body_server(
3719 s.server.peer_qpack_streams.decoder_stream_id.unwrap(),
3720 true
3721 ),
3722 Err(Error::FrameUnexpected)
3723 );
3724 }
3725
3726 #[test]
3727 /// Send a MAX_PUSH_ID frame from the client on a valid stream.
max_push_id_from_client_good()3728 fn max_push_id_from_client_good() {
3729 let mut s = Session::new().unwrap();
3730 s.handshake().unwrap();
3731
3732 s.send_frame_client(
3733 frame::Frame::MaxPushId { push_id: 1 },
3734 s.client.control_stream_id.unwrap(),
3735 false,
3736 )
3737 .unwrap();
3738
3739 assert_eq!(s.poll_server(), Err(Error::Done));
3740 }
3741
3742 #[test]
3743 /// Send a MAX_PUSH_ID frame from the client on an invalid stream.
max_push_id_from_client_bad_stream()3744 fn max_push_id_from_client_bad_stream() {
3745 let mut s = Session::new().unwrap();
3746 s.handshake().unwrap();
3747
3748 let (stream, req) = s.send_request(false).unwrap();
3749
3750 s.send_frame_client(
3751 frame::Frame::MaxPushId { push_id: 2 },
3752 stream,
3753 false,
3754 )
3755 .unwrap();
3756
3757 let ev_headers = Event::Headers {
3758 list: req,
3759 has_body: true,
3760 };
3761
3762 assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3763 assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
3764 }
3765
3766 #[test]
3767 /// Send a sequence of MAX_PUSH_ID frames from the client that attempt to
3768 /// reduce the limit.
max_push_id_from_client_limit_reduction()3769 fn max_push_id_from_client_limit_reduction() {
3770 let mut s = Session::new().unwrap();
3771 s.handshake().unwrap();
3772
3773 s.send_frame_client(
3774 frame::Frame::MaxPushId { push_id: 2 },
3775 s.client.control_stream_id.unwrap(),
3776 false,
3777 )
3778 .unwrap();
3779
3780 s.send_frame_client(
3781 frame::Frame::MaxPushId { push_id: 1 },
3782 s.client.control_stream_id.unwrap(),
3783 false,
3784 )
3785 .unwrap();
3786
3787 assert_eq!(s.poll_server(), Err(Error::IdError));
3788 }
3789
3790 #[test]
3791 /// Send a MAX_PUSH_ID frame from the server, which is forbidden.
max_push_id_from_server()3792 fn max_push_id_from_server() {
3793 let mut s = Session::new().unwrap();
3794 s.handshake().unwrap();
3795
3796 s.send_frame_server(
3797 frame::Frame::MaxPushId { push_id: 1 },
3798 s.server.control_stream_id.unwrap(),
3799 false,
3800 )
3801 .unwrap();
3802
3803 assert_eq!(s.poll_client(), Err(Error::FrameUnexpected));
3804 }
3805
3806 #[test]
3807 /// Send a PUSH_PROMISE frame from the client, which is forbidden.
push_promise_from_client()3808 fn push_promise_from_client() {
3809 let mut s = Session::new().unwrap();
3810 s.handshake().unwrap();
3811
3812 let (stream, req) = s.send_request(false).unwrap();
3813
3814 let header_block = s.client.encode_header_block(&req).unwrap();
3815
3816 s.send_frame_client(
3817 frame::Frame::PushPromise {
3818 push_id: 1,
3819 header_block,
3820 },
3821 stream,
3822 false,
3823 )
3824 .unwrap();
3825
3826 let ev_headers = Event::Headers {
3827 list: req,
3828 has_body: true,
3829 };
3830
3831 assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3832 assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
3833 }
3834
3835 #[test]
3836 /// Send a CANCEL_PUSH frame from the client.
cancel_push_from_client()3837 fn cancel_push_from_client() {
3838 let mut s = Session::new().unwrap();
3839 s.handshake().unwrap();
3840
3841 s.send_frame_client(
3842 frame::Frame::CancelPush { push_id: 1 },
3843 s.client.control_stream_id.unwrap(),
3844 false,
3845 )
3846 .unwrap();
3847
3848 assert_eq!(s.poll_server(), Err(Error::Done));
3849 }
3850
3851 #[test]
3852 /// Send a CANCEL_PUSH frame from the client on an invalid stream.
cancel_push_from_client_bad_stream()3853 fn cancel_push_from_client_bad_stream() {
3854 let mut s = Session::new().unwrap();
3855 s.handshake().unwrap();
3856
3857 let (stream, req) = s.send_request(false).unwrap();
3858
3859 s.send_frame_client(
3860 frame::Frame::CancelPush { push_id: 2 },
3861 stream,
3862 false,
3863 )
3864 .unwrap();
3865
3866 let ev_headers = Event::Headers {
3867 list: req,
3868 has_body: true,
3869 };
3870
3871 assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3872 assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
3873 }
3874
3875 #[test]
3876 /// Send a CANCEL_PUSH frame from the client.
cancel_push_from_server()3877 fn cancel_push_from_server() {
3878 let mut s = Session::new().unwrap();
3879 s.handshake().unwrap();
3880
3881 s.send_frame_server(
3882 frame::Frame::CancelPush { push_id: 1 },
3883 s.server.control_stream_id.unwrap(),
3884 false,
3885 )
3886 .unwrap();
3887
3888 assert_eq!(s.poll_client(), Err(Error::Done));
3889 }
3890
3891 #[test]
3892 /// Send a GOAWAY frame from the client.
goaway_from_client_good()3893 fn goaway_from_client_good() {
3894 let mut s = Session::new().unwrap();
3895 s.handshake().unwrap();
3896
3897 s.client.send_goaway(&mut s.pipe.client, 100).unwrap();
3898
3899 s.advance().ok();
3900
3901 // TODO: server push
3902 assert_eq!(s.poll_server(), Ok((0, Event::GoAway)));
3903 }
3904
3905 #[test]
3906 /// Send a GOAWAY frame from the server.
goaway_from_server_good()3907 fn goaway_from_server_good() {
3908 let mut s = Session::new().unwrap();
3909 s.handshake().unwrap();
3910
3911 s.server.send_goaway(&mut s.pipe.server, 4000).unwrap();
3912
3913 s.advance().ok();
3914
3915 assert_eq!(s.poll_client(), Ok((4000, Event::GoAway)));
3916 }
3917
3918 #[test]
3919 /// A client MUST NOT send a request after it receives GOAWAY.
client_request_after_goaway()3920 fn client_request_after_goaway() {
3921 let mut s = Session::new().unwrap();
3922 s.handshake().unwrap();
3923
3924 s.server.send_goaway(&mut s.pipe.server, 4000).unwrap();
3925
3926 s.advance().ok();
3927
3928 assert_eq!(s.poll_client(), Ok((4000, Event::GoAway)));
3929
3930 assert_eq!(s.send_request(true), Err(Error::FrameUnexpected));
3931 }
3932
3933 #[test]
3934 /// Send a GOAWAY frame from the server, using an invalid goaway ID.
goaway_from_server_invalid_id()3935 fn goaway_from_server_invalid_id() {
3936 let mut s = Session::new().unwrap();
3937 s.handshake().unwrap();
3938
3939 s.send_frame_server(
3940 frame::Frame::GoAway { id: 1 },
3941 s.server.control_stream_id.unwrap(),
3942 false,
3943 )
3944 .unwrap();
3945
3946 assert_eq!(s.poll_client(), Err(Error::IdError));
3947 }
3948
3949 #[test]
3950 /// Send multiple GOAWAY frames from the server, that increase the goaway
3951 /// ID.
goaway_from_server_increase_id()3952 fn goaway_from_server_increase_id() {
3953 let mut s = Session::new().unwrap();
3954 s.handshake().unwrap();
3955
3956 s.send_frame_server(
3957 frame::Frame::GoAway { id: 0 },
3958 s.server.control_stream_id.unwrap(),
3959 false,
3960 )
3961 .unwrap();
3962
3963 s.send_frame_server(
3964 frame::Frame::GoAway { id: 4 },
3965 s.server.control_stream_id.unwrap(),
3966 false,
3967 )
3968 .unwrap();
3969
3970 assert_eq!(s.poll_client(), Ok((0, Event::GoAway)));
3971
3972 assert_eq!(s.poll_client(), Err(Error::IdError));
3973 }
3974
3975 #[test]
3976 #[cfg(feature = "sfv")]
parse_priority_field_value()3977 fn parse_priority_field_value() {
3978 // Legal dicts
3979 assert_eq!(
3980 Ok(Priority::new(0, false)),
3981 Priority::try_from(b"u=0".as_slice())
3982 );
3983 assert_eq!(
3984 Ok(Priority::new(3, false)),
3985 Priority::try_from(b"u=3".as_slice())
3986 );
3987 assert_eq!(
3988 Ok(Priority::new(7, false)),
3989 Priority::try_from(b"u=7".as_slice())
3990 );
3991
3992 assert_eq!(
3993 Ok(Priority::new(0, true)),
3994 Priority::try_from(b"u=0, i".as_slice())
3995 );
3996 assert_eq!(
3997 Ok(Priority::new(3, true)),
3998 Priority::try_from(b"u=3, i".as_slice())
3999 );
4000 assert_eq!(
4001 Ok(Priority::new(7, true)),
4002 Priority::try_from(b"u=7, i".as_slice())
4003 );
4004
4005 assert_eq!(
4006 Ok(Priority::new(0, true)),
4007 Priority::try_from(b"u=0, i=?1".as_slice())
4008 );
4009 assert_eq!(
4010 Ok(Priority::new(3, true)),
4011 Priority::try_from(b"u=3, i=?1".as_slice())
4012 );
4013 assert_eq!(
4014 Ok(Priority::new(7, true)),
4015 Priority::try_from(b"u=7, i=?1".as_slice())
4016 );
4017
4018 assert_eq!(
4019 Ok(Priority::new(3, false)),
4020 Priority::try_from(b"".as_slice())
4021 );
4022
4023 assert_eq!(
4024 Ok(Priority::new(0, true)),
4025 Priority::try_from(b"u=0;foo, i;bar".as_slice())
4026 );
4027 assert_eq!(
4028 Ok(Priority::new(3, true)),
4029 Priority::try_from(b"u=3;hello, i;world".as_slice())
4030 );
4031 assert_eq!(
4032 Ok(Priority::new(7, true)),
4033 Priority::try_from(b"u=7;croeso, i;gymru".as_slice())
4034 );
4035
4036 assert_eq!(
4037 Ok(Priority::new(0, true)),
4038 Priority::try_from(b"u=0, i, spinaltap=11".as_slice())
4039 );
4040
4041 // Illegal formats
4042 assert_eq!(Err(Error::Done), Priority::try_from(b"0".as_slice()));
4043 assert_eq!(
4044 Ok(Priority::new(7, false)),
4045 Priority::try_from(b"u=-1".as_slice())
4046 );
4047 assert_eq!(Err(Error::Done), Priority::try_from(b"u=0.2".as_slice()));
4048 assert_eq!(
4049 Ok(Priority::new(7, false)),
4050 Priority::try_from(b"u=100".as_slice())
4051 );
4052 assert_eq!(
4053 Err(Error::Done),
4054 Priority::try_from(b"u=3, i=true".as_slice())
4055 );
4056
4057 // Trailing comma in dict is malformed
4058 assert_eq!(Err(Error::Done), Priority::try_from(b"u=7, ".as_slice()));
4059 }
4060
4061 #[test]
4062 /// Send a PRIORITY_UPDATE for request stream from the client.
priority_update_request()4063 fn priority_update_request() {
4064 let mut s = Session::new().unwrap();
4065 s.handshake().unwrap();
4066
4067 s.client
4068 .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4069 urgency: 3,
4070 incremental: false,
4071 })
4072 .unwrap();
4073 s.advance().ok();
4074
4075 assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4076 assert_eq!(s.poll_server(), Err(Error::Done));
4077 }
4078
4079 #[test]
4080 /// Send a PRIORITY_UPDATE for request stream from the client.
priority_update_single_stream_rearm()4081 fn priority_update_single_stream_rearm() {
4082 let mut s = Session::new().unwrap();
4083 s.handshake().unwrap();
4084
4085 s.client
4086 .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4087 urgency: 3,
4088 incremental: false,
4089 })
4090 .unwrap();
4091 s.advance().ok();
4092
4093 assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4094 assert_eq!(s.poll_server(), Err(Error::Done));
4095
4096 s.client
4097 .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4098 urgency: 5,
4099 incremental: false,
4100 })
4101 .unwrap();
4102 s.advance().ok();
4103
4104 assert_eq!(s.poll_server(), Err(Error::Done));
4105
4106 // There is only one PRIORITY_UPDATE frame to read. Once read, the event
4107 // will rearm ready for more.
4108 assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=5".to_vec()));
4109 assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4110
4111 s.client
4112 .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4113 urgency: 7,
4114 incremental: false,
4115 })
4116 .unwrap();
4117 s.advance().ok();
4118
4119 assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4120 assert_eq!(s.poll_server(), Err(Error::Done));
4121
4122 assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=7".to_vec()));
4123 assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4124 }
4125
4126 #[test]
4127 /// Send multiple PRIORITY_UPDATE frames for different streams from the
4128 /// client across multiple flights of exchange.
priority_update_request_multiple_stream_arm_multiple_flights()4129 fn priority_update_request_multiple_stream_arm_multiple_flights() {
4130 let mut s = Session::new().unwrap();
4131 s.handshake().unwrap();
4132
4133 s.client
4134 .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4135 urgency: 3,
4136 incremental: false,
4137 })
4138 .unwrap();
4139 s.advance().ok();
4140
4141 assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4142 assert_eq!(s.poll_server(), Err(Error::Done));
4143
4144 s.client
4145 .send_priority_update_for_request(&mut s.pipe.client, 4, &Priority {
4146 urgency: 1,
4147 incremental: false,
4148 })
4149 .unwrap();
4150 s.advance().ok();
4151
4152 assert_eq!(s.poll_server(), Ok((4, Event::PriorityUpdate)));
4153 assert_eq!(s.poll_server(), Err(Error::Done));
4154
4155 s.client
4156 .send_priority_update_for_request(&mut s.pipe.client, 8, &Priority {
4157 urgency: 2,
4158 incremental: false,
4159 })
4160 .unwrap();
4161 s.advance().ok();
4162
4163 assert_eq!(s.poll_server(), Ok((8, Event::PriorityUpdate)));
4164 assert_eq!(s.poll_server(), Err(Error::Done));
4165
4166 assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
4167 assert_eq!(s.server.take_last_priority_update(4), Ok(b"u=1".to_vec()));
4168 assert_eq!(s.server.take_last_priority_update(8), Ok(b"u=2".to_vec()));
4169 assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4170 }
4171
4172 #[test]
4173 /// Send multiple PRIORITY_UPDATE frames for different streams from the
4174 /// client across a single flight.
priority_update_request_multiple_stream_arm_single_flight()4175 fn priority_update_request_multiple_stream_arm_single_flight() {
4176 let mut s = Session::new().unwrap();
4177 s.handshake().unwrap();
4178
4179 let mut d = [42; 65535];
4180
4181 let mut b = octets::OctetsMut::with_slice(&mut d);
4182
4183 let p1 = frame::Frame::PriorityUpdateRequest {
4184 prioritized_element_id: 0,
4185 priority_field_value: b"u=3".to_vec(),
4186 };
4187
4188 let p2 = frame::Frame::PriorityUpdateRequest {
4189 prioritized_element_id: 4,
4190 priority_field_value: b"u=3".to_vec(),
4191 };
4192
4193 let p3 = frame::Frame::PriorityUpdateRequest {
4194 prioritized_element_id: 8,
4195 priority_field_value: b"u=3".to_vec(),
4196 };
4197
4198 p1.to_bytes(&mut b).unwrap();
4199 p2.to_bytes(&mut b).unwrap();
4200 p3.to_bytes(&mut b).unwrap();
4201
4202 let off = b.off();
4203 s.pipe
4204 .client
4205 .stream_send(s.client.control_stream_id.unwrap(), &d[..off], false)
4206 .unwrap();
4207
4208 s.advance().ok();
4209
4210 assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4211 assert_eq!(s.poll_server(), Ok((4, Event::PriorityUpdate)));
4212 assert_eq!(s.poll_server(), Ok((8, Event::PriorityUpdate)));
4213 assert_eq!(s.poll_server(), Err(Error::Done));
4214
4215 assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
4216 assert_eq!(s.server.take_last_priority_update(4), Ok(b"u=3".to_vec()));
4217 assert_eq!(s.server.take_last_priority_update(8), Ok(b"u=3".to_vec()));
4218
4219 assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4220 }
4221
4222 #[test]
4223 /// Send a PRIORITY_UPDATE for a request stream, before and after the stream
4224 /// has been completed.
priority_update_request_collected_completed()4225 fn priority_update_request_collected_completed() {
4226 let mut s = Session::new().unwrap();
4227 s.handshake().unwrap();
4228
4229 s.client
4230 .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4231 urgency: 3,
4232 incremental: false,
4233 })
4234 .unwrap();
4235 s.advance().ok();
4236
4237 let (stream, req) = s.send_request(true).unwrap();
4238 let ev_headers = Event::Headers {
4239 list: req,
4240 has_body: false,
4241 };
4242
4243 // Priority event is generated before request headers.
4244 assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4245 assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4246 assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4247 assert_eq!(s.poll_server(), Err(Error::Done));
4248
4249 assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
4250 assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4251
4252 let resp = s.send_response(stream, true).unwrap();
4253
4254 let ev_headers = Event::Headers {
4255 list: resp,
4256 has_body: false,
4257 };
4258
4259 assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
4260 assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
4261 assert_eq!(s.poll_client(), Err(Error::Done));
4262
4263 // Now send a PRIORITY_UPDATE for the completed request stream.
4264 s.client
4265 .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4266 urgency: 3,
4267 incremental: false,
4268 })
4269 .unwrap();
4270 s.advance().ok();
4271
4272 // No event generated at server
4273 assert_eq!(s.poll_server(), Err(Error::Done));
4274 }
4275
4276 #[test]
4277 /// Send a PRIORITY_UPDATE for a request stream, before and after the stream
4278 /// has been stopped.
priority_update_request_collected_stopped()4279 fn priority_update_request_collected_stopped() {
4280 let mut s = Session::new().unwrap();
4281 s.handshake().unwrap();
4282
4283 s.client
4284 .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4285 urgency: 3,
4286 incremental: false,
4287 })
4288 .unwrap();
4289 s.advance().ok();
4290
4291 let (stream, req) = s.send_request(false).unwrap();
4292 let ev_headers = Event::Headers {
4293 list: req,
4294 has_body: true,
4295 };
4296
4297 // Priority event is generated before request headers.
4298 assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4299 assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4300 assert_eq!(s.poll_server(), Err(Error::Done));
4301
4302 assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
4303 assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4304
4305 s.pipe
4306 .client
4307 .stream_shutdown(stream, crate::Shutdown::Write, 0x100)
4308 .unwrap();
4309 s.pipe
4310 .client
4311 .stream_shutdown(stream, crate::Shutdown::Read, 0x100)
4312 .unwrap();
4313
4314 s.advance().ok();
4315
4316 assert_eq!(s.poll_server(), Ok((0, Event::Reset(0x100))));
4317 assert_eq!(s.poll_server(), Err(Error::Done));
4318
4319 // Now send a PRIORITY_UPDATE for the closed request stream.
4320 s.client
4321 .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4322 urgency: 3,
4323 incremental: false,
4324 })
4325 .unwrap();
4326 s.advance().ok();
4327
4328 // No event generated at server
4329 assert_eq!(s.poll_server(), Err(Error::Done));
4330 }
4331
4332 #[test]
4333 /// Send a PRIORITY_UPDATE for push stream from the client.
priority_update_push()4334 fn priority_update_push() {
4335 let mut s = Session::new().unwrap();
4336 s.handshake().unwrap();
4337
4338 s.send_frame_client(
4339 frame::Frame::PriorityUpdatePush {
4340 prioritized_element_id: 3,
4341 priority_field_value: b"u=3".to_vec(),
4342 },
4343 s.client.control_stream_id.unwrap(),
4344 false,
4345 )
4346 .unwrap();
4347
4348 assert_eq!(s.poll_server(), Err(Error::Done));
4349 }
4350
4351 #[test]
4352 /// Send a PRIORITY_UPDATE for request stream from the client but for an
4353 /// incorrect stream type.
priority_update_request_bad_stream()4354 fn priority_update_request_bad_stream() {
4355 let mut s = Session::new().unwrap();
4356 s.handshake().unwrap();
4357
4358 s.send_frame_client(
4359 frame::Frame::PriorityUpdateRequest {
4360 prioritized_element_id: 5,
4361 priority_field_value: b"u=3".to_vec(),
4362 },
4363 s.client.control_stream_id.unwrap(),
4364 false,
4365 )
4366 .unwrap();
4367
4368 assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
4369 }
4370
4371 #[test]
4372 /// Send a PRIORITY_UPDATE for push stream from the client but for an
4373 /// incorrect stream type.
priority_update_push_bad_stream()4374 fn priority_update_push_bad_stream() {
4375 let mut s = Session::new().unwrap();
4376 s.handshake().unwrap();
4377
4378 s.send_frame_client(
4379 frame::Frame::PriorityUpdatePush {
4380 prioritized_element_id: 5,
4381 priority_field_value: b"u=3".to_vec(),
4382 },
4383 s.client.control_stream_id.unwrap(),
4384 false,
4385 )
4386 .unwrap();
4387
4388 assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
4389 }
4390
4391 #[test]
4392 /// Send a PRIORITY_UPDATE for request stream from the server.
priority_update_request_from_server()4393 fn priority_update_request_from_server() {
4394 let mut s = Session::new().unwrap();
4395 s.handshake().unwrap();
4396
4397 s.send_frame_server(
4398 frame::Frame::PriorityUpdateRequest {
4399 prioritized_element_id: 0,
4400 priority_field_value: b"u=3".to_vec(),
4401 },
4402 s.server.control_stream_id.unwrap(),
4403 false,
4404 )
4405 .unwrap();
4406
4407 assert_eq!(s.poll_client(), Err(Error::FrameUnexpected));
4408 }
4409
4410 #[test]
4411 /// Send a PRIORITY_UPDATE for request stream from the server.
priority_update_push_from_server()4412 fn priority_update_push_from_server() {
4413 let mut s = Session::new().unwrap();
4414 s.handshake().unwrap();
4415
4416 s.send_frame_server(
4417 frame::Frame::PriorityUpdatePush {
4418 prioritized_element_id: 0,
4419 priority_field_value: b"u=3".to_vec(),
4420 },
4421 s.server.control_stream_id.unwrap(),
4422 false,
4423 )
4424 .unwrap();
4425
4426 assert_eq!(s.poll_client(), Err(Error::FrameUnexpected));
4427 }
4428
4429 #[test]
4430 /// Ensure quiche allocates streams for client and server roles as expected.
uni_stream_local_counting()4431 fn uni_stream_local_counting() {
4432 let config = Config::new().unwrap();
4433
4434 let h3_cln = Connection::new(&config, false, false).unwrap();
4435 assert_eq!(h3_cln.next_uni_stream_id, 2);
4436
4437 let h3_srv = Connection::new(&config, true, false).unwrap();
4438 assert_eq!(h3_srv.next_uni_stream_id, 3);
4439 }
4440
4441 #[test]
4442 /// Client opens multiple control streams, which is forbidden.
open_multiple_control_streams()4443 fn open_multiple_control_streams() {
4444 let mut s = Session::new().unwrap();
4445 s.handshake().unwrap();
4446
4447 let stream_id = s.client.next_uni_stream_id;
4448
4449 let mut d = [42; 8];
4450 let mut b = octets::OctetsMut::with_slice(&mut d);
4451
4452 s.pipe
4453 .client
4454 .stream_send(
4455 stream_id,
4456 b.put_varint(stream::HTTP3_CONTROL_STREAM_TYPE_ID).unwrap(),
4457 false,
4458 )
4459 .unwrap();
4460
4461 s.advance().ok();
4462
4463 assert_eq!(s.poll_server(), Err(Error::StreamCreationError));
4464 }
4465
4466 #[test]
4467 /// Client closes the control stream, which is forbidden.
close_control_stream()4468 fn close_control_stream() {
4469 let mut s = Session::new().unwrap();
4470 s.handshake().unwrap();
4471
4472 let mut control_stream_closed = false;
4473
4474 s.send_frame_client(
4475 frame::Frame::MaxPushId { push_id: 1 },
4476 s.client.control_stream_id.unwrap(),
4477 true,
4478 )
4479 .unwrap();
4480
4481 loop {
4482 match s.server.poll(&mut s.pipe.server) {
4483 Ok(_) => (),
4484
4485 Err(Error::Done) => {
4486 break;
4487 },
4488
4489 Err(Error::ClosedCriticalStream) => {
4490 control_stream_closed = true;
4491 break;
4492 },
4493
4494 Err(_) => (),
4495 }
4496 }
4497
4498 assert!(control_stream_closed);
4499 }
4500
4501 #[test]
4502 /// Client closes QPACK stream, which is forbidden.
close_qpack_stream()4503 fn close_qpack_stream() {
4504 let mut s = Session::new().unwrap();
4505 s.handshake().unwrap();
4506
4507 let mut qpack_stream_closed = false;
4508
4509 let stream_id = s.client.local_qpack_streams.encoder_stream_id.unwrap();
4510 let d = [0; 1];
4511
4512 s.pipe.client.stream_send(stream_id, &d, false).unwrap();
4513 s.pipe.client.stream_send(stream_id, &d, true).unwrap();
4514
4515 s.advance().ok();
4516
4517 loop {
4518 match s.server.poll(&mut s.pipe.server) {
4519 Ok(_) => (),
4520
4521 Err(Error::Done) => {
4522 break;
4523 },
4524
4525 Err(Error::ClosedCriticalStream) => {
4526 qpack_stream_closed = true;
4527 break;
4528 },
4529
4530 Err(_) => (),
4531 }
4532 }
4533
4534 assert!(qpack_stream_closed);
4535 }
4536
4537 #[test]
4538 /// Client sends QPACK data.
qpack_data()4539 fn qpack_data() {
4540 // TODO: QPACK instructions are ignored until dynamic table support is
4541 // added so we just test that the data is safely ignored.
4542 let mut s = Session::new().unwrap();
4543 s.handshake().unwrap();
4544
4545 let e_stream_id = s.client.local_qpack_streams.encoder_stream_id.unwrap();
4546 let d_stream_id = s.client.local_qpack_streams.decoder_stream_id.unwrap();
4547 let d = [0; 20];
4548
4549 s.pipe.client.stream_send(e_stream_id, &d, false).unwrap();
4550 s.advance().ok();
4551
4552 s.pipe.client.stream_send(d_stream_id, &d, false).unwrap();
4553 s.advance().ok();
4554
4555 loop {
4556 match s.server.poll(&mut s.pipe.server) {
4557 Ok(_) => (),
4558
4559 Err(Error::Done) => {
4560 break;
4561 },
4562
4563 Err(_) => {
4564 panic!();
4565 },
4566 }
4567 }
4568 }
4569
4570 #[test]
4571 /// Tests limits for the stream state buffer maximum size.
max_state_buf_size()4572 fn max_state_buf_size() {
4573 let mut s = Session::new().unwrap();
4574 s.handshake().unwrap();
4575
4576 let req = vec![
4577 Header::new(b":method", b"GET"),
4578 Header::new(b":scheme", b"https"),
4579 Header::new(b":authority", b"quic.tech"),
4580 Header::new(b":path", b"/test"),
4581 Header::new(b"user-agent", b"quiche-test"),
4582 ];
4583
4584 assert_eq!(
4585 s.client.send_request(&mut s.pipe.client, &req, false),
4586 Ok(0)
4587 );
4588
4589 s.advance().ok();
4590
4591 let ev_headers = Event::Headers {
4592 list: req,
4593 has_body: true,
4594 };
4595
4596 assert_eq!(s.server.poll(&mut s.pipe.server), Ok((0, ev_headers)));
4597
4598 // DATA frames don't consume the state buffer, so can be of any size.
4599 let mut d = [42; 128];
4600 let mut b = octets::OctetsMut::with_slice(&mut d);
4601
4602 let frame_type = b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
4603 s.pipe.client.stream_send(0, frame_type, false).unwrap();
4604
4605 let frame_len = b.put_varint(1 << 24).unwrap();
4606 s.pipe.client.stream_send(0, frame_len, false).unwrap();
4607
4608 s.pipe.client.stream_send(0, &d, false).unwrap();
4609
4610 s.advance().ok();
4611
4612 assert_eq!(s.server.poll(&mut s.pipe.server), Ok((0, Event::Data)));
4613
4614 // GREASE frames consume the state buffer, so need to be limited.
4615 let mut s = Session::new().unwrap();
4616 s.handshake().unwrap();
4617
4618 let mut d = [42; 128];
4619 let mut b = octets::OctetsMut::with_slice(&mut d);
4620
4621 let frame_type = b.put_varint(148_764_065_110_560_899).unwrap();
4622 s.pipe.client.stream_send(0, frame_type, false).unwrap();
4623
4624 let frame_len = b.put_varint(1 << 24).unwrap();
4625 s.pipe.client.stream_send(0, frame_len, false).unwrap();
4626
4627 s.pipe.client.stream_send(0, &d, false).unwrap();
4628
4629 s.advance().ok();
4630
4631 assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::ExcessiveLoad));
4632 }
4633
4634 #[test]
4635 /// Tests that DATA frames are properly truncated depending on the request
4636 /// stream's outgoing flow control capacity.
stream_backpressure()4637 fn stream_backpressure() {
4638 let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
4639
4640 let mut s = Session::new().unwrap();
4641 s.handshake().unwrap();
4642
4643 let (stream, req) = s.send_request(false).unwrap();
4644
4645 let total_data_frames = 6;
4646
4647 for _ in 0..total_data_frames {
4648 assert_eq!(
4649 s.client
4650 .send_body(&mut s.pipe.client, stream, &bytes, false),
4651 Ok(bytes.len())
4652 );
4653
4654 s.advance().ok();
4655 }
4656
4657 assert_eq!(
4658 s.client.send_body(&mut s.pipe.client, stream, &bytes, true),
4659 Ok(bytes.len() - 2)
4660 );
4661
4662 s.advance().ok();
4663
4664 let mut recv_buf = vec![0; bytes.len()];
4665
4666 let ev_headers = Event::Headers {
4667 list: req,
4668 has_body: true,
4669 };
4670
4671 assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4672 assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
4673 assert_eq!(s.poll_server(), Err(Error::Done));
4674
4675 for _ in 0..total_data_frames {
4676 assert_eq!(
4677 s.recv_body_server(stream, &mut recv_buf),
4678 Ok(bytes.len())
4679 );
4680 }
4681
4682 assert_eq!(
4683 s.recv_body_server(stream, &mut recv_buf),
4684 Ok(bytes.len() - 2)
4685 );
4686
4687 // Fin flag from last send_body() call was not sent as the buffer was
4688 // only partially written.
4689 assert_eq!(s.poll_server(), Err(Error::Done));
4690 }
4691
4692 #[test]
4693 /// Tests that the max header list size setting is enforced.
request_max_header_size_limit()4694 fn request_max_header_size_limit() {
4695 let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
4696 config
4697 .load_cert_chain_from_pem_file("examples/cert.crt")
4698 .unwrap();
4699 config
4700 .load_priv_key_from_pem_file("examples/cert.key")
4701 .unwrap();
4702 config.set_application_protos(&[b"h3"]).unwrap();
4703 config.set_initial_max_data(1500);
4704 config.set_initial_max_stream_data_bidi_local(150);
4705 config.set_initial_max_stream_data_bidi_remote(150);
4706 config.set_initial_max_stream_data_uni(150);
4707 config.set_initial_max_streams_bidi(5);
4708 config.set_initial_max_streams_uni(5);
4709 config.verify_peer(false);
4710
4711 let mut h3_config = Config::new().unwrap();
4712 h3_config.set_max_field_section_size(65);
4713
4714 let mut s = Session::with_configs(&mut config, &mut h3_config).unwrap();
4715
4716 s.handshake().unwrap();
4717
4718 let req = vec![
4719 Header::new(b":method", b"GET"),
4720 Header::new(b":scheme", b"https"),
4721 Header::new(b":authority", b"quic.tech"),
4722 Header::new(b":path", b"/test"),
4723 Header::new(b"aaaaaaa", b"aaaaaaaa"),
4724 ];
4725
4726 let stream = s
4727 .client
4728 .send_request(&mut s.pipe.client, &req, true)
4729 .unwrap();
4730
4731 s.advance().ok();
4732
4733 assert_eq!(stream, 0);
4734
4735 assert_eq!(s.poll_server(), Err(Error::ExcessiveLoad));
4736
4737 assert_eq!(
4738 s.pipe.server.local_error.as_ref().unwrap().error_code,
4739 Error::to_wire(Error::ExcessiveLoad)
4740 );
4741 }
4742
4743 #[test]
4744 /// Tests that Error::TransportError contains a transport error.
transport_error()4745 fn transport_error() {
4746 let mut s = Session::new().unwrap();
4747 s.handshake().unwrap();
4748
4749 let req = vec![
4750 Header::new(b":method", b"GET"),
4751 Header::new(b":scheme", b"https"),
4752 Header::new(b":authority", b"quic.tech"),
4753 Header::new(b":path", b"/test"),
4754 Header::new(b"user-agent", b"quiche-test"),
4755 ];
4756
4757 // We need to open all streams in the same flight, so we can't use the
4758 // Session::send_request() method because it also calls advance(),
4759 // otherwise the server would send a MAX_STREAMS frame and the client
4760 // wouldn't hit the streams limit.
4761 assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(0));
4762 assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(4));
4763 assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(8));
4764 assert_eq!(
4765 s.client.send_request(&mut s.pipe.client, &req, true),
4766 Ok(12)
4767 );
4768 assert_eq!(
4769 s.client.send_request(&mut s.pipe.client, &req, true),
4770 Ok(16)
4771 );
4772
4773 assert_eq!(
4774 s.client.send_request(&mut s.pipe.client, &req, true),
4775 Err(Error::TransportError(crate::Error::StreamLimit))
4776 );
4777 }
4778
4779 #[test]
4780 /// Tests that sending DATA before HEADERS causes an error.
data_before_headers()4781 fn data_before_headers() {
4782 let mut s = Session::new().unwrap();
4783 s.handshake().unwrap();
4784
4785 let mut d = [42; 128];
4786 let mut b = octets::OctetsMut::with_slice(&mut d);
4787
4788 let frame_type = b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
4789 s.pipe.client.stream_send(0, frame_type, false).unwrap();
4790
4791 let frame_len = b.put_varint(5).unwrap();
4792 s.pipe.client.stream_send(0, frame_len, false).unwrap();
4793
4794 s.pipe.client.stream_send(0, b"hello", false).unwrap();
4795
4796 s.advance().ok();
4797
4798 assert_eq!(
4799 s.server.poll(&mut s.pipe.server),
4800 Err(Error::FrameUnexpected)
4801 );
4802 }
4803
4804 #[test]
4805 /// Tests that calling poll() after an error occurred does nothing.
poll_after_error()4806 fn poll_after_error() {
4807 let mut s = Session::new().unwrap();
4808 s.handshake().unwrap();
4809
4810 let mut d = [42; 128];
4811 let mut b = octets::OctetsMut::with_slice(&mut d);
4812
4813 let frame_type = b.put_varint(148_764_065_110_560_899).unwrap();
4814 s.pipe.client.stream_send(0, frame_type, false).unwrap();
4815
4816 let frame_len = b.put_varint(1 << 24).unwrap();
4817 s.pipe.client.stream_send(0, frame_len, false).unwrap();
4818
4819 s.pipe.client.stream_send(0, &d, false).unwrap();
4820
4821 s.advance().ok();
4822
4823 assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::ExcessiveLoad));
4824
4825 // Try to call poll() again after an error occurred.
4826 assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::Done));
4827 }
4828
4829 #[test]
4830 /// Tests that we limit sending HEADERS based on the stream capacity.
headers_blocked()4831 fn headers_blocked() {
4832 let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
4833 config
4834 .load_cert_chain_from_pem_file("examples/cert.crt")
4835 .unwrap();
4836 config
4837 .load_priv_key_from_pem_file("examples/cert.key")
4838 .unwrap();
4839 config.set_application_protos(&[b"h3"]).unwrap();
4840 config.set_initial_max_data(70);
4841 config.set_initial_max_stream_data_bidi_local(150);
4842 config.set_initial_max_stream_data_bidi_remote(150);
4843 config.set_initial_max_stream_data_uni(150);
4844 config.set_initial_max_streams_bidi(100);
4845 config.set_initial_max_streams_uni(5);
4846 config.verify_peer(false);
4847
4848 let mut h3_config = Config::new().unwrap();
4849
4850 let mut s = Session::with_configs(&mut config, &mut h3_config).unwrap();
4851
4852 s.handshake().unwrap();
4853
4854 let req = vec![
4855 Header::new(b":method", b"GET"),
4856 Header::new(b":scheme", b"https"),
4857 Header::new(b":authority", b"quic.tech"),
4858 Header::new(b":path", b"/test"),
4859 ];
4860
4861 assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(0));
4862
4863 assert_eq!(
4864 s.client.send_request(&mut s.pipe.client, &req, true),
4865 Err(Error::StreamBlocked)
4866 );
4867
4868 // Clear the writable stream queue.
4869 assert_eq!(s.pipe.client.stream_writable_next(), Some(10));
4870 assert_eq!(s.pipe.client.stream_writable_next(), Some(2));
4871 assert_eq!(s.pipe.client.stream_writable_next(), Some(6));
4872 assert_eq!(s.pipe.client.stream_writable_next(), None);
4873
4874 s.advance().ok();
4875
4876 // Once the server gives flow control credits back, we can send the
4877 // request.
4878 assert_eq!(s.pipe.client.stream_writable_next(), Some(4));
4879 assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(4));
4880 }
4881
4882 #[test]
4883 /// Ensure StreamBlocked when connection flow control prevents headers.
headers_blocked_on_conn()4884 fn headers_blocked_on_conn() {
4885 let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
4886 config
4887 .load_cert_chain_from_pem_file("examples/cert.crt")
4888 .unwrap();
4889 config
4890 .load_priv_key_from_pem_file("examples/cert.key")
4891 .unwrap();
4892 config.set_application_protos(&[b"h3"]).unwrap();
4893 config.set_initial_max_data(70);
4894 config.set_initial_max_stream_data_bidi_local(150);
4895 config.set_initial_max_stream_data_bidi_remote(150);
4896 config.set_initial_max_stream_data_uni(150);
4897 config.set_initial_max_streams_bidi(100);
4898 config.set_initial_max_streams_uni(5);
4899 config.verify_peer(false);
4900
4901 let mut h3_config = Config::new().unwrap();
4902
4903 let mut s = Session::with_configs(&mut config, &mut h3_config).unwrap();
4904
4905 s.handshake().unwrap();
4906
4907 // After the HTTP handshake, some bytes of connection flow control have
4908 // been consumed. Fill the connection with more grease data on the control
4909 // stream.
4910 let d = [42; 28];
4911 assert_eq!(s.pipe.client.stream_send(2, &d, false), Ok(23));
4912
4913 let req = vec![
4914 Header::new(b":method", b"GET"),
4915 Header::new(b":scheme", b"https"),
4916 Header::new(b":authority", b"quic.tech"),
4917 Header::new(b":path", b"/test"),
4918 ];
4919
4920 // There is 0 connection-level flow control, so sending a request is
4921 // blocked.
4922 assert_eq!(
4923 s.client.send_request(&mut s.pipe.client, &req, true),
4924 Err(Error::StreamBlocked)
4925 );
4926 assert_eq!(s.pipe.client.stream_writable_next(), None);
4927
4928 // Emit the control stream data and drain it at the server via poll() to
4929 // consumes it via poll() and gives back flow control.
4930 s.advance().ok();
4931 assert_eq!(s.poll_server(), Err(Error::Done));
4932 s.advance().ok();
4933
4934 // Now we can send the request.
4935 assert_eq!(s.pipe.client.stream_writable_next(), Some(0));
4936 assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(0));
4937 }
4938
4939 #[test]
4940 /// Ensure STREAM_DATA_BLOCKED is not emitted multiple times with the same
4941 /// offset when trying to send large bodies.
send_body_truncation_stream_blocked()4942 fn send_body_truncation_stream_blocked() {
4943 use crate::testing::decode_pkt;
4944
4945 let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
4946 config
4947 .load_cert_chain_from_pem_file("examples/cert.crt")
4948 .unwrap();
4949 config
4950 .load_priv_key_from_pem_file("examples/cert.key")
4951 .unwrap();
4952 config.set_application_protos(&[b"h3"]).unwrap();
4953 config.set_initial_max_data(10000); // large connection-level flow control
4954 config.set_initial_max_stream_data_bidi_local(80);
4955 config.set_initial_max_stream_data_bidi_remote(80);
4956 config.set_initial_max_stream_data_uni(150);
4957 config.set_initial_max_streams_bidi(100);
4958 config.set_initial_max_streams_uni(5);
4959 config.verify_peer(false);
4960
4961 let mut h3_config = Config::new().unwrap();
4962
4963 let mut s = Session::with_configs(&mut config, &mut h3_config).unwrap();
4964
4965 s.handshake().unwrap();
4966
4967 let (stream, req) = s.send_request(true).unwrap();
4968
4969 let ev_headers = Event::Headers {
4970 list: req,
4971 has_body: false,
4972 };
4973
4974 assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4975 assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4976
4977 let _ = s.send_response(stream, false).unwrap();
4978
4979 assert_eq!(s.pipe.server.streams.blocked().len(), 0);
4980
4981 // The body must be larger than the stream window would allow
4982 let d = [42; 500];
4983 let mut off = 0;
4984
4985 let sent = s
4986 .server
4987 .send_body(&mut s.pipe.server, stream, &d, true)
4988 .unwrap();
4989 assert_eq!(sent, 25);
4990 off += sent;
4991
4992 // send_body wrote as much as it could (sent < size of buff).
4993 assert_eq!(s.pipe.server.streams.blocked().len(), 1);
4994 assert_eq!(
4995 s.server
4996 .send_body(&mut s.pipe.server, stream, &d[off..], true),
4997 Err(Error::Done)
4998 );
4999 assert_eq!(s.pipe.server.streams.blocked().len(), 1);
5000
5001 // Now read raw frames to see what the QUIC layer did
5002 let mut buf = [0; 65535];
5003 let (len, _) = s.pipe.server.send(&mut buf).unwrap();
5004
5005 let frames = decode_pkt(&mut s.pipe.client, &mut buf, len).unwrap();
5006
5007 let mut iter = frames.iter();
5008
5009 assert_eq!(
5010 iter.next(),
5011 Some(&crate::frame::Frame::StreamDataBlocked {
5012 stream_id: 0,
5013 limit: 80,
5014 })
5015 );
5016
5017 // At the server, after sending the STREAM_DATA_BLOCKED frame, we clear
5018 // the mark.
5019 assert_eq!(s.pipe.server.streams.blocked().len(), 0);
5020
5021 // Don't read any data from the client, so stream flow control is never
5022 // given back in the form of changing the stream's max offset.
5023 // Subsequent body send operations will still fail but no more
5024 // STREAM_DATA_BLOCKED frames should be submitted since the limit didn't
5025 // change. No frames means no packet to send.
5026 assert_eq!(
5027 s.server
5028 .send_body(&mut s.pipe.server, stream, &d[off..], true),
5029 Err(Error::Done)
5030 );
5031 assert_eq!(s.pipe.server.streams.blocked().len(), 0);
5032 assert_eq!(s.pipe.server.send(&mut buf), Err(crate::Error::Done));
5033
5034 // Now update the client's max offset manually.
5035 let frames = [crate::frame::Frame::MaxStreamData {
5036 stream_id: 0,
5037 max: 100,
5038 }];
5039
5040 let pkt_type = crate::packet::Type::Short;
5041 assert_eq!(
5042 s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
5043 Ok(39),
5044 );
5045
5046 let sent = s
5047 .server
5048 .send_body(&mut s.pipe.server, stream, &d[off..], true)
5049 .unwrap();
5050 assert_eq!(sent, 18);
5051
5052 // Same thing here...
5053 assert_eq!(s.pipe.server.streams.blocked().len(), 1);
5054 assert_eq!(
5055 s.server
5056 .send_body(&mut s.pipe.server, stream, &d[off..], true),
5057 Err(Error::Done)
5058 );
5059 assert_eq!(s.pipe.server.streams.blocked().len(), 1);
5060
5061 let (len, _) = s.pipe.server.send(&mut buf).unwrap();
5062
5063 let frames = decode_pkt(&mut s.pipe.client, &mut buf, len).unwrap();
5064
5065 let mut iter = frames.iter();
5066
5067 assert_eq!(
5068 iter.next(),
5069 Some(&crate::frame::Frame::StreamDataBlocked {
5070 stream_id: 0,
5071 limit: 100,
5072 })
5073 );
5074 }
5075
5076 #[test]
5077 /// Ensure stream doesn't hang due to small cwnd.
send_body_stream_blocked_by_small_cwnd()5078 fn send_body_stream_blocked_by_small_cwnd() {
5079 let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5080 config
5081 .load_cert_chain_from_pem_file("examples/cert.crt")
5082 .unwrap();
5083 config
5084 .load_priv_key_from_pem_file("examples/cert.key")
5085 .unwrap();
5086 config.set_application_protos(&[b"h3"]).unwrap();
5087 config.set_initial_max_data(100000); // large connection-level flow control
5088 config.set_initial_max_stream_data_bidi_local(100000);
5089 config.set_initial_max_stream_data_bidi_remote(50000);
5090 config.set_initial_max_stream_data_uni(150);
5091 config.set_initial_max_streams_bidi(100);
5092 config.set_initial_max_streams_uni(5);
5093 config.verify_peer(false);
5094
5095 let mut h3_config = Config::new().unwrap();
5096
5097 let mut s = Session::with_configs(&mut config, &mut h3_config).unwrap();
5098
5099 s.handshake().unwrap();
5100
5101 let (stream, req) = s.send_request(true).unwrap();
5102
5103 let ev_headers = Event::Headers {
5104 list: req,
5105 has_body: false,
5106 };
5107
5108 assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5109 assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
5110
5111 let _ = s.send_response(stream, false).unwrap();
5112
5113 // Clear the writable stream queue.
5114 assert_eq!(s.pipe.server.stream_writable_next(), Some(stream));
5115 assert_eq!(s.pipe.server.stream_writable_next(), Some(11));
5116 assert_eq!(s.pipe.server.stream_writable_next(), Some(3));
5117 assert_eq!(s.pipe.server.stream_writable_next(), Some(7));
5118 assert_eq!(s.pipe.server.stream_writable_next(), None);
5119
5120 // The body must be larger than the cwnd would allow.
5121 let send_buf = [42; 80000];
5122
5123 let sent = s
5124 .server
5125 .send_body(&mut s.pipe.server, stream, &send_buf, true)
5126 .unwrap();
5127
5128 // send_body wrote as much as it could (sent < size of buff).
5129 assert_eq!(sent, 11995);
5130
5131 s.advance().ok();
5132
5133 // Client reads received headers and body.
5134 let mut recv_buf = [42; 80000];
5135 assert!(s.poll_client().is_ok());
5136 assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
5137 assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(11995));
5138
5139 s.advance().ok();
5140
5141 // Server send cap is smaller than remaining body buffer.
5142 assert!(s.pipe.server.tx_cap < send_buf.len() - sent);
5143
5144 // Once the server cwnd opens up, we can send more body.
5145 assert_eq!(s.pipe.server.stream_writable_next(), Some(0));
5146 }
5147
5148 #[test]
5149 /// Ensure stream doesn't hang due to small cwnd.
send_body_stream_blocked_zero_length()5150 fn send_body_stream_blocked_zero_length() {
5151 let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5152 config
5153 .load_cert_chain_from_pem_file("examples/cert.crt")
5154 .unwrap();
5155 config
5156 .load_priv_key_from_pem_file("examples/cert.key")
5157 .unwrap();
5158 config.set_application_protos(&[b"h3"]).unwrap();
5159 config.set_initial_max_data(100000); // large connection-level flow control
5160 config.set_initial_max_stream_data_bidi_local(100000);
5161 config.set_initial_max_stream_data_bidi_remote(50000);
5162 config.set_initial_max_stream_data_uni(150);
5163 config.set_initial_max_streams_bidi(100);
5164 config.set_initial_max_streams_uni(5);
5165 config.verify_peer(false);
5166
5167 let mut h3_config = Config::new().unwrap();
5168
5169 let mut s = Session::with_configs(&mut config, &mut h3_config).unwrap();
5170
5171 s.handshake().unwrap();
5172
5173 let (stream, req) = s.send_request(true).unwrap();
5174
5175 let ev_headers = Event::Headers {
5176 list: req,
5177 has_body: false,
5178 };
5179
5180 assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5181 assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
5182
5183 let _ = s.send_response(stream, false).unwrap();
5184
5185 // Clear the writable stream queue.
5186 assert_eq!(s.pipe.server.stream_writable_next(), Some(stream));
5187 assert_eq!(s.pipe.server.stream_writable_next(), Some(11));
5188 assert_eq!(s.pipe.server.stream_writable_next(), Some(3));
5189 assert_eq!(s.pipe.server.stream_writable_next(), Some(7));
5190 assert_eq!(s.pipe.server.stream_writable_next(), None);
5191
5192 // The body is large enough to fill the cwnd, except for enough bytes
5193 // for another DATA frame header (but no payload).
5194 let send_buf = [42; 11994];
5195
5196 let sent = s
5197 .server
5198 .send_body(&mut s.pipe.server, stream, &send_buf, false)
5199 .unwrap();
5200
5201 assert_eq!(sent, 11994);
5202
5203 // There is only enough capacity left for the DATA frame header, but
5204 // no payload.
5205 assert_eq!(s.pipe.server.stream_capacity(stream).unwrap(), 3);
5206 assert_eq!(
5207 s.server
5208 .send_body(&mut s.pipe.server, stream, &send_buf, false),
5209 Err(Error::Done)
5210 );
5211
5212 s.advance().ok();
5213
5214 // Client reads received headers and body.
5215 let mut recv_buf = [42; 80000];
5216 assert!(s.poll_client().is_ok());
5217 assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
5218 assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(11994));
5219
5220 s.advance().ok();
5221
5222 // Once the server cwnd opens up, we can send more body.
5223 assert_eq!(s.pipe.server.stream_writable_next(), Some(0));
5224 }
5225
5226 #[test]
5227 /// Test handling of 0-length DATA writes with and without fin.
zero_length_data()5228 fn zero_length_data() {
5229 let mut s = Session::new().unwrap();
5230 s.handshake().unwrap();
5231
5232 let (stream, req) = s.send_request(false).unwrap();
5233
5234 assert_eq!(
5235 s.client.send_body(&mut s.pipe.client, 0, b"", false),
5236 Err(Error::Done)
5237 );
5238 assert_eq!(s.client.send_body(&mut s.pipe.client, 0, b"", true), Ok(0));
5239
5240 s.advance().ok();
5241
5242 let mut recv_buf = vec![0; 100];
5243
5244 let ev_headers = Event::Headers {
5245 list: req,
5246 has_body: true,
5247 };
5248
5249 assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5250
5251 assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
5252 assert_eq!(s.recv_body_server(stream, &mut recv_buf), Err(Error::Done));
5253
5254 assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
5255 assert_eq!(s.poll_server(), Err(Error::Done));
5256
5257 let resp = s.send_response(stream, false).unwrap();
5258
5259 assert_eq!(
5260 s.server.send_body(&mut s.pipe.server, 0, b"", false),
5261 Err(Error::Done)
5262 );
5263 assert_eq!(s.server.send_body(&mut s.pipe.server, 0, b"", true), Ok(0));
5264
5265 s.advance().ok();
5266
5267 let ev_headers = Event::Headers {
5268 list: resp,
5269 has_body: true,
5270 };
5271
5272 assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
5273
5274 assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
5275 assert_eq!(s.recv_body_client(stream, &mut recv_buf), Err(Error::Done));
5276
5277 assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
5278 assert_eq!(s.poll_client(), Err(Error::Done));
5279 }
5280
5281 #[test]
5282 /// Tests that blocked 0-length DATA writes are reported correctly.
zero_length_data_blocked()5283 fn zero_length_data_blocked() {
5284 let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5285 config
5286 .load_cert_chain_from_pem_file("examples/cert.crt")
5287 .unwrap();
5288 config
5289 .load_priv_key_from_pem_file("examples/cert.key")
5290 .unwrap();
5291 config.set_application_protos(&[b"h3"]).unwrap();
5292 config.set_initial_max_data(69);
5293 config.set_initial_max_stream_data_bidi_local(150);
5294 config.set_initial_max_stream_data_bidi_remote(150);
5295 config.set_initial_max_stream_data_uni(150);
5296 config.set_initial_max_streams_bidi(100);
5297 config.set_initial_max_streams_uni(5);
5298 config.verify_peer(false);
5299
5300 let mut h3_config = Config::new().unwrap();
5301
5302 let mut s = Session::with_configs(&mut config, &mut h3_config).unwrap();
5303
5304 s.handshake().unwrap();
5305
5306 let req = vec![
5307 Header::new(b":method", b"GET"),
5308 Header::new(b":scheme", b"https"),
5309 Header::new(b":authority", b"quic.tech"),
5310 Header::new(b":path", b"/test"),
5311 ];
5312
5313 assert_eq!(
5314 s.client.send_request(&mut s.pipe.client, &req, false),
5315 Ok(0)
5316 );
5317
5318 assert_eq!(
5319 s.client.send_body(&mut s.pipe.client, 0, b"", true),
5320 Err(Error::Done)
5321 );
5322
5323 // Clear the writable stream queue.
5324 assert_eq!(s.pipe.client.stream_writable_next(), Some(10));
5325 assert_eq!(s.pipe.client.stream_writable_next(), Some(2));
5326 assert_eq!(s.pipe.client.stream_writable_next(), Some(6));
5327 assert_eq!(s.pipe.client.stream_writable_next(), None);
5328
5329 s.advance().ok();
5330
5331 // Once the server gives flow control credits back, we can send the body.
5332 assert_eq!(s.pipe.client.stream_writable_next(), Some(0));
5333 assert_eq!(s.client.send_body(&mut s.pipe.client, 0, b"", true), Ok(0));
5334 }
5335
5336 #[test]
5337 /// Tests that receiving an empty SETTINGS frame is handled and reported.
empty_settings()5338 fn empty_settings() {
5339 let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5340 config
5341 .load_cert_chain_from_pem_file("examples/cert.crt")
5342 .unwrap();
5343 config
5344 .load_priv_key_from_pem_file("examples/cert.key")
5345 .unwrap();
5346 config.set_application_protos(&[b"h3"]).unwrap();
5347 config.set_initial_max_data(1500);
5348 config.set_initial_max_stream_data_bidi_local(150);
5349 config.set_initial_max_stream_data_bidi_remote(150);
5350 config.set_initial_max_stream_data_uni(150);
5351 config.set_initial_max_streams_bidi(5);
5352 config.set_initial_max_streams_uni(5);
5353 config.verify_peer(false);
5354 config.set_ack_delay_exponent(8);
5355 config.grease(false);
5356
5357 let h3_config = Config::new().unwrap();
5358 let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5359
5360 s.handshake().unwrap();
5361
5362 assert!(s.client.peer_settings_raw().is_some());
5363 assert!(s.server.peer_settings_raw().is_some());
5364 }
5365
5366 #[test]
5367 /// Tests that receiving a H3_DATAGRAM setting is ok.
dgram_setting()5368 fn dgram_setting() {
5369 let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5370 config
5371 .load_cert_chain_from_pem_file("examples/cert.crt")
5372 .unwrap();
5373 config
5374 .load_priv_key_from_pem_file("examples/cert.key")
5375 .unwrap();
5376 config.set_application_protos(&[b"h3"]).unwrap();
5377 config.set_initial_max_data(70);
5378 config.set_initial_max_stream_data_bidi_local(150);
5379 config.set_initial_max_stream_data_bidi_remote(150);
5380 config.set_initial_max_stream_data_uni(150);
5381 config.set_initial_max_streams_bidi(100);
5382 config.set_initial_max_streams_uni(5);
5383 config.enable_dgram(true, 1000, 1000);
5384 config.verify_peer(false);
5385
5386 let h3_config = Config::new().unwrap();
5387
5388 let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5389 assert_eq!(s.pipe.handshake(), Ok(()));
5390
5391 s.client.send_settings(&mut s.pipe.client).unwrap();
5392 assert_eq!(s.pipe.advance(), Ok(()));
5393
5394 // Before processing SETTINGS (via poll), HTTP/3 DATAGRAMS are not
5395 // enabled.
5396 assert!(!s.server.dgram_enabled_by_peer(&s.pipe.server));
5397
5398 // When everything is ok, poll returns Done and DATAGRAM is enabled.
5399 assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::Done));
5400 assert!(s.server.dgram_enabled_by_peer(&s.pipe.server));
5401
5402 // Now detect things on the client
5403 s.server.send_settings(&mut s.pipe.server).unwrap();
5404 assert_eq!(s.pipe.advance(), Ok(()));
5405 assert!(!s.client.dgram_enabled_by_peer(&s.pipe.client));
5406 assert_eq!(s.client.poll(&mut s.pipe.client), Err(Error::Done));
5407 assert!(s.client.dgram_enabled_by_peer(&s.pipe.client));
5408 }
5409
5410 #[test]
5411 /// Tests that receiving a H3_DATAGRAM setting when no TP is set generates
5412 /// an error.
dgram_setting_no_tp()5413 fn dgram_setting_no_tp() {
5414 let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5415 config
5416 .load_cert_chain_from_pem_file("examples/cert.crt")
5417 .unwrap();
5418 config
5419 .load_priv_key_from_pem_file("examples/cert.key")
5420 .unwrap();
5421 config.set_application_protos(&[b"h3"]).unwrap();
5422 config.set_initial_max_data(70);
5423 config.set_initial_max_stream_data_bidi_local(150);
5424 config.set_initial_max_stream_data_bidi_remote(150);
5425 config.set_initial_max_stream_data_uni(150);
5426 config.set_initial_max_streams_bidi(100);
5427 config.set_initial_max_streams_uni(5);
5428 config.verify_peer(false);
5429
5430 let h3_config = Config::new().unwrap();
5431
5432 let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5433 assert_eq!(s.pipe.handshake(), Ok(()));
5434
5435 s.client.control_stream_id = Some(
5436 s.client
5437 .open_uni_stream(
5438 &mut s.pipe.client,
5439 stream::HTTP3_CONTROL_STREAM_TYPE_ID,
5440 )
5441 .unwrap(),
5442 );
5443
5444 let settings = frame::Frame::Settings {
5445 max_field_section_size: None,
5446 qpack_max_table_capacity: None,
5447 qpack_blocked_streams: None,
5448 connect_protocol_enabled: None,
5449 h3_datagram: Some(1),
5450 grease: None,
5451 raw: Default::default(),
5452 };
5453
5454 s.send_frame_client(settings, s.client.control_stream_id.unwrap(), false)
5455 .unwrap();
5456
5457 assert_eq!(s.pipe.advance(), Ok(()));
5458
5459 assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::SettingsError));
5460 }
5461
5462 #[test]
5463 /// Tests that receiving SETTINGS with prohibited values generates an error.
settings_h2_prohibited()5464 fn settings_h2_prohibited() {
5465 let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5466 config
5467 .load_cert_chain_from_pem_file("examples/cert.crt")
5468 .unwrap();
5469 config
5470 .load_priv_key_from_pem_file("examples/cert.key")
5471 .unwrap();
5472 config.set_application_protos(&[b"h3"]).unwrap();
5473 config.set_initial_max_data(70);
5474 config.set_initial_max_stream_data_bidi_local(150);
5475 config.set_initial_max_stream_data_bidi_remote(150);
5476 config.set_initial_max_stream_data_uni(150);
5477 config.set_initial_max_streams_bidi(100);
5478 config.set_initial_max_streams_uni(5);
5479 config.verify_peer(false);
5480
5481 let h3_config = Config::new().unwrap();
5482
5483 let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5484 assert_eq!(s.pipe.handshake(), Ok(()));
5485
5486 s.client.control_stream_id = Some(
5487 s.client
5488 .open_uni_stream(
5489 &mut s.pipe.client,
5490 stream::HTTP3_CONTROL_STREAM_TYPE_ID,
5491 )
5492 .unwrap(),
5493 );
5494
5495 s.server.control_stream_id = Some(
5496 s.server
5497 .open_uni_stream(
5498 &mut s.pipe.server,
5499 stream::HTTP3_CONTROL_STREAM_TYPE_ID,
5500 )
5501 .unwrap(),
5502 );
5503
5504 let frame_payload_len = 2u64;
5505 let settings = [
5506 frame::SETTINGS_FRAME_TYPE_ID as u8,
5507 frame_payload_len as u8,
5508 0x2, // 0x2 is a reserved setting type
5509 1,
5510 ];
5511
5512 s.send_arbitrary_stream_data_client(
5513 &settings,
5514 s.client.control_stream_id.unwrap(),
5515 false,
5516 )
5517 .unwrap();
5518
5519 s.send_arbitrary_stream_data_server(
5520 &settings,
5521 s.server.control_stream_id.unwrap(),
5522 false,
5523 )
5524 .unwrap();
5525
5526 assert_eq!(s.pipe.advance(), Ok(()));
5527
5528 assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::SettingsError));
5529
5530 assert_eq!(s.client.poll(&mut s.pipe.client), Err(Error::SettingsError));
5531 }
5532
5533 #[test]
5534 /// Send a single DATAGRAM.
single_dgram()5535 fn single_dgram() {
5536 let mut buf = [0; 65535];
5537 let mut s = Session::new().unwrap();
5538 s.handshake().unwrap();
5539
5540 // We'll send default data of 10 bytes on flow ID 0.
5541 let result = (11, 0, 1);
5542
5543 s.send_dgram_client(0).unwrap();
5544
5545 assert_eq!(s.poll_server(), Ok((0, Event::Datagram)));
5546 assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
5547 assert_eq!(s.poll_server(), Err(Error::Done));
5548
5549 s.send_dgram_server(0).unwrap();
5550 assert_eq!(s.poll_client(), Ok((0, Event::Datagram)));
5551 assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
5552 }
5553
5554 #[test]
5555 /// Send multiple DATAGRAMs.
multiple_dgram()5556 fn multiple_dgram() {
5557 let mut buf = [0; 65535];
5558 let mut s = Session::new().unwrap();
5559 s.handshake().unwrap();
5560
5561 // We'll send default data of 10 bytes on flow ID 0.
5562 let result = (11, 0, 1);
5563
5564 s.send_dgram_client(0).unwrap();
5565 s.send_dgram_client(0).unwrap();
5566 s.send_dgram_client(0).unwrap();
5567
5568 assert_eq!(s.poll_server(), Ok((0, Event::Datagram)));
5569 assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
5570 assert_eq!(s.poll_server(), Err(Error::Done));
5571 assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
5572 assert_eq!(s.poll_server(), Err(Error::Done));
5573 assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
5574 assert_eq!(s.poll_server(), Err(Error::Done));
5575 assert_eq!(s.recv_dgram_server(&mut buf), Err(Error::Done));
5576 assert_eq!(s.poll_server(), Err(Error::Done));
5577
5578 s.send_dgram_server(0).unwrap();
5579 s.send_dgram_server(0).unwrap();
5580 s.send_dgram_server(0).unwrap();
5581
5582 assert_eq!(s.poll_client(), Ok((0, Event::Datagram)));
5583 assert_eq!(s.poll_server(), Err(Error::Done));
5584 assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
5585 assert_eq!(s.poll_client(), Err(Error::Done));
5586 assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
5587 assert_eq!(s.poll_client(), Err(Error::Done));
5588 assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
5589 assert_eq!(s.poll_client(), Err(Error::Done));
5590 assert_eq!(s.recv_dgram_client(&mut buf), Err(Error::Done));
5591 assert_eq!(s.poll_client(), Err(Error::Done));
5592 }
5593
5594 #[test]
5595 /// Send more DATAGRAMs than the send queue allows.
multiple_dgram_overflow()5596 fn multiple_dgram_overflow() {
5597 let mut buf = [0; 65535];
5598 let mut s = Session::new().unwrap();
5599 s.handshake().unwrap();
5600
5601 // We'll send default data of 10 bytes on flow ID 0.
5602 let result = (11, 0, 1);
5603
5604 // Five DATAGRAMs
5605 s.send_dgram_client(0).unwrap();
5606 s.send_dgram_client(0).unwrap();
5607 s.send_dgram_client(0).unwrap();
5608 s.send_dgram_client(0).unwrap();
5609 s.send_dgram_client(0).unwrap();
5610
5611 // Only 3 independent DATAGRAM events will fire.
5612 assert_eq!(s.poll_server(), Ok((0, Event::Datagram)));
5613 assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
5614 assert_eq!(s.poll_server(), Err(Error::Done));
5615 assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
5616 assert_eq!(s.poll_server(), Err(Error::Done));
5617 assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
5618 assert_eq!(s.poll_server(), Err(Error::Done));
5619 assert_eq!(s.recv_dgram_server(&mut buf), Err(Error::Done));
5620 assert_eq!(s.poll_server(), Err(Error::Done));
5621 }
5622
5623 #[test]
5624 /// Send a single DATAGRAM and request. Ensure that poll continuously cycles
5625 /// between the two types if the data is not read.
poll_yield_cycling()5626 fn poll_yield_cycling() {
5627 let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5628 config
5629 .load_cert_chain_from_pem_file("examples/cert.crt")
5630 .unwrap();
5631 config
5632 .load_priv_key_from_pem_file("examples/cert.key")
5633 .unwrap();
5634 config.set_application_protos(&[b"h3"]).unwrap();
5635 config.set_initial_max_data(1500);
5636 config.set_initial_max_stream_data_bidi_local(150);
5637 config.set_initial_max_stream_data_bidi_remote(150);
5638 config.set_initial_max_stream_data_uni(150);
5639 config.set_initial_max_streams_bidi(100);
5640 config.set_initial_max_streams_uni(5);
5641 config.verify_peer(false);
5642 config.enable_dgram(true, 100, 100);
5643
5644 let mut h3_config = Config::new().unwrap();
5645 let mut s = Session::with_configs(&mut config, &mut h3_config).unwrap();
5646 s.handshake().unwrap();
5647
5648 // Send request followed by DATAGRAM on client side.
5649 let (stream, req) = s.send_request(false).unwrap();
5650
5651 s.send_body_client(stream, true).unwrap();
5652
5653 let ev_headers = Event::Headers {
5654 list: req,
5655 has_body: true,
5656 };
5657
5658 s.send_dgram_client(0).unwrap();
5659
5660 // Now let's test the poll counts and yielding.
5661 assert_eq!(s.poll_server(), Ok((0, Event::Datagram)));
5662
5663 assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5664 assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
5665
5666 assert_eq!(s.poll_server(), Err(Error::Done));
5667 }
5668
5669 #[test]
5670 /// Send a single DATAGRAM and request. Ensure that poll
5671 /// yield cycles and cleanly exits if data is read.
poll_yield_single_read()5672 fn poll_yield_single_read() {
5673 let mut buf = [0; 65535];
5674
5675 let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5676 config
5677 .load_cert_chain_from_pem_file("examples/cert.crt")
5678 .unwrap();
5679 config
5680 .load_priv_key_from_pem_file("examples/cert.key")
5681 .unwrap();
5682 config.set_application_protos(&[b"h3"]).unwrap();
5683 config.set_initial_max_data(1500);
5684 config.set_initial_max_stream_data_bidi_local(150);
5685 config.set_initial_max_stream_data_bidi_remote(150);
5686 config.set_initial_max_stream_data_uni(150);
5687 config.set_initial_max_streams_bidi(100);
5688 config.set_initial_max_streams_uni(5);
5689 config.verify_peer(false);
5690 config.enable_dgram(true, 100, 100);
5691
5692 let mut h3_config = Config::new().unwrap();
5693 let mut s = Session::with_configs(&mut config, &mut h3_config).unwrap();
5694 s.handshake().unwrap();
5695
5696 // We'll send default data of 10 bytes on flow ID 0.
5697 let result = (11, 0, 1);
5698
5699 // Send request followed by DATAGRAM on client side.
5700 let (stream, req) = s.send_request(false).unwrap();
5701
5702 let body = s.send_body_client(stream, true).unwrap();
5703
5704 let mut recv_buf = vec![0; body.len()];
5705
5706 let ev_headers = Event::Headers {
5707 list: req,
5708 has_body: true,
5709 };
5710
5711 s.send_dgram_client(0).unwrap();
5712
5713 // Now let's test the poll counts and yielding.
5714 assert_eq!(s.poll_server(), Ok((0, Event::Datagram)));
5715
5716 assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5717 assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
5718
5719 assert_eq!(s.poll_server(), Err(Error::Done));
5720
5721 assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
5722
5723 assert_eq!(s.poll_server(), Err(Error::Done));
5724
5725 assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
5726 assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
5727 assert_eq!(s.poll_server(), Err(Error::Done));
5728
5729 // Send response followed by DATAGRAM on server side
5730 let resp = s.send_response(stream, false).unwrap();
5731
5732 let body = s.send_body_server(stream, true).unwrap();
5733
5734 let mut recv_buf = vec![0; body.len()];
5735
5736 let ev_headers = Event::Headers {
5737 list: resp,
5738 has_body: true,
5739 };
5740
5741 s.send_dgram_server(0).unwrap();
5742
5743 // Now let's test the poll counts and yielding.
5744 assert_eq!(s.poll_client(), Ok((0, Event::Datagram)));
5745
5746 assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
5747 assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
5748
5749 assert_eq!(s.poll_client(), Err(Error::Done));
5750
5751 assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
5752
5753 assert_eq!(s.poll_client(), Err(Error::Done));
5754
5755 assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
5756
5757 assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
5758 assert_eq!(s.poll_client(), Err(Error::Done));
5759 }
5760
5761 #[test]
5762 /// Send a multiple DATAGRAMs and requests. Ensure that poll
5763 /// yield cycles and cleanly exits if data is read.
poll_yield_multi_read()5764 fn poll_yield_multi_read() {
5765 let mut buf = [0; 65535];
5766
5767 let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5768 config
5769 .load_cert_chain_from_pem_file("examples/cert.crt")
5770 .unwrap();
5771 config
5772 .load_priv_key_from_pem_file("examples/cert.key")
5773 .unwrap();
5774 config.set_application_protos(&[b"h3"]).unwrap();
5775 config.set_initial_max_data(1500);
5776 config.set_initial_max_stream_data_bidi_local(150);
5777 config.set_initial_max_stream_data_bidi_remote(150);
5778 config.set_initial_max_stream_data_uni(150);
5779 config.set_initial_max_streams_bidi(100);
5780 config.set_initial_max_streams_uni(5);
5781 config.verify_peer(false);
5782 config.enable_dgram(true, 100, 100);
5783
5784 let mut h3_config = Config::new().unwrap();
5785 let mut s = Session::with_configs(&mut config, &mut h3_config).unwrap();
5786 s.handshake().unwrap();
5787
5788 // 10 bytes on flow ID 0 and 2.
5789 let flow_0_result = (11, 0, 1);
5790 let flow_2_result = (11, 2, 1);
5791
5792 // Send requests followed by DATAGRAMs on client side.
5793 let (stream, req) = s.send_request(false).unwrap();
5794
5795 let body = s.send_body_client(stream, true).unwrap();
5796
5797 let mut recv_buf = vec![0; body.len()];
5798
5799 let ev_headers = Event::Headers {
5800 list: req,
5801 has_body: true,
5802 };
5803
5804 s.send_dgram_client(0).unwrap();
5805 s.send_dgram_client(0).unwrap();
5806 s.send_dgram_client(0).unwrap();
5807 s.send_dgram_client(0).unwrap();
5808 s.send_dgram_client(0).unwrap();
5809 s.send_dgram_client(2).unwrap();
5810 s.send_dgram_client(2).unwrap();
5811 s.send_dgram_client(2).unwrap();
5812 s.send_dgram_client(2).unwrap();
5813 s.send_dgram_client(2).unwrap();
5814
5815 // Now let's test the poll counts and yielding.
5816 assert_eq!(s.poll_server(), Ok((0, Event::Datagram)));
5817
5818 assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5819 assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
5820
5821 assert_eq!(s.poll_server(), Err(Error::Done));
5822
5823 // Second cycle, start to read
5824 assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
5825 assert_eq!(s.poll_server(), Err(Error::Done));
5826 assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
5827 assert_eq!(s.poll_server(), Err(Error::Done));
5828 assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
5829 assert_eq!(s.poll_server(), Err(Error::Done));
5830
5831 assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
5832 assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
5833
5834 assert_eq!(s.poll_server(), Err(Error::Done));
5835
5836 // Third cycle.
5837 assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
5838 assert_eq!(s.poll_server(), Err(Error::Done));
5839 assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
5840 assert_eq!(s.poll_server(), Err(Error::Done));
5841 assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
5842 assert_eq!(s.poll_server(), Err(Error::Done));
5843 assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
5844 assert_eq!(s.poll_server(), Err(Error::Done));
5845 assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
5846 assert_eq!(s.poll_server(), Err(Error::Done));
5847 assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
5848 assert_eq!(s.poll_server(), Err(Error::Done));
5849 assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
5850 assert_eq!(s.poll_server(), Err(Error::Done));
5851
5852 // Send response followed by DATAGRAM on server side
5853 let resp = s.send_response(stream, false).unwrap();
5854
5855 let body = s.send_body_server(stream, true).unwrap();
5856
5857 let mut recv_buf = vec![0; body.len()];
5858
5859 let ev_headers = Event::Headers {
5860 list: resp,
5861 has_body: true,
5862 };
5863
5864 s.send_dgram_server(0).unwrap();
5865 s.send_dgram_server(0).unwrap();
5866 s.send_dgram_server(0).unwrap();
5867 s.send_dgram_server(0).unwrap();
5868 s.send_dgram_server(0).unwrap();
5869 s.send_dgram_server(2).unwrap();
5870 s.send_dgram_server(2).unwrap();
5871 s.send_dgram_server(2).unwrap();
5872 s.send_dgram_server(2).unwrap();
5873 s.send_dgram_server(2).unwrap();
5874
5875 assert_eq!(s.poll_client(), Ok((0, Event::Datagram)));
5876
5877 assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
5878 assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
5879
5880 assert_eq!(s.poll_client(), Err(Error::Done));
5881
5882 // Second cycle, start to read
5883 assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
5884 assert_eq!(s.poll_client(), Err(Error::Done));
5885 assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
5886 assert_eq!(s.poll_client(), Err(Error::Done));
5887 assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
5888 assert_eq!(s.poll_client(), Err(Error::Done));
5889
5890 assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
5891 assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
5892
5893 assert_eq!(s.poll_client(), Err(Error::Done));
5894
5895 // Third cycle.
5896 assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
5897 assert_eq!(s.poll_client(), Err(Error::Done));
5898 assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
5899 assert_eq!(s.poll_client(), Err(Error::Done));
5900 assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
5901 assert_eq!(s.poll_client(), Err(Error::Done));
5902 assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
5903 assert_eq!(s.poll_client(), Err(Error::Done));
5904 assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
5905 assert_eq!(s.poll_client(), Err(Error::Done));
5906 assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
5907 assert_eq!(s.poll_client(), Err(Error::Done));
5908 assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
5909 assert_eq!(s.poll_client(), Err(Error::Done));
5910 }
5911
5912 #[test]
5913 /// Tests that the Finished event is not issued for streams of unknown type
5914 /// (e.g. GREASE).
finished_is_for_requests()5915 fn finished_is_for_requests() {
5916 let mut s = Session::new().unwrap();
5917 s.handshake().unwrap();
5918
5919 assert_eq!(s.poll_client(), Err(Error::Done));
5920 assert_eq!(s.poll_server(), Err(Error::Done));
5921
5922 assert_eq!(s.client.open_grease_stream(&mut s.pipe.client), Ok(()));
5923 assert_eq!(s.pipe.advance(), Ok(()));
5924
5925 assert_eq!(s.poll_client(), Err(Error::Done));
5926 assert_eq!(s.poll_server(), Err(Error::Done));
5927 }
5928
5929 #[test]
5930 /// Tests that streams are marked as finished only once.
finished_once()5931 fn finished_once() {
5932 let mut s = Session::new().unwrap();
5933 s.handshake().unwrap();
5934
5935 let (stream, req) = s.send_request(false).unwrap();
5936 let body = s.send_body_client(stream, true).unwrap();
5937
5938 let mut recv_buf = vec![0; body.len()];
5939
5940 let ev_headers = Event::Headers {
5941 list: req,
5942 has_body: true,
5943 };
5944
5945 assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5946 assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
5947
5948 assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
5949 assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
5950
5951 assert_eq!(s.recv_body_server(stream, &mut recv_buf), Err(Error::Done));
5952 assert_eq!(s.poll_server(), Err(Error::Done));
5953 }
5954
5955 #[test]
5956 /// Tests that the Data event is properly re-armed.
data_event_rearm()5957 fn data_event_rearm() {
5958 let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
5959
5960 let mut s = Session::new().unwrap();
5961 s.handshake().unwrap();
5962
5963 let (stream, req) = s.send_request(false).unwrap();
5964
5965 let mut recv_buf = vec![0; bytes.len()];
5966
5967 let ev_headers = Event::Headers {
5968 list: req,
5969 has_body: true,
5970 };
5971
5972 // Manually send an incomplete DATA frame (i.e. the frame size is longer
5973 // than the actual data sent).
5974 {
5975 let mut d = [42; 10];
5976 let mut b = octets::OctetsMut::with_slice(&mut d);
5977
5978 b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
5979 b.put_varint(bytes.len() as u64).unwrap();
5980 let off = b.off();
5981 s.pipe.client.stream_send(stream, &d[..off], false).unwrap();
5982
5983 assert_eq!(
5984 s.pipe.client.stream_send(stream, &bytes[..5], false),
5985 Ok(5)
5986 );
5987
5988 s.advance().ok();
5989 }
5990
5991 assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5992 assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
5993 assert_eq!(s.poll_server(), Err(Error::Done));
5994
5995 // Read the available body data.
5996 assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(5));
5997
5998 // Send the remaining DATA payload.
5999 assert_eq!(s.pipe.client.stream_send(stream, &bytes[5..], false), Ok(5));
6000 s.advance().ok();
6001
6002 assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6003 assert_eq!(s.poll_server(), Err(Error::Done));
6004
6005 // Read the rest of the body data.
6006 assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(5));
6007 assert_eq!(s.poll_server(), Err(Error::Done));
6008
6009 // Send more data.
6010 let body = s.send_body_client(stream, false).unwrap();
6011
6012 assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6013 assert_eq!(s.poll_server(), Err(Error::Done));
6014
6015 assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
6016
6017 // Send more data, then HEADERS, then more data.
6018 let body = s.send_body_client(stream, false).unwrap();
6019
6020 let trailers = vec![Header::new(b"hello", b"world")];
6021
6022 s.client
6023 .send_headers(&mut s.pipe.client, stream, &trailers, false)
6024 .unwrap();
6025
6026 let ev_trailers = Event::Headers {
6027 list: trailers,
6028 has_body: true,
6029 };
6030
6031 s.advance().ok();
6032
6033 s.send_body_client(stream, false).unwrap();
6034
6035 assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6036 assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
6037
6038 assert_eq!(s.poll_server(), Ok((stream, ev_trailers)));
6039
6040 assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6041 assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
6042
6043 let (stream, req) = s.send_request(false).unwrap();
6044
6045 let ev_headers = Event::Headers {
6046 list: req,
6047 has_body: true,
6048 };
6049
6050 // Manually send an incomplete DATA frame (i.e. only the header is sent).
6051 {
6052 let mut d = [42; 10];
6053 let mut b = octets::OctetsMut::with_slice(&mut d);
6054
6055 b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
6056 b.put_varint(bytes.len() as u64).unwrap();
6057 let off = b.off();
6058 s.pipe.client.stream_send(stream, &d[..off], false).unwrap();
6059
6060 s.advance().ok();
6061 }
6062
6063 assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6064 assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6065 assert_eq!(s.poll_server(), Err(Error::Done));
6066
6067 assert_eq!(s.recv_body_server(stream, &mut recv_buf), Err(Error::Done));
6068
6069 assert_eq!(s.pipe.client.stream_send(stream, &bytes[..5], false), Ok(5));
6070
6071 s.advance().ok();
6072
6073 assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6074 assert_eq!(s.poll_server(), Err(Error::Done));
6075
6076 assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(5));
6077
6078 assert_eq!(s.pipe.client.stream_send(stream, &bytes[5..], false), Ok(5));
6079 s.advance().ok();
6080
6081 assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6082 assert_eq!(s.poll_server(), Err(Error::Done));
6083
6084 assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(5));
6085
6086 // Buffer multiple data frames.
6087 let body = s.send_body_client(stream, false).unwrap();
6088 s.send_body_client(stream, false).unwrap();
6089 s.send_body_client(stream, false).unwrap();
6090
6091 assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6092 assert_eq!(s.poll_server(), Err(Error::Done));
6093
6094 {
6095 let mut d = [42; 10];
6096 let mut b = octets::OctetsMut::with_slice(&mut d);
6097
6098 b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
6099 b.put_varint(0).unwrap();
6100 let off = b.off();
6101 s.pipe.client.stream_send(stream, &d[..off], true).unwrap();
6102
6103 s.advance().ok();
6104 }
6105
6106 let mut recv_buf = vec![0; bytes.len() * 3];
6107
6108 assert_eq!(
6109 s.recv_body_server(stream, &mut recv_buf),
6110 Ok(body.len() * 3)
6111 );
6112 }
6113
6114 #[test]
6115 /// Tests that the Datagram event is properly re-armed.
dgram_event_rearm()6116 fn dgram_event_rearm() {
6117 let mut buf = [0; 65535];
6118
6119 let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6120 config
6121 .load_cert_chain_from_pem_file("examples/cert.crt")
6122 .unwrap();
6123 config
6124 .load_priv_key_from_pem_file("examples/cert.key")
6125 .unwrap();
6126 config.set_application_protos(&[b"h3"]).unwrap();
6127 config.set_initial_max_data(1500);
6128 config.set_initial_max_stream_data_bidi_local(150);
6129 config.set_initial_max_stream_data_bidi_remote(150);
6130 config.set_initial_max_stream_data_uni(150);
6131 config.set_initial_max_streams_bidi(100);
6132 config.set_initial_max_streams_uni(5);
6133 config.verify_peer(false);
6134 config.enable_dgram(true, 100, 100);
6135
6136 let mut h3_config = Config::new().unwrap();
6137 let mut s = Session::with_configs(&mut config, &mut h3_config).unwrap();
6138 s.handshake().unwrap();
6139
6140 // 10 bytes on flow ID 0 and 2.
6141 let flow_0_result = (11, 0, 1);
6142 let flow_2_result = (11, 2, 1);
6143
6144 // Send requests followed by DATAGRAMs on client side.
6145 let (stream, req) = s.send_request(false).unwrap();
6146
6147 let body = s.send_body_client(stream, true).unwrap();
6148
6149 let mut recv_buf = vec![0; body.len()];
6150
6151 let ev_headers = Event::Headers {
6152 list: req,
6153 has_body: true,
6154 };
6155
6156 s.send_dgram_client(0).unwrap();
6157 s.send_dgram_client(0).unwrap();
6158 s.send_dgram_client(2).unwrap();
6159 s.send_dgram_client(2).unwrap();
6160
6161 assert_eq!(s.poll_server(), Ok((0, Event::Datagram)));
6162
6163 assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6164 assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6165
6166 assert_eq!(s.poll_server(), Err(Error::Done));
6167 assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6168
6169 assert_eq!(s.poll_server(), Err(Error::Done));
6170 assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6171
6172 assert_eq!(s.poll_server(), Err(Error::Done));
6173 assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6174
6175 assert_eq!(s.poll_server(), Err(Error::Done));
6176 assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6177
6178 assert_eq!(s.poll_server(), Err(Error::Done));
6179
6180 s.send_dgram_client(0).unwrap();
6181 s.send_dgram_client(2).unwrap();
6182
6183 assert_eq!(s.poll_server(), Ok((0, Event::Datagram)));
6184 assert_eq!(s.poll_server(), Err(Error::Done));
6185
6186 assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6187 assert_eq!(s.poll_server(), Err(Error::Done));
6188
6189 assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6190 assert_eq!(s.poll_server(), Err(Error::Done));
6191
6192 assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
6193 assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6194 }
6195
6196 #[test]
reset_stream()6197 fn reset_stream() {
6198 let mut buf = [0; 65535];
6199
6200 let mut s = Session::new().unwrap();
6201 s.handshake().unwrap();
6202
6203 // Client sends request.
6204 let (stream, req) = s.send_request(false).unwrap();
6205
6206 let ev_headers = Event::Headers {
6207 list: req,
6208 has_body: true,
6209 };
6210
6211 // Server sends response and closes stream.
6212 assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6213 assert_eq!(s.poll_server(), Err(Error::Done));
6214
6215 let resp = s.send_response(stream, true).unwrap();
6216
6217 let ev_headers = Event::Headers {
6218 list: resp,
6219 has_body: false,
6220 };
6221
6222 assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
6223 assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
6224 assert_eq!(s.poll_client(), Err(Error::Done));
6225
6226 // Client sends RESET_STREAM, closing stream.
6227 let frames = [crate::frame::Frame::ResetStream {
6228 stream_id: stream,
6229 error_code: 42,
6230 final_size: 68,
6231 }];
6232
6233 let pkt_type = crate::packet::Type::Short;
6234 assert_eq!(
6235 s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
6236 Ok(39)
6237 );
6238
6239 // Server issues Reset event for the stream.
6240 assert_eq!(s.poll_server(), Ok((stream, Event::Reset(42))));
6241 assert_eq!(s.poll_server(), Err(Error::Done));
6242
6243 // Sending RESET_STREAM again shouldn't trigger another Reset event.
6244 assert_eq!(
6245 s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
6246 Ok(39)
6247 );
6248
6249 assert_eq!(s.poll_server(), Err(Error::Done));
6250 }
6251
6252 #[test]
reset_finished_at_server()6253 fn reset_finished_at_server() {
6254 let mut s = Session::new().unwrap();
6255 s.handshake().unwrap();
6256
6257 // Client sends HEADERS and doesn't fin
6258 let (stream, _req) = s.send_request(false).unwrap();
6259
6260 // ..then Client sends RESET_STREAM
6261 assert_eq!(
6262 s.pipe.client.stream_shutdown(0, crate::Shutdown::Write, 0),
6263 Ok(())
6264 );
6265
6266 assert_eq!(s.pipe.advance(), Ok(()));
6267
6268 // Server receives just a reset
6269 assert_eq!(s.poll_server(), Ok((stream, Event::Reset(0))));
6270 assert_eq!(s.poll_server(), Err(Error::Done));
6271
6272 // Client sends HEADERS and fin
6273 let (stream, req) = s.send_request(true).unwrap();
6274
6275 // ..then Client sends RESET_STREAM
6276 assert_eq!(
6277 s.pipe.client.stream_shutdown(4, crate::Shutdown::Write, 0),
6278 Ok(())
6279 );
6280
6281 let ev_headers = Event::Headers {
6282 list: req,
6283 has_body: false,
6284 };
6285
6286 // Server receives headers and fin.
6287 assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6288 assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6289 assert_eq!(s.poll_server(), Err(Error::Done));
6290 }
6291
6292 #[test]
reset_finished_at_client()6293 fn reset_finished_at_client() {
6294 let mut buf = [0; 65535];
6295 let mut s = Session::new().unwrap();
6296 s.handshake().unwrap();
6297
6298 // Client sends HEADERS and doesn't fin
6299 let (stream, req) = s.send_request(false).unwrap();
6300
6301 let ev_headers = Event::Headers {
6302 list: req,
6303 has_body: true,
6304 };
6305
6306 // Server receives headers.
6307 assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6308 assert_eq!(s.poll_server(), Err(Error::Done));
6309
6310 // Server sends response and doesn't fin
6311 s.send_response(stream, false).unwrap();
6312
6313 assert_eq!(s.pipe.advance(), Ok(()));
6314
6315 // .. then Server sends RESET_STREAM
6316 assert_eq!(
6317 s.pipe
6318 .server
6319 .stream_shutdown(stream, crate::Shutdown::Write, 0),
6320 Ok(())
6321 );
6322
6323 assert_eq!(s.pipe.advance(), Ok(()));
6324
6325 // Client receives Reset only
6326 assert_eq!(s.poll_client(), Ok((stream, Event::Reset(0))));
6327 assert_eq!(s.poll_server(), Err(Error::Done));
6328
6329 // Client sends headers and fin.
6330 let (stream, req) = s.send_request(true).unwrap();
6331
6332 let ev_headers = Event::Headers {
6333 list: req,
6334 has_body: false,
6335 };
6336
6337 // Server receives headers and fin.
6338 assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6339 assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6340 assert_eq!(s.poll_server(), Err(Error::Done));
6341
6342 // Server sends response and fin
6343 let resp = s.send_response(stream, true).unwrap();
6344
6345 assert_eq!(s.pipe.advance(), Ok(()));
6346
6347 // ..then Server sends RESET_STREAM
6348 let frames = [crate::frame::Frame::ResetStream {
6349 stream_id: stream,
6350 error_code: 42,
6351 final_size: 68,
6352 }];
6353
6354 let pkt_type = crate::packet::Type::Short;
6355 assert_eq!(
6356 s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
6357 Ok(39)
6358 );
6359
6360 assert_eq!(s.pipe.advance(), Ok(()));
6361
6362 let ev_headers = Event::Headers {
6363 list: resp,
6364 has_body: false,
6365 };
6366
6367 // Client receives headers and fin.
6368 assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
6369 assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
6370 assert_eq!(s.poll_client(), Err(Error::Done));
6371 }
6372 }
6373
6374 #[cfg(feature = "ffi")]
6375 mod ffi;
6376 mod frame;
6377 #[doc(hidden)]
6378 pub mod qpack;
6379 mod stream;
6380