• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2018-2019, Cloudflare, Inc.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 //     * Redistributions of source code must retain the above copyright notice,
9 //       this list of conditions and the following disclaimer.
10 //
11 //     * Redistributions in binary form must reproduce the above copyright
12 //       notice, this list of conditions and the following disclaimer in the
13 //       documentation and/or other materials provided with the distribution.
14 //
15 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16 // IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17 // THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 
27 use std::cmp;
28 
29 use std::sync::Arc;
30 
31 use std::collections::hash_map;
32 use std::collections::BTreeMap;
33 use std::collections::BinaryHeap;
34 use std::collections::HashMap;
35 use std::collections::HashSet;
36 use std::collections::VecDeque;
37 
38 use std::time;
39 
40 use smallvec::SmallVec;
41 
42 use crate::Error;
43 use crate::Result;
44 
45 use crate::flowcontrol;
46 use crate::ranges;
47 
48 const DEFAULT_URGENCY: u8 = 127;
49 
50 #[cfg(test)]
51 const SEND_BUFFER_SIZE: usize = 5;
52 
53 #[cfg(not(test))]
54 const SEND_BUFFER_SIZE: usize = 4096;
55 
56 // The default size of the receiver stream flow control window.
57 const DEFAULT_STREAM_WINDOW: u64 = 32 * 1024;
58 
59 /// The maximum size of the receiver stream flow control window.
60 pub const MAX_STREAM_WINDOW: u64 = 16 * 1024 * 1024;
61 
62 /// A simple no-op hasher for Stream IDs.
63 ///
64 /// The QUIC protocol and quiche library guarantees stream ID uniqueness, so
65 /// we can save effort by avoiding using a more complicated algorithm.
66 #[derive(Default)]
67 pub struct StreamIdHasher {
68     id: u64,
69 }
70 
71 impl std::hash::Hasher for StreamIdHasher {
72     #[inline]
finish(&self) -> u6473     fn finish(&self) -> u64 {
74         self.id
75     }
76 
77     #[inline]
write_u64(&mut self, id: u64)78     fn write_u64(&mut self, id: u64) {
79         self.id = id;
80     }
81 
82     #[inline]
write(&mut self, _: &[u8])83     fn write(&mut self, _: &[u8]) {
84         // We need a default write() for the trait but stream IDs will always
85         // be a u64 so we just delegate to write_u64.
86         unimplemented!()
87     }
88 }
89 
90 type BuildStreamIdHasher = std::hash::BuildHasherDefault<StreamIdHasher>;
91 
92 pub type StreamIdHashMap<V> = HashMap<u64, V, BuildStreamIdHasher>;
93 pub type StreamIdHashSet = HashSet<u64, BuildStreamIdHasher>;
94 
95 /// Keeps track of QUIC streams and enforces stream limits.
96 #[derive(Default)]
97 pub struct StreamMap {
98     /// Map of streams indexed by stream ID.
99     streams: StreamIdHashMap<Stream>,
100 
101     /// Set of streams that were completed and garbage collected.
102     ///
103     /// Instead of keeping the full stream state forever, we collect completed
104     /// streams to save memory, but we still need to keep track of previously
105     /// created streams, to prevent peers from re-creating them.
106     collected: StreamIdHashSet,
107 
108     /// Peer's maximum bidirectional stream count limit.
109     peer_max_streams_bidi: u64,
110 
111     /// Peer's maximum unidirectional stream count limit.
112     peer_max_streams_uni: u64,
113 
114     /// The total number of bidirectional streams opened by the peer.
115     peer_opened_streams_bidi: u64,
116 
117     /// The total number of unidirectional streams opened by the peer.
118     peer_opened_streams_uni: u64,
119 
120     /// Local maximum bidirectional stream count limit.
121     local_max_streams_bidi: u64,
122     local_max_streams_bidi_next: u64,
123 
124     /// Local maximum unidirectional stream count limit.
125     local_max_streams_uni: u64,
126     local_max_streams_uni_next: u64,
127 
128     /// The total number of bidirectional streams opened by the local endpoint.
129     local_opened_streams_bidi: u64,
130 
131     /// The total number of unidirectional streams opened by the local endpoint.
132     local_opened_streams_uni: u64,
133 
134     /// Queue of stream IDs corresponding to streams that have buffered data
135     /// ready to be sent to the peer. This also implies that the stream has
136     /// enough flow control credits to send at least some of that data.
137     ///
138     /// Streams are grouped by their priority, where each urgency level has two
139     /// queues, one for non-incremental streams and one for incremental ones.
140     ///
141     /// Streams with lower urgency level are scheduled first, and within the
142     /// same urgency level Non-incremental streams are scheduled first, in the
143     /// order of their stream IDs, and incremental streams are scheduled in a
144     /// round-robin fashion after all non-incremental streams have been flushed.
145     flushable: BTreeMap<u8, (BinaryHeap<std::cmp::Reverse<u64>>, VecDeque<u64>)>,
146 
147     /// Set of stream IDs corresponding to streams that have outstanding data
148     /// to read. This is used to generate a `StreamIter` of streams without
149     /// having to iterate over the full list of streams.
150     pub readable: StreamIdHashSet,
151 
152     /// Set of stream IDs corresponding to streams that have enough flow control
153     /// capacity to be written to, and is not finished. This is used to generate
154     /// a `StreamIter` of streams without having to iterate over the full list
155     /// of streams.
156     pub writable: StreamIdHashSet,
157 
158     /// Set of stream IDs corresponding to streams that are almost out of flow
159     /// control credit and need to send MAX_STREAM_DATA. This is used to
160     /// generate a `StreamIter` of streams without having to iterate over the
161     /// full list of streams.
162     almost_full: StreamIdHashSet,
163 
164     /// Set of stream IDs corresponding to streams that are blocked. The value
165     /// of the map elements represents the offset of the stream at which the
166     /// blocking occurred.
167     blocked: StreamIdHashMap<u64>,
168 
169     /// Set of stream IDs corresponding to streams that are reset. The value
170     /// of the map elements is a tuple of the error code and final size values
171     /// to include in the RESET_STREAM frame.
172     reset: StreamIdHashMap<(u64, u64)>,
173 
174     /// Set of stream IDs corresponding to streams that are shutdown on the
175     /// receive side, and need to send a STOP_SENDING frame. The value of the
176     /// map elements is the error code to include in the STOP_SENDING frame.
177     stopped: StreamIdHashMap<u64>,
178 
179     /// The maximum size of a stream window.
180     max_stream_window: u64,
181 }
182 
183 impl StreamMap {
new( max_streams_bidi: u64, max_streams_uni: u64, max_stream_window: u64, ) -> StreamMap184     pub fn new(
185         max_streams_bidi: u64, max_streams_uni: u64, max_stream_window: u64,
186     ) -> StreamMap {
187         StreamMap {
188             local_max_streams_bidi: max_streams_bidi,
189             local_max_streams_bidi_next: max_streams_bidi,
190 
191             local_max_streams_uni: max_streams_uni,
192             local_max_streams_uni_next: max_streams_uni,
193 
194             max_stream_window,
195 
196             ..StreamMap::default()
197         }
198     }
199 
200     /// Returns the stream with the given ID if it exists.
get(&self, id: u64) -> Option<&Stream>201     pub fn get(&self, id: u64) -> Option<&Stream> {
202         self.streams.get(&id)
203     }
204 
205     /// Returns the mutable stream with the given ID if it exists.
get_mut(&mut self, id: u64) -> Option<&mut Stream>206     pub fn get_mut(&mut self, id: u64) -> Option<&mut Stream> {
207         self.streams.get_mut(&id)
208     }
209 
210     /// Returns the mutable stream with the given ID if it exists, or creates
211     /// a new one otherwise.
212     ///
213     /// The `local` parameter indicates whether the stream's creation was
214     /// requested by the local application rather than the peer, and is
215     /// used to validate the requested stream ID, and to select the initial
216     /// flow control values from the local and remote transport parameters
217     /// (also passed as arguments).
218     ///
219     /// This also takes care of enforcing both local and the peer's stream
220     /// count limits. If one of these limits is violated, the `StreamLimit`
221     /// error is returned.
get_or_create( &mut self, id: u64, local_params: &crate::TransportParams, peer_params: &crate::TransportParams, local: bool, is_server: bool, ) -> Result<&mut Stream>222     pub(crate) fn get_or_create(
223         &mut self, id: u64, local_params: &crate::TransportParams,
224         peer_params: &crate::TransportParams, local: bool, is_server: bool,
225     ) -> Result<&mut Stream> {
226         let (stream, is_new_and_writable) = match self.streams.entry(id) {
227             hash_map::Entry::Vacant(v) => {
228                 // Stream has already been closed and garbage collected.
229                 if self.collected.contains(&id) {
230                     return Err(Error::Done);
231                 }
232 
233                 if local != is_local(id, is_server) {
234                     return Err(Error::InvalidStreamState(id));
235                 }
236 
237                 let (max_rx_data, max_tx_data) = match (local, is_bidi(id)) {
238                     // Locally-initiated bidirectional stream.
239                     (true, true) => (
240                         local_params.initial_max_stream_data_bidi_local,
241                         peer_params.initial_max_stream_data_bidi_remote,
242                     ),
243 
244                     // Locally-initiated unidirectional stream.
245                     (true, false) => (0, peer_params.initial_max_stream_data_uni),
246 
247                     // Remotely-initiated bidirectional stream.
248                     (false, true) => (
249                         local_params.initial_max_stream_data_bidi_remote,
250                         peer_params.initial_max_stream_data_bidi_local,
251                     ),
252 
253                     // Remotely-initiated unidirectional stream.
254                     (false, false) =>
255                         (local_params.initial_max_stream_data_uni, 0),
256                 };
257 
258                 // The two least significant bits from a stream id identify the
259                 // type of stream. Truncate those bits to get the sequence for
260                 // that stream type.
261                 let stream_sequence = id >> 2;
262 
263                 // Enforce stream count limits.
264                 match (is_local(id, is_server), is_bidi(id)) {
265                     (true, true) => {
266                         let n = std::cmp::max(
267                             self.local_opened_streams_bidi,
268                             stream_sequence + 1,
269                         );
270 
271                         if n > self.peer_max_streams_bidi {
272                             return Err(Error::StreamLimit);
273                         }
274 
275                         self.local_opened_streams_bidi = n;
276                     },
277 
278                     (true, false) => {
279                         let n = std::cmp::max(
280                             self.local_opened_streams_uni,
281                             stream_sequence + 1,
282                         );
283 
284                         if n > self.peer_max_streams_uni {
285                             return Err(Error::StreamLimit);
286                         }
287 
288                         self.local_opened_streams_uni = n;
289                     },
290 
291                     (false, true) => {
292                         let n = std::cmp::max(
293                             self.peer_opened_streams_bidi,
294                             stream_sequence + 1,
295                         );
296 
297                         if n > self.local_max_streams_bidi {
298                             return Err(Error::StreamLimit);
299                         }
300 
301                         self.peer_opened_streams_bidi = n;
302                     },
303 
304                     (false, false) => {
305                         let n = std::cmp::max(
306                             self.peer_opened_streams_uni,
307                             stream_sequence + 1,
308                         );
309 
310                         if n > self.local_max_streams_uni {
311                             return Err(Error::StreamLimit);
312                         }
313 
314                         self.peer_opened_streams_uni = n;
315                     },
316                 };
317 
318                 let s = Stream::new(
319                     max_rx_data,
320                     max_tx_data,
321                     is_bidi(id),
322                     local,
323                     self.max_stream_window,
324                 );
325 
326                 let is_writable = s.is_writable();
327 
328                 (v.insert(s), is_writable)
329             },
330 
331             hash_map::Entry::Occupied(v) => (v.into_mut(), false),
332         };
333 
334         // Newly created stream might already be writable due to initial flow
335         // control limits.
336         if is_new_and_writable {
337             self.writable.insert(id);
338         }
339 
340         Ok(stream)
341     }
342 
343     /// Pushes the stream ID to the back of the flushable streams queue with
344     /// the specified urgency.
345     ///
346     /// Note that the caller is responsible for checking that the specified
347     /// stream ID was not in the queue already before calling this.
348     ///
349     /// Queueing a stream multiple times simultaneously means that it might be
350     /// unfairly scheduled more often than other streams, and might also cause
351     /// spurious cycles through the queue, so it should be avoided.
push_flushable(&mut self, stream_id: u64, urgency: u8, incr: bool)352     pub fn push_flushable(&mut self, stream_id: u64, urgency: u8, incr: bool) {
353         // Push the element to the back of the queue corresponding to the given
354         // urgency. If the queue doesn't exist yet, create it first.
355         let queues = self
356             .flushable
357             .entry(urgency)
358             .or_insert_with(|| (BinaryHeap::new(), VecDeque::new()));
359 
360         if !incr {
361             // Non-incremental streams are scheduled in order of their stream ID.
362             queues.0.push(std::cmp::Reverse(stream_id))
363         } else {
364             // Incremental streams are scheduled in a round-robin fashion.
365             queues.1.push_back(stream_id)
366         };
367     }
368 
369     /// Returns the first stream ID from the flushable streams
370     /// queue with the highest urgency.
371     ///
372     /// Note that if the stream is no longer flushable after sending some of its
373     /// outstanding data, it needs to be removed from the queue.
peek_flushable(&mut self) -> Option<u64>374     pub fn peek_flushable(&mut self) -> Option<u64> {
375         self.flushable.iter_mut().next().and_then(|(_, queues)| {
376             queues.0.peek().map(|x| x.0).or_else(|| {
377                 // When peeking incremental streams, make sure to move the current
378                 // stream to the end of the queue so they are pocesses in a round
379                 // robin fashion
380                 if let Some(current_incremental) = queues.1.pop_front() {
381                     queues.1.push_back(current_incremental);
382                     Some(current_incremental)
383                 } else {
384                     None
385                 }
386             })
387         })
388     }
389 
390     /// Remove the last peeked stream
remove_flushable(&mut self)391     pub fn remove_flushable(&mut self) {
392         let mut top_urgency = self
393             .flushable
394             .first_entry()
395             .expect("Remove previously peeked stream");
396 
397         let queues = top_urgency.get_mut();
398         queues.0.pop().map(|x| x.0).or_else(|| queues.1.pop_back());
399         // Remove the queue from the list of queues if it is now empty, so that
400         // the next time `pop_flushable()` is called the next queue with elements
401         // is used.
402         if queues.0.is_empty() && queues.1.is_empty() {
403             top_urgency.remove();
404         }
405     }
406 
407     /// Adds or removes the stream ID to/from the readable streams set.
408     ///
409     /// If the stream was already in the list, this does nothing.
mark_readable(&mut self, stream_id: u64, readable: bool)410     pub fn mark_readable(&mut self, stream_id: u64, readable: bool) {
411         if readable {
412             self.readable.insert(stream_id);
413         } else {
414             self.readable.remove(&stream_id);
415         }
416     }
417 
418     /// Adds or removes the stream ID to/from the writable streams set.
419     ///
420     /// This should also be called anytime a new stream is created, in addition
421     /// to when an existing stream becomes writable (or stops being writable).
422     ///
423     /// If the stream was already in the list, this does nothing.
mark_writable(&mut self, stream_id: u64, writable: bool)424     pub fn mark_writable(&mut self, stream_id: u64, writable: bool) {
425         if writable {
426             self.writable.insert(stream_id);
427         } else {
428             self.writable.remove(&stream_id);
429         }
430     }
431 
432     /// Adds or removes the stream ID to/from the almost full streams set.
433     ///
434     /// If the stream was already in the list, this does nothing.
mark_almost_full(&mut self, stream_id: u64, almost_full: bool)435     pub fn mark_almost_full(&mut self, stream_id: u64, almost_full: bool) {
436         if almost_full {
437             self.almost_full.insert(stream_id);
438         } else {
439             self.almost_full.remove(&stream_id);
440         }
441     }
442 
443     /// Adds or removes the stream ID to/from the blocked streams set with the
444     /// given offset value.
445     ///
446     /// If the stream was already in the list, this does nothing.
mark_blocked(&mut self, stream_id: u64, blocked: bool, off: u64)447     pub fn mark_blocked(&mut self, stream_id: u64, blocked: bool, off: u64) {
448         if blocked {
449             self.blocked.insert(stream_id, off);
450         } else {
451             self.blocked.remove(&stream_id);
452         }
453     }
454 
455     /// Adds or removes the stream ID to/from the reset streams set with the
456     /// given error code and final size values.
457     ///
458     /// If the stream was already in the list, this does nothing.
mark_reset( &mut self, stream_id: u64, reset: bool, error_code: u64, final_size: u64, )459     pub fn mark_reset(
460         &mut self, stream_id: u64, reset: bool, error_code: u64, final_size: u64,
461     ) {
462         if reset {
463             self.reset.insert(stream_id, (error_code, final_size));
464         } else {
465             self.reset.remove(&stream_id);
466         }
467     }
468 
469     /// Adds or removes the stream ID to/from the stopped streams set with the
470     /// given error code.
471     ///
472     /// If the stream was already in the list, this does nothing.
mark_stopped( &mut self, stream_id: u64, stopped: bool, error_code: u64, )473     pub fn mark_stopped(
474         &mut self, stream_id: u64, stopped: bool, error_code: u64,
475     ) {
476         if stopped {
477             self.stopped.insert(stream_id, error_code);
478         } else {
479             self.stopped.remove(&stream_id);
480         }
481     }
482 
483     /// Updates the peer's maximum bidirectional stream count limit.
update_peer_max_streams_bidi(&mut self, v: u64)484     pub fn update_peer_max_streams_bidi(&mut self, v: u64) {
485         self.peer_max_streams_bidi = cmp::max(self.peer_max_streams_bidi, v);
486     }
487 
488     /// Updates the peer's maximum unidirectional stream count limit.
update_peer_max_streams_uni(&mut self, v: u64)489     pub fn update_peer_max_streams_uni(&mut self, v: u64) {
490         self.peer_max_streams_uni = cmp::max(self.peer_max_streams_uni, v);
491     }
492 
493     /// Commits the new max_streams_bidi limit.
update_max_streams_bidi(&mut self)494     pub fn update_max_streams_bidi(&mut self) {
495         self.local_max_streams_bidi = self.local_max_streams_bidi_next;
496     }
497 
498     /// Returns the current max_streams_bidi limit.
max_streams_bidi(&self) -> u64499     pub fn max_streams_bidi(&self) -> u64 {
500         self.local_max_streams_bidi
501     }
502 
503     /// Returns the new max_streams_bidi limit.
max_streams_bidi_next(&mut self) -> u64504     pub fn max_streams_bidi_next(&mut self) -> u64 {
505         self.local_max_streams_bidi_next
506     }
507 
508     /// Commits the new max_streams_uni limit.
update_max_streams_uni(&mut self)509     pub fn update_max_streams_uni(&mut self) {
510         self.local_max_streams_uni = self.local_max_streams_uni_next;
511     }
512 
513     /// Returns the new max_streams_uni limit.
max_streams_uni_next(&mut self) -> u64514     pub fn max_streams_uni_next(&mut self) -> u64 {
515         self.local_max_streams_uni_next
516     }
517 
518     /// Returns the number of bidirectional streams that can be created
519     /// before the peer's stream count limit is reached.
peer_streams_left_bidi(&self) -> u64520     pub fn peer_streams_left_bidi(&self) -> u64 {
521         self.peer_max_streams_bidi - self.local_opened_streams_bidi
522     }
523 
524     /// Returns the number of unidirectional streams that can be created
525     /// before the peer's stream count limit is reached.
peer_streams_left_uni(&self) -> u64526     pub fn peer_streams_left_uni(&self) -> u64 {
527         self.peer_max_streams_uni - self.local_opened_streams_uni
528     }
529 
530     /// Drops completed stream.
531     ///
532     /// This should only be called when Stream::is_complete() returns true for
533     /// the given stream.
collect(&mut self, stream_id: u64, local: bool)534     pub fn collect(&mut self, stream_id: u64, local: bool) {
535         if !local {
536             // If the stream was created by the peer, give back a max streams
537             // credit.
538             if is_bidi(stream_id) {
539                 self.local_max_streams_bidi_next =
540                     self.local_max_streams_bidi_next.saturating_add(1);
541             } else {
542                 self.local_max_streams_uni_next =
543                     self.local_max_streams_uni_next.saturating_add(1);
544             }
545         }
546 
547         self.mark_readable(stream_id, false);
548         self.mark_writable(stream_id, false);
549 
550         self.streams.remove(&stream_id);
551         self.collected.insert(stream_id);
552     }
553 
554     /// Creates an iterator over streams that have outstanding data to read.
readable(&self) -> StreamIter555     pub fn readable(&self) -> StreamIter {
556         StreamIter::from(&self.readable)
557     }
558 
559     /// Creates an iterator over streams that can be written to.
writable(&self) -> StreamIter560     pub fn writable(&self) -> StreamIter {
561         StreamIter::from(&self.writable)
562     }
563 
564     /// Creates an iterator over streams that need to send MAX_STREAM_DATA.
almost_full(&self) -> StreamIter565     pub fn almost_full(&self) -> StreamIter {
566         StreamIter::from(&self.almost_full)
567     }
568 
569     /// Creates an iterator over streams that need to send STREAM_DATA_BLOCKED.
blocked(&self) -> hash_map::Iter<u64, u64>570     pub fn blocked(&self) -> hash_map::Iter<u64, u64> {
571         self.blocked.iter()
572     }
573 
574     /// Creates an iterator over streams that need to send RESET_STREAM.
reset(&self) -> hash_map::Iter<u64, (u64, u64)>575     pub fn reset(&self) -> hash_map::Iter<u64, (u64, u64)> {
576         self.reset.iter()
577     }
578 
579     /// Creates an iterator over streams that need to send STOP_SENDING.
stopped(&self) -> hash_map::Iter<u64, u64>580     pub fn stopped(&self) -> hash_map::Iter<u64, u64> {
581         self.stopped.iter()
582     }
583 
584     /// Returns true if the stream has been collected.
is_collected(&self, stream_id: u64) -> bool585     pub fn is_collected(&self, stream_id: u64) -> bool {
586         self.collected.contains(&stream_id)
587     }
588 
589     /// Returns true if there are any streams that have data to write.
has_flushable(&self) -> bool590     pub fn has_flushable(&self) -> bool {
591         !self.flushable.is_empty()
592     }
593 
594     /// Returns true if there are any streams that have data to read.
has_readable(&self) -> bool595     pub fn has_readable(&self) -> bool {
596         !self.readable.is_empty()
597     }
598 
599     /// Returns true if there are any streams that need to update the local
600     /// flow control limit.
has_almost_full(&self) -> bool601     pub fn has_almost_full(&self) -> bool {
602         !self.almost_full.is_empty()
603     }
604 
605     /// Returns true if there are any streams that are blocked.
has_blocked(&self) -> bool606     pub fn has_blocked(&self) -> bool {
607         !self.blocked.is_empty()
608     }
609 
610     /// Returns true if there are any streams that are reset.
has_reset(&self) -> bool611     pub fn has_reset(&self) -> bool {
612         !self.reset.is_empty()
613     }
614 
615     /// Returns true if there are any streams that need to send STOP_SENDING.
has_stopped(&self) -> bool616     pub fn has_stopped(&self) -> bool {
617         !self.stopped.is_empty()
618     }
619 
620     /// Returns true if the max bidirectional streams count needs to be updated
621     /// by sending a MAX_STREAMS frame to the peer.
should_update_max_streams_bidi(&self) -> bool622     pub fn should_update_max_streams_bidi(&self) -> bool {
623         self.local_max_streams_bidi_next != self.local_max_streams_bidi &&
624             self.local_max_streams_bidi_next / 2 >
625                 self.local_max_streams_bidi - self.peer_opened_streams_bidi
626     }
627 
628     /// Returns true if the max unidirectional streams count needs to be updated
629     /// by sending a MAX_STREAMS frame to the peer.
should_update_max_streams_uni(&self) -> bool630     pub fn should_update_max_streams_uni(&self) -> bool {
631         self.local_max_streams_uni_next != self.local_max_streams_uni &&
632             self.local_max_streams_uni_next / 2 >
633                 self.local_max_streams_uni - self.peer_opened_streams_uni
634     }
635 
636     /// Returns the number of active streams in the map.
637     #[cfg(test)]
len(&self) -> usize638     pub fn len(&self) -> usize {
639         self.streams.len()
640     }
641 }
642 
643 /// A QUIC stream.
644 #[derive(Default)]
645 pub struct Stream {
646     /// Receive-side stream buffer.
647     pub recv: RecvBuf,
648 
649     /// Send-side stream buffer.
650     pub send: SendBuf,
651 
652     pub send_lowat: usize,
653 
654     /// Whether the stream is bidirectional.
655     pub bidi: bool,
656 
657     /// Whether the stream was created by the local endpoint.
658     pub local: bool,
659 
660     /// Application data.
661     pub data: Option<Box<dyn std::any::Any + Send + Sync>>,
662 
663     /// The stream's urgency (lower is better). Default is `DEFAULT_URGENCY`.
664     pub urgency: u8,
665 
666     /// Whether the stream can be flushed incrementally. Default is `true`.
667     pub incremental: bool,
668 }
669 
670 impl Stream {
671     /// Creates a new stream with the given flow control limits.
new( max_rx_data: u64, max_tx_data: u64, bidi: bool, local: bool, max_window: u64, ) -> Stream672     pub fn new(
673         max_rx_data: u64, max_tx_data: u64, bidi: bool, local: bool,
674         max_window: u64,
675     ) -> Stream {
676         Stream {
677             recv: RecvBuf::new(max_rx_data, max_window),
678             send: SendBuf::new(max_tx_data),
679             send_lowat: 1,
680             bidi,
681             local,
682             data: None,
683             urgency: DEFAULT_URGENCY,
684             incremental: true,
685         }
686     }
687 
688     /// Returns true if the stream has data to read.
is_readable(&self) -> bool689     pub fn is_readable(&self) -> bool {
690         self.recv.ready()
691     }
692 
693     /// Returns true if the stream has enough flow control capacity to be
694     /// written to, and is not finished.
is_writable(&self) -> bool695     pub fn is_writable(&self) -> bool {
696         !self.send.shutdown &&
697             !self.send.is_fin() &&
698             (self.send.off + self.send_lowat as u64) < self.send.max_data
699     }
700 
701     /// Returns true if the stream has data to send and is allowed to send at
702     /// least some of it.
is_flushable(&self) -> bool703     pub fn is_flushable(&self) -> bool {
704         self.send.ready() && self.send.off_front() < self.send.max_data
705     }
706 
707     /// Returns true if the stream is complete.
708     ///
709     /// For bidirectional streams this happens when both the receive and send
710     /// sides are complete. That is when all incoming data has been read by the
711     /// application, and when all outgoing data has been acked by the peer.
712     ///
713     /// For unidirectional streams this happens when either the receive or send
714     /// side is complete, depending on whether the stream was created locally
715     /// or not.
is_complete(&self) -> bool716     pub fn is_complete(&self) -> bool {
717         match (self.bidi, self.local) {
718             // For bidirectional streams we need to check both receive and send
719             // sides for completion.
720             (true, _) => self.recv.is_fin() && self.send.is_complete(),
721 
722             // For unidirectional streams generated locally, we only need to
723             // check the send side for completion.
724             (false, true) => self.send.is_complete(),
725 
726             // For unidirectional streams generated by the peer, we only need
727             // to check the receive side for completion.
728             (false, false) => self.recv.is_fin(),
729         }
730     }
731 
732     /// Returns true if the stream is not storing incoming data.
is_draining(&self) -> bool733     pub fn is_draining(&self) -> bool {
734         self.recv.drain
735     }
736 }
737 
738 /// Returns true if the stream was created locally.
is_local(stream_id: u64, is_server: bool) -> bool739 pub fn is_local(stream_id: u64, is_server: bool) -> bool {
740     (stream_id & 0x1) == (is_server as u64)
741 }
742 
743 /// Returns true if the stream is bidirectional.
is_bidi(stream_id: u64) -> bool744 pub fn is_bidi(stream_id: u64) -> bool {
745     (stream_id & 0x2) == 0
746 }
747 
748 /// An iterator over QUIC streams.
749 #[derive(Default)]
750 pub struct StreamIter {
751     streams: SmallVec<[u64; 8]>,
752 }
753 
754 impl StreamIter {
755     #[inline]
from(streams: &StreamIdHashSet) -> Self756     fn from(streams: &StreamIdHashSet) -> Self {
757         StreamIter {
758             streams: streams.iter().copied().collect(),
759         }
760     }
761 }
762 
763 impl Iterator for StreamIter {
764     type Item = u64;
765 
766     #[inline]
next(&mut self) -> Option<Self::Item>767     fn next(&mut self) -> Option<Self::Item> {
768         self.streams.pop()
769     }
770 }
771 
772 impl ExactSizeIterator for StreamIter {
773     #[inline]
len(&self) -> usize774     fn len(&self) -> usize {
775         self.streams.len()
776     }
777 }
778 
779 /// Receive-side stream buffer.
780 ///
781 /// Stream data received by the peer is buffered in a list of data chunks
782 /// ordered by offset in ascending order. Contiguous data can then be read
783 /// into a slice.
784 #[derive(Debug, Default)]
785 pub struct RecvBuf {
786     /// Chunks of data received from the peer that have not yet been read by
787     /// the application, ordered by offset.
788     data: BTreeMap<u64, RangeBuf>,
789 
790     /// The lowest data offset that has yet to be read by the application.
791     off: u64,
792 
793     /// The total length of data received on this stream.
794     len: u64,
795 
796     /// Receiver flow controller.
797     flow_control: flowcontrol::FlowControl,
798 
799     /// The final stream offset received from the peer, if any.
800     fin_off: Option<u64>,
801 
802     /// The error code received via RESET_STREAM.
803     error: Option<u64>,
804 
805     /// Whether incoming data is validated but not buffered.
806     drain: bool,
807 }
808 
809 impl RecvBuf {
810     /// Creates a new receive buffer.
new(max_data: u64, max_window: u64) -> RecvBuf811     fn new(max_data: u64, max_window: u64) -> RecvBuf {
812         RecvBuf {
813             flow_control: flowcontrol::FlowControl::new(
814                 max_data,
815                 cmp::min(max_data, DEFAULT_STREAM_WINDOW),
816                 max_window,
817             ),
818             ..RecvBuf::default()
819         }
820     }
821 
822     /// Inserts the given chunk of data in the buffer.
823     ///
824     /// This also takes care of enforcing stream flow control limits, as well
825     /// as handling incoming data that overlaps data that is already in the
826     /// buffer.
write(&mut self, buf: RangeBuf) -> Result<()>827     pub fn write(&mut self, buf: RangeBuf) -> Result<()> {
828         if buf.max_off() > self.max_data() {
829             return Err(Error::FlowControl);
830         }
831 
832         if let Some(fin_off) = self.fin_off {
833             // Stream's size is known, forbid data beyond that point.
834             if buf.max_off() > fin_off {
835                 return Err(Error::FinalSize);
836             }
837 
838             // Stream's size is already known, forbid changing it.
839             if buf.fin() && fin_off != buf.max_off() {
840                 return Err(Error::FinalSize);
841             }
842         }
843 
844         // Stream's known size is lower than data already received.
845         if buf.fin() && buf.max_off() < self.len {
846             return Err(Error::FinalSize);
847         }
848 
849         // We already saved the final offset, so there's nothing else we
850         // need to keep from the RangeBuf if it's empty.
851         if self.fin_off.is_some() && buf.is_empty() {
852             return Ok(());
853         }
854 
855         if buf.fin() {
856             self.fin_off = Some(buf.max_off());
857         }
858 
859         // No need to store empty buffer that doesn't carry the fin flag.
860         if !buf.fin() && buf.is_empty() {
861             return Ok(());
862         }
863 
864         // Check if data is fully duplicate, that is the buffer's max offset is
865         // lower or equal to the offset already stored in the recv buffer.
866         if self.off >= buf.max_off() {
867             // An exception is applied to empty range buffers, because an empty
868             // buffer's max offset matches the max offset of the recv buffer.
869             //
870             // By this point all spurious empty buffers should have already been
871             // discarded, so allowing empty buffers here should be safe.
872             if !buf.is_empty() {
873                 return Ok(());
874             }
875         }
876 
877         let mut tmp_bufs = VecDeque::with_capacity(2);
878         tmp_bufs.push_back(buf);
879 
880         'tmp: while let Some(mut buf) = tmp_bufs.pop_front() {
881             // Discard incoming data below current stream offset. Bytes up to
882             // `self.off` have already been received so we should not buffer
883             // them again. This is also important to make sure `ready()` doesn't
884             // get stuck when a buffer with lower offset than the stream's is
885             // buffered.
886             if self.off_front() > buf.off() {
887                 buf = buf.split_off((self.off_front() - buf.off()) as usize);
888             }
889 
890             // Handle overlapping data. If the incoming data's starting offset
891             // is above the previous maximum received offset, there is clearly
892             // no overlap so this logic can be skipped. However do still try to
893             // merge an empty final buffer (i.e. an empty buffer with the fin
894             // flag set, which is the only kind of empty buffer that should
895             // reach this point).
896             if buf.off() < self.max_off() || buf.is_empty() {
897                 for (_, b) in self.data.range(buf.off()..) {
898                     let off = buf.off();
899 
900                     // We are past the current buffer.
901                     if b.off() > buf.max_off() {
902                         break;
903                     }
904 
905                     // New buffer is fully contained in existing buffer.
906                     if off >= b.off() && buf.max_off() <= b.max_off() {
907                         continue 'tmp;
908                     }
909 
910                     // New buffer's start overlaps existing buffer.
911                     if off >= b.off() && off < b.max_off() {
912                         buf = buf.split_off((b.max_off() - off) as usize);
913                     }
914 
915                     // New buffer's end overlaps existing buffer.
916                     if off < b.off() && buf.max_off() > b.off() {
917                         tmp_bufs
918                             .push_back(buf.split_off((b.off() - off) as usize));
919                     }
920                 }
921             }
922 
923             self.len = cmp::max(self.len, buf.max_off());
924 
925             if !self.drain {
926                 self.data.insert(buf.max_off(), buf);
927             }
928         }
929 
930         Ok(())
931     }
932 
933     /// Writes data from the receive buffer into the given output buffer.
934     ///
935     /// Only contiguous data is written to the output buffer, starting from
936     /// offset 0. The offset is incremented as data is read out of the receive
937     /// buffer into the application buffer. If there is no data at the expected
938     /// read offset, the `Done` error is returned.
939     ///
940     /// On success the amount of data read, and a flag indicating if there is
941     /// no more data in the buffer, are returned as a tuple.
emit(&mut self, out: &mut [u8]) -> Result<(usize, bool)>942     pub fn emit(&mut self, out: &mut [u8]) -> Result<(usize, bool)> {
943         let mut len = 0;
944         let mut cap = out.len();
945 
946         if !self.ready() {
947             return Err(Error::Done);
948         }
949 
950         // The stream was reset, so return the error code instead.
951         if let Some(e) = self.error {
952             return Err(Error::StreamReset(e));
953         }
954 
955         while cap > 0 && self.ready() {
956             let mut entry = match self.data.first_entry() {
957                 Some(entry) => entry,
958                 None => break,
959             };
960 
961             let buf = entry.get_mut();
962 
963             let buf_len = cmp::min(buf.len(), cap);
964 
965             out[len..len + buf_len].copy_from_slice(&buf[..buf_len]);
966 
967             self.off += buf_len as u64;
968 
969             len += buf_len;
970             cap -= buf_len;
971 
972             if buf_len < buf.len() {
973                 buf.consume(buf_len);
974 
975                 // We reached the maximum capacity, so end here.
976                 break;
977             }
978 
979             entry.remove();
980         }
981 
982         // Update consumed bytes for flow control.
983         self.flow_control.add_consumed(len as u64);
984 
985         Ok((len, self.is_fin()))
986     }
987 
988     /// Resets the stream at the given offset.
reset(&mut self, error_code: u64, final_size: u64) -> Result<usize>989     pub fn reset(&mut self, error_code: u64, final_size: u64) -> Result<usize> {
990         // Stream's size is already known, forbid changing it.
991         if let Some(fin_off) = self.fin_off {
992             if fin_off != final_size {
993                 return Err(Error::FinalSize);
994             }
995         }
996 
997         // Stream's known size is lower than data already received.
998         if final_size < self.len {
999             return Err(Error::FinalSize);
1000         }
1001 
1002         // Calculate how many bytes need to be removed from the connection flow
1003         // control.
1004         let max_data_delta = final_size - self.len;
1005 
1006         if self.error.is_some() {
1007             return Ok(max_data_delta as usize);
1008         }
1009 
1010         self.error = Some(error_code);
1011 
1012         // Clear all data already buffered.
1013         self.off = final_size;
1014 
1015         self.data.clear();
1016 
1017         // In order to ensure the application is notified when the stream is
1018         // reset, enqueue a zero-length buffer at the final size offset.
1019         let buf = RangeBuf::from(b"", final_size, true);
1020         self.write(buf)?;
1021 
1022         Ok(max_data_delta as usize)
1023     }
1024 
1025     /// Commits the new max_data limit.
update_max_data(&mut self, now: time::Instant)1026     pub fn update_max_data(&mut self, now: time::Instant) {
1027         self.flow_control.update_max_data(now);
1028     }
1029 
1030     /// Return the new max_data limit.
max_data_next(&mut self) -> u641031     pub fn max_data_next(&mut self) -> u64 {
1032         self.flow_control.max_data_next()
1033     }
1034 
1035     /// Return the current flow control limit.
max_data(&self) -> u641036     fn max_data(&self) -> u64 {
1037         self.flow_control.max_data()
1038     }
1039 
1040     /// Return the current window.
window(&self) -> u641041     pub fn window(&self) -> u64 {
1042         self.flow_control.window()
1043     }
1044 
1045     /// Autotune the window size.
autotune_window(&mut self, now: time::Instant, rtt: time::Duration)1046     pub fn autotune_window(&mut self, now: time::Instant, rtt: time::Duration) {
1047         self.flow_control.autotune_window(now, rtt);
1048     }
1049 
1050     /// Shuts down receiving data.
shutdown(&mut self) -> Result<()>1051     pub fn shutdown(&mut self) -> Result<()> {
1052         if self.drain {
1053             return Err(Error::Done);
1054         }
1055 
1056         self.drain = true;
1057 
1058         self.data.clear();
1059 
1060         self.off = self.max_off();
1061 
1062         Ok(())
1063     }
1064 
1065     /// Returns the lowest offset of data buffered.
off_front(&self) -> u641066     pub fn off_front(&self) -> u64 {
1067         self.off
1068     }
1069 
1070     /// Returns true if we need to update the local flow control limit.
almost_full(&self) -> bool1071     pub fn almost_full(&self) -> bool {
1072         self.fin_off.is_none() && self.flow_control.should_update_max_data()
1073     }
1074 
1075     /// Returns the largest offset ever received.
max_off(&self) -> u641076     pub fn max_off(&self) -> u64 {
1077         self.len
1078     }
1079 
1080     /// Returns true if the receive-side of the stream is complete.
1081     ///
1082     /// This happens when the stream's receive final size is known, and the
1083     /// application has read all data from the stream.
is_fin(&self) -> bool1084     pub fn is_fin(&self) -> bool {
1085         if self.fin_off == Some(self.off) {
1086             return true;
1087         }
1088 
1089         false
1090     }
1091 
1092     /// Returns true if the stream has data to be read.
ready(&self) -> bool1093     fn ready(&self) -> bool {
1094         let (_, buf) = match self.data.first_key_value() {
1095             Some(v) => v,
1096             None => return false,
1097         };
1098 
1099         buf.off() == self.off
1100     }
1101 }
1102 
1103 /// Send-side stream buffer.
1104 ///
1105 /// Stream data scheduled to be sent to the peer is buffered in a list of data
1106 /// chunks ordered by offset in ascending order. Contiguous data can then be
1107 /// read into a slice.
1108 ///
1109 /// By default, new data is appended at the end of the stream, but data can be
1110 /// inserted at the start of the buffer (this is to allow data that needs to be
1111 /// retransmitted to be re-buffered).
1112 #[derive(Debug, Default)]
1113 pub struct SendBuf {
1114     /// Chunks of data to be sent, ordered by offset.
1115     data: VecDeque<RangeBuf>,
1116 
1117     /// The index of the buffer that needs to be sent next.
1118     pos: usize,
1119 
1120     /// The maximum offset of data buffered in the stream.
1121     off: u64,
1122 
1123     /// The maximum offset of data sent to the peer, regardless of
1124     /// retransmissions.
1125     emit_off: u64,
1126 
1127     /// The amount of data currently buffered.
1128     len: u64,
1129 
1130     /// The maximum offset we are allowed to send to the peer.
1131     max_data: u64,
1132 
1133     /// The last offset the stream was blocked at, if any.
1134     blocked_at: Option<u64>,
1135 
1136     /// The final stream offset written to the stream, if any.
1137     fin_off: Option<u64>,
1138 
1139     /// Whether the stream's send-side has been shut down.
1140     shutdown: bool,
1141 
1142     /// Ranges of data offsets that have been acked.
1143     acked: ranges::RangeSet,
1144 
1145     /// The error code received via STOP_SENDING.
1146     error: Option<u64>,
1147 }
1148 
1149 impl SendBuf {
1150     /// Creates a new send buffer.
new(max_data: u64) -> SendBuf1151     fn new(max_data: u64) -> SendBuf {
1152         SendBuf {
1153             max_data,
1154             ..SendBuf::default()
1155         }
1156     }
1157 
1158     /// Inserts the given slice of data at the end of the buffer.
1159     ///
1160     /// The number of bytes that were actually stored in the buffer is returned
1161     /// (this may be lower than the size of the input buffer, in case of partial
1162     /// writes).
write(&mut self, mut data: &[u8], mut fin: bool) -> Result<usize>1163     pub fn write(&mut self, mut data: &[u8], mut fin: bool) -> Result<usize> {
1164         let max_off = self.off + data.len() as u64;
1165 
1166         // Get the stream send capacity. This will return an error if the stream
1167         // was stopped.
1168         let capacity = self.cap()?;
1169 
1170         if data.len() > capacity {
1171             // Truncate the input buffer according to the stream's capacity.
1172             let len = capacity;
1173             data = &data[..len];
1174 
1175             // We are not buffering the full input, so clear the fin flag.
1176             fin = false;
1177         }
1178 
1179         if let Some(fin_off) = self.fin_off {
1180             // Can't write past final offset.
1181             if max_off > fin_off {
1182                 return Err(Error::FinalSize);
1183             }
1184 
1185             // Can't "undo" final offset.
1186             if max_off == fin_off && !fin {
1187                 return Err(Error::FinalSize);
1188             }
1189         }
1190 
1191         if fin {
1192             self.fin_off = Some(max_off);
1193         }
1194 
1195         // Don't queue data that was already fully acked.
1196         if self.ack_off() >= max_off {
1197             return Ok(data.len());
1198         }
1199 
1200         // We already recorded the final offset, so we can just discard the
1201         // empty buffer now.
1202         if data.is_empty() {
1203             return Ok(data.len());
1204         }
1205 
1206         let mut len = 0;
1207 
1208         // Split the remaining input data into consistently-sized buffers to
1209         // avoid fragmentation.
1210         for chunk in data.chunks(SEND_BUFFER_SIZE) {
1211             len += chunk.len();
1212 
1213             let fin = len == data.len() && fin;
1214 
1215             let buf = RangeBuf::from(chunk, self.off, fin);
1216 
1217             // The new data can simply be appended at the end of the send buffer.
1218             self.data.push_back(buf);
1219 
1220             self.off += chunk.len() as u64;
1221             self.len += chunk.len() as u64;
1222         }
1223 
1224         Ok(len)
1225     }
1226 
1227     /// Writes data from the send buffer into the given output buffer.
emit(&mut self, out: &mut [u8]) -> Result<(usize, bool)>1228     pub fn emit(&mut self, out: &mut [u8]) -> Result<(usize, bool)> {
1229         let mut out_len = out.len();
1230         let out_off = self.off_front();
1231 
1232         let mut next_off = out_off;
1233 
1234         while out_len > 0 &&
1235             self.ready() &&
1236             self.off_front() == next_off &&
1237             self.off_front() < self.max_data
1238         {
1239             let buf = match self.data.get_mut(self.pos) {
1240                 Some(v) => v,
1241 
1242                 None => break,
1243             };
1244 
1245             if buf.is_empty() {
1246                 self.pos += 1;
1247                 continue;
1248             }
1249 
1250             let buf_len = cmp::min(buf.len(), out_len);
1251             let partial = buf_len < buf.len();
1252 
1253             // Copy data to the output buffer.
1254             let out_pos = (next_off - out_off) as usize;
1255             out[out_pos..out_pos + buf_len].copy_from_slice(&buf[..buf_len]);
1256 
1257             self.len -= buf_len as u64;
1258 
1259             out_len -= buf_len;
1260 
1261             next_off = buf.off() + buf_len as u64;
1262 
1263             buf.consume(buf_len);
1264 
1265             if partial {
1266                 // We reached the maximum capacity, so end here.
1267                 break;
1268             }
1269 
1270             self.pos += 1;
1271         }
1272 
1273         // Override the `fin` flag set for the output buffer by matching the
1274         // buffer's maximum offset against the stream's final offset (if known).
1275         //
1276         // This is more efficient than tracking `fin` using the range buffers
1277         // themselves, and lets us avoid queueing empty buffers just so we can
1278         // propagate the final size.
1279         let fin = self.fin_off == Some(next_off);
1280 
1281         // Record the largest offset that has been sent so we can accurately
1282         // report final_size
1283         self.emit_off = cmp::max(self.emit_off, next_off);
1284 
1285         Ok((out.len() - out_len, fin))
1286     }
1287 
1288     /// Updates the max_data limit to the given value.
update_max_data(&mut self, max_data: u64)1289     pub fn update_max_data(&mut self, max_data: u64) {
1290         self.max_data = cmp::max(self.max_data, max_data);
1291     }
1292 
1293     /// Updates the last offset the stream was blocked at, if any.
update_blocked_at(&mut self, blocked_at: Option<u64>)1294     pub fn update_blocked_at(&mut self, blocked_at: Option<u64>) {
1295         self.blocked_at = blocked_at;
1296     }
1297 
1298     /// The last offset the stream was blocked at, if any.
blocked_at(&self) -> Option<u64>1299     pub fn blocked_at(&self) -> Option<u64> {
1300         self.blocked_at
1301     }
1302 
1303     /// Increments the acked data offset.
ack(&mut self, off: u64, len: usize)1304     pub fn ack(&mut self, off: u64, len: usize) {
1305         self.acked.insert(off..off + len as u64);
1306     }
1307 
ack_and_drop(&mut self, off: u64, len: usize)1308     pub fn ack_and_drop(&mut self, off: u64, len: usize) {
1309         self.ack(off, len);
1310 
1311         let ack_off = self.ack_off();
1312 
1313         if self.data.is_empty() {
1314             return;
1315         }
1316 
1317         if off > ack_off {
1318             return;
1319         }
1320 
1321         let mut drop_until = None;
1322 
1323         // Drop contiguously acked data from the front of the buffer.
1324         for (i, buf) in self.data.iter_mut().enumerate() {
1325             // Newly acked range is past highest contiguous acked range, so we
1326             // can't drop it.
1327             if buf.off >= ack_off {
1328                 break;
1329             }
1330 
1331             // Highest contiguous acked range falls within newly acked range,
1332             // so we can't drop it.
1333             if buf.off < ack_off && ack_off < buf.max_off() {
1334                 break;
1335             }
1336 
1337             // Newly acked range can be dropped.
1338             drop_until = Some(i);
1339         }
1340 
1341         if let Some(drop) = drop_until {
1342             self.data.drain(..=drop);
1343 
1344             // When a buffer is marked for retransmission, but then acked before
1345             // it could be retransmitted, we might end up decreasing the SendBuf
1346             // position too much, so make sure that doesn't happen.
1347             self.pos = self.pos.saturating_sub(drop + 1);
1348         }
1349     }
1350 
retransmit(&mut self, off: u64, len: usize)1351     pub fn retransmit(&mut self, off: u64, len: usize) {
1352         let max_off = off + len as u64;
1353         let ack_off = self.ack_off();
1354 
1355         if self.data.is_empty() {
1356             return;
1357         }
1358 
1359         if max_off <= ack_off {
1360             return;
1361         }
1362 
1363         for i in 0..self.data.len() {
1364             let buf = &mut self.data[i];
1365 
1366             if buf.off >= max_off {
1367                 break;
1368             }
1369 
1370             if off > buf.max_off() {
1371                 continue;
1372             }
1373 
1374             // Split the buffer into 2 if the retransmit range ends before the
1375             // buffer's final offset.
1376             let new_buf = if buf.off < max_off && max_off < buf.max_off() {
1377                 Some(buf.split_off((max_off - buf.off) as usize))
1378             } else {
1379                 None
1380             };
1381 
1382             let prev_pos = buf.pos;
1383 
1384             // Reduce the buffer's position (expand the buffer) if the retransmit
1385             // range is past the buffer's starting offset.
1386             buf.pos = if off > buf.off && off <= buf.max_off() {
1387                 cmp::min(buf.pos, buf.start + (off - buf.off) as usize)
1388             } else {
1389                 buf.start
1390             };
1391 
1392             self.pos = cmp::min(self.pos, i);
1393 
1394             self.len += (prev_pos - buf.pos) as u64;
1395 
1396             if let Some(b) = new_buf {
1397                 self.data.insert(i + 1, b);
1398             }
1399         }
1400     }
1401 
1402     /// Resets the stream at the current offset and clears all buffered data.
reset(&mut self) -> (u64, u64)1403     pub fn reset(&mut self) -> (u64, u64) {
1404         let unsent_off = cmp::max(self.off_front(), self.emit_off);
1405         let unsent_len = self.off_back().saturating_sub(unsent_off);
1406 
1407         self.fin_off = Some(unsent_off);
1408 
1409         // Drop all buffered data.
1410         self.data.clear();
1411 
1412         // Mark all data as acked.
1413         self.ack(0, self.off as usize);
1414 
1415         self.pos = 0;
1416         self.len = 0;
1417         self.off = unsent_off;
1418 
1419         (self.emit_off, unsent_len)
1420     }
1421 
1422     /// Resets the streams and records the received error code.
1423     ///
1424     /// Calling this again after the first time has no effect.
stop(&mut self, error_code: u64) -> Result<(u64, u64)>1425     pub fn stop(&mut self, error_code: u64) -> Result<(u64, u64)> {
1426         if self.error.is_some() {
1427             return Err(Error::Done);
1428         }
1429 
1430         let (max_off, unsent) = self.reset();
1431 
1432         self.error = Some(error_code);
1433 
1434         Ok((max_off, unsent))
1435     }
1436 
1437     /// Shuts down sending data.
shutdown(&mut self) -> Result<(u64, u64)>1438     pub fn shutdown(&mut self) -> Result<(u64, u64)> {
1439         if self.shutdown {
1440             return Err(Error::Done);
1441         }
1442 
1443         self.shutdown = true;
1444 
1445         Ok(self.reset())
1446     }
1447 
1448     /// Returns the largest offset of data buffered.
off_back(&self) -> u641449     pub fn off_back(&self) -> u64 {
1450         self.off
1451     }
1452 
1453     /// Returns the lowest offset of data buffered.
off_front(&self) -> u641454     pub fn off_front(&self) -> u64 {
1455         let mut pos = self.pos;
1456 
1457         // Skip empty buffers from the start of the queue.
1458         while let Some(b) = self.data.get(pos) {
1459             if !b.is_empty() {
1460                 return b.off();
1461             }
1462 
1463             pos += 1;
1464         }
1465 
1466         self.off
1467     }
1468 
1469     /// The maximum offset we are allowed to send to the peer.
max_off(&self) -> u641470     pub fn max_off(&self) -> u64 {
1471         self.max_data
1472     }
1473 
1474     /// Returns true if all data in the stream has been sent.
1475     ///
1476     /// This happens when the stream's send final size is known, and the
1477     /// application has already written data up to that point.
is_fin(&self) -> bool1478     pub fn is_fin(&self) -> bool {
1479         if self.fin_off == Some(self.off) {
1480             return true;
1481         }
1482 
1483         false
1484     }
1485 
1486     /// Returns true if the send-side of the stream is complete.
1487     ///
1488     /// This happens when the stream's send final size is known, and the peer
1489     /// has already acked all stream data up to that point.
is_complete(&self) -> bool1490     pub fn is_complete(&self) -> bool {
1491         if let Some(fin_off) = self.fin_off {
1492             if self.acked == (0..fin_off) {
1493                 return true;
1494             }
1495         }
1496 
1497         false
1498     }
1499 
1500     /// Returns true if the stream was stopped before completion.
is_stopped(&self) -> bool1501     pub fn is_stopped(&self) -> bool {
1502         self.error.is_some()
1503     }
1504 
1505     /// Returns true if there is data to be written.
ready(&self) -> bool1506     fn ready(&self) -> bool {
1507         !self.data.is_empty() && self.off_front() < self.off
1508     }
1509 
1510     /// Returns the highest contiguously acked offset.
ack_off(&self) -> u641511     fn ack_off(&self) -> u64 {
1512         match self.acked.iter().next() {
1513             // Only consider the initial range if it contiguously covers the
1514             // start of the stream (i.e. from offset 0).
1515             Some(std::ops::Range { start: 0, end }) => end,
1516 
1517             Some(_) | None => 0,
1518         }
1519     }
1520 
1521     /// Returns the outgoing flow control capacity.
cap(&self) -> Result<usize>1522     pub fn cap(&self) -> Result<usize> {
1523         // The stream was stopped, so return the error code instead.
1524         if let Some(e) = self.error {
1525             return Err(Error::StreamStopped(e));
1526         }
1527 
1528         Ok((self.max_data - self.off) as usize)
1529     }
1530 }
1531 
1532 /// Buffer holding data at a specific offset.
1533 ///
1534 /// The data is stored in a `Vec<u8>` in such a way that it can be shared
1535 /// between multiple `RangeBuf` objects.
1536 ///
1537 /// Each `RangeBuf` will have its own view of that buffer, where the `start`
1538 /// value indicates the initial offset within the `Vec`, and `len` indicates the
1539 /// number of bytes, starting from `start` that are included.
1540 ///
1541 /// In addition, `pos` indicates the current offset within the `Vec`, starting
1542 /// from the very beginning of the `Vec`.
1543 ///
1544 /// Finally, `off` is the starting offset for the specific `RangeBuf` within the
1545 /// stream the buffer belongs to.
1546 #[derive(Clone, Debug, Default, Eq)]
1547 pub struct RangeBuf {
1548     /// The internal buffer holding the data.
1549     ///
1550     /// To avoid needless allocations when a RangeBuf is split, this field is
1551     /// reference-counted and can be shared between multiple RangeBuf objects,
1552     /// and sliced using the `start` and `len` values.
1553     data: Arc<Vec<u8>>,
1554 
1555     /// The initial offset within the internal buffer.
1556     start: usize,
1557 
1558     /// The current offset within the internal buffer.
1559     pos: usize,
1560 
1561     /// The number of bytes in the buffer, from the initial offset.
1562     len: usize,
1563 
1564     /// The offset of the buffer within a stream.
1565     off: u64,
1566 
1567     /// Whether this contains the final byte in the stream.
1568     fin: bool,
1569 }
1570 
1571 impl RangeBuf {
1572     /// Creates a new `RangeBuf` from the given slice.
from(buf: &[u8], off: u64, fin: bool) -> RangeBuf1573     pub fn from(buf: &[u8], off: u64, fin: bool) -> RangeBuf {
1574         RangeBuf {
1575             data: Arc::new(Vec::from(buf)),
1576             start: 0,
1577             pos: 0,
1578             len: buf.len(),
1579             off,
1580             fin,
1581         }
1582     }
1583 
1584     /// Returns whether `self` holds the final offset in the stream.
fin(&self) -> bool1585     pub fn fin(&self) -> bool {
1586         self.fin
1587     }
1588 
1589     /// Returns the starting offset of `self`.
off(&self) -> u641590     pub fn off(&self) -> u64 {
1591         (self.off - self.start as u64) + self.pos as u64
1592     }
1593 
1594     /// Returns the final offset of `self`.
max_off(&self) -> u641595     pub fn max_off(&self) -> u64 {
1596         self.off() + self.len() as u64
1597     }
1598 
1599     /// Returns the length of `self`.
len(&self) -> usize1600     pub fn len(&self) -> usize {
1601         self.len - (self.pos - self.start)
1602     }
1603 
1604     /// Returns true if `self` has a length of zero bytes.
is_empty(&self) -> bool1605     pub fn is_empty(&self) -> bool {
1606         self.len() == 0
1607     }
1608 
1609     /// Consumes the starting `count` bytes of `self`.
consume(&mut self, count: usize)1610     pub fn consume(&mut self, count: usize) {
1611         self.pos += count;
1612     }
1613 
1614     /// Splits the buffer into two at the given index.
split_off(&mut self, at: usize) -> RangeBuf1615     pub fn split_off(&mut self, at: usize) -> RangeBuf {
1616         assert!(
1617             at <= self.len,
1618             "`at` split index (is {}) should be <= len (is {})",
1619             at,
1620             self.len
1621         );
1622 
1623         let buf = RangeBuf {
1624             data: self.data.clone(),
1625             start: self.start + at,
1626             pos: cmp::max(self.pos, self.start + at),
1627             len: self.len - at,
1628             off: self.off + at as u64,
1629             fin: self.fin,
1630         };
1631 
1632         self.pos = cmp::min(self.pos, self.start + at);
1633         self.len = at;
1634         self.fin = false;
1635 
1636         buf
1637     }
1638 }
1639 
1640 impl std::ops::Deref for RangeBuf {
1641     type Target = [u8];
1642 
deref(&self) -> &[u8]1643     fn deref(&self) -> &[u8] {
1644         &self.data[self.pos..self.start + self.len]
1645     }
1646 }
1647 
1648 impl Ord for RangeBuf {
cmp(&self, other: &RangeBuf) -> cmp::Ordering1649     fn cmp(&self, other: &RangeBuf) -> cmp::Ordering {
1650         // Invert ordering to implement min-heap.
1651         self.off.cmp(&other.off).reverse()
1652     }
1653 }
1654 
1655 impl PartialOrd for RangeBuf {
partial_cmp(&self, other: &RangeBuf) -> Option<cmp::Ordering>1656     fn partial_cmp(&self, other: &RangeBuf) -> Option<cmp::Ordering> {
1657         Some(self.cmp(other))
1658     }
1659 }
1660 
1661 impl PartialEq for RangeBuf {
eq(&self, other: &RangeBuf) -> bool1662     fn eq(&self, other: &RangeBuf) -> bool {
1663         self.off == other.off
1664     }
1665 }
1666 
1667 #[cfg(test)]
1668 mod tests {
1669     use super::*;
1670 
1671     #[test]
empty_read()1672     fn empty_read() {
1673         let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
1674         assert_eq!(recv.len, 0);
1675 
1676         let mut buf = [0; 32];
1677 
1678         assert_eq!(recv.emit(&mut buf), Err(Error::Done));
1679     }
1680 
1681     #[test]
empty_stream_frame()1682     fn empty_stream_frame() {
1683         let mut recv = RecvBuf::new(15, DEFAULT_STREAM_WINDOW);
1684         assert_eq!(recv.len, 0);
1685 
1686         let buf = RangeBuf::from(b"hello", 0, false);
1687         assert!(recv.write(buf).is_ok());
1688         assert_eq!(recv.len, 5);
1689         assert_eq!(recv.off, 0);
1690         assert_eq!(recv.data.len(), 1);
1691 
1692         let mut buf = [0; 32];
1693         assert_eq!(recv.emit(&mut buf), Ok((5, false)));
1694 
1695         // Don't store non-fin empty buffer.
1696         let buf = RangeBuf::from(b"", 10, false);
1697         assert!(recv.write(buf).is_ok());
1698         assert_eq!(recv.len, 5);
1699         assert_eq!(recv.off, 5);
1700         assert_eq!(recv.data.len(), 0);
1701 
1702         // Check flow control for empty buffer.
1703         let buf = RangeBuf::from(b"", 16, false);
1704         assert_eq!(recv.write(buf), Err(Error::FlowControl));
1705 
1706         // Store fin empty buffer.
1707         let buf = RangeBuf::from(b"", 5, true);
1708         assert!(recv.write(buf).is_ok());
1709         assert_eq!(recv.len, 5);
1710         assert_eq!(recv.off, 5);
1711         assert_eq!(recv.data.len(), 1);
1712 
1713         // Don't store additional fin empty buffers.
1714         let buf = RangeBuf::from(b"", 5, true);
1715         assert!(recv.write(buf).is_ok());
1716         assert_eq!(recv.len, 5);
1717         assert_eq!(recv.off, 5);
1718         assert_eq!(recv.data.len(), 1);
1719 
1720         // Don't store additional fin non-empty buffers.
1721         let buf = RangeBuf::from(b"aa", 3, true);
1722         assert!(recv.write(buf).is_ok());
1723         assert_eq!(recv.len, 5);
1724         assert_eq!(recv.off, 5);
1725         assert_eq!(recv.data.len(), 1);
1726 
1727         // Validate final size with fin empty buffers.
1728         let buf = RangeBuf::from(b"", 6, true);
1729         assert_eq!(recv.write(buf), Err(Error::FinalSize));
1730         let buf = RangeBuf::from(b"", 4, true);
1731         assert_eq!(recv.write(buf), Err(Error::FinalSize));
1732 
1733         let mut buf = [0; 32];
1734         assert_eq!(recv.emit(&mut buf), Ok((0, true)));
1735     }
1736 
1737     #[test]
ordered_read()1738     fn ordered_read() {
1739         let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
1740         assert_eq!(recv.len, 0);
1741 
1742         let mut buf = [0; 32];
1743 
1744         let first = RangeBuf::from(b"hello", 0, false);
1745         let second = RangeBuf::from(b"world", 5, false);
1746         let third = RangeBuf::from(b"something", 10, true);
1747 
1748         assert!(recv.write(second).is_ok());
1749         assert_eq!(recv.len, 10);
1750         assert_eq!(recv.off, 0);
1751 
1752         assert_eq!(recv.emit(&mut buf), Err(Error::Done));
1753 
1754         assert!(recv.write(third).is_ok());
1755         assert_eq!(recv.len, 19);
1756         assert_eq!(recv.off, 0);
1757 
1758         assert_eq!(recv.emit(&mut buf), Err(Error::Done));
1759 
1760         assert!(recv.write(first).is_ok());
1761         assert_eq!(recv.len, 19);
1762         assert_eq!(recv.off, 0);
1763 
1764         let (len, fin) = recv.emit(&mut buf).unwrap();
1765         assert_eq!(len, 19);
1766         assert_eq!(fin, true);
1767         assert_eq!(&buf[..len], b"helloworldsomething");
1768         assert_eq!(recv.len, 19);
1769         assert_eq!(recv.off, 19);
1770 
1771         assert_eq!(recv.emit(&mut buf), Err(Error::Done));
1772     }
1773 
1774     #[test]
split_read()1775     fn split_read() {
1776         let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
1777         assert_eq!(recv.len, 0);
1778 
1779         let mut buf = [0; 32];
1780 
1781         let first = RangeBuf::from(b"something", 0, false);
1782         let second = RangeBuf::from(b"helloworld", 9, true);
1783 
1784         assert!(recv.write(first).is_ok());
1785         assert_eq!(recv.len, 9);
1786         assert_eq!(recv.off, 0);
1787 
1788         assert!(recv.write(second).is_ok());
1789         assert_eq!(recv.len, 19);
1790         assert_eq!(recv.off, 0);
1791 
1792         let (len, fin) = recv.emit(&mut buf[..10]).unwrap();
1793         assert_eq!(len, 10);
1794         assert_eq!(fin, false);
1795         assert_eq!(&buf[..len], b"somethingh");
1796         assert_eq!(recv.len, 19);
1797         assert_eq!(recv.off, 10);
1798 
1799         let (len, fin) = recv.emit(&mut buf[..5]).unwrap();
1800         assert_eq!(len, 5);
1801         assert_eq!(fin, false);
1802         assert_eq!(&buf[..len], b"ellow");
1803         assert_eq!(recv.len, 19);
1804         assert_eq!(recv.off, 15);
1805 
1806         let (len, fin) = recv.emit(&mut buf[..10]).unwrap();
1807         assert_eq!(len, 4);
1808         assert_eq!(fin, true);
1809         assert_eq!(&buf[..len], b"orld");
1810         assert_eq!(recv.len, 19);
1811         assert_eq!(recv.off, 19);
1812     }
1813 
1814     #[test]
incomplete_read()1815     fn incomplete_read() {
1816         let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
1817         assert_eq!(recv.len, 0);
1818 
1819         let mut buf = [0; 32];
1820 
1821         let first = RangeBuf::from(b"something", 0, false);
1822         let second = RangeBuf::from(b"helloworld", 9, true);
1823 
1824         assert!(recv.write(second).is_ok());
1825         assert_eq!(recv.len, 19);
1826         assert_eq!(recv.off, 0);
1827 
1828         assert_eq!(recv.emit(&mut buf), Err(Error::Done));
1829 
1830         assert!(recv.write(first).is_ok());
1831         assert_eq!(recv.len, 19);
1832         assert_eq!(recv.off, 0);
1833 
1834         let (len, fin) = recv.emit(&mut buf).unwrap();
1835         assert_eq!(len, 19);
1836         assert_eq!(fin, true);
1837         assert_eq!(&buf[..len], b"somethinghelloworld");
1838         assert_eq!(recv.len, 19);
1839         assert_eq!(recv.off, 19);
1840     }
1841 
1842     #[test]
zero_len_read()1843     fn zero_len_read() {
1844         let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
1845         assert_eq!(recv.len, 0);
1846 
1847         let mut buf = [0; 32];
1848 
1849         let first = RangeBuf::from(b"something", 0, false);
1850         let second = RangeBuf::from(b"", 9, true);
1851 
1852         assert!(recv.write(first).is_ok());
1853         assert_eq!(recv.len, 9);
1854         assert_eq!(recv.off, 0);
1855         assert_eq!(recv.data.len(), 1);
1856 
1857         assert!(recv.write(second).is_ok());
1858         assert_eq!(recv.len, 9);
1859         assert_eq!(recv.off, 0);
1860         assert_eq!(recv.data.len(), 1);
1861 
1862         let (len, fin) = recv.emit(&mut buf).unwrap();
1863         assert_eq!(len, 9);
1864         assert_eq!(fin, true);
1865         assert_eq!(&buf[..len], b"something");
1866         assert_eq!(recv.len, 9);
1867         assert_eq!(recv.off, 9);
1868     }
1869 
1870     #[test]
past_read()1871     fn past_read() {
1872         let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
1873         assert_eq!(recv.len, 0);
1874 
1875         let mut buf = [0; 32];
1876 
1877         let first = RangeBuf::from(b"something", 0, false);
1878         let second = RangeBuf::from(b"hello", 3, false);
1879         let third = RangeBuf::from(b"ello", 4, true);
1880         let fourth = RangeBuf::from(b"ello", 5, true);
1881 
1882         assert!(recv.write(first).is_ok());
1883         assert_eq!(recv.len, 9);
1884         assert_eq!(recv.off, 0);
1885         assert_eq!(recv.data.len(), 1);
1886 
1887         let (len, fin) = recv.emit(&mut buf).unwrap();
1888         assert_eq!(len, 9);
1889         assert_eq!(fin, false);
1890         assert_eq!(&buf[..len], b"something");
1891         assert_eq!(recv.len, 9);
1892         assert_eq!(recv.off, 9);
1893 
1894         assert!(recv.write(second).is_ok());
1895         assert_eq!(recv.len, 9);
1896         assert_eq!(recv.off, 9);
1897         assert_eq!(recv.data.len(), 0);
1898 
1899         assert_eq!(recv.write(third), Err(Error::FinalSize));
1900 
1901         assert!(recv.write(fourth).is_ok());
1902         assert_eq!(recv.len, 9);
1903         assert_eq!(recv.off, 9);
1904         assert_eq!(recv.data.len(), 0);
1905 
1906         assert_eq!(recv.emit(&mut buf), Err(Error::Done));
1907     }
1908 
1909     #[test]
fully_overlapping_read()1910     fn fully_overlapping_read() {
1911         let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
1912         assert_eq!(recv.len, 0);
1913 
1914         let mut buf = [0; 32];
1915 
1916         let first = RangeBuf::from(b"something", 0, false);
1917         let second = RangeBuf::from(b"hello", 4, false);
1918 
1919         assert!(recv.write(first).is_ok());
1920         assert_eq!(recv.len, 9);
1921         assert_eq!(recv.off, 0);
1922         assert_eq!(recv.data.len(), 1);
1923 
1924         assert!(recv.write(second).is_ok());
1925         assert_eq!(recv.len, 9);
1926         assert_eq!(recv.off, 0);
1927         assert_eq!(recv.data.len(), 1);
1928 
1929         let (len, fin) = recv.emit(&mut buf).unwrap();
1930         assert_eq!(len, 9);
1931         assert_eq!(fin, false);
1932         assert_eq!(&buf[..len], b"something");
1933         assert_eq!(recv.len, 9);
1934         assert_eq!(recv.off, 9);
1935         assert_eq!(recv.data.len(), 0);
1936 
1937         assert_eq!(recv.emit(&mut buf), Err(Error::Done));
1938     }
1939 
1940     #[test]
fully_overlapping_read2()1941     fn fully_overlapping_read2() {
1942         let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
1943         assert_eq!(recv.len, 0);
1944 
1945         let mut buf = [0; 32];
1946 
1947         let first = RangeBuf::from(b"something", 0, false);
1948         let second = RangeBuf::from(b"hello", 4, false);
1949 
1950         assert!(recv.write(second).is_ok());
1951         assert_eq!(recv.len, 9);
1952         assert_eq!(recv.off, 0);
1953         assert_eq!(recv.data.len(), 1);
1954 
1955         assert!(recv.write(first).is_ok());
1956         assert_eq!(recv.len, 9);
1957         assert_eq!(recv.off, 0);
1958         assert_eq!(recv.data.len(), 2);
1959 
1960         let (len, fin) = recv.emit(&mut buf).unwrap();
1961         assert_eq!(len, 9);
1962         assert_eq!(fin, false);
1963         assert_eq!(&buf[..len], b"somehello");
1964         assert_eq!(recv.len, 9);
1965         assert_eq!(recv.off, 9);
1966         assert_eq!(recv.data.len(), 0);
1967 
1968         assert_eq!(recv.emit(&mut buf), Err(Error::Done));
1969     }
1970 
1971     #[test]
fully_overlapping_read3()1972     fn fully_overlapping_read3() {
1973         let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
1974         assert_eq!(recv.len, 0);
1975 
1976         let mut buf = [0; 32];
1977 
1978         let first = RangeBuf::from(b"something", 0, false);
1979         let second = RangeBuf::from(b"hello", 3, false);
1980 
1981         assert!(recv.write(second).is_ok());
1982         assert_eq!(recv.len, 8);
1983         assert_eq!(recv.off, 0);
1984         assert_eq!(recv.data.len(), 1);
1985 
1986         assert!(recv.write(first).is_ok());
1987         assert_eq!(recv.len, 9);
1988         assert_eq!(recv.off, 0);
1989         assert_eq!(recv.data.len(), 3);
1990 
1991         let (len, fin) = recv.emit(&mut buf).unwrap();
1992         assert_eq!(len, 9);
1993         assert_eq!(fin, false);
1994         assert_eq!(&buf[..len], b"somhellog");
1995         assert_eq!(recv.len, 9);
1996         assert_eq!(recv.off, 9);
1997         assert_eq!(recv.data.len(), 0);
1998 
1999         assert_eq!(recv.emit(&mut buf), Err(Error::Done));
2000     }
2001 
2002     #[test]
fully_overlapping_read_multi()2003     fn fully_overlapping_read_multi() {
2004         let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
2005         assert_eq!(recv.len, 0);
2006 
2007         let mut buf = [0; 32];
2008 
2009         let first = RangeBuf::from(b"somethingsomething", 0, false);
2010         let second = RangeBuf::from(b"hello", 3, false);
2011         let third = RangeBuf::from(b"hello", 12, false);
2012 
2013         assert!(recv.write(second).is_ok());
2014         assert_eq!(recv.len, 8);
2015         assert_eq!(recv.off, 0);
2016         assert_eq!(recv.data.len(), 1);
2017 
2018         assert!(recv.write(third).is_ok());
2019         assert_eq!(recv.len, 17);
2020         assert_eq!(recv.off, 0);
2021         assert_eq!(recv.data.len(), 2);
2022 
2023         assert!(recv.write(first).is_ok());
2024         assert_eq!(recv.len, 18);
2025         assert_eq!(recv.off, 0);
2026         assert_eq!(recv.data.len(), 5);
2027 
2028         let (len, fin) = recv.emit(&mut buf).unwrap();
2029         assert_eq!(len, 18);
2030         assert_eq!(fin, false);
2031         assert_eq!(&buf[..len], b"somhellogsomhellog");
2032         assert_eq!(recv.len, 18);
2033         assert_eq!(recv.off, 18);
2034         assert_eq!(recv.data.len(), 0);
2035 
2036         assert_eq!(recv.emit(&mut buf), Err(Error::Done));
2037     }
2038 
2039     #[test]
overlapping_start_read()2040     fn overlapping_start_read() {
2041         let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
2042         assert_eq!(recv.len, 0);
2043 
2044         let mut buf = [0; 32];
2045 
2046         let first = RangeBuf::from(b"something", 0, false);
2047         let second = RangeBuf::from(b"hello", 8, true);
2048 
2049         assert!(recv.write(first).is_ok());
2050         assert_eq!(recv.len, 9);
2051         assert_eq!(recv.off, 0);
2052         assert_eq!(recv.data.len(), 1);
2053 
2054         assert!(recv.write(second).is_ok());
2055         assert_eq!(recv.len, 13);
2056         assert_eq!(recv.off, 0);
2057         assert_eq!(recv.data.len(), 2);
2058 
2059         let (len, fin) = recv.emit(&mut buf).unwrap();
2060         assert_eq!(len, 13);
2061         assert_eq!(fin, true);
2062         assert_eq!(&buf[..len], b"somethingello");
2063         assert_eq!(recv.len, 13);
2064         assert_eq!(recv.off, 13);
2065 
2066         assert_eq!(recv.emit(&mut buf), Err(Error::Done));
2067     }
2068 
2069     #[test]
overlapping_end_read()2070     fn overlapping_end_read() {
2071         let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
2072         assert_eq!(recv.len, 0);
2073 
2074         let mut buf = [0; 32];
2075 
2076         let first = RangeBuf::from(b"hello", 0, false);
2077         let second = RangeBuf::from(b"something", 3, true);
2078 
2079         assert!(recv.write(second).is_ok());
2080         assert_eq!(recv.len, 12);
2081         assert_eq!(recv.off, 0);
2082         assert_eq!(recv.data.len(), 1);
2083 
2084         assert!(recv.write(first).is_ok());
2085         assert_eq!(recv.len, 12);
2086         assert_eq!(recv.off, 0);
2087         assert_eq!(recv.data.len(), 2);
2088 
2089         let (len, fin) = recv.emit(&mut buf).unwrap();
2090         assert_eq!(len, 12);
2091         assert_eq!(fin, true);
2092         assert_eq!(&buf[..len], b"helsomething");
2093         assert_eq!(recv.len, 12);
2094         assert_eq!(recv.off, 12);
2095 
2096         assert_eq!(recv.emit(&mut buf), Err(Error::Done));
2097     }
2098 
2099     #[test]
overlapping_end_twice_read()2100     fn overlapping_end_twice_read() {
2101         let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
2102         assert_eq!(recv.len, 0);
2103 
2104         let mut buf = [0; 32];
2105 
2106         let first = RangeBuf::from(b"he", 0, false);
2107         let second = RangeBuf::from(b"ow", 4, false);
2108         let third = RangeBuf::from(b"rl", 7, false);
2109         let fourth = RangeBuf::from(b"helloworld", 0, true);
2110 
2111         assert!(recv.write(third).is_ok());
2112         assert_eq!(recv.len, 9);
2113         assert_eq!(recv.off, 0);
2114         assert_eq!(recv.data.len(), 1);
2115 
2116         assert!(recv.write(second).is_ok());
2117         assert_eq!(recv.len, 9);
2118         assert_eq!(recv.off, 0);
2119         assert_eq!(recv.data.len(), 2);
2120 
2121         assert!(recv.write(first).is_ok());
2122         assert_eq!(recv.len, 9);
2123         assert_eq!(recv.off, 0);
2124         assert_eq!(recv.data.len(), 3);
2125 
2126         assert!(recv.write(fourth).is_ok());
2127         assert_eq!(recv.len, 10);
2128         assert_eq!(recv.off, 0);
2129         assert_eq!(recv.data.len(), 6);
2130 
2131         let (len, fin) = recv.emit(&mut buf).unwrap();
2132         assert_eq!(len, 10);
2133         assert_eq!(fin, true);
2134         assert_eq!(&buf[..len], b"helloworld");
2135         assert_eq!(recv.len, 10);
2136         assert_eq!(recv.off, 10);
2137 
2138         assert_eq!(recv.emit(&mut buf), Err(Error::Done));
2139     }
2140 
2141     #[test]
overlapping_end_twice_and_contained_read()2142     fn overlapping_end_twice_and_contained_read() {
2143         let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
2144         assert_eq!(recv.len, 0);
2145 
2146         let mut buf = [0; 32];
2147 
2148         let first = RangeBuf::from(b"hellow", 0, false);
2149         let second = RangeBuf::from(b"barfoo", 10, true);
2150         let third = RangeBuf::from(b"rl", 7, false);
2151         let fourth = RangeBuf::from(b"elloworldbarfoo", 1, true);
2152 
2153         assert!(recv.write(third).is_ok());
2154         assert_eq!(recv.len, 9);
2155         assert_eq!(recv.off, 0);
2156         assert_eq!(recv.data.len(), 1);
2157 
2158         assert!(recv.write(second).is_ok());
2159         assert_eq!(recv.len, 16);
2160         assert_eq!(recv.off, 0);
2161         assert_eq!(recv.data.len(), 2);
2162 
2163         assert!(recv.write(first).is_ok());
2164         assert_eq!(recv.len, 16);
2165         assert_eq!(recv.off, 0);
2166         assert_eq!(recv.data.len(), 3);
2167 
2168         assert!(recv.write(fourth).is_ok());
2169         assert_eq!(recv.len, 16);
2170         assert_eq!(recv.off, 0);
2171         assert_eq!(recv.data.len(), 5);
2172 
2173         let (len, fin) = recv.emit(&mut buf).unwrap();
2174         assert_eq!(len, 16);
2175         assert_eq!(fin, true);
2176         assert_eq!(&buf[..len], b"helloworldbarfoo");
2177         assert_eq!(recv.len, 16);
2178         assert_eq!(recv.off, 16);
2179 
2180         assert_eq!(recv.emit(&mut buf), Err(Error::Done));
2181     }
2182 
2183     #[test]
partially_multi_overlapping_reordered_read()2184     fn partially_multi_overlapping_reordered_read() {
2185         let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
2186         assert_eq!(recv.len, 0);
2187 
2188         let mut buf = [0; 32];
2189 
2190         let first = RangeBuf::from(b"hello", 8, false);
2191         let second = RangeBuf::from(b"something", 0, false);
2192         let third = RangeBuf::from(b"moar", 11, true);
2193 
2194         assert!(recv.write(first).is_ok());
2195         assert_eq!(recv.len, 13);
2196         assert_eq!(recv.off, 0);
2197         assert_eq!(recv.data.len(), 1);
2198 
2199         assert!(recv.write(second).is_ok());
2200         assert_eq!(recv.len, 13);
2201         assert_eq!(recv.off, 0);
2202         assert_eq!(recv.data.len(), 2);
2203 
2204         assert!(recv.write(third).is_ok());
2205         assert_eq!(recv.len, 15);
2206         assert_eq!(recv.off, 0);
2207         assert_eq!(recv.data.len(), 3);
2208 
2209         let (len, fin) = recv.emit(&mut buf).unwrap();
2210         assert_eq!(len, 15);
2211         assert_eq!(fin, true);
2212         assert_eq!(&buf[..len], b"somethinhelloar");
2213         assert_eq!(recv.len, 15);
2214         assert_eq!(recv.off, 15);
2215         assert_eq!(recv.data.len(), 0);
2216 
2217         assert_eq!(recv.emit(&mut buf), Err(Error::Done));
2218     }
2219 
2220     #[test]
partially_multi_overlapping_reordered_read2()2221     fn partially_multi_overlapping_reordered_read2() {
2222         let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
2223         assert_eq!(recv.len, 0);
2224 
2225         let mut buf = [0; 32];
2226 
2227         let first = RangeBuf::from(b"aaa", 0, false);
2228         let second = RangeBuf::from(b"bbb", 2, false);
2229         let third = RangeBuf::from(b"ccc", 4, false);
2230         let fourth = RangeBuf::from(b"ddd", 6, false);
2231         let fifth = RangeBuf::from(b"eee", 9, false);
2232         let sixth = RangeBuf::from(b"fff", 11, false);
2233 
2234         assert!(recv.write(second).is_ok());
2235         assert_eq!(recv.len, 5);
2236         assert_eq!(recv.off, 0);
2237         assert_eq!(recv.data.len(), 1);
2238 
2239         assert!(recv.write(fourth).is_ok());
2240         assert_eq!(recv.len, 9);
2241         assert_eq!(recv.off, 0);
2242         assert_eq!(recv.data.len(), 2);
2243 
2244         assert!(recv.write(third).is_ok());
2245         assert_eq!(recv.len, 9);
2246         assert_eq!(recv.off, 0);
2247         assert_eq!(recv.data.len(), 3);
2248 
2249         assert!(recv.write(first).is_ok());
2250         assert_eq!(recv.len, 9);
2251         assert_eq!(recv.off, 0);
2252         assert_eq!(recv.data.len(), 4);
2253 
2254         assert!(recv.write(sixth).is_ok());
2255         assert_eq!(recv.len, 14);
2256         assert_eq!(recv.off, 0);
2257         assert_eq!(recv.data.len(), 5);
2258 
2259         assert!(recv.write(fifth).is_ok());
2260         assert_eq!(recv.len, 14);
2261         assert_eq!(recv.off, 0);
2262         assert_eq!(recv.data.len(), 6);
2263 
2264         let (len, fin) = recv.emit(&mut buf).unwrap();
2265         assert_eq!(len, 14);
2266         assert_eq!(fin, false);
2267         assert_eq!(&buf[..len], b"aabbbcdddeefff");
2268         assert_eq!(recv.len, 14);
2269         assert_eq!(recv.off, 14);
2270         assert_eq!(recv.data.len(), 0);
2271 
2272         assert_eq!(recv.emit(&mut buf), Err(Error::Done));
2273     }
2274 
2275     #[test]
empty_write()2276     fn empty_write() {
2277         let mut buf = [0; 5];
2278 
2279         let mut send = SendBuf::new(u64::MAX);
2280         assert_eq!(send.len, 0);
2281 
2282         let (written, fin) = send.emit(&mut buf).unwrap();
2283         assert_eq!(written, 0);
2284         assert_eq!(fin, false);
2285     }
2286 
2287     #[test]
multi_write()2288     fn multi_write() {
2289         let mut buf = [0; 128];
2290 
2291         let mut send = SendBuf::new(u64::MAX);
2292         assert_eq!(send.len, 0);
2293 
2294         let first = b"something";
2295         let second = b"helloworld";
2296 
2297         assert!(send.write(first, false).is_ok());
2298         assert_eq!(send.len, 9);
2299 
2300         assert!(send.write(second, true).is_ok());
2301         assert_eq!(send.len, 19);
2302 
2303         let (written, fin) = send.emit(&mut buf[..128]).unwrap();
2304         assert_eq!(written, 19);
2305         assert_eq!(fin, true);
2306         assert_eq!(&buf[..written], b"somethinghelloworld");
2307         assert_eq!(send.len, 0);
2308     }
2309 
2310     #[test]
split_write()2311     fn split_write() {
2312         let mut buf = [0; 10];
2313 
2314         let mut send = SendBuf::new(u64::MAX);
2315         assert_eq!(send.len, 0);
2316 
2317         let first = b"something";
2318         let second = b"helloworld";
2319 
2320         assert!(send.write(first, false).is_ok());
2321         assert_eq!(send.len, 9);
2322 
2323         assert!(send.write(second, true).is_ok());
2324         assert_eq!(send.len, 19);
2325 
2326         assert_eq!(send.off_front(), 0);
2327 
2328         let (written, fin) = send.emit(&mut buf[..10]).unwrap();
2329         assert_eq!(written, 10);
2330         assert_eq!(fin, false);
2331         assert_eq!(&buf[..written], b"somethingh");
2332         assert_eq!(send.len, 9);
2333 
2334         assert_eq!(send.off_front(), 10);
2335 
2336         let (written, fin) = send.emit(&mut buf[..5]).unwrap();
2337         assert_eq!(written, 5);
2338         assert_eq!(fin, false);
2339         assert_eq!(&buf[..written], b"ellow");
2340         assert_eq!(send.len, 4);
2341 
2342         assert_eq!(send.off_front(), 15);
2343 
2344         let (written, fin) = send.emit(&mut buf[..10]).unwrap();
2345         assert_eq!(written, 4);
2346         assert_eq!(fin, true);
2347         assert_eq!(&buf[..written], b"orld");
2348         assert_eq!(send.len, 0);
2349 
2350         assert_eq!(send.off_front(), 19);
2351     }
2352 
2353     #[test]
resend()2354     fn resend() {
2355         let mut buf = [0; 15];
2356 
2357         let mut send = SendBuf::new(u64::MAX);
2358         assert_eq!(send.len, 0);
2359         assert_eq!(send.off_front(), 0);
2360 
2361         let first = b"something";
2362         let second = b"helloworld";
2363 
2364         assert!(send.write(first, false).is_ok());
2365         assert_eq!(send.off_front(), 0);
2366 
2367         assert!(send.write(second, true).is_ok());
2368         assert_eq!(send.off_front(), 0);
2369 
2370         assert_eq!(send.len, 19);
2371 
2372         let (written, fin) = send.emit(&mut buf[..4]).unwrap();
2373         assert_eq!(written, 4);
2374         assert_eq!(fin, false);
2375         assert_eq!(&buf[..written], b"some");
2376         assert_eq!(send.len, 15);
2377         assert_eq!(send.off_front(), 4);
2378 
2379         let (written, fin) = send.emit(&mut buf[..5]).unwrap();
2380         assert_eq!(written, 5);
2381         assert_eq!(fin, false);
2382         assert_eq!(&buf[..written], b"thing");
2383         assert_eq!(send.len, 10);
2384         assert_eq!(send.off_front(), 9);
2385 
2386         let (written, fin) = send.emit(&mut buf[..5]).unwrap();
2387         assert_eq!(written, 5);
2388         assert_eq!(fin, false);
2389         assert_eq!(&buf[..written], b"hello");
2390         assert_eq!(send.len, 5);
2391         assert_eq!(send.off_front(), 14);
2392 
2393         send.retransmit(4, 5);
2394         assert_eq!(send.len, 10);
2395         assert_eq!(send.off_front(), 4);
2396 
2397         send.retransmit(0, 4);
2398         assert_eq!(send.len, 14);
2399         assert_eq!(send.off_front(), 0);
2400 
2401         let (written, fin) = send.emit(&mut buf[..11]).unwrap();
2402         assert_eq!(written, 9);
2403         assert_eq!(fin, false);
2404         assert_eq!(&buf[..written], b"something");
2405         assert_eq!(send.len, 5);
2406         assert_eq!(send.off_front(), 14);
2407 
2408         let (written, fin) = send.emit(&mut buf[..11]).unwrap();
2409         assert_eq!(written, 5);
2410         assert_eq!(fin, true);
2411         assert_eq!(&buf[..written], b"world");
2412         assert_eq!(send.len, 0);
2413         assert_eq!(send.off_front(), 19);
2414     }
2415 
2416     #[test]
write_blocked_by_off()2417     fn write_blocked_by_off() {
2418         let mut buf = [0; 10];
2419 
2420         let mut send = SendBuf::default();
2421         assert_eq!(send.len, 0);
2422 
2423         let first = b"something";
2424         let second = b"helloworld";
2425 
2426         assert_eq!(send.write(first, false), Ok(0));
2427         assert_eq!(send.len, 0);
2428 
2429         assert_eq!(send.write(second, true), Ok(0));
2430         assert_eq!(send.len, 0);
2431 
2432         send.update_max_data(5);
2433 
2434         assert_eq!(send.write(first, false), Ok(5));
2435         assert_eq!(send.len, 5);
2436 
2437         assert_eq!(send.write(second, true), Ok(0));
2438         assert_eq!(send.len, 5);
2439 
2440         assert_eq!(send.off_front(), 0);
2441 
2442         let (written, fin) = send.emit(&mut buf[..10]).unwrap();
2443         assert_eq!(written, 5);
2444         assert_eq!(fin, false);
2445         assert_eq!(&buf[..written], b"somet");
2446         assert_eq!(send.len, 0);
2447 
2448         assert_eq!(send.off_front(), 5);
2449 
2450         let (written, fin) = send.emit(&mut buf[..10]).unwrap();
2451         assert_eq!(written, 0);
2452         assert_eq!(fin, false);
2453         assert_eq!(&buf[..written], b"");
2454         assert_eq!(send.len, 0);
2455 
2456         send.update_max_data(15);
2457 
2458         assert_eq!(send.write(&first[5..], false), Ok(4));
2459         assert_eq!(send.len, 4);
2460 
2461         assert_eq!(send.write(second, true), Ok(6));
2462         assert_eq!(send.len, 10);
2463 
2464         assert_eq!(send.off_front(), 5);
2465 
2466         let (written, fin) = send.emit(&mut buf[..10]).unwrap();
2467         assert_eq!(written, 10);
2468         assert_eq!(fin, false);
2469         assert_eq!(&buf[..10], b"hinghellow");
2470         assert_eq!(send.len, 0);
2471 
2472         send.update_max_data(25);
2473 
2474         assert_eq!(send.write(&second[6..], true), Ok(4));
2475         assert_eq!(send.len, 4);
2476 
2477         assert_eq!(send.off_front(), 15);
2478 
2479         let (written, fin) = send.emit(&mut buf[..10]).unwrap();
2480         assert_eq!(written, 4);
2481         assert_eq!(fin, true);
2482         assert_eq!(&buf[..written], b"orld");
2483         assert_eq!(send.len, 0);
2484     }
2485 
2486     #[test]
zero_len_write()2487     fn zero_len_write() {
2488         let mut buf = [0; 10];
2489 
2490         let mut send = SendBuf::new(u64::MAX);
2491         assert_eq!(send.len, 0);
2492 
2493         let first = b"something";
2494 
2495         assert!(send.write(first, false).is_ok());
2496         assert_eq!(send.len, 9);
2497 
2498         assert!(send.write(&[], true).is_ok());
2499         assert_eq!(send.len, 9);
2500 
2501         assert_eq!(send.off_front(), 0);
2502 
2503         let (written, fin) = send.emit(&mut buf[..10]).unwrap();
2504         assert_eq!(written, 9);
2505         assert_eq!(fin, true);
2506         assert_eq!(&buf[..written], b"something");
2507         assert_eq!(send.len, 0);
2508     }
2509 
2510     #[test]
recv_flow_control()2511     fn recv_flow_control() {
2512         let mut stream = Stream::new(15, 0, true, true, DEFAULT_STREAM_WINDOW);
2513         assert!(!stream.recv.almost_full());
2514 
2515         let mut buf = [0; 32];
2516 
2517         let first = RangeBuf::from(b"hello", 0, false);
2518         let second = RangeBuf::from(b"world", 5, false);
2519         let third = RangeBuf::from(b"something", 10, false);
2520 
2521         assert_eq!(stream.recv.write(second), Ok(()));
2522         assert_eq!(stream.recv.write(first), Ok(()));
2523         assert!(!stream.recv.almost_full());
2524 
2525         assert_eq!(stream.recv.write(third), Err(Error::FlowControl));
2526 
2527         let (len, fin) = stream.recv.emit(&mut buf).unwrap();
2528         assert_eq!(&buf[..len], b"helloworld");
2529         assert_eq!(fin, false);
2530 
2531         assert!(stream.recv.almost_full());
2532 
2533         stream.recv.update_max_data(time::Instant::now());
2534         assert_eq!(stream.recv.max_data_next(), 25);
2535         assert!(!stream.recv.almost_full());
2536 
2537         let third = RangeBuf::from(b"something", 10, false);
2538         assert_eq!(stream.recv.write(third), Ok(()));
2539     }
2540 
2541     #[test]
recv_past_fin()2542     fn recv_past_fin() {
2543         let mut stream = Stream::new(15, 0, true, true, DEFAULT_STREAM_WINDOW);
2544         assert!(!stream.recv.almost_full());
2545 
2546         let first = RangeBuf::from(b"hello", 0, true);
2547         let second = RangeBuf::from(b"world", 5, false);
2548 
2549         assert_eq!(stream.recv.write(first), Ok(()));
2550         assert_eq!(stream.recv.write(second), Err(Error::FinalSize));
2551     }
2552 
2553     #[test]
recv_fin_dup()2554     fn recv_fin_dup() {
2555         let mut stream = Stream::new(15, 0, true, true, DEFAULT_STREAM_WINDOW);
2556         assert!(!stream.recv.almost_full());
2557 
2558         let first = RangeBuf::from(b"hello", 0, true);
2559         let second = RangeBuf::from(b"hello", 0, true);
2560 
2561         assert_eq!(stream.recv.write(first), Ok(()));
2562         assert_eq!(stream.recv.write(second), Ok(()));
2563 
2564         let mut buf = [0; 32];
2565 
2566         let (len, fin) = stream.recv.emit(&mut buf).unwrap();
2567         assert_eq!(&buf[..len], b"hello");
2568         assert_eq!(fin, true);
2569     }
2570 
2571     #[test]
recv_fin_change()2572     fn recv_fin_change() {
2573         let mut stream = Stream::new(15, 0, true, true, DEFAULT_STREAM_WINDOW);
2574         assert!(!stream.recv.almost_full());
2575 
2576         let first = RangeBuf::from(b"hello", 0, true);
2577         let second = RangeBuf::from(b"world", 5, true);
2578 
2579         assert_eq!(stream.recv.write(second), Ok(()));
2580         assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
2581     }
2582 
2583     #[test]
recv_fin_lower_than_received()2584     fn recv_fin_lower_than_received() {
2585         let mut stream = Stream::new(15, 0, true, true, DEFAULT_STREAM_WINDOW);
2586         assert!(!stream.recv.almost_full());
2587 
2588         let first = RangeBuf::from(b"hello", 0, true);
2589         let second = RangeBuf::from(b"world", 5, false);
2590 
2591         assert_eq!(stream.recv.write(second), Ok(()));
2592         assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
2593     }
2594 
2595     #[test]
recv_fin_flow_control()2596     fn recv_fin_flow_control() {
2597         let mut stream = Stream::new(15, 0, true, true, DEFAULT_STREAM_WINDOW);
2598         assert!(!stream.recv.almost_full());
2599 
2600         let mut buf = [0; 32];
2601 
2602         let first = RangeBuf::from(b"hello", 0, false);
2603         let second = RangeBuf::from(b"world", 5, true);
2604 
2605         assert_eq!(stream.recv.write(first), Ok(()));
2606         assert_eq!(stream.recv.write(second), Ok(()));
2607 
2608         let (len, fin) = stream.recv.emit(&mut buf).unwrap();
2609         assert_eq!(&buf[..len], b"helloworld");
2610         assert_eq!(fin, true);
2611 
2612         assert!(!stream.recv.almost_full());
2613     }
2614 
2615     #[test]
recv_fin_reset_mismatch()2616     fn recv_fin_reset_mismatch() {
2617         let mut stream = Stream::new(15, 0, true, true, DEFAULT_STREAM_WINDOW);
2618         assert!(!stream.recv.almost_full());
2619 
2620         let first = RangeBuf::from(b"hello", 0, true);
2621 
2622         assert_eq!(stream.recv.write(first), Ok(()));
2623         assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
2624     }
2625 
2626     #[test]
recv_reset_dup()2627     fn recv_reset_dup() {
2628         let mut stream = Stream::new(15, 0, true, true, DEFAULT_STREAM_WINDOW);
2629         assert!(!stream.recv.almost_full());
2630 
2631         let first = RangeBuf::from(b"hello", 0, false);
2632 
2633         assert_eq!(stream.recv.write(first), Ok(()));
2634         assert_eq!(stream.recv.reset(0, 5), Ok(0));
2635         assert_eq!(stream.recv.reset(0, 5), Ok(0));
2636     }
2637 
2638     #[test]
recv_reset_change()2639     fn recv_reset_change() {
2640         let mut stream = Stream::new(15, 0, true, true, DEFAULT_STREAM_WINDOW);
2641         assert!(!stream.recv.almost_full());
2642 
2643         let first = RangeBuf::from(b"hello", 0, false);
2644 
2645         assert_eq!(stream.recv.write(first), Ok(()));
2646         assert_eq!(stream.recv.reset(0, 5), Ok(0));
2647         assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
2648     }
2649 
2650     #[test]
recv_reset_lower_than_received()2651     fn recv_reset_lower_than_received() {
2652         let mut stream = Stream::new(15, 0, true, true, DEFAULT_STREAM_WINDOW);
2653         assert!(!stream.recv.almost_full());
2654 
2655         let first = RangeBuf::from(b"hello", 0, false);
2656 
2657         assert_eq!(stream.recv.write(first), Ok(()));
2658         assert_eq!(stream.recv.reset(0, 4), Err(Error::FinalSize));
2659     }
2660 
2661     #[test]
send_flow_control()2662     fn send_flow_control() {
2663         let mut buf = [0; 25];
2664 
2665         let mut stream = Stream::new(0, 15, true, true, DEFAULT_STREAM_WINDOW);
2666 
2667         let first = b"hello";
2668         let second = b"world";
2669         let third = b"something";
2670 
2671         assert!(stream.send.write(first, false).is_ok());
2672         assert!(stream.send.write(second, false).is_ok());
2673         assert!(stream.send.write(third, false).is_ok());
2674 
2675         assert_eq!(stream.send.off_front(), 0);
2676 
2677         let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
2678         assert_eq!(written, 15);
2679         assert_eq!(fin, false);
2680         assert_eq!(&buf[..written], b"helloworldsomet");
2681 
2682         assert_eq!(stream.send.off_front(), 15);
2683 
2684         let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
2685         assert_eq!(written, 0);
2686         assert_eq!(fin, false);
2687         assert_eq!(&buf[..written], b"");
2688 
2689         stream.send.retransmit(0, 15);
2690 
2691         assert_eq!(stream.send.off_front(), 0);
2692 
2693         let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
2694         assert_eq!(written, 10);
2695         assert_eq!(fin, false);
2696         assert_eq!(&buf[..written], b"helloworld");
2697 
2698         assert_eq!(stream.send.off_front(), 10);
2699 
2700         let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
2701         assert_eq!(written, 5);
2702         assert_eq!(fin, false);
2703         assert_eq!(&buf[..written], b"somet");
2704     }
2705 
2706     #[test]
send_past_fin()2707     fn send_past_fin() {
2708         let mut stream = Stream::new(0, 15, true, true, DEFAULT_STREAM_WINDOW);
2709 
2710         let first = b"hello";
2711         let second = b"world";
2712         let third = b"third";
2713 
2714         assert_eq!(stream.send.write(first, false), Ok(5));
2715 
2716         assert_eq!(stream.send.write(second, true), Ok(5));
2717         assert!(stream.send.is_fin());
2718 
2719         assert_eq!(stream.send.write(third, false), Err(Error::FinalSize));
2720     }
2721 
2722     #[test]
send_fin_dup()2723     fn send_fin_dup() {
2724         let mut stream = Stream::new(0, 15, true, true, DEFAULT_STREAM_WINDOW);
2725 
2726         assert_eq!(stream.send.write(b"hello", true), Ok(5));
2727         assert!(stream.send.is_fin());
2728 
2729         assert_eq!(stream.send.write(b"", true), Ok(0));
2730         assert!(stream.send.is_fin());
2731     }
2732 
2733     #[test]
send_undo_fin()2734     fn send_undo_fin() {
2735         let mut stream = Stream::new(0, 15, true, true, DEFAULT_STREAM_WINDOW);
2736 
2737         assert_eq!(stream.send.write(b"hello", true), Ok(5));
2738         assert!(stream.send.is_fin());
2739 
2740         assert_eq!(
2741             stream.send.write(b"helloworld", true),
2742             Err(Error::FinalSize)
2743         );
2744     }
2745 
2746     #[test]
send_fin_max_data_match()2747     fn send_fin_max_data_match() {
2748         let mut buf = [0; 15];
2749 
2750         let mut stream = Stream::new(0, 15, true, true, DEFAULT_STREAM_WINDOW);
2751 
2752         let slice = b"hellohellohello";
2753 
2754         assert!(stream.send.write(slice, true).is_ok());
2755 
2756         let (written, fin) = stream.send.emit(&mut buf[..15]).unwrap();
2757         assert_eq!(written, 15);
2758         assert_eq!(fin, true);
2759         assert_eq!(&buf[..written], slice);
2760     }
2761 
2762     #[test]
send_fin_zero_length()2763     fn send_fin_zero_length() {
2764         let mut buf = [0; 5];
2765 
2766         let mut stream = Stream::new(0, 15, true, true, DEFAULT_STREAM_WINDOW);
2767 
2768         assert_eq!(stream.send.write(b"hello", false), Ok(5));
2769         assert_eq!(stream.send.write(b"", true), Ok(0));
2770         assert!(stream.send.is_fin());
2771 
2772         let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
2773         assert_eq!(written, 5);
2774         assert_eq!(fin, true);
2775         assert_eq!(&buf[..written], b"hello");
2776     }
2777 
2778     #[test]
send_ack()2779     fn send_ack() {
2780         let mut buf = [0; 5];
2781 
2782         let mut stream = Stream::new(0, 15, true, true, DEFAULT_STREAM_WINDOW);
2783 
2784         assert_eq!(stream.send.write(b"hello", false), Ok(5));
2785         assert_eq!(stream.send.write(b"world", false), Ok(5));
2786         assert_eq!(stream.send.write(b"", true), Ok(0));
2787         assert!(stream.send.is_fin());
2788 
2789         assert_eq!(stream.send.off_front(), 0);
2790 
2791         let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
2792         assert_eq!(written, 5);
2793         assert_eq!(fin, false);
2794         assert_eq!(&buf[..written], b"hello");
2795 
2796         stream.send.ack_and_drop(0, 5);
2797 
2798         stream.send.retransmit(0, 5);
2799 
2800         assert_eq!(stream.send.off_front(), 5);
2801 
2802         let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
2803         assert_eq!(written, 5);
2804         assert_eq!(fin, true);
2805         assert_eq!(&buf[..written], b"world");
2806     }
2807 
2808     #[test]
send_ack_reordering()2809     fn send_ack_reordering() {
2810         let mut buf = [0; 5];
2811 
2812         let mut stream = Stream::new(0, 15, true, true, DEFAULT_STREAM_WINDOW);
2813 
2814         assert_eq!(stream.send.write(b"hello", false), Ok(5));
2815         assert_eq!(stream.send.write(b"world", false), Ok(5));
2816         assert_eq!(stream.send.write(b"", true), Ok(0));
2817         assert!(stream.send.is_fin());
2818 
2819         assert_eq!(stream.send.off_front(), 0);
2820 
2821         let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
2822         assert_eq!(written, 5);
2823         assert_eq!(fin, false);
2824         assert_eq!(&buf[..written], b"hello");
2825 
2826         assert_eq!(stream.send.off_front(), 5);
2827 
2828         let (written, fin) = stream.send.emit(&mut buf[..1]).unwrap();
2829         assert_eq!(written, 1);
2830         assert_eq!(fin, false);
2831         assert_eq!(&buf[..written], b"w");
2832 
2833         stream.send.ack_and_drop(5, 1);
2834         stream.send.ack_and_drop(0, 5);
2835 
2836         stream.send.retransmit(0, 5);
2837         stream.send.retransmit(5, 1);
2838 
2839         assert_eq!(stream.send.off_front(), 6);
2840 
2841         let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
2842         assert_eq!(written, 4);
2843         assert_eq!(fin, true);
2844         assert_eq!(&buf[..written], b"orld");
2845     }
2846 
2847     #[test]
recv_data_below_off()2848     fn recv_data_below_off() {
2849         let mut stream = Stream::new(15, 0, true, true, DEFAULT_STREAM_WINDOW);
2850 
2851         let first = RangeBuf::from(b"hello", 0, false);
2852 
2853         assert_eq!(stream.recv.write(first), Ok(()));
2854 
2855         let mut buf = [0; 10];
2856 
2857         let (len, fin) = stream.recv.emit(&mut buf).unwrap();
2858         assert_eq!(&buf[..len], b"hello");
2859         assert_eq!(fin, false);
2860 
2861         let first = RangeBuf::from(b"elloworld", 1, true);
2862         assert_eq!(stream.recv.write(first), Ok(()));
2863 
2864         let (len, fin) = stream.recv.emit(&mut buf).unwrap();
2865         assert_eq!(&buf[..len], b"world");
2866         assert_eq!(fin, true);
2867     }
2868 
2869     #[test]
stream_complete()2870     fn stream_complete() {
2871         let mut stream = Stream::new(30, 30, true, true, DEFAULT_STREAM_WINDOW);
2872 
2873         assert_eq!(stream.send.write(b"hello", false), Ok(5));
2874         assert_eq!(stream.send.write(b"world", false), Ok(5));
2875 
2876         assert!(!stream.send.is_complete());
2877         assert!(!stream.send.is_fin());
2878 
2879         assert_eq!(stream.send.write(b"", true), Ok(0));
2880 
2881         assert!(!stream.send.is_complete());
2882         assert!(stream.send.is_fin());
2883 
2884         let buf = RangeBuf::from(b"hello", 0, true);
2885         assert!(stream.recv.write(buf).is_ok());
2886         assert!(!stream.recv.is_fin());
2887 
2888         stream.send.ack(6, 4);
2889         assert!(!stream.send.is_complete());
2890 
2891         let mut buf = [0; 2];
2892         assert_eq!(stream.recv.emit(&mut buf), Ok((2, false)));
2893         assert!(!stream.recv.is_fin());
2894 
2895         stream.send.ack(1, 5);
2896         assert!(!stream.send.is_complete());
2897 
2898         stream.send.ack(0, 1);
2899         assert!(stream.send.is_complete());
2900 
2901         assert!(!stream.is_complete());
2902 
2903         let mut buf = [0; 3];
2904         assert_eq!(stream.recv.emit(&mut buf), Ok((3, true)));
2905         assert!(stream.recv.is_fin());
2906 
2907         assert!(stream.is_complete());
2908     }
2909 
2910     #[test]
send_fin_zero_length_output()2911     fn send_fin_zero_length_output() {
2912         let mut buf = [0; 5];
2913 
2914         let mut stream = Stream::new(0, 15, true, true, DEFAULT_STREAM_WINDOW);
2915 
2916         assert_eq!(stream.send.write(b"hello", false), Ok(5));
2917         assert_eq!(stream.send.off_front(), 0);
2918         assert!(!stream.send.is_fin());
2919 
2920         let (written, fin) = stream.send.emit(&mut buf).unwrap();
2921         assert_eq!(written, 5);
2922         assert_eq!(fin, false);
2923         assert_eq!(&buf[..written], b"hello");
2924 
2925         assert_eq!(stream.send.write(b"", true), Ok(0));
2926         assert!(stream.send.is_fin());
2927         assert_eq!(stream.send.off_front(), 5);
2928 
2929         let (written, fin) = stream.send.emit(&mut buf).unwrap();
2930         assert_eq!(written, 0);
2931         assert_eq!(fin, true);
2932         assert_eq!(&buf[..written], b"");
2933     }
2934 
2935     #[test]
send_emit()2936     fn send_emit() {
2937         let mut buf = [0; 5];
2938 
2939         let mut stream = Stream::new(0, 20, true, true, DEFAULT_STREAM_WINDOW);
2940 
2941         assert_eq!(stream.send.write(b"hello", false), Ok(5));
2942         assert_eq!(stream.send.write(b"world", false), Ok(5));
2943         assert_eq!(stream.send.write(b"olleh", false), Ok(5));
2944         assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
2945         assert_eq!(stream.send.off_front(), 0);
2946         assert_eq!(stream.send.data.len(), 4);
2947 
2948         assert!(stream.is_flushable());
2949 
2950         assert!(stream.send.ready());
2951         assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
2952         assert_eq!(stream.send.off_front(), 4);
2953         assert_eq!(&buf[..4], b"hell");
2954 
2955         assert!(stream.send.ready());
2956         assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
2957         assert_eq!(stream.send.off_front(), 8);
2958         assert_eq!(&buf[..4], b"owor");
2959 
2960         assert!(stream.send.ready());
2961         assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
2962         assert_eq!(stream.send.off_front(), 10);
2963         assert_eq!(&buf[..2], b"ld");
2964 
2965         assert!(stream.send.ready());
2966         assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
2967         assert_eq!(stream.send.off_front(), 11);
2968         assert_eq!(&buf[..1], b"o");
2969 
2970         assert!(stream.send.ready());
2971         assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
2972         assert_eq!(stream.send.off_front(), 16);
2973         assert_eq!(&buf[..5], b"llehd");
2974 
2975         assert!(stream.send.ready());
2976         assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
2977         assert_eq!(stream.send.off_front(), 20);
2978         assert_eq!(&buf[..4], b"lrow");
2979 
2980         assert!(!stream.is_flushable());
2981 
2982         assert!(!stream.send.ready());
2983         assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
2984         assert_eq!(stream.send.off_front(), 20);
2985     }
2986 
2987     #[test]
send_emit_ack()2988     fn send_emit_ack() {
2989         let mut buf = [0; 5];
2990 
2991         let mut stream = Stream::new(0, 20, true, true, DEFAULT_STREAM_WINDOW);
2992 
2993         assert_eq!(stream.send.write(b"hello", false), Ok(5));
2994         assert_eq!(stream.send.write(b"world", false), Ok(5));
2995         assert_eq!(stream.send.write(b"olleh", false), Ok(5));
2996         assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
2997         assert_eq!(stream.send.off_front(), 0);
2998         assert_eq!(stream.send.data.len(), 4);
2999 
3000         assert!(stream.is_flushable());
3001 
3002         assert!(stream.send.ready());
3003         assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
3004         assert_eq!(stream.send.off_front(), 4);
3005         assert_eq!(&buf[..4], b"hell");
3006 
3007         assert!(stream.send.ready());
3008         assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
3009         assert_eq!(stream.send.off_front(), 8);
3010         assert_eq!(&buf[..4], b"owor");
3011 
3012         stream.send.ack_and_drop(0, 5);
3013         assert_eq!(stream.send.data.len(), 3);
3014 
3015         assert!(stream.send.ready());
3016         assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
3017         assert_eq!(stream.send.off_front(), 10);
3018         assert_eq!(&buf[..2], b"ld");
3019 
3020         stream.send.ack_and_drop(7, 5);
3021         assert_eq!(stream.send.data.len(), 3);
3022 
3023         assert!(stream.send.ready());
3024         assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
3025         assert_eq!(stream.send.off_front(), 11);
3026         assert_eq!(&buf[..1], b"o");
3027 
3028         assert!(stream.send.ready());
3029         assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
3030         assert_eq!(stream.send.off_front(), 16);
3031         assert_eq!(&buf[..5], b"llehd");
3032 
3033         stream.send.ack_and_drop(5, 7);
3034         assert_eq!(stream.send.data.len(), 2);
3035 
3036         assert!(stream.send.ready());
3037         assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
3038         assert_eq!(stream.send.off_front(), 20);
3039         assert_eq!(&buf[..4], b"lrow");
3040 
3041         assert!(!stream.is_flushable());
3042 
3043         assert!(!stream.send.ready());
3044         assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
3045         assert_eq!(stream.send.off_front(), 20);
3046 
3047         stream.send.ack_and_drop(22, 4);
3048         assert_eq!(stream.send.data.len(), 2);
3049 
3050         stream.send.ack_and_drop(20, 1);
3051         assert_eq!(stream.send.data.len(), 2);
3052     }
3053 
3054     #[test]
send_emit_retransmit()3055     fn send_emit_retransmit() {
3056         let mut buf = [0; 5];
3057 
3058         let mut stream = Stream::new(0, 20, true, true, DEFAULT_STREAM_WINDOW);
3059 
3060         assert_eq!(stream.send.write(b"hello", false), Ok(5));
3061         assert_eq!(stream.send.write(b"world", false), Ok(5));
3062         assert_eq!(stream.send.write(b"olleh", false), Ok(5));
3063         assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
3064         assert_eq!(stream.send.off_front(), 0);
3065         assert_eq!(stream.send.data.len(), 4);
3066 
3067         assert!(stream.is_flushable());
3068 
3069         assert!(stream.send.ready());
3070         assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
3071         assert_eq!(stream.send.off_front(), 4);
3072         assert_eq!(&buf[..4], b"hell");
3073 
3074         assert!(stream.send.ready());
3075         assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
3076         assert_eq!(stream.send.off_front(), 8);
3077         assert_eq!(&buf[..4], b"owor");
3078 
3079         stream.send.retransmit(3, 3);
3080         assert_eq!(stream.send.off_front(), 3);
3081 
3082         assert!(stream.send.ready());
3083         assert_eq!(stream.send.emit(&mut buf[..3]), Ok((3, false)));
3084         assert_eq!(stream.send.off_front(), 8);
3085         assert_eq!(&buf[..3], b"low");
3086 
3087         assert!(stream.send.ready());
3088         assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
3089         assert_eq!(stream.send.off_front(), 10);
3090         assert_eq!(&buf[..2], b"ld");
3091 
3092         stream.send.ack_and_drop(7, 2);
3093 
3094         stream.send.retransmit(8, 2);
3095 
3096         assert!(stream.send.ready());
3097         assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
3098         assert_eq!(stream.send.off_front(), 10);
3099         assert_eq!(&buf[..2], b"ld");
3100 
3101         assert!(stream.send.ready());
3102         assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
3103         assert_eq!(stream.send.off_front(), 11);
3104         assert_eq!(&buf[..1], b"o");
3105 
3106         assert!(stream.send.ready());
3107         assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
3108         assert_eq!(stream.send.off_front(), 16);
3109         assert_eq!(&buf[..5], b"llehd");
3110 
3111         stream.send.retransmit(12, 2);
3112 
3113         assert!(stream.send.ready());
3114         assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
3115         assert_eq!(stream.send.off_front(), 16);
3116         assert_eq!(&buf[..2], b"le");
3117 
3118         assert!(stream.send.ready());
3119         assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
3120         assert_eq!(stream.send.off_front(), 20);
3121         assert_eq!(&buf[..4], b"lrow");
3122 
3123         assert!(!stream.is_flushable());
3124 
3125         assert!(!stream.send.ready());
3126         assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
3127         assert_eq!(stream.send.off_front(), 20);
3128 
3129         stream.send.retransmit(7, 12);
3130 
3131         assert!(stream.send.ready());
3132         assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
3133         assert_eq!(stream.send.off_front(), 12);
3134         assert_eq!(&buf[..5], b"rldol");
3135 
3136         assert!(stream.send.ready());
3137         assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
3138         assert_eq!(stream.send.off_front(), 17);
3139         assert_eq!(&buf[..5], b"lehdl");
3140 
3141         assert!(stream.send.ready());
3142         assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
3143         assert_eq!(stream.send.off_front(), 20);
3144         assert_eq!(&buf[..2], b"ro");
3145 
3146         stream.send.ack_and_drop(12, 7);
3147 
3148         stream.send.retransmit(7, 12);
3149 
3150         assert!(stream.send.ready());
3151         assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
3152         assert_eq!(stream.send.off_front(), 12);
3153         assert_eq!(&buf[..5], b"rldol");
3154 
3155         assert!(stream.send.ready());
3156         assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
3157         assert_eq!(stream.send.off_front(), 17);
3158         assert_eq!(&buf[..5], b"lehdl");
3159 
3160         assert!(stream.send.ready());
3161         assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
3162         assert_eq!(stream.send.off_front(), 20);
3163         assert_eq!(&buf[..2], b"ro");
3164     }
3165 
3166     #[test]
rangebuf_split_off()3167     fn rangebuf_split_off() {
3168         let mut buf = RangeBuf::from(b"helloworld", 5, true);
3169         assert_eq!(buf.start, 0);
3170         assert_eq!(buf.pos, 0);
3171         assert_eq!(buf.len, 10);
3172         assert_eq!(buf.off, 5);
3173         assert_eq!(buf.fin, true);
3174 
3175         assert_eq!(buf.len(), 10);
3176         assert_eq!(buf.off(), 5);
3177         assert_eq!(buf.fin(), true);
3178 
3179         assert_eq!(&buf[..], b"helloworld");
3180 
3181         // Advance buffer.
3182         buf.consume(5);
3183 
3184         assert_eq!(buf.start, 0);
3185         assert_eq!(buf.pos, 5);
3186         assert_eq!(buf.len, 10);
3187         assert_eq!(buf.off, 5);
3188         assert_eq!(buf.fin, true);
3189 
3190         assert_eq!(buf.len(), 5);
3191         assert_eq!(buf.off(), 10);
3192         assert_eq!(buf.fin(), true);
3193 
3194         assert_eq!(&buf[..], b"world");
3195 
3196         // Split buffer before position.
3197         let mut new_buf = buf.split_off(3);
3198 
3199         assert_eq!(buf.start, 0);
3200         assert_eq!(buf.pos, 3);
3201         assert_eq!(buf.len, 3);
3202         assert_eq!(buf.off, 5);
3203         assert_eq!(buf.fin, false);
3204 
3205         assert_eq!(buf.len(), 0);
3206         assert_eq!(buf.off(), 8);
3207         assert_eq!(buf.fin(), false);
3208 
3209         assert_eq!(&buf[..], b"");
3210 
3211         assert_eq!(new_buf.start, 3);
3212         assert_eq!(new_buf.pos, 5);
3213         assert_eq!(new_buf.len, 7);
3214         assert_eq!(new_buf.off, 8);
3215         assert_eq!(new_buf.fin, true);
3216 
3217         assert_eq!(new_buf.len(), 5);
3218         assert_eq!(new_buf.off(), 10);
3219         assert_eq!(new_buf.fin(), true);
3220 
3221         assert_eq!(&new_buf[..], b"world");
3222 
3223         // Advance buffer.
3224         new_buf.consume(2);
3225 
3226         assert_eq!(new_buf.start, 3);
3227         assert_eq!(new_buf.pos, 7);
3228         assert_eq!(new_buf.len, 7);
3229         assert_eq!(new_buf.off, 8);
3230         assert_eq!(new_buf.fin, true);
3231 
3232         assert_eq!(new_buf.len(), 3);
3233         assert_eq!(new_buf.off(), 12);
3234         assert_eq!(new_buf.fin(), true);
3235 
3236         assert_eq!(&new_buf[..], b"rld");
3237 
3238         // Split buffer after position.
3239         let mut new_new_buf = new_buf.split_off(5);
3240 
3241         assert_eq!(new_buf.start, 3);
3242         assert_eq!(new_buf.pos, 7);
3243         assert_eq!(new_buf.len, 5);
3244         assert_eq!(new_buf.off, 8);
3245         assert_eq!(new_buf.fin, false);
3246 
3247         assert_eq!(new_buf.len(), 1);
3248         assert_eq!(new_buf.off(), 12);
3249         assert_eq!(new_buf.fin(), false);
3250 
3251         assert_eq!(&new_buf[..], b"r");
3252 
3253         assert_eq!(new_new_buf.start, 8);
3254         assert_eq!(new_new_buf.pos, 8);
3255         assert_eq!(new_new_buf.len, 2);
3256         assert_eq!(new_new_buf.off, 13);
3257         assert_eq!(new_new_buf.fin, true);
3258 
3259         assert_eq!(new_new_buf.len(), 2);
3260         assert_eq!(new_new_buf.off(), 13);
3261         assert_eq!(new_new_buf.fin(), true);
3262 
3263         assert_eq!(&new_new_buf[..], b"ld");
3264 
3265         // Advance buffer.
3266         new_new_buf.consume(2);
3267 
3268         assert_eq!(new_new_buf.start, 8);
3269         assert_eq!(new_new_buf.pos, 10);
3270         assert_eq!(new_new_buf.len, 2);
3271         assert_eq!(new_new_buf.off, 13);
3272         assert_eq!(new_new_buf.fin, true);
3273 
3274         assert_eq!(new_new_buf.len(), 0);
3275         assert_eq!(new_new_buf.off(), 15);
3276         assert_eq!(new_new_buf.fin(), true);
3277 
3278         assert_eq!(&new_new_buf[..], b"");
3279     }
3280 
3281     /// RFC9000 2.1: A stream ID that is used out of order results in all
3282     /// streams of that type with lower-numbered stream IDs also being opened.
3283     #[test]
stream_limit_auto_open()3284     fn stream_limit_auto_open() {
3285         let local_tp = crate::TransportParams::default();
3286         let peer_tp = crate::TransportParams::default();
3287 
3288         let mut streams = StreamMap::new(5, 5, 5);
3289 
3290         let stream_id = 500;
3291         assert!(!is_local(stream_id, true), "stream id is peer initiated");
3292         assert!(is_bidi(stream_id), "stream id is bidirectional");
3293         assert_eq!(
3294             streams
3295                 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
3296                 .err(),
3297             Some(Error::StreamLimit),
3298             "stream limit should be exceeded"
3299         );
3300     }
3301 
3302     /// Stream limit should be satisfied regardless of what order we open
3303     /// streams
3304     #[test]
stream_create_out_of_order()3305     fn stream_create_out_of_order() {
3306         let local_tp = crate::TransportParams::default();
3307         let peer_tp = crate::TransportParams::default();
3308 
3309         let mut streams = StreamMap::new(5, 5, 5);
3310 
3311         for stream_id in [8, 12, 4] {
3312             assert!(is_local(stream_id, false), "stream id is client initiated");
3313             assert!(is_bidi(stream_id), "stream id is bidirectional");
3314             assert!(streams
3315                 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
3316                 .is_ok());
3317         }
3318     }
3319 
3320     /// Check stream limit boundary cases
3321     #[test]
stream_limit_edge()3322     fn stream_limit_edge() {
3323         let local_tp = crate::TransportParams::default();
3324         let peer_tp = crate::TransportParams::default();
3325 
3326         let mut streams = StreamMap::new(3, 3, 3);
3327 
3328         // Highest permitted
3329         let stream_id = 8;
3330         assert!(streams
3331             .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
3332             .is_ok());
3333 
3334         // One more than highest permitted
3335         let stream_id = 12;
3336         assert_eq!(
3337             streams
3338                 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
3339                 .err(),
3340             Some(Error::StreamLimit)
3341         );
3342     }
3343 
3344     /// Check SendBuf::len calculation on a retransmit case
3345     #[test]
send_buf_len_on_retransmit()3346     fn send_buf_len_on_retransmit() {
3347         let mut buf = [0; 15];
3348 
3349         let mut send = SendBuf::new(u64::MAX);
3350         assert_eq!(send.len, 0);
3351         assert_eq!(send.off_front(), 0);
3352 
3353         let first = b"something";
3354 
3355         assert!(send.write(first, false).is_ok());
3356         assert_eq!(send.off_front(), 0);
3357 
3358         assert_eq!(send.len, 9);
3359 
3360         let (written, fin) = send.emit(&mut buf[..4]).unwrap();
3361         assert_eq!(written, 4);
3362         assert_eq!(fin, false);
3363         assert_eq!(&buf[..written], b"some");
3364         assert_eq!(send.len, 5);
3365         assert_eq!(send.off_front(), 4);
3366 
3367         send.retransmit(3, 5);
3368         assert_eq!(send.len, 6);
3369         assert_eq!(send.off_front(), 3);
3370     }
3371 
3372     #[test]
send_buf_final_size_retransmit()3373     fn send_buf_final_size_retransmit() {
3374         let mut buf = [0; 50];
3375         let mut send = SendBuf::new(u64::MAX);
3376 
3377         send.write(&buf, false).unwrap();
3378         assert_eq!(send.off_front(), 0);
3379 
3380         // Emit the whole buffer
3381         let (written, _fin) = send.emit(&mut buf).unwrap();
3382         assert_eq!(written, buf.len());
3383         assert_eq!(send.off_front(), buf.len() as u64);
3384 
3385         // Server decides to retransmit the last 10 bytes. It's possible
3386         // it's not actually lost and that the client did receive it.
3387         send.retransmit(40, 10);
3388 
3389         // Server receives STOP_SENDING from client. The final_size we
3390         // send in the RESET_STREAM should be 50. If we send anything less,
3391         // it's a FINAL_SIZE_ERROR.
3392         let (fin_off, unsent) = send.stop(0).unwrap();
3393         assert_eq!(fin_off, 50);
3394         assert_eq!(unsent, 0);
3395     }
3396 }
3397