1 // Copyright (C) 2018-2019, Cloudflare, Inc.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 // * Redistributions of source code must retain the above copyright notice,
9 // this list of conditions and the following disclaimer.
10 //
11 // * Redistributions in binary form must reproduce the above copyright
12 // notice, this list of conditions and the following disclaimer in the
13 // documentation and/or other materials provided with the distribution.
14 //
15 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16 // IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17 // THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27 use std::cmp;
28
29 use std::sync::Arc;
30
31 use std::collections::hash_map;
32 use std::collections::BTreeMap;
33 use std::collections::BinaryHeap;
34 use std::collections::HashMap;
35 use std::collections::HashSet;
36 use std::collections::VecDeque;
37
38 use std::time;
39
40 use smallvec::SmallVec;
41
42 use crate::Error;
43 use crate::Result;
44
45 use crate::flowcontrol;
46 use crate::ranges;
47
48 const DEFAULT_URGENCY: u8 = 127;
49
50 #[cfg(test)]
51 const SEND_BUFFER_SIZE: usize = 5;
52
53 #[cfg(not(test))]
54 const SEND_BUFFER_SIZE: usize = 4096;
55
56 // The default size of the receiver stream flow control window.
57 const DEFAULT_STREAM_WINDOW: u64 = 32 * 1024;
58
59 /// The maximum size of the receiver stream flow control window.
60 pub const MAX_STREAM_WINDOW: u64 = 16 * 1024 * 1024;
61
62 /// A simple no-op hasher for Stream IDs.
63 ///
64 /// The QUIC protocol and quiche library guarantees stream ID uniqueness, so
65 /// we can save effort by avoiding using a more complicated algorithm.
66 #[derive(Default)]
67 pub struct StreamIdHasher {
68 id: u64,
69 }
70
71 impl std::hash::Hasher for StreamIdHasher {
72 #[inline]
finish(&self) -> u6473 fn finish(&self) -> u64 {
74 self.id
75 }
76
77 #[inline]
write_u64(&mut self, id: u64)78 fn write_u64(&mut self, id: u64) {
79 self.id = id;
80 }
81
82 #[inline]
write(&mut self, _: &[u8])83 fn write(&mut self, _: &[u8]) {
84 // We need a default write() for the trait but stream IDs will always
85 // be a u64 so we just delegate to write_u64.
86 unimplemented!()
87 }
88 }
89
90 type BuildStreamIdHasher = std::hash::BuildHasherDefault<StreamIdHasher>;
91
92 pub type StreamIdHashMap<V> = HashMap<u64, V, BuildStreamIdHasher>;
93 pub type StreamIdHashSet = HashSet<u64, BuildStreamIdHasher>;
94
95 /// Keeps track of QUIC streams and enforces stream limits.
96 #[derive(Default)]
97 pub struct StreamMap {
98 /// Map of streams indexed by stream ID.
99 streams: StreamIdHashMap<Stream>,
100
101 /// Set of streams that were completed and garbage collected.
102 ///
103 /// Instead of keeping the full stream state forever, we collect completed
104 /// streams to save memory, but we still need to keep track of previously
105 /// created streams, to prevent peers from re-creating them.
106 collected: StreamIdHashSet,
107
108 /// Peer's maximum bidirectional stream count limit.
109 peer_max_streams_bidi: u64,
110
111 /// Peer's maximum unidirectional stream count limit.
112 peer_max_streams_uni: u64,
113
114 /// The total number of bidirectional streams opened by the peer.
115 peer_opened_streams_bidi: u64,
116
117 /// The total number of unidirectional streams opened by the peer.
118 peer_opened_streams_uni: u64,
119
120 /// Local maximum bidirectional stream count limit.
121 local_max_streams_bidi: u64,
122 local_max_streams_bidi_next: u64,
123
124 /// Local maximum unidirectional stream count limit.
125 local_max_streams_uni: u64,
126 local_max_streams_uni_next: u64,
127
128 /// The total number of bidirectional streams opened by the local endpoint.
129 local_opened_streams_bidi: u64,
130
131 /// The total number of unidirectional streams opened by the local endpoint.
132 local_opened_streams_uni: u64,
133
134 /// Queue of stream IDs corresponding to streams that have buffered data
135 /// ready to be sent to the peer. This also implies that the stream has
136 /// enough flow control credits to send at least some of that data.
137 ///
138 /// Streams are grouped by their priority, where each urgency level has two
139 /// queues, one for non-incremental streams and one for incremental ones.
140 ///
141 /// Streams with lower urgency level are scheduled first, and within the
142 /// same urgency level Non-incremental streams are scheduled first, in the
143 /// order of their stream IDs, and incremental streams are scheduled in a
144 /// round-robin fashion after all non-incremental streams have been flushed.
145 flushable: BTreeMap<u8, (BinaryHeap<std::cmp::Reverse<u64>>, VecDeque<u64>)>,
146
147 /// Set of stream IDs corresponding to streams that have outstanding data
148 /// to read. This is used to generate a `StreamIter` of streams without
149 /// having to iterate over the full list of streams.
150 pub readable: StreamIdHashSet,
151
152 /// Set of stream IDs corresponding to streams that have enough flow control
153 /// capacity to be written to, and is not finished. This is used to generate
154 /// a `StreamIter` of streams without having to iterate over the full list
155 /// of streams.
156 pub writable: StreamIdHashSet,
157
158 /// Set of stream IDs corresponding to streams that are almost out of flow
159 /// control credit and need to send MAX_STREAM_DATA. This is used to
160 /// generate a `StreamIter` of streams without having to iterate over the
161 /// full list of streams.
162 almost_full: StreamIdHashSet,
163
164 /// Set of stream IDs corresponding to streams that are blocked. The value
165 /// of the map elements represents the offset of the stream at which the
166 /// blocking occurred.
167 blocked: StreamIdHashMap<u64>,
168
169 /// Set of stream IDs corresponding to streams that are reset. The value
170 /// of the map elements is a tuple of the error code and final size values
171 /// to include in the RESET_STREAM frame.
172 reset: StreamIdHashMap<(u64, u64)>,
173
174 /// Set of stream IDs corresponding to streams that are shutdown on the
175 /// receive side, and need to send a STOP_SENDING frame. The value of the
176 /// map elements is the error code to include in the STOP_SENDING frame.
177 stopped: StreamIdHashMap<u64>,
178
179 /// The maximum size of a stream window.
180 max_stream_window: u64,
181 }
182
183 impl StreamMap {
new( max_streams_bidi: u64, max_streams_uni: u64, max_stream_window: u64, ) -> StreamMap184 pub fn new(
185 max_streams_bidi: u64, max_streams_uni: u64, max_stream_window: u64,
186 ) -> StreamMap {
187 StreamMap {
188 local_max_streams_bidi: max_streams_bidi,
189 local_max_streams_bidi_next: max_streams_bidi,
190
191 local_max_streams_uni: max_streams_uni,
192 local_max_streams_uni_next: max_streams_uni,
193
194 max_stream_window,
195
196 ..StreamMap::default()
197 }
198 }
199
200 /// Returns the stream with the given ID if it exists.
get(&self, id: u64) -> Option<&Stream>201 pub fn get(&self, id: u64) -> Option<&Stream> {
202 self.streams.get(&id)
203 }
204
205 /// Returns the mutable stream with the given ID if it exists.
get_mut(&mut self, id: u64) -> Option<&mut Stream>206 pub fn get_mut(&mut self, id: u64) -> Option<&mut Stream> {
207 self.streams.get_mut(&id)
208 }
209
210 /// Returns the mutable stream with the given ID if it exists, or creates
211 /// a new one otherwise.
212 ///
213 /// The `local` parameter indicates whether the stream's creation was
214 /// requested by the local application rather than the peer, and is
215 /// used to validate the requested stream ID, and to select the initial
216 /// flow control values from the local and remote transport parameters
217 /// (also passed as arguments).
218 ///
219 /// This also takes care of enforcing both local and the peer's stream
220 /// count limits. If one of these limits is violated, the `StreamLimit`
221 /// error is returned.
get_or_create( &mut self, id: u64, local_params: &crate::TransportParams, peer_params: &crate::TransportParams, local: bool, is_server: bool, ) -> Result<&mut Stream>222 pub(crate) fn get_or_create(
223 &mut self, id: u64, local_params: &crate::TransportParams,
224 peer_params: &crate::TransportParams, local: bool, is_server: bool,
225 ) -> Result<&mut Stream> {
226 let (stream, is_new_and_writable) = match self.streams.entry(id) {
227 hash_map::Entry::Vacant(v) => {
228 // Stream has already been closed and garbage collected.
229 if self.collected.contains(&id) {
230 return Err(Error::Done);
231 }
232
233 if local != is_local(id, is_server) {
234 return Err(Error::InvalidStreamState(id));
235 }
236
237 let (max_rx_data, max_tx_data) = match (local, is_bidi(id)) {
238 // Locally-initiated bidirectional stream.
239 (true, true) => (
240 local_params.initial_max_stream_data_bidi_local,
241 peer_params.initial_max_stream_data_bidi_remote,
242 ),
243
244 // Locally-initiated unidirectional stream.
245 (true, false) => (0, peer_params.initial_max_stream_data_uni),
246
247 // Remotely-initiated bidirectional stream.
248 (false, true) => (
249 local_params.initial_max_stream_data_bidi_remote,
250 peer_params.initial_max_stream_data_bidi_local,
251 ),
252
253 // Remotely-initiated unidirectional stream.
254 (false, false) =>
255 (local_params.initial_max_stream_data_uni, 0),
256 };
257
258 // The two least significant bits from a stream id identify the
259 // type of stream. Truncate those bits to get the sequence for
260 // that stream type.
261 let stream_sequence = id >> 2;
262
263 // Enforce stream count limits.
264 match (is_local(id, is_server), is_bidi(id)) {
265 (true, true) => {
266 let n = std::cmp::max(
267 self.local_opened_streams_bidi,
268 stream_sequence + 1,
269 );
270
271 if n > self.peer_max_streams_bidi {
272 return Err(Error::StreamLimit);
273 }
274
275 self.local_opened_streams_bidi = n;
276 },
277
278 (true, false) => {
279 let n = std::cmp::max(
280 self.local_opened_streams_uni,
281 stream_sequence + 1,
282 );
283
284 if n > self.peer_max_streams_uni {
285 return Err(Error::StreamLimit);
286 }
287
288 self.local_opened_streams_uni = n;
289 },
290
291 (false, true) => {
292 let n = std::cmp::max(
293 self.peer_opened_streams_bidi,
294 stream_sequence + 1,
295 );
296
297 if n > self.local_max_streams_bidi {
298 return Err(Error::StreamLimit);
299 }
300
301 self.peer_opened_streams_bidi = n;
302 },
303
304 (false, false) => {
305 let n = std::cmp::max(
306 self.peer_opened_streams_uni,
307 stream_sequence + 1,
308 );
309
310 if n > self.local_max_streams_uni {
311 return Err(Error::StreamLimit);
312 }
313
314 self.peer_opened_streams_uni = n;
315 },
316 };
317
318 let s = Stream::new(
319 max_rx_data,
320 max_tx_data,
321 is_bidi(id),
322 local,
323 self.max_stream_window,
324 );
325
326 let is_writable = s.is_writable();
327
328 (v.insert(s), is_writable)
329 },
330
331 hash_map::Entry::Occupied(v) => (v.into_mut(), false),
332 };
333
334 // Newly created stream might already be writable due to initial flow
335 // control limits.
336 if is_new_and_writable {
337 self.writable.insert(id);
338 }
339
340 Ok(stream)
341 }
342
343 /// Pushes the stream ID to the back of the flushable streams queue with
344 /// the specified urgency.
345 ///
346 /// Note that the caller is responsible for checking that the specified
347 /// stream ID was not in the queue already before calling this.
348 ///
349 /// Queueing a stream multiple times simultaneously means that it might be
350 /// unfairly scheduled more often than other streams, and might also cause
351 /// spurious cycles through the queue, so it should be avoided.
push_flushable(&mut self, stream_id: u64, urgency: u8, incr: bool)352 pub fn push_flushable(&mut self, stream_id: u64, urgency: u8, incr: bool) {
353 // Push the element to the back of the queue corresponding to the given
354 // urgency. If the queue doesn't exist yet, create it first.
355 let queues = self
356 .flushable
357 .entry(urgency)
358 .or_insert_with(|| (BinaryHeap::new(), VecDeque::new()));
359
360 if !incr {
361 // Non-incremental streams are scheduled in order of their stream ID.
362 queues.0.push(std::cmp::Reverse(stream_id))
363 } else {
364 // Incremental streams are scheduled in a round-robin fashion.
365 queues.1.push_back(stream_id)
366 };
367 }
368
369 /// Returns the first stream ID from the flushable streams
370 /// queue with the highest urgency.
371 ///
372 /// Note that if the stream is no longer flushable after sending some of its
373 /// outstanding data, it needs to be removed from the queue.
peek_flushable(&mut self) -> Option<u64>374 pub fn peek_flushable(&mut self) -> Option<u64> {
375 self.flushable.iter_mut().next().and_then(|(_, queues)| {
376 queues.0.peek().map(|x| x.0).or_else(|| {
377 // When peeking incremental streams, make sure to move the current
378 // stream to the end of the queue so they are pocesses in a round
379 // robin fashion
380 if let Some(current_incremental) = queues.1.pop_front() {
381 queues.1.push_back(current_incremental);
382 Some(current_incremental)
383 } else {
384 None
385 }
386 })
387 })
388 }
389
390 /// Remove the last peeked stream
remove_flushable(&mut self)391 pub fn remove_flushable(&mut self) {
392 let mut top_urgency = self
393 .flushable
394 .first_entry()
395 .expect("Remove previously peeked stream");
396
397 let queues = top_urgency.get_mut();
398 queues.0.pop().map(|x| x.0).or_else(|| queues.1.pop_back());
399 // Remove the queue from the list of queues if it is now empty, so that
400 // the next time `pop_flushable()` is called the next queue with elements
401 // is used.
402 if queues.0.is_empty() && queues.1.is_empty() {
403 top_urgency.remove();
404 }
405 }
406
407 /// Adds or removes the stream ID to/from the readable streams set.
408 ///
409 /// If the stream was already in the list, this does nothing.
mark_readable(&mut self, stream_id: u64, readable: bool)410 pub fn mark_readable(&mut self, stream_id: u64, readable: bool) {
411 if readable {
412 self.readable.insert(stream_id);
413 } else {
414 self.readable.remove(&stream_id);
415 }
416 }
417
418 /// Adds or removes the stream ID to/from the writable streams set.
419 ///
420 /// This should also be called anytime a new stream is created, in addition
421 /// to when an existing stream becomes writable (or stops being writable).
422 ///
423 /// If the stream was already in the list, this does nothing.
mark_writable(&mut self, stream_id: u64, writable: bool)424 pub fn mark_writable(&mut self, stream_id: u64, writable: bool) {
425 if writable {
426 self.writable.insert(stream_id);
427 } else {
428 self.writable.remove(&stream_id);
429 }
430 }
431
432 /// Adds or removes the stream ID to/from the almost full streams set.
433 ///
434 /// If the stream was already in the list, this does nothing.
mark_almost_full(&mut self, stream_id: u64, almost_full: bool)435 pub fn mark_almost_full(&mut self, stream_id: u64, almost_full: bool) {
436 if almost_full {
437 self.almost_full.insert(stream_id);
438 } else {
439 self.almost_full.remove(&stream_id);
440 }
441 }
442
443 /// Adds or removes the stream ID to/from the blocked streams set with the
444 /// given offset value.
445 ///
446 /// If the stream was already in the list, this does nothing.
mark_blocked(&mut self, stream_id: u64, blocked: bool, off: u64)447 pub fn mark_blocked(&mut self, stream_id: u64, blocked: bool, off: u64) {
448 if blocked {
449 self.blocked.insert(stream_id, off);
450 } else {
451 self.blocked.remove(&stream_id);
452 }
453 }
454
455 /// Adds or removes the stream ID to/from the reset streams set with the
456 /// given error code and final size values.
457 ///
458 /// If the stream was already in the list, this does nothing.
mark_reset( &mut self, stream_id: u64, reset: bool, error_code: u64, final_size: u64, )459 pub fn mark_reset(
460 &mut self, stream_id: u64, reset: bool, error_code: u64, final_size: u64,
461 ) {
462 if reset {
463 self.reset.insert(stream_id, (error_code, final_size));
464 } else {
465 self.reset.remove(&stream_id);
466 }
467 }
468
469 /// Adds or removes the stream ID to/from the stopped streams set with the
470 /// given error code.
471 ///
472 /// If the stream was already in the list, this does nothing.
mark_stopped( &mut self, stream_id: u64, stopped: bool, error_code: u64, )473 pub fn mark_stopped(
474 &mut self, stream_id: u64, stopped: bool, error_code: u64,
475 ) {
476 if stopped {
477 self.stopped.insert(stream_id, error_code);
478 } else {
479 self.stopped.remove(&stream_id);
480 }
481 }
482
483 /// Updates the peer's maximum bidirectional stream count limit.
update_peer_max_streams_bidi(&mut self, v: u64)484 pub fn update_peer_max_streams_bidi(&mut self, v: u64) {
485 self.peer_max_streams_bidi = cmp::max(self.peer_max_streams_bidi, v);
486 }
487
488 /// Updates the peer's maximum unidirectional stream count limit.
update_peer_max_streams_uni(&mut self, v: u64)489 pub fn update_peer_max_streams_uni(&mut self, v: u64) {
490 self.peer_max_streams_uni = cmp::max(self.peer_max_streams_uni, v);
491 }
492
493 /// Commits the new max_streams_bidi limit.
update_max_streams_bidi(&mut self)494 pub fn update_max_streams_bidi(&mut self) {
495 self.local_max_streams_bidi = self.local_max_streams_bidi_next;
496 }
497
498 /// Returns the current max_streams_bidi limit.
max_streams_bidi(&self) -> u64499 pub fn max_streams_bidi(&self) -> u64 {
500 self.local_max_streams_bidi
501 }
502
503 /// Returns the new max_streams_bidi limit.
max_streams_bidi_next(&mut self) -> u64504 pub fn max_streams_bidi_next(&mut self) -> u64 {
505 self.local_max_streams_bidi_next
506 }
507
508 /// Commits the new max_streams_uni limit.
update_max_streams_uni(&mut self)509 pub fn update_max_streams_uni(&mut self) {
510 self.local_max_streams_uni = self.local_max_streams_uni_next;
511 }
512
513 /// Returns the new max_streams_uni limit.
max_streams_uni_next(&mut self) -> u64514 pub fn max_streams_uni_next(&mut self) -> u64 {
515 self.local_max_streams_uni_next
516 }
517
518 /// Returns the number of bidirectional streams that can be created
519 /// before the peer's stream count limit is reached.
peer_streams_left_bidi(&self) -> u64520 pub fn peer_streams_left_bidi(&self) -> u64 {
521 self.peer_max_streams_bidi - self.local_opened_streams_bidi
522 }
523
524 /// Returns the number of unidirectional streams that can be created
525 /// before the peer's stream count limit is reached.
peer_streams_left_uni(&self) -> u64526 pub fn peer_streams_left_uni(&self) -> u64 {
527 self.peer_max_streams_uni - self.local_opened_streams_uni
528 }
529
530 /// Drops completed stream.
531 ///
532 /// This should only be called when Stream::is_complete() returns true for
533 /// the given stream.
collect(&mut self, stream_id: u64, local: bool)534 pub fn collect(&mut self, stream_id: u64, local: bool) {
535 if !local {
536 // If the stream was created by the peer, give back a max streams
537 // credit.
538 if is_bidi(stream_id) {
539 self.local_max_streams_bidi_next =
540 self.local_max_streams_bidi_next.saturating_add(1);
541 } else {
542 self.local_max_streams_uni_next =
543 self.local_max_streams_uni_next.saturating_add(1);
544 }
545 }
546
547 self.mark_readable(stream_id, false);
548 self.mark_writable(stream_id, false);
549
550 self.streams.remove(&stream_id);
551 self.collected.insert(stream_id);
552 }
553
554 /// Creates an iterator over streams that have outstanding data to read.
readable(&self) -> StreamIter555 pub fn readable(&self) -> StreamIter {
556 StreamIter::from(&self.readable)
557 }
558
559 /// Creates an iterator over streams that can be written to.
writable(&self) -> StreamIter560 pub fn writable(&self) -> StreamIter {
561 StreamIter::from(&self.writable)
562 }
563
564 /// Creates an iterator over streams that need to send MAX_STREAM_DATA.
almost_full(&self) -> StreamIter565 pub fn almost_full(&self) -> StreamIter {
566 StreamIter::from(&self.almost_full)
567 }
568
569 /// Creates an iterator over streams that need to send STREAM_DATA_BLOCKED.
blocked(&self) -> hash_map::Iter<u64, u64>570 pub fn blocked(&self) -> hash_map::Iter<u64, u64> {
571 self.blocked.iter()
572 }
573
574 /// Creates an iterator over streams that need to send RESET_STREAM.
reset(&self) -> hash_map::Iter<u64, (u64, u64)>575 pub fn reset(&self) -> hash_map::Iter<u64, (u64, u64)> {
576 self.reset.iter()
577 }
578
579 /// Creates an iterator over streams that need to send STOP_SENDING.
stopped(&self) -> hash_map::Iter<u64, u64>580 pub fn stopped(&self) -> hash_map::Iter<u64, u64> {
581 self.stopped.iter()
582 }
583
584 /// Returns true if the stream has been collected.
is_collected(&self, stream_id: u64) -> bool585 pub fn is_collected(&self, stream_id: u64) -> bool {
586 self.collected.contains(&stream_id)
587 }
588
589 /// Returns true if there are any streams that have data to write.
has_flushable(&self) -> bool590 pub fn has_flushable(&self) -> bool {
591 !self.flushable.is_empty()
592 }
593
594 /// Returns true if there are any streams that have data to read.
has_readable(&self) -> bool595 pub fn has_readable(&self) -> bool {
596 !self.readable.is_empty()
597 }
598
599 /// Returns true if there are any streams that need to update the local
600 /// flow control limit.
has_almost_full(&self) -> bool601 pub fn has_almost_full(&self) -> bool {
602 !self.almost_full.is_empty()
603 }
604
605 /// Returns true if there are any streams that are blocked.
has_blocked(&self) -> bool606 pub fn has_blocked(&self) -> bool {
607 !self.blocked.is_empty()
608 }
609
610 /// Returns true if there are any streams that are reset.
has_reset(&self) -> bool611 pub fn has_reset(&self) -> bool {
612 !self.reset.is_empty()
613 }
614
615 /// Returns true if there are any streams that need to send STOP_SENDING.
has_stopped(&self) -> bool616 pub fn has_stopped(&self) -> bool {
617 !self.stopped.is_empty()
618 }
619
620 /// Returns true if the max bidirectional streams count needs to be updated
621 /// by sending a MAX_STREAMS frame to the peer.
should_update_max_streams_bidi(&self) -> bool622 pub fn should_update_max_streams_bidi(&self) -> bool {
623 self.local_max_streams_bidi_next != self.local_max_streams_bidi &&
624 self.local_max_streams_bidi_next / 2 >
625 self.local_max_streams_bidi - self.peer_opened_streams_bidi
626 }
627
628 /// Returns true if the max unidirectional streams count needs to be updated
629 /// by sending a MAX_STREAMS frame to the peer.
should_update_max_streams_uni(&self) -> bool630 pub fn should_update_max_streams_uni(&self) -> bool {
631 self.local_max_streams_uni_next != self.local_max_streams_uni &&
632 self.local_max_streams_uni_next / 2 >
633 self.local_max_streams_uni - self.peer_opened_streams_uni
634 }
635
636 /// Returns the number of active streams in the map.
637 #[cfg(test)]
len(&self) -> usize638 pub fn len(&self) -> usize {
639 self.streams.len()
640 }
641 }
642
643 /// A QUIC stream.
644 #[derive(Default)]
645 pub struct Stream {
646 /// Receive-side stream buffer.
647 pub recv: RecvBuf,
648
649 /// Send-side stream buffer.
650 pub send: SendBuf,
651
652 pub send_lowat: usize,
653
654 /// Whether the stream is bidirectional.
655 pub bidi: bool,
656
657 /// Whether the stream was created by the local endpoint.
658 pub local: bool,
659
660 /// Application data.
661 pub data: Option<Box<dyn std::any::Any + Send + Sync>>,
662
663 /// The stream's urgency (lower is better). Default is `DEFAULT_URGENCY`.
664 pub urgency: u8,
665
666 /// Whether the stream can be flushed incrementally. Default is `true`.
667 pub incremental: bool,
668 }
669
670 impl Stream {
671 /// Creates a new stream with the given flow control limits.
new( max_rx_data: u64, max_tx_data: u64, bidi: bool, local: bool, max_window: u64, ) -> Stream672 pub fn new(
673 max_rx_data: u64, max_tx_data: u64, bidi: bool, local: bool,
674 max_window: u64,
675 ) -> Stream {
676 Stream {
677 recv: RecvBuf::new(max_rx_data, max_window),
678 send: SendBuf::new(max_tx_data),
679 send_lowat: 1,
680 bidi,
681 local,
682 data: None,
683 urgency: DEFAULT_URGENCY,
684 incremental: true,
685 }
686 }
687
688 /// Returns true if the stream has data to read.
is_readable(&self) -> bool689 pub fn is_readable(&self) -> bool {
690 self.recv.ready()
691 }
692
693 /// Returns true if the stream has enough flow control capacity to be
694 /// written to, and is not finished.
is_writable(&self) -> bool695 pub fn is_writable(&self) -> bool {
696 !self.send.shutdown &&
697 !self.send.is_fin() &&
698 (self.send.off + self.send_lowat as u64) < self.send.max_data
699 }
700
701 /// Returns true if the stream has data to send and is allowed to send at
702 /// least some of it.
is_flushable(&self) -> bool703 pub fn is_flushable(&self) -> bool {
704 self.send.ready() && self.send.off_front() < self.send.max_data
705 }
706
707 /// Returns true if the stream is complete.
708 ///
709 /// For bidirectional streams this happens when both the receive and send
710 /// sides are complete. That is when all incoming data has been read by the
711 /// application, and when all outgoing data has been acked by the peer.
712 ///
713 /// For unidirectional streams this happens when either the receive or send
714 /// side is complete, depending on whether the stream was created locally
715 /// or not.
is_complete(&self) -> bool716 pub fn is_complete(&self) -> bool {
717 match (self.bidi, self.local) {
718 // For bidirectional streams we need to check both receive and send
719 // sides for completion.
720 (true, _) => self.recv.is_fin() && self.send.is_complete(),
721
722 // For unidirectional streams generated locally, we only need to
723 // check the send side for completion.
724 (false, true) => self.send.is_complete(),
725
726 // For unidirectional streams generated by the peer, we only need
727 // to check the receive side for completion.
728 (false, false) => self.recv.is_fin(),
729 }
730 }
731
732 /// Returns true if the stream is not storing incoming data.
is_draining(&self) -> bool733 pub fn is_draining(&self) -> bool {
734 self.recv.drain
735 }
736 }
737
738 /// Returns true if the stream was created locally.
is_local(stream_id: u64, is_server: bool) -> bool739 pub fn is_local(stream_id: u64, is_server: bool) -> bool {
740 (stream_id & 0x1) == (is_server as u64)
741 }
742
743 /// Returns true if the stream is bidirectional.
is_bidi(stream_id: u64) -> bool744 pub fn is_bidi(stream_id: u64) -> bool {
745 (stream_id & 0x2) == 0
746 }
747
748 /// An iterator over QUIC streams.
749 #[derive(Default)]
750 pub struct StreamIter {
751 streams: SmallVec<[u64; 8]>,
752 }
753
754 impl StreamIter {
755 #[inline]
from(streams: &StreamIdHashSet) -> Self756 fn from(streams: &StreamIdHashSet) -> Self {
757 StreamIter {
758 streams: streams.iter().copied().collect(),
759 }
760 }
761 }
762
763 impl Iterator for StreamIter {
764 type Item = u64;
765
766 #[inline]
next(&mut self) -> Option<Self::Item>767 fn next(&mut self) -> Option<Self::Item> {
768 self.streams.pop()
769 }
770 }
771
772 impl ExactSizeIterator for StreamIter {
773 #[inline]
len(&self) -> usize774 fn len(&self) -> usize {
775 self.streams.len()
776 }
777 }
778
779 /// Receive-side stream buffer.
780 ///
781 /// Stream data received by the peer is buffered in a list of data chunks
782 /// ordered by offset in ascending order. Contiguous data can then be read
783 /// into a slice.
784 #[derive(Debug, Default)]
785 pub struct RecvBuf {
786 /// Chunks of data received from the peer that have not yet been read by
787 /// the application, ordered by offset.
788 data: BTreeMap<u64, RangeBuf>,
789
790 /// The lowest data offset that has yet to be read by the application.
791 off: u64,
792
793 /// The total length of data received on this stream.
794 len: u64,
795
796 /// Receiver flow controller.
797 flow_control: flowcontrol::FlowControl,
798
799 /// The final stream offset received from the peer, if any.
800 fin_off: Option<u64>,
801
802 /// The error code received via RESET_STREAM.
803 error: Option<u64>,
804
805 /// Whether incoming data is validated but not buffered.
806 drain: bool,
807 }
808
809 impl RecvBuf {
810 /// Creates a new receive buffer.
new(max_data: u64, max_window: u64) -> RecvBuf811 fn new(max_data: u64, max_window: u64) -> RecvBuf {
812 RecvBuf {
813 flow_control: flowcontrol::FlowControl::new(
814 max_data,
815 cmp::min(max_data, DEFAULT_STREAM_WINDOW),
816 max_window,
817 ),
818 ..RecvBuf::default()
819 }
820 }
821
822 /// Inserts the given chunk of data in the buffer.
823 ///
824 /// This also takes care of enforcing stream flow control limits, as well
825 /// as handling incoming data that overlaps data that is already in the
826 /// buffer.
write(&mut self, buf: RangeBuf) -> Result<()>827 pub fn write(&mut self, buf: RangeBuf) -> Result<()> {
828 if buf.max_off() > self.max_data() {
829 return Err(Error::FlowControl);
830 }
831
832 if let Some(fin_off) = self.fin_off {
833 // Stream's size is known, forbid data beyond that point.
834 if buf.max_off() > fin_off {
835 return Err(Error::FinalSize);
836 }
837
838 // Stream's size is already known, forbid changing it.
839 if buf.fin() && fin_off != buf.max_off() {
840 return Err(Error::FinalSize);
841 }
842 }
843
844 // Stream's known size is lower than data already received.
845 if buf.fin() && buf.max_off() < self.len {
846 return Err(Error::FinalSize);
847 }
848
849 // We already saved the final offset, so there's nothing else we
850 // need to keep from the RangeBuf if it's empty.
851 if self.fin_off.is_some() && buf.is_empty() {
852 return Ok(());
853 }
854
855 if buf.fin() {
856 self.fin_off = Some(buf.max_off());
857 }
858
859 // No need to store empty buffer that doesn't carry the fin flag.
860 if !buf.fin() && buf.is_empty() {
861 return Ok(());
862 }
863
864 // Check if data is fully duplicate, that is the buffer's max offset is
865 // lower or equal to the offset already stored in the recv buffer.
866 if self.off >= buf.max_off() {
867 // An exception is applied to empty range buffers, because an empty
868 // buffer's max offset matches the max offset of the recv buffer.
869 //
870 // By this point all spurious empty buffers should have already been
871 // discarded, so allowing empty buffers here should be safe.
872 if !buf.is_empty() {
873 return Ok(());
874 }
875 }
876
877 let mut tmp_bufs = VecDeque::with_capacity(2);
878 tmp_bufs.push_back(buf);
879
880 'tmp: while let Some(mut buf) = tmp_bufs.pop_front() {
881 // Discard incoming data below current stream offset. Bytes up to
882 // `self.off` have already been received so we should not buffer
883 // them again. This is also important to make sure `ready()` doesn't
884 // get stuck when a buffer with lower offset than the stream's is
885 // buffered.
886 if self.off_front() > buf.off() {
887 buf = buf.split_off((self.off_front() - buf.off()) as usize);
888 }
889
890 // Handle overlapping data. If the incoming data's starting offset
891 // is above the previous maximum received offset, there is clearly
892 // no overlap so this logic can be skipped. However do still try to
893 // merge an empty final buffer (i.e. an empty buffer with the fin
894 // flag set, which is the only kind of empty buffer that should
895 // reach this point).
896 if buf.off() < self.max_off() || buf.is_empty() {
897 for (_, b) in self.data.range(buf.off()..) {
898 let off = buf.off();
899
900 // We are past the current buffer.
901 if b.off() > buf.max_off() {
902 break;
903 }
904
905 // New buffer is fully contained in existing buffer.
906 if off >= b.off() && buf.max_off() <= b.max_off() {
907 continue 'tmp;
908 }
909
910 // New buffer's start overlaps existing buffer.
911 if off >= b.off() && off < b.max_off() {
912 buf = buf.split_off((b.max_off() - off) as usize);
913 }
914
915 // New buffer's end overlaps existing buffer.
916 if off < b.off() && buf.max_off() > b.off() {
917 tmp_bufs
918 .push_back(buf.split_off((b.off() - off) as usize));
919 }
920 }
921 }
922
923 self.len = cmp::max(self.len, buf.max_off());
924
925 if !self.drain {
926 self.data.insert(buf.max_off(), buf);
927 }
928 }
929
930 Ok(())
931 }
932
933 /// Writes data from the receive buffer into the given output buffer.
934 ///
935 /// Only contiguous data is written to the output buffer, starting from
936 /// offset 0. The offset is incremented as data is read out of the receive
937 /// buffer into the application buffer. If there is no data at the expected
938 /// read offset, the `Done` error is returned.
939 ///
940 /// On success the amount of data read, and a flag indicating if there is
941 /// no more data in the buffer, are returned as a tuple.
emit(&mut self, out: &mut [u8]) -> Result<(usize, bool)>942 pub fn emit(&mut self, out: &mut [u8]) -> Result<(usize, bool)> {
943 let mut len = 0;
944 let mut cap = out.len();
945
946 if !self.ready() {
947 return Err(Error::Done);
948 }
949
950 // The stream was reset, so return the error code instead.
951 if let Some(e) = self.error {
952 return Err(Error::StreamReset(e));
953 }
954
955 while cap > 0 && self.ready() {
956 let mut entry = match self.data.first_entry() {
957 Some(entry) => entry,
958 None => break,
959 };
960
961 let buf = entry.get_mut();
962
963 let buf_len = cmp::min(buf.len(), cap);
964
965 out[len..len + buf_len].copy_from_slice(&buf[..buf_len]);
966
967 self.off += buf_len as u64;
968
969 len += buf_len;
970 cap -= buf_len;
971
972 if buf_len < buf.len() {
973 buf.consume(buf_len);
974
975 // We reached the maximum capacity, so end here.
976 break;
977 }
978
979 entry.remove();
980 }
981
982 // Update consumed bytes for flow control.
983 self.flow_control.add_consumed(len as u64);
984
985 Ok((len, self.is_fin()))
986 }
987
988 /// Resets the stream at the given offset.
reset(&mut self, error_code: u64, final_size: u64) -> Result<usize>989 pub fn reset(&mut self, error_code: u64, final_size: u64) -> Result<usize> {
990 // Stream's size is already known, forbid changing it.
991 if let Some(fin_off) = self.fin_off {
992 if fin_off != final_size {
993 return Err(Error::FinalSize);
994 }
995 }
996
997 // Stream's known size is lower than data already received.
998 if final_size < self.len {
999 return Err(Error::FinalSize);
1000 }
1001
1002 // Calculate how many bytes need to be removed from the connection flow
1003 // control.
1004 let max_data_delta = final_size - self.len;
1005
1006 if self.error.is_some() {
1007 return Ok(max_data_delta as usize);
1008 }
1009
1010 self.error = Some(error_code);
1011
1012 // Clear all data already buffered.
1013 self.off = final_size;
1014
1015 self.data.clear();
1016
1017 // In order to ensure the application is notified when the stream is
1018 // reset, enqueue a zero-length buffer at the final size offset.
1019 let buf = RangeBuf::from(b"", final_size, true);
1020 self.write(buf)?;
1021
1022 Ok(max_data_delta as usize)
1023 }
1024
1025 /// Commits the new max_data limit.
update_max_data(&mut self, now: time::Instant)1026 pub fn update_max_data(&mut self, now: time::Instant) {
1027 self.flow_control.update_max_data(now);
1028 }
1029
1030 /// Return the new max_data limit.
max_data_next(&mut self) -> u641031 pub fn max_data_next(&mut self) -> u64 {
1032 self.flow_control.max_data_next()
1033 }
1034
1035 /// Return the current flow control limit.
max_data(&self) -> u641036 fn max_data(&self) -> u64 {
1037 self.flow_control.max_data()
1038 }
1039
1040 /// Return the current window.
window(&self) -> u641041 pub fn window(&self) -> u64 {
1042 self.flow_control.window()
1043 }
1044
1045 /// Autotune the window size.
autotune_window(&mut self, now: time::Instant, rtt: time::Duration)1046 pub fn autotune_window(&mut self, now: time::Instant, rtt: time::Duration) {
1047 self.flow_control.autotune_window(now, rtt);
1048 }
1049
1050 /// Shuts down receiving data.
shutdown(&mut self) -> Result<()>1051 pub fn shutdown(&mut self) -> Result<()> {
1052 if self.drain {
1053 return Err(Error::Done);
1054 }
1055
1056 self.drain = true;
1057
1058 self.data.clear();
1059
1060 self.off = self.max_off();
1061
1062 Ok(())
1063 }
1064
1065 /// Returns the lowest offset of data buffered.
off_front(&self) -> u641066 pub fn off_front(&self) -> u64 {
1067 self.off
1068 }
1069
1070 /// Returns true if we need to update the local flow control limit.
almost_full(&self) -> bool1071 pub fn almost_full(&self) -> bool {
1072 self.fin_off.is_none() && self.flow_control.should_update_max_data()
1073 }
1074
1075 /// Returns the largest offset ever received.
max_off(&self) -> u641076 pub fn max_off(&self) -> u64 {
1077 self.len
1078 }
1079
1080 /// Returns true if the receive-side of the stream is complete.
1081 ///
1082 /// This happens when the stream's receive final size is known, and the
1083 /// application has read all data from the stream.
is_fin(&self) -> bool1084 pub fn is_fin(&self) -> bool {
1085 if self.fin_off == Some(self.off) {
1086 return true;
1087 }
1088
1089 false
1090 }
1091
1092 /// Returns true if the stream has data to be read.
ready(&self) -> bool1093 fn ready(&self) -> bool {
1094 let (_, buf) = match self.data.first_key_value() {
1095 Some(v) => v,
1096 None => return false,
1097 };
1098
1099 buf.off() == self.off
1100 }
1101 }
1102
1103 /// Send-side stream buffer.
1104 ///
1105 /// Stream data scheduled to be sent to the peer is buffered in a list of data
1106 /// chunks ordered by offset in ascending order. Contiguous data can then be
1107 /// read into a slice.
1108 ///
1109 /// By default, new data is appended at the end of the stream, but data can be
1110 /// inserted at the start of the buffer (this is to allow data that needs to be
1111 /// retransmitted to be re-buffered).
1112 #[derive(Debug, Default)]
1113 pub struct SendBuf {
1114 /// Chunks of data to be sent, ordered by offset.
1115 data: VecDeque<RangeBuf>,
1116
1117 /// The index of the buffer that needs to be sent next.
1118 pos: usize,
1119
1120 /// The maximum offset of data buffered in the stream.
1121 off: u64,
1122
1123 /// The maximum offset of data sent to the peer, regardless of
1124 /// retransmissions.
1125 emit_off: u64,
1126
1127 /// The amount of data currently buffered.
1128 len: u64,
1129
1130 /// The maximum offset we are allowed to send to the peer.
1131 max_data: u64,
1132
1133 /// The last offset the stream was blocked at, if any.
1134 blocked_at: Option<u64>,
1135
1136 /// The final stream offset written to the stream, if any.
1137 fin_off: Option<u64>,
1138
1139 /// Whether the stream's send-side has been shut down.
1140 shutdown: bool,
1141
1142 /// Ranges of data offsets that have been acked.
1143 acked: ranges::RangeSet,
1144
1145 /// The error code received via STOP_SENDING.
1146 error: Option<u64>,
1147 }
1148
1149 impl SendBuf {
1150 /// Creates a new send buffer.
new(max_data: u64) -> SendBuf1151 fn new(max_data: u64) -> SendBuf {
1152 SendBuf {
1153 max_data,
1154 ..SendBuf::default()
1155 }
1156 }
1157
1158 /// Inserts the given slice of data at the end of the buffer.
1159 ///
1160 /// The number of bytes that were actually stored in the buffer is returned
1161 /// (this may be lower than the size of the input buffer, in case of partial
1162 /// writes).
write(&mut self, mut data: &[u8], mut fin: bool) -> Result<usize>1163 pub fn write(&mut self, mut data: &[u8], mut fin: bool) -> Result<usize> {
1164 let max_off = self.off + data.len() as u64;
1165
1166 // Get the stream send capacity. This will return an error if the stream
1167 // was stopped.
1168 let capacity = self.cap()?;
1169
1170 if data.len() > capacity {
1171 // Truncate the input buffer according to the stream's capacity.
1172 let len = capacity;
1173 data = &data[..len];
1174
1175 // We are not buffering the full input, so clear the fin flag.
1176 fin = false;
1177 }
1178
1179 if let Some(fin_off) = self.fin_off {
1180 // Can't write past final offset.
1181 if max_off > fin_off {
1182 return Err(Error::FinalSize);
1183 }
1184
1185 // Can't "undo" final offset.
1186 if max_off == fin_off && !fin {
1187 return Err(Error::FinalSize);
1188 }
1189 }
1190
1191 if fin {
1192 self.fin_off = Some(max_off);
1193 }
1194
1195 // Don't queue data that was already fully acked.
1196 if self.ack_off() >= max_off {
1197 return Ok(data.len());
1198 }
1199
1200 // We already recorded the final offset, so we can just discard the
1201 // empty buffer now.
1202 if data.is_empty() {
1203 return Ok(data.len());
1204 }
1205
1206 let mut len = 0;
1207
1208 // Split the remaining input data into consistently-sized buffers to
1209 // avoid fragmentation.
1210 for chunk in data.chunks(SEND_BUFFER_SIZE) {
1211 len += chunk.len();
1212
1213 let fin = len == data.len() && fin;
1214
1215 let buf = RangeBuf::from(chunk, self.off, fin);
1216
1217 // The new data can simply be appended at the end of the send buffer.
1218 self.data.push_back(buf);
1219
1220 self.off += chunk.len() as u64;
1221 self.len += chunk.len() as u64;
1222 }
1223
1224 Ok(len)
1225 }
1226
1227 /// Writes data from the send buffer into the given output buffer.
emit(&mut self, out: &mut [u8]) -> Result<(usize, bool)>1228 pub fn emit(&mut self, out: &mut [u8]) -> Result<(usize, bool)> {
1229 let mut out_len = out.len();
1230 let out_off = self.off_front();
1231
1232 let mut next_off = out_off;
1233
1234 while out_len > 0 &&
1235 self.ready() &&
1236 self.off_front() == next_off &&
1237 self.off_front() < self.max_data
1238 {
1239 let buf = match self.data.get_mut(self.pos) {
1240 Some(v) => v,
1241
1242 None => break,
1243 };
1244
1245 if buf.is_empty() {
1246 self.pos += 1;
1247 continue;
1248 }
1249
1250 let buf_len = cmp::min(buf.len(), out_len);
1251 let partial = buf_len < buf.len();
1252
1253 // Copy data to the output buffer.
1254 let out_pos = (next_off - out_off) as usize;
1255 out[out_pos..out_pos + buf_len].copy_from_slice(&buf[..buf_len]);
1256
1257 self.len -= buf_len as u64;
1258
1259 out_len -= buf_len;
1260
1261 next_off = buf.off() + buf_len as u64;
1262
1263 buf.consume(buf_len);
1264
1265 if partial {
1266 // We reached the maximum capacity, so end here.
1267 break;
1268 }
1269
1270 self.pos += 1;
1271 }
1272
1273 // Override the `fin` flag set for the output buffer by matching the
1274 // buffer's maximum offset against the stream's final offset (if known).
1275 //
1276 // This is more efficient than tracking `fin` using the range buffers
1277 // themselves, and lets us avoid queueing empty buffers just so we can
1278 // propagate the final size.
1279 let fin = self.fin_off == Some(next_off);
1280
1281 // Record the largest offset that has been sent so we can accurately
1282 // report final_size
1283 self.emit_off = cmp::max(self.emit_off, next_off);
1284
1285 Ok((out.len() - out_len, fin))
1286 }
1287
1288 /// Updates the max_data limit to the given value.
update_max_data(&mut self, max_data: u64)1289 pub fn update_max_data(&mut self, max_data: u64) {
1290 self.max_data = cmp::max(self.max_data, max_data);
1291 }
1292
1293 /// Updates the last offset the stream was blocked at, if any.
update_blocked_at(&mut self, blocked_at: Option<u64>)1294 pub fn update_blocked_at(&mut self, blocked_at: Option<u64>) {
1295 self.blocked_at = blocked_at;
1296 }
1297
1298 /// The last offset the stream was blocked at, if any.
blocked_at(&self) -> Option<u64>1299 pub fn blocked_at(&self) -> Option<u64> {
1300 self.blocked_at
1301 }
1302
1303 /// Increments the acked data offset.
ack(&mut self, off: u64, len: usize)1304 pub fn ack(&mut self, off: u64, len: usize) {
1305 self.acked.insert(off..off + len as u64);
1306 }
1307
ack_and_drop(&mut self, off: u64, len: usize)1308 pub fn ack_and_drop(&mut self, off: u64, len: usize) {
1309 self.ack(off, len);
1310
1311 let ack_off = self.ack_off();
1312
1313 if self.data.is_empty() {
1314 return;
1315 }
1316
1317 if off > ack_off {
1318 return;
1319 }
1320
1321 let mut drop_until = None;
1322
1323 // Drop contiguously acked data from the front of the buffer.
1324 for (i, buf) in self.data.iter_mut().enumerate() {
1325 // Newly acked range is past highest contiguous acked range, so we
1326 // can't drop it.
1327 if buf.off >= ack_off {
1328 break;
1329 }
1330
1331 // Highest contiguous acked range falls within newly acked range,
1332 // so we can't drop it.
1333 if buf.off < ack_off && ack_off < buf.max_off() {
1334 break;
1335 }
1336
1337 // Newly acked range can be dropped.
1338 drop_until = Some(i);
1339 }
1340
1341 if let Some(drop) = drop_until {
1342 self.data.drain(..=drop);
1343
1344 // When a buffer is marked for retransmission, but then acked before
1345 // it could be retransmitted, we might end up decreasing the SendBuf
1346 // position too much, so make sure that doesn't happen.
1347 self.pos = self.pos.saturating_sub(drop + 1);
1348 }
1349 }
1350
retransmit(&mut self, off: u64, len: usize)1351 pub fn retransmit(&mut self, off: u64, len: usize) {
1352 let max_off = off + len as u64;
1353 let ack_off = self.ack_off();
1354
1355 if self.data.is_empty() {
1356 return;
1357 }
1358
1359 if max_off <= ack_off {
1360 return;
1361 }
1362
1363 for i in 0..self.data.len() {
1364 let buf = &mut self.data[i];
1365
1366 if buf.off >= max_off {
1367 break;
1368 }
1369
1370 if off > buf.max_off() {
1371 continue;
1372 }
1373
1374 // Split the buffer into 2 if the retransmit range ends before the
1375 // buffer's final offset.
1376 let new_buf = if buf.off < max_off && max_off < buf.max_off() {
1377 Some(buf.split_off((max_off - buf.off) as usize))
1378 } else {
1379 None
1380 };
1381
1382 let prev_pos = buf.pos;
1383
1384 // Reduce the buffer's position (expand the buffer) if the retransmit
1385 // range is past the buffer's starting offset.
1386 buf.pos = if off > buf.off && off <= buf.max_off() {
1387 cmp::min(buf.pos, buf.start + (off - buf.off) as usize)
1388 } else {
1389 buf.start
1390 };
1391
1392 self.pos = cmp::min(self.pos, i);
1393
1394 self.len += (prev_pos - buf.pos) as u64;
1395
1396 if let Some(b) = new_buf {
1397 self.data.insert(i + 1, b);
1398 }
1399 }
1400 }
1401
1402 /// Resets the stream at the current offset and clears all buffered data.
reset(&mut self) -> (u64, u64)1403 pub fn reset(&mut self) -> (u64, u64) {
1404 let unsent_off = cmp::max(self.off_front(), self.emit_off);
1405 let unsent_len = self.off_back().saturating_sub(unsent_off);
1406
1407 self.fin_off = Some(unsent_off);
1408
1409 // Drop all buffered data.
1410 self.data.clear();
1411
1412 // Mark all data as acked.
1413 self.ack(0, self.off as usize);
1414
1415 self.pos = 0;
1416 self.len = 0;
1417 self.off = unsent_off;
1418
1419 (self.emit_off, unsent_len)
1420 }
1421
1422 /// Resets the streams and records the received error code.
1423 ///
1424 /// Calling this again after the first time has no effect.
stop(&mut self, error_code: u64) -> Result<(u64, u64)>1425 pub fn stop(&mut self, error_code: u64) -> Result<(u64, u64)> {
1426 if self.error.is_some() {
1427 return Err(Error::Done);
1428 }
1429
1430 let (max_off, unsent) = self.reset();
1431
1432 self.error = Some(error_code);
1433
1434 Ok((max_off, unsent))
1435 }
1436
1437 /// Shuts down sending data.
shutdown(&mut self) -> Result<(u64, u64)>1438 pub fn shutdown(&mut self) -> Result<(u64, u64)> {
1439 if self.shutdown {
1440 return Err(Error::Done);
1441 }
1442
1443 self.shutdown = true;
1444
1445 Ok(self.reset())
1446 }
1447
1448 /// Returns the largest offset of data buffered.
off_back(&self) -> u641449 pub fn off_back(&self) -> u64 {
1450 self.off
1451 }
1452
1453 /// Returns the lowest offset of data buffered.
off_front(&self) -> u641454 pub fn off_front(&self) -> u64 {
1455 let mut pos = self.pos;
1456
1457 // Skip empty buffers from the start of the queue.
1458 while let Some(b) = self.data.get(pos) {
1459 if !b.is_empty() {
1460 return b.off();
1461 }
1462
1463 pos += 1;
1464 }
1465
1466 self.off
1467 }
1468
1469 /// The maximum offset we are allowed to send to the peer.
max_off(&self) -> u641470 pub fn max_off(&self) -> u64 {
1471 self.max_data
1472 }
1473
1474 /// Returns true if all data in the stream has been sent.
1475 ///
1476 /// This happens when the stream's send final size is known, and the
1477 /// application has already written data up to that point.
is_fin(&self) -> bool1478 pub fn is_fin(&self) -> bool {
1479 if self.fin_off == Some(self.off) {
1480 return true;
1481 }
1482
1483 false
1484 }
1485
1486 /// Returns true if the send-side of the stream is complete.
1487 ///
1488 /// This happens when the stream's send final size is known, and the peer
1489 /// has already acked all stream data up to that point.
is_complete(&self) -> bool1490 pub fn is_complete(&self) -> bool {
1491 if let Some(fin_off) = self.fin_off {
1492 if self.acked == (0..fin_off) {
1493 return true;
1494 }
1495 }
1496
1497 false
1498 }
1499
1500 /// Returns true if the stream was stopped before completion.
is_stopped(&self) -> bool1501 pub fn is_stopped(&self) -> bool {
1502 self.error.is_some()
1503 }
1504
1505 /// Returns true if there is data to be written.
ready(&self) -> bool1506 fn ready(&self) -> bool {
1507 !self.data.is_empty() && self.off_front() < self.off
1508 }
1509
1510 /// Returns the highest contiguously acked offset.
ack_off(&self) -> u641511 fn ack_off(&self) -> u64 {
1512 match self.acked.iter().next() {
1513 // Only consider the initial range if it contiguously covers the
1514 // start of the stream (i.e. from offset 0).
1515 Some(std::ops::Range { start: 0, end }) => end,
1516
1517 Some(_) | None => 0,
1518 }
1519 }
1520
1521 /// Returns the outgoing flow control capacity.
cap(&self) -> Result<usize>1522 pub fn cap(&self) -> Result<usize> {
1523 // The stream was stopped, so return the error code instead.
1524 if let Some(e) = self.error {
1525 return Err(Error::StreamStopped(e));
1526 }
1527
1528 Ok((self.max_data - self.off) as usize)
1529 }
1530 }
1531
1532 /// Buffer holding data at a specific offset.
1533 ///
1534 /// The data is stored in a `Vec<u8>` in such a way that it can be shared
1535 /// between multiple `RangeBuf` objects.
1536 ///
1537 /// Each `RangeBuf` will have its own view of that buffer, where the `start`
1538 /// value indicates the initial offset within the `Vec`, and `len` indicates the
1539 /// number of bytes, starting from `start` that are included.
1540 ///
1541 /// In addition, `pos` indicates the current offset within the `Vec`, starting
1542 /// from the very beginning of the `Vec`.
1543 ///
1544 /// Finally, `off` is the starting offset for the specific `RangeBuf` within the
1545 /// stream the buffer belongs to.
1546 #[derive(Clone, Debug, Default, Eq)]
1547 pub struct RangeBuf {
1548 /// The internal buffer holding the data.
1549 ///
1550 /// To avoid needless allocations when a RangeBuf is split, this field is
1551 /// reference-counted and can be shared between multiple RangeBuf objects,
1552 /// and sliced using the `start` and `len` values.
1553 data: Arc<Vec<u8>>,
1554
1555 /// The initial offset within the internal buffer.
1556 start: usize,
1557
1558 /// The current offset within the internal buffer.
1559 pos: usize,
1560
1561 /// The number of bytes in the buffer, from the initial offset.
1562 len: usize,
1563
1564 /// The offset of the buffer within a stream.
1565 off: u64,
1566
1567 /// Whether this contains the final byte in the stream.
1568 fin: bool,
1569 }
1570
1571 impl RangeBuf {
1572 /// Creates a new `RangeBuf` from the given slice.
from(buf: &[u8], off: u64, fin: bool) -> RangeBuf1573 pub fn from(buf: &[u8], off: u64, fin: bool) -> RangeBuf {
1574 RangeBuf {
1575 data: Arc::new(Vec::from(buf)),
1576 start: 0,
1577 pos: 0,
1578 len: buf.len(),
1579 off,
1580 fin,
1581 }
1582 }
1583
1584 /// Returns whether `self` holds the final offset in the stream.
fin(&self) -> bool1585 pub fn fin(&self) -> bool {
1586 self.fin
1587 }
1588
1589 /// Returns the starting offset of `self`.
off(&self) -> u641590 pub fn off(&self) -> u64 {
1591 (self.off - self.start as u64) + self.pos as u64
1592 }
1593
1594 /// Returns the final offset of `self`.
max_off(&self) -> u641595 pub fn max_off(&self) -> u64 {
1596 self.off() + self.len() as u64
1597 }
1598
1599 /// Returns the length of `self`.
len(&self) -> usize1600 pub fn len(&self) -> usize {
1601 self.len - (self.pos - self.start)
1602 }
1603
1604 /// Returns true if `self` has a length of zero bytes.
is_empty(&self) -> bool1605 pub fn is_empty(&self) -> bool {
1606 self.len() == 0
1607 }
1608
1609 /// Consumes the starting `count` bytes of `self`.
consume(&mut self, count: usize)1610 pub fn consume(&mut self, count: usize) {
1611 self.pos += count;
1612 }
1613
1614 /// Splits the buffer into two at the given index.
split_off(&mut self, at: usize) -> RangeBuf1615 pub fn split_off(&mut self, at: usize) -> RangeBuf {
1616 assert!(
1617 at <= self.len,
1618 "`at` split index (is {}) should be <= len (is {})",
1619 at,
1620 self.len
1621 );
1622
1623 let buf = RangeBuf {
1624 data: self.data.clone(),
1625 start: self.start + at,
1626 pos: cmp::max(self.pos, self.start + at),
1627 len: self.len - at,
1628 off: self.off + at as u64,
1629 fin: self.fin,
1630 };
1631
1632 self.pos = cmp::min(self.pos, self.start + at);
1633 self.len = at;
1634 self.fin = false;
1635
1636 buf
1637 }
1638 }
1639
1640 impl std::ops::Deref for RangeBuf {
1641 type Target = [u8];
1642
deref(&self) -> &[u8]1643 fn deref(&self) -> &[u8] {
1644 &self.data[self.pos..self.start + self.len]
1645 }
1646 }
1647
1648 impl Ord for RangeBuf {
cmp(&self, other: &RangeBuf) -> cmp::Ordering1649 fn cmp(&self, other: &RangeBuf) -> cmp::Ordering {
1650 // Invert ordering to implement min-heap.
1651 self.off.cmp(&other.off).reverse()
1652 }
1653 }
1654
1655 impl PartialOrd for RangeBuf {
partial_cmp(&self, other: &RangeBuf) -> Option<cmp::Ordering>1656 fn partial_cmp(&self, other: &RangeBuf) -> Option<cmp::Ordering> {
1657 Some(self.cmp(other))
1658 }
1659 }
1660
1661 impl PartialEq for RangeBuf {
eq(&self, other: &RangeBuf) -> bool1662 fn eq(&self, other: &RangeBuf) -> bool {
1663 self.off == other.off
1664 }
1665 }
1666
1667 #[cfg(test)]
1668 mod tests {
1669 use super::*;
1670
1671 #[test]
empty_read()1672 fn empty_read() {
1673 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
1674 assert_eq!(recv.len, 0);
1675
1676 let mut buf = [0; 32];
1677
1678 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
1679 }
1680
1681 #[test]
empty_stream_frame()1682 fn empty_stream_frame() {
1683 let mut recv = RecvBuf::new(15, DEFAULT_STREAM_WINDOW);
1684 assert_eq!(recv.len, 0);
1685
1686 let buf = RangeBuf::from(b"hello", 0, false);
1687 assert!(recv.write(buf).is_ok());
1688 assert_eq!(recv.len, 5);
1689 assert_eq!(recv.off, 0);
1690 assert_eq!(recv.data.len(), 1);
1691
1692 let mut buf = [0; 32];
1693 assert_eq!(recv.emit(&mut buf), Ok((5, false)));
1694
1695 // Don't store non-fin empty buffer.
1696 let buf = RangeBuf::from(b"", 10, false);
1697 assert!(recv.write(buf).is_ok());
1698 assert_eq!(recv.len, 5);
1699 assert_eq!(recv.off, 5);
1700 assert_eq!(recv.data.len(), 0);
1701
1702 // Check flow control for empty buffer.
1703 let buf = RangeBuf::from(b"", 16, false);
1704 assert_eq!(recv.write(buf), Err(Error::FlowControl));
1705
1706 // Store fin empty buffer.
1707 let buf = RangeBuf::from(b"", 5, true);
1708 assert!(recv.write(buf).is_ok());
1709 assert_eq!(recv.len, 5);
1710 assert_eq!(recv.off, 5);
1711 assert_eq!(recv.data.len(), 1);
1712
1713 // Don't store additional fin empty buffers.
1714 let buf = RangeBuf::from(b"", 5, true);
1715 assert!(recv.write(buf).is_ok());
1716 assert_eq!(recv.len, 5);
1717 assert_eq!(recv.off, 5);
1718 assert_eq!(recv.data.len(), 1);
1719
1720 // Don't store additional fin non-empty buffers.
1721 let buf = RangeBuf::from(b"aa", 3, true);
1722 assert!(recv.write(buf).is_ok());
1723 assert_eq!(recv.len, 5);
1724 assert_eq!(recv.off, 5);
1725 assert_eq!(recv.data.len(), 1);
1726
1727 // Validate final size with fin empty buffers.
1728 let buf = RangeBuf::from(b"", 6, true);
1729 assert_eq!(recv.write(buf), Err(Error::FinalSize));
1730 let buf = RangeBuf::from(b"", 4, true);
1731 assert_eq!(recv.write(buf), Err(Error::FinalSize));
1732
1733 let mut buf = [0; 32];
1734 assert_eq!(recv.emit(&mut buf), Ok((0, true)));
1735 }
1736
1737 #[test]
ordered_read()1738 fn ordered_read() {
1739 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
1740 assert_eq!(recv.len, 0);
1741
1742 let mut buf = [0; 32];
1743
1744 let first = RangeBuf::from(b"hello", 0, false);
1745 let second = RangeBuf::from(b"world", 5, false);
1746 let third = RangeBuf::from(b"something", 10, true);
1747
1748 assert!(recv.write(second).is_ok());
1749 assert_eq!(recv.len, 10);
1750 assert_eq!(recv.off, 0);
1751
1752 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
1753
1754 assert!(recv.write(third).is_ok());
1755 assert_eq!(recv.len, 19);
1756 assert_eq!(recv.off, 0);
1757
1758 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
1759
1760 assert!(recv.write(first).is_ok());
1761 assert_eq!(recv.len, 19);
1762 assert_eq!(recv.off, 0);
1763
1764 let (len, fin) = recv.emit(&mut buf).unwrap();
1765 assert_eq!(len, 19);
1766 assert_eq!(fin, true);
1767 assert_eq!(&buf[..len], b"helloworldsomething");
1768 assert_eq!(recv.len, 19);
1769 assert_eq!(recv.off, 19);
1770
1771 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
1772 }
1773
1774 #[test]
split_read()1775 fn split_read() {
1776 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
1777 assert_eq!(recv.len, 0);
1778
1779 let mut buf = [0; 32];
1780
1781 let first = RangeBuf::from(b"something", 0, false);
1782 let second = RangeBuf::from(b"helloworld", 9, true);
1783
1784 assert!(recv.write(first).is_ok());
1785 assert_eq!(recv.len, 9);
1786 assert_eq!(recv.off, 0);
1787
1788 assert!(recv.write(second).is_ok());
1789 assert_eq!(recv.len, 19);
1790 assert_eq!(recv.off, 0);
1791
1792 let (len, fin) = recv.emit(&mut buf[..10]).unwrap();
1793 assert_eq!(len, 10);
1794 assert_eq!(fin, false);
1795 assert_eq!(&buf[..len], b"somethingh");
1796 assert_eq!(recv.len, 19);
1797 assert_eq!(recv.off, 10);
1798
1799 let (len, fin) = recv.emit(&mut buf[..5]).unwrap();
1800 assert_eq!(len, 5);
1801 assert_eq!(fin, false);
1802 assert_eq!(&buf[..len], b"ellow");
1803 assert_eq!(recv.len, 19);
1804 assert_eq!(recv.off, 15);
1805
1806 let (len, fin) = recv.emit(&mut buf[..10]).unwrap();
1807 assert_eq!(len, 4);
1808 assert_eq!(fin, true);
1809 assert_eq!(&buf[..len], b"orld");
1810 assert_eq!(recv.len, 19);
1811 assert_eq!(recv.off, 19);
1812 }
1813
1814 #[test]
incomplete_read()1815 fn incomplete_read() {
1816 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
1817 assert_eq!(recv.len, 0);
1818
1819 let mut buf = [0; 32];
1820
1821 let first = RangeBuf::from(b"something", 0, false);
1822 let second = RangeBuf::from(b"helloworld", 9, true);
1823
1824 assert!(recv.write(second).is_ok());
1825 assert_eq!(recv.len, 19);
1826 assert_eq!(recv.off, 0);
1827
1828 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
1829
1830 assert!(recv.write(first).is_ok());
1831 assert_eq!(recv.len, 19);
1832 assert_eq!(recv.off, 0);
1833
1834 let (len, fin) = recv.emit(&mut buf).unwrap();
1835 assert_eq!(len, 19);
1836 assert_eq!(fin, true);
1837 assert_eq!(&buf[..len], b"somethinghelloworld");
1838 assert_eq!(recv.len, 19);
1839 assert_eq!(recv.off, 19);
1840 }
1841
1842 #[test]
zero_len_read()1843 fn zero_len_read() {
1844 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
1845 assert_eq!(recv.len, 0);
1846
1847 let mut buf = [0; 32];
1848
1849 let first = RangeBuf::from(b"something", 0, false);
1850 let second = RangeBuf::from(b"", 9, true);
1851
1852 assert!(recv.write(first).is_ok());
1853 assert_eq!(recv.len, 9);
1854 assert_eq!(recv.off, 0);
1855 assert_eq!(recv.data.len(), 1);
1856
1857 assert!(recv.write(second).is_ok());
1858 assert_eq!(recv.len, 9);
1859 assert_eq!(recv.off, 0);
1860 assert_eq!(recv.data.len(), 1);
1861
1862 let (len, fin) = recv.emit(&mut buf).unwrap();
1863 assert_eq!(len, 9);
1864 assert_eq!(fin, true);
1865 assert_eq!(&buf[..len], b"something");
1866 assert_eq!(recv.len, 9);
1867 assert_eq!(recv.off, 9);
1868 }
1869
1870 #[test]
past_read()1871 fn past_read() {
1872 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
1873 assert_eq!(recv.len, 0);
1874
1875 let mut buf = [0; 32];
1876
1877 let first = RangeBuf::from(b"something", 0, false);
1878 let second = RangeBuf::from(b"hello", 3, false);
1879 let third = RangeBuf::from(b"ello", 4, true);
1880 let fourth = RangeBuf::from(b"ello", 5, true);
1881
1882 assert!(recv.write(first).is_ok());
1883 assert_eq!(recv.len, 9);
1884 assert_eq!(recv.off, 0);
1885 assert_eq!(recv.data.len(), 1);
1886
1887 let (len, fin) = recv.emit(&mut buf).unwrap();
1888 assert_eq!(len, 9);
1889 assert_eq!(fin, false);
1890 assert_eq!(&buf[..len], b"something");
1891 assert_eq!(recv.len, 9);
1892 assert_eq!(recv.off, 9);
1893
1894 assert!(recv.write(second).is_ok());
1895 assert_eq!(recv.len, 9);
1896 assert_eq!(recv.off, 9);
1897 assert_eq!(recv.data.len(), 0);
1898
1899 assert_eq!(recv.write(third), Err(Error::FinalSize));
1900
1901 assert!(recv.write(fourth).is_ok());
1902 assert_eq!(recv.len, 9);
1903 assert_eq!(recv.off, 9);
1904 assert_eq!(recv.data.len(), 0);
1905
1906 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
1907 }
1908
1909 #[test]
fully_overlapping_read()1910 fn fully_overlapping_read() {
1911 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
1912 assert_eq!(recv.len, 0);
1913
1914 let mut buf = [0; 32];
1915
1916 let first = RangeBuf::from(b"something", 0, false);
1917 let second = RangeBuf::from(b"hello", 4, false);
1918
1919 assert!(recv.write(first).is_ok());
1920 assert_eq!(recv.len, 9);
1921 assert_eq!(recv.off, 0);
1922 assert_eq!(recv.data.len(), 1);
1923
1924 assert!(recv.write(second).is_ok());
1925 assert_eq!(recv.len, 9);
1926 assert_eq!(recv.off, 0);
1927 assert_eq!(recv.data.len(), 1);
1928
1929 let (len, fin) = recv.emit(&mut buf).unwrap();
1930 assert_eq!(len, 9);
1931 assert_eq!(fin, false);
1932 assert_eq!(&buf[..len], b"something");
1933 assert_eq!(recv.len, 9);
1934 assert_eq!(recv.off, 9);
1935 assert_eq!(recv.data.len(), 0);
1936
1937 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
1938 }
1939
1940 #[test]
fully_overlapping_read2()1941 fn fully_overlapping_read2() {
1942 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
1943 assert_eq!(recv.len, 0);
1944
1945 let mut buf = [0; 32];
1946
1947 let first = RangeBuf::from(b"something", 0, false);
1948 let second = RangeBuf::from(b"hello", 4, false);
1949
1950 assert!(recv.write(second).is_ok());
1951 assert_eq!(recv.len, 9);
1952 assert_eq!(recv.off, 0);
1953 assert_eq!(recv.data.len(), 1);
1954
1955 assert!(recv.write(first).is_ok());
1956 assert_eq!(recv.len, 9);
1957 assert_eq!(recv.off, 0);
1958 assert_eq!(recv.data.len(), 2);
1959
1960 let (len, fin) = recv.emit(&mut buf).unwrap();
1961 assert_eq!(len, 9);
1962 assert_eq!(fin, false);
1963 assert_eq!(&buf[..len], b"somehello");
1964 assert_eq!(recv.len, 9);
1965 assert_eq!(recv.off, 9);
1966 assert_eq!(recv.data.len(), 0);
1967
1968 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
1969 }
1970
1971 #[test]
fully_overlapping_read3()1972 fn fully_overlapping_read3() {
1973 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
1974 assert_eq!(recv.len, 0);
1975
1976 let mut buf = [0; 32];
1977
1978 let first = RangeBuf::from(b"something", 0, false);
1979 let second = RangeBuf::from(b"hello", 3, false);
1980
1981 assert!(recv.write(second).is_ok());
1982 assert_eq!(recv.len, 8);
1983 assert_eq!(recv.off, 0);
1984 assert_eq!(recv.data.len(), 1);
1985
1986 assert!(recv.write(first).is_ok());
1987 assert_eq!(recv.len, 9);
1988 assert_eq!(recv.off, 0);
1989 assert_eq!(recv.data.len(), 3);
1990
1991 let (len, fin) = recv.emit(&mut buf).unwrap();
1992 assert_eq!(len, 9);
1993 assert_eq!(fin, false);
1994 assert_eq!(&buf[..len], b"somhellog");
1995 assert_eq!(recv.len, 9);
1996 assert_eq!(recv.off, 9);
1997 assert_eq!(recv.data.len(), 0);
1998
1999 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
2000 }
2001
2002 #[test]
fully_overlapping_read_multi()2003 fn fully_overlapping_read_multi() {
2004 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
2005 assert_eq!(recv.len, 0);
2006
2007 let mut buf = [0; 32];
2008
2009 let first = RangeBuf::from(b"somethingsomething", 0, false);
2010 let second = RangeBuf::from(b"hello", 3, false);
2011 let third = RangeBuf::from(b"hello", 12, false);
2012
2013 assert!(recv.write(second).is_ok());
2014 assert_eq!(recv.len, 8);
2015 assert_eq!(recv.off, 0);
2016 assert_eq!(recv.data.len(), 1);
2017
2018 assert!(recv.write(third).is_ok());
2019 assert_eq!(recv.len, 17);
2020 assert_eq!(recv.off, 0);
2021 assert_eq!(recv.data.len(), 2);
2022
2023 assert!(recv.write(first).is_ok());
2024 assert_eq!(recv.len, 18);
2025 assert_eq!(recv.off, 0);
2026 assert_eq!(recv.data.len(), 5);
2027
2028 let (len, fin) = recv.emit(&mut buf).unwrap();
2029 assert_eq!(len, 18);
2030 assert_eq!(fin, false);
2031 assert_eq!(&buf[..len], b"somhellogsomhellog");
2032 assert_eq!(recv.len, 18);
2033 assert_eq!(recv.off, 18);
2034 assert_eq!(recv.data.len(), 0);
2035
2036 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
2037 }
2038
2039 #[test]
overlapping_start_read()2040 fn overlapping_start_read() {
2041 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
2042 assert_eq!(recv.len, 0);
2043
2044 let mut buf = [0; 32];
2045
2046 let first = RangeBuf::from(b"something", 0, false);
2047 let second = RangeBuf::from(b"hello", 8, true);
2048
2049 assert!(recv.write(first).is_ok());
2050 assert_eq!(recv.len, 9);
2051 assert_eq!(recv.off, 0);
2052 assert_eq!(recv.data.len(), 1);
2053
2054 assert!(recv.write(second).is_ok());
2055 assert_eq!(recv.len, 13);
2056 assert_eq!(recv.off, 0);
2057 assert_eq!(recv.data.len(), 2);
2058
2059 let (len, fin) = recv.emit(&mut buf).unwrap();
2060 assert_eq!(len, 13);
2061 assert_eq!(fin, true);
2062 assert_eq!(&buf[..len], b"somethingello");
2063 assert_eq!(recv.len, 13);
2064 assert_eq!(recv.off, 13);
2065
2066 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
2067 }
2068
2069 #[test]
overlapping_end_read()2070 fn overlapping_end_read() {
2071 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
2072 assert_eq!(recv.len, 0);
2073
2074 let mut buf = [0; 32];
2075
2076 let first = RangeBuf::from(b"hello", 0, false);
2077 let second = RangeBuf::from(b"something", 3, true);
2078
2079 assert!(recv.write(second).is_ok());
2080 assert_eq!(recv.len, 12);
2081 assert_eq!(recv.off, 0);
2082 assert_eq!(recv.data.len(), 1);
2083
2084 assert!(recv.write(first).is_ok());
2085 assert_eq!(recv.len, 12);
2086 assert_eq!(recv.off, 0);
2087 assert_eq!(recv.data.len(), 2);
2088
2089 let (len, fin) = recv.emit(&mut buf).unwrap();
2090 assert_eq!(len, 12);
2091 assert_eq!(fin, true);
2092 assert_eq!(&buf[..len], b"helsomething");
2093 assert_eq!(recv.len, 12);
2094 assert_eq!(recv.off, 12);
2095
2096 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
2097 }
2098
2099 #[test]
overlapping_end_twice_read()2100 fn overlapping_end_twice_read() {
2101 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
2102 assert_eq!(recv.len, 0);
2103
2104 let mut buf = [0; 32];
2105
2106 let first = RangeBuf::from(b"he", 0, false);
2107 let second = RangeBuf::from(b"ow", 4, false);
2108 let third = RangeBuf::from(b"rl", 7, false);
2109 let fourth = RangeBuf::from(b"helloworld", 0, true);
2110
2111 assert!(recv.write(third).is_ok());
2112 assert_eq!(recv.len, 9);
2113 assert_eq!(recv.off, 0);
2114 assert_eq!(recv.data.len(), 1);
2115
2116 assert!(recv.write(second).is_ok());
2117 assert_eq!(recv.len, 9);
2118 assert_eq!(recv.off, 0);
2119 assert_eq!(recv.data.len(), 2);
2120
2121 assert!(recv.write(first).is_ok());
2122 assert_eq!(recv.len, 9);
2123 assert_eq!(recv.off, 0);
2124 assert_eq!(recv.data.len(), 3);
2125
2126 assert!(recv.write(fourth).is_ok());
2127 assert_eq!(recv.len, 10);
2128 assert_eq!(recv.off, 0);
2129 assert_eq!(recv.data.len(), 6);
2130
2131 let (len, fin) = recv.emit(&mut buf).unwrap();
2132 assert_eq!(len, 10);
2133 assert_eq!(fin, true);
2134 assert_eq!(&buf[..len], b"helloworld");
2135 assert_eq!(recv.len, 10);
2136 assert_eq!(recv.off, 10);
2137
2138 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
2139 }
2140
2141 #[test]
overlapping_end_twice_and_contained_read()2142 fn overlapping_end_twice_and_contained_read() {
2143 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
2144 assert_eq!(recv.len, 0);
2145
2146 let mut buf = [0; 32];
2147
2148 let first = RangeBuf::from(b"hellow", 0, false);
2149 let second = RangeBuf::from(b"barfoo", 10, true);
2150 let third = RangeBuf::from(b"rl", 7, false);
2151 let fourth = RangeBuf::from(b"elloworldbarfoo", 1, true);
2152
2153 assert!(recv.write(third).is_ok());
2154 assert_eq!(recv.len, 9);
2155 assert_eq!(recv.off, 0);
2156 assert_eq!(recv.data.len(), 1);
2157
2158 assert!(recv.write(second).is_ok());
2159 assert_eq!(recv.len, 16);
2160 assert_eq!(recv.off, 0);
2161 assert_eq!(recv.data.len(), 2);
2162
2163 assert!(recv.write(first).is_ok());
2164 assert_eq!(recv.len, 16);
2165 assert_eq!(recv.off, 0);
2166 assert_eq!(recv.data.len(), 3);
2167
2168 assert!(recv.write(fourth).is_ok());
2169 assert_eq!(recv.len, 16);
2170 assert_eq!(recv.off, 0);
2171 assert_eq!(recv.data.len(), 5);
2172
2173 let (len, fin) = recv.emit(&mut buf).unwrap();
2174 assert_eq!(len, 16);
2175 assert_eq!(fin, true);
2176 assert_eq!(&buf[..len], b"helloworldbarfoo");
2177 assert_eq!(recv.len, 16);
2178 assert_eq!(recv.off, 16);
2179
2180 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
2181 }
2182
2183 #[test]
partially_multi_overlapping_reordered_read()2184 fn partially_multi_overlapping_reordered_read() {
2185 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
2186 assert_eq!(recv.len, 0);
2187
2188 let mut buf = [0; 32];
2189
2190 let first = RangeBuf::from(b"hello", 8, false);
2191 let second = RangeBuf::from(b"something", 0, false);
2192 let third = RangeBuf::from(b"moar", 11, true);
2193
2194 assert!(recv.write(first).is_ok());
2195 assert_eq!(recv.len, 13);
2196 assert_eq!(recv.off, 0);
2197 assert_eq!(recv.data.len(), 1);
2198
2199 assert!(recv.write(second).is_ok());
2200 assert_eq!(recv.len, 13);
2201 assert_eq!(recv.off, 0);
2202 assert_eq!(recv.data.len(), 2);
2203
2204 assert!(recv.write(third).is_ok());
2205 assert_eq!(recv.len, 15);
2206 assert_eq!(recv.off, 0);
2207 assert_eq!(recv.data.len(), 3);
2208
2209 let (len, fin) = recv.emit(&mut buf).unwrap();
2210 assert_eq!(len, 15);
2211 assert_eq!(fin, true);
2212 assert_eq!(&buf[..len], b"somethinhelloar");
2213 assert_eq!(recv.len, 15);
2214 assert_eq!(recv.off, 15);
2215 assert_eq!(recv.data.len(), 0);
2216
2217 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
2218 }
2219
2220 #[test]
partially_multi_overlapping_reordered_read2()2221 fn partially_multi_overlapping_reordered_read2() {
2222 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
2223 assert_eq!(recv.len, 0);
2224
2225 let mut buf = [0; 32];
2226
2227 let first = RangeBuf::from(b"aaa", 0, false);
2228 let second = RangeBuf::from(b"bbb", 2, false);
2229 let third = RangeBuf::from(b"ccc", 4, false);
2230 let fourth = RangeBuf::from(b"ddd", 6, false);
2231 let fifth = RangeBuf::from(b"eee", 9, false);
2232 let sixth = RangeBuf::from(b"fff", 11, false);
2233
2234 assert!(recv.write(second).is_ok());
2235 assert_eq!(recv.len, 5);
2236 assert_eq!(recv.off, 0);
2237 assert_eq!(recv.data.len(), 1);
2238
2239 assert!(recv.write(fourth).is_ok());
2240 assert_eq!(recv.len, 9);
2241 assert_eq!(recv.off, 0);
2242 assert_eq!(recv.data.len(), 2);
2243
2244 assert!(recv.write(third).is_ok());
2245 assert_eq!(recv.len, 9);
2246 assert_eq!(recv.off, 0);
2247 assert_eq!(recv.data.len(), 3);
2248
2249 assert!(recv.write(first).is_ok());
2250 assert_eq!(recv.len, 9);
2251 assert_eq!(recv.off, 0);
2252 assert_eq!(recv.data.len(), 4);
2253
2254 assert!(recv.write(sixth).is_ok());
2255 assert_eq!(recv.len, 14);
2256 assert_eq!(recv.off, 0);
2257 assert_eq!(recv.data.len(), 5);
2258
2259 assert!(recv.write(fifth).is_ok());
2260 assert_eq!(recv.len, 14);
2261 assert_eq!(recv.off, 0);
2262 assert_eq!(recv.data.len(), 6);
2263
2264 let (len, fin) = recv.emit(&mut buf).unwrap();
2265 assert_eq!(len, 14);
2266 assert_eq!(fin, false);
2267 assert_eq!(&buf[..len], b"aabbbcdddeefff");
2268 assert_eq!(recv.len, 14);
2269 assert_eq!(recv.off, 14);
2270 assert_eq!(recv.data.len(), 0);
2271
2272 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
2273 }
2274
2275 #[test]
empty_write()2276 fn empty_write() {
2277 let mut buf = [0; 5];
2278
2279 let mut send = SendBuf::new(u64::MAX);
2280 assert_eq!(send.len, 0);
2281
2282 let (written, fin) = send.emit(&mut buf).unwrap();
2283 assert_eq!(written, 0);
2284 assert_eq!(fin, false);
2285 }
2286
2287 #[test]
multi_write()2288 fn multi_write() {
2289 let mut buf = [0; 128];
2290
2291 let mut send = SendBuf::new(u64::MAX);
2292 assert_eq!(send.len, 0);
2293
2294 let first = b"something";
2295 let second = b"helloworld";
2296
2297 assert!(send.write(first, false).is_ok());
2298 assert_eq!(send.len, 9);
2299
2300 assert!(send.write(second, true).is_ok());
2301 assert_eq!(send.len, 19);
2302
2303 let (written, fin) = send.emit(&mut buf[..128]).unwrap();
2304 assert_eq!(written, 19);
2305 assert_eq!(fin, true);
2306 assert_eq!(&buf[..written], b"somethinghelloworld");
2307 assert_eq!(send.len, 0);
2308 }
2309
2310 #[test]
split_write()2311 fn split_write() {
2312 let mut buf = [0; 10];
2313
2314 let mut send = SendBuf::new(u64::MAX);
2315 assert_eq!(send.len, 0);
2316
2317 let first = b"something";
2318 let second = b"helloworld";
2319
2320 assert!(send.write(first, false).is_ok());
2321 assert_eq!(send.len, 9);
2322
2323 assert!(send.write(second, true).is_ok());
2324 assert_eq!(send.len, 19);
2325
2326 assert_eq!(send.off_front(), 0);
2327
2328 let (written, fin) = send.emit(&mut buf[..10]).unwrap();
2329 assert_eq!(written, 10);
2330 assert_eq!(fin, false);
2331 assert_eq!(&buf[..written], b"somethingh");
2332 assert_eq!(send.len, 9);
2333
2334 assert_eq!(send.off_front(), 10);
2335
2336 let (written, fin) = send.emit(&mut buf[..5]).unwrap();
2337 assert_eq!(written, 5);
2338 assert_eq!(fin, false);
2339 assert_eq!(&buf[..written], b"ellow");
2340 assert_eq!(send.len, 4);
2341
2342 assert_eq!(send.off_front(), 15);
2343
2344 let (written, fin) = send.emit(&mut buf[..10]).unwrap();
2345 assert_eq!(written, 4);
2346 assert_eq!(fin, true);
2347 assert_eq!(&buf[..written], b"orld");
2348 assert_eq!(send.len, 0);
2349
2350 assert_eq!(send.off_front(), 19);
2351 }
2352
2353 #[test]
resend()2354 fn resend() {
2355 let mut buf = [0; 15];
2356
2357 let mut send = SendBuf::new(u64::MAX);
2358 assert_eq!(send.len, 0);
2359 assert_eq!(send.off_front(), 0);
2360
2361 let first = b"something";
2362 let second = b"helloworld";
2363
2364 assert!(send.write(first, false).is_ok());
2365 assert_eq!(send.off_front(), 0);
2366
2367 assert!(send.write(second, true).is_ok());
2368 assert_eq!(send.off_front(), 0);
2369
2370 assert_eq!(send.len, 19);
2371
2372 let (written, fin) = send.emit(&mut buf[..4]).unwrap();
2373 assert_eq!(written, 4);
2374 assert_eq!(fin, false);
2375 assert_eq!(&buf[..written], b"some");
2376 assert_eq!(send.len, 15);
2377 assert_eq!(send.off_front(), 4);
2378
2379 let (written, fin) = send.emit(&mut buf[..5]).unwrap();
2380 assert_eq!(written, 5);
2381 assert_eq!(fin, false);
2382 assert_eq!(&buf[..written], b"thing");
2383 assert_eq!(send.len, 10);
2384 assert_eq!(send.off_front(), 9);
2385
2386 let (written, fin) = send.emit(&mut buf[..5]).unwrap();
2387 assert_eq!(written, 5);
2388 assert_eq!(fin, false);
2389 assert_eq!(&buf[..written], b"hello");
2390 assert_eq!(send.len, 5);
2391 assert_eq!(send.off_front(), 14);
2392
2393 send.retransmit(4, 5);
2394 assert_eq!(send.len, 10);
2395 assert_eq!(send.off_front(), 4);
2396
2397 send.retransmit(0, 4);
2398 assert_eq!(send.len, 14);
2399 assert_eq!(send.off_front(), 0);
2400
2401 let (written, fin) = send.emit(&mut buf[..11]).unwrap();
2402 assert_eq!(written, 9);
2403 assert_eq!(fin, false);
2404 assert_eq!(&buf[..written], b"something");
2405 assert_eq!(send.len, 5);
2406 assert_eq!(send.off_front(), 14);
2407
2408 let (written, fin) = send.emit(&mut buf[..11]).unwrap();
2409 assert_eq!(written, 5);
2410 assert_eq!(fin, true);
2411 assert_eq!(&buf[..written], b"world");
2412 assert_eq!(send.len, 0);
2413 assert_eq!(send.off_front(), 19);
2414 }
2415
2416 #[test]
write_blocked_by_off()2417 fn write_blocked_by_off() {
2418 let mut buf = [0; 10];
2419
2420 let mut send = SendBuf::default();
2421 assert_eq!(send.len, 0);
2422
2423 let first = b"something";
2424 let second = b"helloworld";
2425
2426 assert_eq!(send.write(first, false), Ok(0));
2427 assert_eq!(send.len, 0);
2428
2429 assert_eq!(send.write(second, true), Ok(0));
2430 assert_eq!(send.len, 0);
2431
2432 send.update_max_data(5);
2433
2434 assert_eq!(send.write(first, false), Ok(5));
2435 assert_eq!(send.len, 5);
2436
2437 assert_eq!(send.write(second, true), Ok(0));
2438 assert_eq!(send.len, 5);
2439
2440 assert_eq!(send.off_front(), 0);
2441
2442 let (written, fin) = send.emit(&mut buf[..10]).unwrap();
2443 assert_eq!(written, 5);
2444 assert_eq!(fin, false);
2445 assert_eq!(&buf[..written], b"somet");
2446 assert_eq!(send.len, 0);
2447
2448 assert_eq!(send.off_front(), 5);
2449
2450 let (written, fin) = send.emit(&mut buf[..10]).unwrap();
2451 assert_eq!(written, 0);
2452 assert_eq!(fin, false);
2453 assert_eq!(&buf[..written], b"");
2454 assert_eq!(send.len, 0);
2455
2456 send.update_max_data(15);
2457
2458 assert_eq!(send.write(&first[5..], false), Ok(4));
2459 assert_eq!(send.len, 4);
2460
2461 assert_eq!(send.write(second, true), Ok(6));
2462 assert_eq!(send.len, 10);
2463
2464 assert_eq!(send.off_front(), 5);
2465
2466 let (written, fin) = send.emit(&mut buf[..10]).unwrap();
2467 assert_eq!(written, 10);
2468 assert_eq!(fin, false);
2469 assert_eq!(&buf[..10], b"hinghellow");
2470 assert_eq!(send.len, 0);
2471
2472 send.update_max_data(25);
2473
2474 assert_eq!(send.write(&second[6..], true), Ok(4));
2475 assert_eq!(send.len, 4);
2476
2477 assert_eq!(send.off_front(), 15);
2478
2479 let (written, fin) = send.emit(&mut buf[..10]).unwrap();
2480 assert_eq!(written, 4);
2481 assert_eq!(fin, true);
2482 assert_eq!(&buf[..written], b"orld");
2483 assert_eq!(send.len, 0);
2484 }
2485
2486 #[test]
zero_len_write()2487 fn zero_len_write() {
2488 let mut buf = [0; 10];
2489
2490 let mut send = SendBuf::new(u64::MAX);
2491 assert_eq!(send.len, 0);
2492
2493 let first = b"something";
2494
2495 assert!(send.write(first, false).is_ok());
2496 assert_eq!(send.len, 9);
2497
2498 assert!(send.write(&[], true).is_ok());
2499 assert_eq!(send.len, 9);
2500
2501 assert_eq!(send.off_front(), 0);
2502
2503 let (written, fin) = send.emit(&mut buf[..10]).unwrap();
2504 assert_eq!(written, 9);
2505 assert_eq!(fin, true);
2506 assert_eq!(&buf[..written], b"something");
2507 assert_eq!(send.len, 0);
2508 }
2509
2510 #[test]
recv_flow_control()2511 fn recv_flow_control() {
2512 let mut stream = Stream::new(15, 0, true, true, DEFAULT_STREAM_WINDOW);
2513 assert!(!stream.recv.almost_full());
2514
2515 let mut buf = [0; 32];
2516
2517 let first = RangeBuf::from(b"hello", 0, false);
2518 let second = RangeBuf::from(b"world", 5, false);
2519 let third = RangeBuf::from(b"something", 10, false);
2520
2521 assert_eq!(stream.recv.write(second), Ok(()));
2522 assert_eq!(stream.recv.write(first), Ok(()));
2523 assert!(!stream.recv.almost_full());
2524
2525 assert_eq!(stream.recv.write(third), Err(Error::FlowControl));
2526
2527 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
2528 assert_eq!(&buf[..len], b"helloworld");
2529 assert_eq!(fin, false);
2530
2531 assert!(stream.recv.almost_full());
2532
2533 stream.recv.update_max_data(time::Instant::now());
2534 assert_eq!(stream.recv.max_data_next(), 25);
2535 assert!(!stream.recv.almost_full());
2536
2537 let third = RangeBuf::from(b"something", 10, false);
2538 assert_eq!(stream.recv.write(third), Ok(()));
2539 }
2540
2541 #[test]
recv_past_fin()2542 fn recv_past_fin() {
2543 let mut stream = Stream::new(15, 0, true, true, DEFAULT_STREAM_WINDOW);
2544 assert!(!stream.recv.almost_full());
2545
2546 let first = RangeBuf::from(b"hello", 0, true);
2547 let second = RangeBuf::from(b"world", 5, false);
2548
2549 assert_eq!(stream.recv.write(first), Ok(()));
2550 assert_eq!(stream.recv.write(second), Err(Error::FinalSize));
2551 }
2552
2553 #[test]
recv_fin_dup()2554 fn recv_fin_dup() {
2555 let mut stream = Stream::new(15, 0, true, true, DEFAULT_STREAM_WINDOW);
2556 assert!(!stream.recv.almost_full());
2557
2558 let first = RangeBuf::from(b"hello", 0, true);
2559 let second = RangeBuf::from(b"hello", 0, true);
2560
2561 assert_eq!(stream.recv.write(first), Ok(()));
2562 assert_eq!(stream.recv.write(second), Ok(()));
2563
2564 let mut buf = [0; 32];
2565
2566 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
2567 assert_eq!(&buf[..len], b"hello");
2568 assert_eq!(fin, true);
2569 }
2570
2571 #[test]
recv_fin_change()2572 fn recv_fin_change() {
2573 let mut stream = Stream::new(15, 0, true, true, DEFAULT_STREAM_WINDOW);
2574 assert!(!stream.recv.almost_full());
2575
2576 let first = RangeBuf::from(b"hello", 0, true);
2577 let second = RangeBuf::from(b"world", 5, true);
2578
2579 assert_eq!(stream.recv.write(second), Ok(()));
2580 assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
2581 }
2582
2583 #[test]
recv_fin_lower_than_received()2584 fn recv_fin_lower_than_received() {
2585 let mut stream = Stream::new(15, 0, true, true, DEFAULT_STREAM_WINDOW);
2586 assert!(!stream.recv.almost_full());
2587
2588 let first = RangeBuf::from(b"hello", 0, true);
2589 let second = RangeBuf::from(b"world", 5, false);
2590
2591 assert_eq!(stream.recv.write(second), Ok(()));
2592 assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
2593 }
2594
2595 #[test]
recv_fin_flow_control()2596 fn recv_fin_flow_control() {
2597 let mut stream = Stream::new(15, 0, true, true, DEFAULT_STREAM_WINDOW);
2598 assert!(!stream.recv.almost_full());
2599
2600 let mut buf = [0; 32];
2601
2602 let first = RangeBuf::from(b"hello", 0, false);
2603 let second = RangeBuf::from(b"world", 5, true);
2604
2605 assert_eq!(stream.recv.write(first), Ok(()));
2606 assert_eq!(stream.recv.write(second), Ok(()));
2607
2608 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
2609 assert_eq!(&buf[..len], b"helloworld");
2610 assert_eq!(fin, true);
2611
2612 assert!(!stream.recv.almost_full());
2613 }
2614
2615 #[test]
recv_fin_reset_mismatch()2616 fn recv_fin_reset_mismatch() {
2617 let mut stream = Stream::new(15, 0, true, true, DEFAULT_STREAM_WINDOW);
2618 assert!(!stream.recv.almost_full());
2619
2620 let first = RangeBuf::from(b"hello", 0, true);
2621
2622 assert_eq!(stream.recv.write(first), Ok(()));
2623 assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
2624 }
2625
2626 #[test]
recv_reset_dup()2627 fn recv_reset_dup() {
2628 let mut stream = Stream::new(15, 0, true, true, DEFAULT_STREAM_WINDOW);
2629 assert!(!stream.recv.almost_full());
2630
2631 let first = RangeBuf::from(b"hello", 0, false);
2632
2633 assert_eq!(stream.recv.write(first), Ok(()));
2634 assert_eq!(stream.recv.reset(0, 5), Ok(0));
2635 assert_eq!(stream.recv.reset(0, 5), Ok(0));
2636 }
2637
2638 #[test]
recv_reset_change()2639 fn recv_reset_change() {
2640 let mut stream = Stream::new(15, 0, true, true, DEFAULT_STREAM_WINDOW);
2641 assert!(!stream.recv.almost_full());
2642
2643 let first = RangeBuf::from(b"hello", 0, false);
2644
2645 assert_eq!(stream.recv.write(first), Ok(()));
2646 assert_eq!(stream.recv.reset(0, 5), Ok(0));
2647 assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
2648 }
2649
2650 #[test]
recv_reset_lower_than_received()2651 fn recv_reset_lower_than_received() {
2652 let mut stream = Stream::new(15, 0, true, true, DEFAULT_STREAM_WINDOW);
2653 assert!(!stream.recv.almost_full());
2654
2655 let first = RangeBuf::from(b"hello", 0, false);
2656
2657 assert_eq!(stream.recv.write(first), Ok(()));
2658 assert_eq!(stream.recv.reset(0, 4), Err(Error::FinalSize));
2659 }
2660
2661 #[test]
send_flow_control()2662 fn send_flow_control() {
2663 let mut buf = [0; 25];
2664
2665 let mut stream = Stream::new(0, 15, true, true, DEFAULT_STREAM_WINDOW);
2666
2667 let first = b"hello";
2668 let second = b"world";
2669 let third = b"something";
2670
2671 assert!(stream.send.write(first, false).is_ok());
2672 assert!(stream.send.write(second, false).is_ok());
2673 assert!(stream.send.write(third, false).is_ok());
2674
2675 assert_eq!(stream.send.off_front(), 0);
2676
2677 let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
2678 assert_eq!(written, 15);
2679 assert_eq!(fin, false);
2680 assert_eq!(&buf[..written], b"helloworldsomet");
2681
2682 assert_eq!(stream.send.off_front(), 15);
2683
2684 let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
2685 assert_eq!(written, 0);
2686 assert_eq!(fin, false);
2687 assert_eq!(&buf[..written], b"");
2688
2689 stream.send.retransmit(0, 15);
2690
2691 assert_eq!(stream.send.off_front(), 0);
2692
2693 let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
2694 assert_eq!(written, 10);
2695 assert_eq!(fin, false);
2696 assert_eq!(&buf[..written], b"helloworld");
2697
2698 assert_eq!(stream.send.off_front(), 10);
2699
2700 let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
2701 assert_eq!(written, 5);
2702 assert_eq!(fin, false);
2703 assert_eq!(&buf[..written], b"somet");
2704 }
2705
2706 #[test]
send_past_fin()2707 fn send_past_fin() {
2708 let mut stream = Stream::new(0, 15, true, true, DEFAULT_STREAM_WINDOW);
2709
2710 let first = b"hello";
2711 let second = b"world";
2712 let third = b"third";
2713
2714 assert_eq!(stream.send.write(first, false), Ok(5));
2715
2716 assert_eq!(stream.send.write(second, true), Ok(5));
2717 assert!(stream.send.is_fin());
2718
2719 assert_eq!(stream.send.write(third, false), Err(Error::FinalSize));
2720 }
2721
2722 #[test]
send_fin_dup()2723 fn send_fin_dup() {
2724 let mut stream = Stream::new(0, 15, true, true, DEFAULT_STREAM_WINDOW);
2725
2726 assert_eq!(stream.send.write(b"hello", true), Ok(5));
2727 assert!(stream.send.is_fin());
2728
2729 assert_eq!(stream.send.write(b"", true), Ok(0));
2730 assert!(stream.send.is_fin());
2731 }
2732
2733 #[test]
send_undo_fin()2734 fn send_undo_fin() {
2735 let mut stream = Stream::new(0, 15, true, true, DEFAULT_STREAM_WINDOW);
2736
2737 assert_eq!(stream.send.write(b"hello", true), Ok(5));
2738 assert!(stream.send.is_fin());
2739
2740 assert_eq!(
2741 stream.send.write(b"helloworld", true),
2742 Err(Error::FinalSize)
2743 );
2744 }
2745
2746 #[test]
send_fin_max_data_match()2747 fn send_fin_max_data_match() {
2748 let mut buf = [0; 15];
2749
2750 let mut stream = Stream::new(0, 15, true, true, DEFAULT_STREAM_WINDOW);
2751
2752 let slice = b"hellohellohello";
2753
2754 assert!(stream.send.write(slice, true).is_ok());
2755
2756 let (written, fin) = stream.send.emit(&mut buf[..15]).unwrap();
2757 assert_eq!(written, 15);
2758 assert_eq!(fin, true);
2759 assert_eq!(&buf[..written], slice);
2760 }
2761
2762 #[test]
send_fin_zero_length()2763 fn send_fin_zero_length() {
2764 let mut buf = [0; 5];
2765
2766 let mut stream = Stream::new(0, 15, true, true, DEFAULT_STREAM_WINDOW);
2767
2768 assert_eq!(stream.send.write(b"hello", false), Ok(5));
2769 assert_eq!(stream.send.write(b"", true), Ok(0));
2770 assert!(stream.send.is_fin());
2771
2772 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
2773 assert_eq!(written, 5);
2774 assert_eq!(fin, true);
2775 assert_eq!(&buf[..written], b"hello");
2776 }
2777
2778 #[test]
send_ack()2779 fn send_ack() {
2780 let mut buf = [0; 5];
2781
2782 let mut stream = Stream::new(0, 15, true, true, DEFAULT_STREAM_WINDOW);
2783
2784 assert_eq!(stream.send.write(b"hello", false), Ok(5));
2785 assert_eq!(stream.send.write(b"world", false), Ok(5));
2786 assert_eq!(stream.send.write(b"", true), Ok(0));
2787 assert!(stream.send.is_fin());
2788
2789 assert_eq!(stream.send.off_front(), 0);
2790
2791 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
2792 assert_eq!(written, 5);
2793 assert_eq!(fin, false);
2794 assert_eq!(&buf[..written], b"hello");
2795
2796 stream.send.ack_and_drop(0, 5);
2797
2798 stream.send.retransmit(0, 5);
2799
2800 assert_eq!(stream.send.off_front(), 5);
2801
2802 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
2803 assert_eq!(written, 5);
2804 assert_eq!(fin, true);
2805 assert_eq!(&buf[..written], b"world");
2806 }
2807
2808 #[test]
send_ack_reordering()2809 fn send_ack_reordering() {
2810 let mut buf = [0; 5];
2811
2812 let mut stream = Stream::new(0, 15, true, true, DEFAULT_STREAM_WINDOW);
2813
2814 assert_eq!(stream.send.write(b"hello", false), Ok(5));
2815 assert_eq!(stream.send.write(b"world", false), Ok(5));
2816 assert_eq!(stream.send.write(b"", true), Ok(0));
2817 assert!(stream.send.is_fin());
2818
2819 assert_eq!(stream.send.off_front(), 0);
2820
2821 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
2822 assert_eq!(written, 5);
2823 assert_eq!(fin, false);
2824 assert_eq!(&buf[..written], b"hello");
2825
2826 assert_eq!(stream.send.off_front(), 5);
2827
2828 let (written, fin) = stream.send.emit(&mut buf[..1]).unwrap();
2829 assert_eq!(written, 1);
2830 assert_eq!(fin, false);
2831 assert_eq!(&buf[..written], b"w");
2832
2833 stream.send.ack_and_drop(5, 1);
2834 stream.send.ack_and_drop(0, 5);
2835
2836 stream.send.retransmit(0, 5);
2837 stream.send.retransmit(5, 1);
2838
2839 assert_eq!(stream.send.off_front(), 6);
2840
2841 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
2842 assert_eq!(written, 4);
2843 assert_eq!(fin, true);
2844 assert_eq!(&buf[..written], b"orld");
2845 }
2846
2847 #[test]
recv_data_below_off()2848 fn recv_data_below_off() {
2849 let mut stream = Stream::new(15, 0, true, true, DEFAULT_STREAM_WINDOW);
2850
2851 let first = RangeBuf::from(b"hello", 0, false);
2852
2853 assert_eq!(stream.recv.write(first), Ok(()));
2854
2855 let mut buf = [0; 10];
2856
2857 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
2858 assert_eq!(&buf[..len], b"hello");
2859 assert_eq!(fin, false);
2860
2861 let first = RangeBuf::from(b"elloworld", 1, true);
2862 assert_eq!(stream.recv.write(first), Ok(()));
2863
2864 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
2865 assert_eq!(&buf[..len], b"world");
2866 assert_eq!(fin, true);
2867 }
2868
2869 #[test]
stream_complete()2870 fn stream_complete() {
2871 let mut stream = Stream::new(30, 30, true, true, DEFAULT_STREAM_WINDOW);
2872
2873 assert_eq!(stream.send.write(b"hello", false), Ok(5));
2874 assert_eq!(stream.send.write(b"world", false), Ok(5));
2875
2876 assert!(!stream.send.is_complete());
2877 assert!(!stream.send.is_fin());
2878
2879 assert_eq!(stream.send.write(b"", true), Ok(0));
2880
2881 assert!(!stream.send.is_complete());
2882 assert!(stream.send.is_fin());
2883
2884 let buf = RangeBuf::from(b"hello", 0, true);
2885 assert!(stream.recv.write(buf).is_ok());
2886 assert!(!stream.recv.is_fin());
2887
2888 stream.send.ack(6, 4);
2889 assert!(!stream.send.is_complete());
2890
2891 let mut buf = [0; 2];
2892 assert_eq!(stream.recv.emit(&mut buf), Ok((2, false)));
2893 assert!(!stream.recv.is_fin());
2894
2895 stream.send.ack(1, 5);
2896 assert!(!stream.send.is_complete());
2897
2898 stream.send.ack(0, 1);
2899 assert!(stream.send.is_complete());
2900
2901 assert!(!stream.is_complete());
2902
2903 let mut buf = [0; 3];
2904 assert_eq!(stream.recv.emit(&mut buf), Ok((3, true)));
2905 assert!(stream.recv.is_fin());
2906
2907 assert!(stream.is_complete());
2908 }
2909
2910 #[test]
send_fin_zero_length_output()2911 fn send_fin_zero_length_output() {
2912 let mut buf = [0; 5];
2913
2914 let mut stream = Stream::new(0, 15, true, true, DEFAULT_STREAM_WINDOW);
2915
2916 assert_eq!(stream.send.write(b"hello", false), Ok(5));
2917 assert_eq!(stream.send.off_front(), 0);
2918 assert!(!stream.send.is_fin());
2919
2920 let (written, fin) = stream.send.emit(&mut buf).unwrap();
2921 assert_eq!(written, 5);
2922 assert_eq!(fin, false);
2923 assert_eq!(&buf[..written], b"hello");
2924
2925 assert_eq!(stream.send.write(b"", true), Ok(0));
2926 assert!(stream.send.is_fin());
2927 assert_eq!(stream.send.off_front(), 5);
2928
2929 let (written, fin) = stream.send.emit(&mut buf).unwrap();
2930 assert_eq!(written, 0);
2931 assert_eq!(fin, true);
2932 assert_eq!(&buf[..written], b"");
2933 }
2934
2935 #[test]
send_emit()2936 fn send_emit() {
2937 let mut buf = [0; 5];
2938
2939 let mut stream = Stream::new(0, 20, true, true, DEFAULT_STREAM_WINDOW);
2940
2941 assert_eq!(stream.send.write(b"hello", false), Ok(5));
2942 assert_eq!(stream.send.write(b"world", false), Ok(5));
2943 assert_eq!(stream.send.write(b"olleh", false), Ok(5));
2944 assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
2945 assert_eq!(stream.send.off_front(), 0);
2946 assert_eq!(stream.send.data.len(), 4);
2947
2948 assert!(stream.is_flushable());
2949
2950 assert!(stream.send.ready());
2951 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
2952 assert_eq!(stream.send.off_front(), 4);
2953 assert_eq!(&buf[..4], b"hell");
2954
2955 assert!(stream.send.ready());
2956 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
2957 assert_eq!(stream.send.off_front(), 8);
2958 assert_eq!(&buf[..4], b"owor");
2959
2960 assert!(stream.send.ready());
2961 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
2962 assert_eq!(stream.send.off_front(), 10);
2963 assert_eq!(&buf[..2], b"ld");
2964
2965 assert!(stream.send.ready());
2966 assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
2967 assert_eq!(stream.send.off_front(), 11);
2968 assert_eq!(&buf[..1], b"o");
2969
2970 assert!(stream.send.ready());
2971 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
2972 assert_eq!(stream.send.off_front(), 16);
2973 assert_eq!(&buf[..5], b"llehd");
2974
2975 assert!(stream.send.ready());
2976 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
2977 assert_eq!(stream.send.off_front(), 20);
2978 assert_eq!(&buf[..4], b"lrow");
2979
2980 assert!(!stream.is_flushable());
2981
2982 assert!(!stream.send.ready());
2983 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
2984 assert_eq!(stream.send.off_front(), 20);
2985 }
2986
2987 #[test]
send_emit_ack()2988 fn send_emit_ack() {
2989 let mut buf = [0; 5];
2990
2991 let mut stream = Stream::new(0, 20, true, true, DEFAULT_STREAM_WINDOW);
2992
2993 assert_eq!(stream.send.write(b"hello", false), Ok(5));
2994 assert_eq!(stream.send.write(b"world", false), Ok(5));
2995 assert_eq!(stream.send.write(b"olleh", false), Ok(5));
2996 assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
2997 assert_eq!(stream.send.off_front(), 0);
2998 assert_eq!(stream.send.data.len(), 4);
2999
3000 assert!(stream.is_flushable());
3001
3002 assert!(stream.send.ready());
3003 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
3004 assert_eq!(stream.send.off_front(), 4);
3005 assert_eq!(&buf[..4], b"hell");
3006
3007 assert!(stream.send.ready());
3008 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
3009 assert_eq!(stream.send.off_front(), 8);
3010 assert_eq!(&buf[..4], b"owor");
3011
3012 stream.send.ack_and_drop(0, 5);
3013 assert_eq!(stream.send.data.len(), 3);
3014
3015 assert!(stream.send.ready());
3016 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
3017 assert_eq!(stream.send.off_front(), 10);
3018 assert_eq!(&buf[..2], b"ld");
3019
3020 stream.send.ack_and_drop(7, 5);
3021 assert_eq!(stream.send.data.len(), 3);
3022
3023 assert!(stream.send.ready());
3024 assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
3025 assert_eq!(stream.send.off_front(), 11);
3026 assert_eq!(&buf[..1], b"o");
3027
3028 assert!(stream.send.ready());
3029 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
3030 assert_eq!(stream.send.off_front(), 16);
3031 assert_eq!(&buf[..5], b"llehd");
3032
3033 stream.send.ack_and_drop(5, 7);
3034 assert_eq!(stream.send.data.len(), 2);
3035
3036 assert!(stream.send.ready());
3037 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
3038 assert_eq!(stream.send.off_front(), 20);
3039 assert_eq!(&buf[..4], b"lrow");
3040
3041 assert!(!stream.is_flushable());
3042
3043 assert!(!stream.send.ready());
3044 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
3045 assert_eq!(stream.send.off_front(), 20);
3046
3047 stream.send.ack_and_drop(22, 4);
3048 assert_eq!(stream.send.data.len(), 2);
3049
3050 stream.send.ack_and_drop(20, 1);
3051 assert_eq!(stream.send.data.len(), 2);
3052 }
3053
3054 #[test]
send_emit_retransmit()3055 fn send_emit_retransmit() {
3056 let mut buf = [0; 5];
3057
3058 let mut stream = Stream::new(0, 20, true, true, DEFAULT_STREAM_WINDOW);
3059
3060 assert_eq!(stream.send.write(b"hello", false), Ok(5));
3061 assert_eq!(stream.send.write(b"world", false), Ok(5));
3062 assert_eq!(stream.send.write(b"olleh", false), Ok(5));
3063 assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
3064 assert_eq!(stream.send.off_front(), 0);
3065 assert_eq!(stream.send.data.len(), 4);
3066
3067 assert!(stream.is_flushable());
3068
3069 assert!(stream.send.ready());
3070 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
3071 assert_eq!(stream.send.off_front(), 4);
3072 assert_eq!(&buf[..4], b"hell");
3073
3074 assert!(stream.send.ready());
3075 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
3076 assert_eq!(stream.send.off_front(), 8);
3077 assert_eq!(&buf[..4], b"owor");
3078
3079 stream.send.retransmit(3, 3);
3080 assert_eq!(stream.send.off_front(), 3);
3081
3082 assert!(stream.send.ready());
3083 assert_eq!(stream.send.emit(&mut buf[..3]), Ok((3, false)));
3084 assert_eq!(stream.send.off_front(), 8);
3085 assert_eq!(&buf[..3], b"low");
3086
3087 assert!(stream.send.ready());
3088 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
3089 assert_eq!(stream.send.off_front(), 10);
3090 assert_eq!(&buf[..2], b"ld");
3091
3092 stream.send.ack_and_drop(7, 2);
3093
3094 stream.send.retransmit(8, 2);
3095
3096 assert!(stream.send.ready());
3097 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
3098 assert_eq!(stream.send.off_front(), 10);
3099 assert_eq!(&buf[..2], b"ld");
3100
3101 assert!(stream.send.ready());
3102 assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
3103 assert_eq!(stream.send.off_front(), 11);
3104 assert_eq!(&buf[..1], b"o");
3105
3106 assert!(stream.send.ready());
3107 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
3108 assert_eq!(stream.send.off_front(), 16);
3109 assert_eq!(&buf[..5], b"llehd");
3110
3111 stream.send.retransmit(12, 2);
3112
3113 assert!(stream.send.ready());
3114 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
3115 assert_eq!(stream.send.off_front(), 16);
3116 assert_eq!(&buf[..2], b"le");
3117
3118 assert!(stream.send.ready());
3119 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
3120 assert_eq!(stream.send.off_front(), 20);
3121 assert_eq!(&buf[..4], b"lrow");
3122
3123 assert!(!stream.is_flushable());
3124
3125 assert!(!stream.send.ready());
3126 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
3127 assert_eq!(stream.send.off_front(), 20);
3128
3129 stream.send.retransmit(7, 12);
3130
3131 assert!(stream.send.ready());
3132 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
3133 assert_eq!(stream.send.off_front(), 12);
3134 assert_eq!(&buf[..5], b"rldol");
3135
3136 assert!(stream.send.ready());
3137 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
3138 assert_eq!(stream.send.off_front(), 17);
3139 assert_eq!(&buf[..5], b"lehdl");
3140
3141 assert!(stream.send.ready());
3142 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
3143 assert_eq!(stream.send.off_front(), 20);
3144 assert_eq!(&buf[..2], b"ro");
3145
3146 stream.send.ack_and_drop(12, 7);
3147
3148 stream.send.retransmit(7, 12);
3149
3150 assert!(stream.send.ready());
3151 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
3152 assert_eq!(stream.send.off_front(), 12);
3153 assert_eq!(&buf[..5], b"rldol");
3154
3155 assert!(stream.send.ready());
3156 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
3157 assert_eq!(stream.send.off_front(), 17);
3158 assert_eq!(&buf[..5], b"lehdl");
3159
3160 assert!(stream.send.ready());
3161 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
3162 assert_eq!(stream.send.off_front(), 20);
3163 assert_eq!(&buf[..2], b"ro");
3164 }
3165
3166 #[test]
rangebuf_split_off()3167 fn rangebuf_split_off() {
3168 let mut buf = RangeBuf::from(b"helloworld", 5, true);
3169 assert_eq!(buf.start, 0);
3170 assert_eq!(buf.pos, 0);
3171 assert_eq!(buf.len, 10);
3172 assert_eq!(buf.off, 5);
3173 assert_eq!(buf.fin, true);
3174
3175 assert_eq!(buf.len(), 10);
3176 assert_eq!(buf.off(), 5);
3177 assert_eq!(buf.fin(), true);
3178
3179 assert_eq!(&buf[..], b"helloworld");
3180
3181 // Advance buffer.
3182 buf.consume(5);
3183
3184 assert_eq!(buf.start, 0);
3185 assert_eq!(buf.pos, 5);
3186 assert_eq!(buf.len, 10);
3187 assert_eq!(buf.off, 5);
3188 assert_eq!(buf.fin, true);
3189
3190 assert_eq!(buf.len(), 5);
3191 assert_eq!(buf.off(), 10);
3192 assert_eq!(buf.fin(), true);
3193
3194 assert_eq!(&buf[..], b"world");
3195
3196 // Split buffer before position.
3197 let mut new_buf = buf.split_off(3);
3198
3199 assert_eq!(buf.start, 0);
3200 assert_eq!(buf.pos, 3);
3201 assert_eq!(buf.len, 3);
3202 assert_eq!(buf.off, 5);
3203 assert_eq!(buf.fin, false);
3204
3205 assert_eq!(buf.len(), 0);
3206 assert_eq!(buf.off(), 8);
3207 assert_eq!(buf.fin(), false);
3208
3209 assert_eq!(&buf[..], b"");
3210
3211 assert_eq!(new_buf.start, 3);
3212 assert_eq!(new_buf.pos, 5);
3213 assert_eq!(new_buf.len, 7);
3214 assert_eq!(new_buf.off, 8);
3215 assert_eq!(new_buf.fin, true);
3216
3217 assert_eq!(new_buf.len(), 5);
3218 assert_eq!(new_buf.off(), 10);
3219 assert_eq!(new_buf.fin(), true);
3220
3221 assert_eq!(&new_buf[..], b"world");
3222
3223 // Advance buffer.
3224 new_buf.consume(2);
3225
3226 assert_eq!(new_buf.start, 3);
3227 assert_eq!(new_buf.pos, 7);
3228 assert_eq!(new_buf.len, 7);
3229 assert_eq!(new_buf.off, 8);
3230 assert_eq!(new_buf.fin, true);
3231
3232 assert_eq!(new_buf.len(), 3);
3233 assert_eq!(new_buf.off(), 12);
3234 assert_eq!(new_buf.fin(), true);
3235
3236 assert_eq!(&new_buf[..], b"rld");
3237
3238 // Split buffer after position.
3239 let mut new_new_buf = new_buf.split_off(5);
3240
3241 assert_eq!(new_buf.start, 3);
3242 assert_eq!(new_buf.pos, 7);
3243 assert_eq!(new_buf.len, 5);
3244 assert_eq!(new_buf.off, 8);
3245 assert_eq!(new_buf.fin, false);
3246
3247 assert_eq!(new_buf.len(), 1);
3248 assert_eq!(new_buf.off(), 12);
3249 assert_eq!(new_buf.fin(), false);
3250
3251 assert_eq!(&new_buf[..], b"r");
3252
3253 assert_eq!(new_new_buf.start, 8);
3254 assert_eq!(new_new_buf.pos, 8);
3255 assert_eq!(new_new_buf.len, 2);
3256 assert_eq!(new_new_buf.off, 13);
3257 assert_eq!(new_new_buf.fin, true);
3258
3259 assert_eq!(new_new_buf.len(), 2);
3260 assert_eq!(new_new_buf.off(), 13);
3261 assert_eq!(new_new_buf.fin(), true);
3262
3263 assert_eq!(&new_new_buf[..], b"ld");
3264
3265 // Advance buffer.
3266 new_new_buf.consume(2);
3267
3268 assert_eq!(new_new_buf.start, 8);
3269 assert_eq!(new_new_buf.pos, 10);
3270 assert_eq!(new_new_buf.len, 2);
3271 assert_eq!(new_new_buf.off, 13);
3272 assert_eq!(new_new_buf.fin, true);
3273
3274 assert_eq!(new_new_buf.len(), 0);
3275 assert_eq!(new_new_buf.off(), 15);
3276 assert_eq!(new_new_buf.fin(), true);
3277
3278 assert_eq!(&new_new_buf[..], b"");
3279 }
3280
3281 /// RFC9000 2.1: A stream ID that is used out of order results in all
3282 /// streams of that type with lower-numbered stream IDs also being opened.
3283 #[test]
stream_limit_auto_open()3284 fn stream_limit_auto_open() {
3285 let local_tp = crate::TransportParams::default();
3286 let peer_tp = crate::TransportParams::default();
3287
3288 let mut streams = StreamMap::new(5, 5, 5);
3289
3290 let stream_id = 500;
3291 assert!(!is_local(stream_id, true), "stream id is peer initiated");
3292 assert!(is_bidi(stream_id), "stream id is bidirectional");
3293 assert_eq!(
3294 streams
3295 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
3296 .err(),
3297 Some(Error::StreamLimit),
3298 "stream limit should be exceeded"
3299 );
3300 }
3301
3302 /// Stream limit should be satisfied regardless of what order we open
3303 /// streams
3304 #[test]
stream_create_out_of_order()3305 fn stream_create_out_of_order() {
3306 let local_tp = crate::TransportParams::default();
3307 let peer_tp = crate::TransportParams::default();
3308
3309 let mut streams = StreamMap::new(5, 5, 5);
3310
3311 for stream_id in [8, 12, 4] {
3312 assert!(is_local(stream_id, false), "stream id is client initiated");
3313 assert!(is_bidi(stream_id), "stream id is bidirectional");
3314 assert!(streams
3315 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
3316 .is_ok());
3317 }
3318 }
3319
3320 /// Check stream limit boundary cases
3321 #[test]
stream_limit_edge()3322 fn stream_limit_edge() {
3323 let local_tp = crate::TransportParams::default();
3324 let peer_tp = crate::TransportParams::default();
3325
3326 let mut streams = StreamMap::new(3, 3, 3);
3327
3328 // Highest permitted
3329 let stream_id = 8;
3330 assert!(streams
3331 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
3332 .is_ok());
3333
3334 // One more than highest permitted
3335 let stream_id = 12;
3336 assert_eq!(
3337 streams
3338 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
3339 .err(),
3340 Some(Error::StreamLimit)
3341 );
3342 }
3343
3344 /// Check SendBuf::len calculation on a retransmit case
3345 #[test]
send_buf_len_on_retransmit()3346 fn send_buf_len_on_retransmit() {
3347 let mut buf = [0; 15];
3348
3349 let mut send = SendBuf::new(u64::MAX);
3350 assert_eq!(send.len, 0);
3351 assert_eq!(send.off_front(), 0);
3352
3353 let first = b"something";
3354
3355 assert!(send.write(first, false).is_ok());
3356 assert_eq!(send.off_front(), 0);
3357
3358 assert_eq!(send.len, 9);
3359
3360 let (written, fin) = send.emit(&mut buf[..4]).unwrap();
3361 assert_eq!(written, 4);
3362 assert_eq!(fin, false);
3363 assert_eq!(&buf[..written], b"some");
3364 assert_eq!(send.len, 5);
3365 assert_eq!(send.off_front(), 4);
3366
3367 send.retransmit(3, 5);
3368 assert_eq!(send.len, 6);
3369 assert_eq!(send.off_front(), 3);
3370 }
3371
3372 #[test]
send_buf_final_size_retransmit()3373 fn send_buf_final_size_retransmit() {
3374 let mut buf = [0; 50];
3375 let mut send = SendBuf::new(u64::MAX);
3376
3377 send.write(&buf, false).unwrap();
3378 assert_eq!(send.off_front(), 0);
3379
3380 // Emit the whole buffer
3381 let (written, _fin) = send.emit(&mut buf).unwrap();
3382 assert_eq!(written, buf.len());
3383 assert_eq!(send.off_front(), buf.len() as u64);
3384
3385 // Server decides to retransmit the last 10 bytes. It's possible
3386 // it's not actually lost and that the client did receive it.
3387 send.retransmit(40, 10);
3388
3389 // Server receives STOP_SENDING from client. The final_size we
3390 // send in the RESET_STREAM should be 50. If we send anything less,
3391 // it's a FINAL_SIZE_ERROR.
3392 let (fin_off, unsent) = send.stop(0).unwrap();
3393 assert_eq!(fin_off, 50);
3394 assert_eq!(unsent, 0);
3395 }
3396 }
3397