1 // Copyright (C) 2018-2019, Cloudflare, Inc.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 // * Redistributions of source code must retain the above copyright notice,
9 // this list of conditions and the following disclaimer.
10 //
11 // * Redistributions in binary form must reproduce the above copyright
12 // notice, this list of conditions and the following disclaimer in the
13 // documentation and/or other materials provided with the distribution.
14 //
15 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16 // IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17 // THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27 use std::cmp;
28
29 use std::collections::hash_map;
30 use std::collections::BTreeMap;
31 use std::collections::BinaryHeap;
32 use std::collections::HashMap;
33 use std::collections::HashSet;
34 use std::collections::VecDeque;
35
36 use crate::Error;
37 use crate::Result;
38
39 use crate::ranges;
40
41 const DEFAULT_URGENCY: u8 = 127;
42
43 /// Keeps track of QUIC streams and enforces stream limits.
44 #[derive(Default)]
45 pub struct StreamMap {
46 /// Map of streams indexed by stream ID.
47 streams: HashMap<u64, Stream>,
48
49 /// Set of streams that were completed and garbage collected.
50 ///
51 /// Instead of keeping the full stream state forever, we collect completed
52 /// streams to save memory, but we still need to keep track of previously
53 /// created streams, to prevent peers from re-creating them.
54 collected: HashSet<u64>,
55
56 /// Peer's maximum bidirectional stream count limit.
57 peer_max_streams_bidi: u64,
58
59 /// Peer's maximum unidirectional stream count limit.
60 peer_max_streams_uni: u64,
61
62 /// The total number of bidirectional streams opened by the peer.
63 peer_opened_streams_bidi: u64,
64
65 /// The total number of unidirectional streams opened by the peer.
66 peer_opened_streams_uni: u64,
67
68 /// Local maximum bidirectional stream count limit.
69 local_max_streams_bidi: u64,
70 local_max_streams_bidi_next: u64,
71
72 /// Local maximum unidirectional stream count limit.
73 local_max_streams_uni: u64,
74 local_max_streams_uni_next: u64,
75
76 /// The total number of bidirectional streams opened by the local endpoint.
77 local_opened_streams_bidi: u64,
78
79 /// The total number of unidirectional streams opened by the local endpoint.
80 local_opened_streams_uni: u64,
81
82 /// Queue of stream IDs corresponding to streams that have buffered data
83 /// ready to be sent to the peer. This also implies that the stream has
84 /// enough flow control credits to send at least some of that data.
85 ///
86 /// Streams are grouped by their priority, where each urgency level has two
87 /// queues, one for non-incremental streams and one for incremental ones.
88 ///
89 /// Streams with lower urgency level are scheduled first, and within the
90 /// same urgency level Non-incremental streams are scheduled first, in the
91 /// order of their stream IDs, and incremental streams are scheduled in a
92 /// round-robin fashion after all non-incremental streams have been flushed.
93 flushable: BTreeMap<u8, (BinaryHeap<std::cmp::Reverse<u64>>, VecDeque<u64>)>,
94
95 /// Set of stream IDs corresponding to streams that have outstanding data
96 /// to read. This is used to generate a `StreamIter` of streams without
97 /// having to iterate over the full list of streams.
98 readable: HashSet<u64>,
99
100 /// Set of stream IDs corresponding to streams that have enough flow control
101 /// capacity to be written to, and is not finished. This is used to generate
102 /// a `StreamIter` of streams without having to iterate over the full list
103 /// of streams.
104 writable: HashSet<u64>,
105
106 /// Set of stream IDs corresponding to streams that are almost out of flow
107 /// control credit and need to send MAX_STREAM_DATA. This is used to
108 /// generate a `StreamIter` of streams without having to iterate over the
109 /// full list of streams.
110 almost_full: HashSet<u64>,
111
112 /// Set of stream IDs corresponding to streams that are blocked. The value
113 /// of the map elements represents the offset of the stream at which the
114 /// blocking occurred.
115 blocked: HashMap<u64, u64>,
116 }
117
118 impl StreamMap {
new(max_streams_bidi: u64, max_streams_uni: u64) -> StreamMap119 pub fn new(max_streams_bidi: u64, max_streams_uni: u64) -> StreamMap {
120 StreamMap {
121 local_max_streams_bidi: max_streams_bidi,
122 local_max_streams_bidi_next: max_streams_bidi,
123
124 local_max_streams_uni: max_streams_uni,
125 local_max_streams_uni_next: max_streams_uni,
126
127 ..StreamMap::default()
128 }
129 }
130
131 /// Returns the stream with the given ID if it exists.
get(&self, id: u64) -> Option<&Stream>132 pub fn get(&self, id: u64) -> Option<&Stream> {
133 self.streams.get(&id)
134 }
135
136 /// Returns the mutable stream with the given ID if it exists.
get_mut(&mut self, id: u64) -> Option<&mut Stream>137 pub fn get_mut(&mut self, id: u64) -> Option<&mut Stream> {
138 self.streams.get_mut(&id)
139 }
140
141 /// Returns the mutable stream with the given ID if it exists, or creates
142 /// a new one otherwise.
143 ///
144 /// The `local` parameter indicates whether the stream's creation was
145 /// requested by the local application rather than the peer, and is
146 /// used to validate the requested stream ID, and to select the initial
147 /// flow control values from the local and remote transport parameters
148 /// (also passed as arguments).
149 ///
150 /// This also takes care of enforcing both local and the peer's stream
151 /// count limits. If one of these limits is violated, the `StreamLimit`
152 /// error is returned.
get_or_create( &mut self, id: u64, local_params: &crate::TransportParams, peer_params: &crate::TransportParams, local: bool, is_server: bool, ) -> Result<&mut Stream>153 pub(crate) fn get_or_create(
154 &mut self, id: u64, local_params: &crate::TransportParams,
155 peer_params: &crate::TransportParams, local: bool, is_server: bool,
156 ) -> Result<&mut Stream> {
157 let stream = match self.streams.entry(id) {
158 hash_map::Entry::Vacant(v) => {
159 // Stream has already been closed and garbage collected.
160 if self.collected.contains(&id) {
161 return Err(Error::Done);
162 }
163
164 if local != is_local(id, is_server) {
165 return Err(Error::InvalidStreamState);
166 }
167
168 let (max_rx_data, max_tx_data) = match (local, is_bidi(id)) {
169 // Locally-initiated bidirectional stream.
170 (true, true) => (
171 local_params.initial_max_stream_data_bidi_local,
172 peer_params.initial_max_stream_data_bidi_remote,
173 ),
174
175 // Locally-initiated unidirectional stream.
176 (true, false) => (0, peer_params.initial_max_stream_data_uni),
177
178 // Remotely-initiated bidirectional stream.
179 (false, true) => (
180 local_params.initial_max_stream_data_bidi_remote,
181 peer_params.initial_max_stream_data_bidi_local,
182 ),
183
184 // Remotely-initiated unidirectional stream.
185 (false, false) =>
186 (local_params.initial_max_stream_data_uni, 0),
187 };
188
189 // Enforce stream count limits.
190 match (is_local(id, is_server), is_bidi(id)) {
191 (true, true) => {
192 if self.local_opened_streams_bidi >=
193 self.peer_max_streams_bidi
194 {
195 return Err(Error::StreamLimit);
196 }
197
198 self.local_opened_streams_bidi += 1;
199 },
200
201 (true, false) => {
202 if self.local_opened_streams_uni >=
203 self.peer_max_streams_uni
204 {
205 return Err(Error::StreamLimit);
206 }
207
208 self.local_opened_streams_uni += 1;
209 },
210
211 (false, true) => {
212 if self.peer_opened_streams_bidi >=
213 self.local_max_streams_bidi
214 {
215 return Err(Error::StreamLimit);
216 }
217
218 self.peer_opened_streams_bidi += 1;
219 },
220
221 (false, false) => {
222 if self.peer_opened_streams_uni >=
223 self.local_max_streams_uni
224 {
225 return Err(Error::StreamLimit);
226 }
227
228 self.peer_opened_streams_uni += 1;
229 },
230 };
231
232 let s = Stream::new(max_rx_data, max_tx_data, is_bidi(id), local);
233 v.insert(s)
234 },
235
236 hash_map::Entry::Occupied(v) => v.into_mut(),
237 };
238
239 // Stream might already be writable due to initial flow control limits.
240 if stream.is_writable() {
241 self.writable.insert(id);
242 }
243
244 Ok(stream)
245 }
246
247 /// Pushes the stream ID to the back of the flushable streams queue with
248 /// the specified urgency.
249 ///
250 /// Note that the caller is responsible for checking that the specified
251 /// stream ID was not in the queue already before calling this.
252 ///
253 /// Queueing a stream multiple times simultaneously means that it might be
254 /// unfairly scheduled more often than other streams, and might also cause
255 /// spurious cycles through the queue, so it should be avoided.
push_flushable(&mut self, stream_id: u64, urgency: u8, incr: bool)256 pub fn push_flushable(&mut self, stream_id: u64, urgency: u8, incr: bool) {
257 // Push the element to the back of the queue corresponding to the given
258 // urgency. If the queue doesn't exist yet, create it first.
259 let queues = self
260 .flushable
261 .entry(urgency)
262 .or_insert_with(|| (BinaryHeap::new(), VecDeque::new()));
263
264 if !incr {
265 // Non-incremental streams are scheduled in order of their stream ID.
266 queues.0.push(std::cmp::Reverse(stream_id))
267 } else {
268 // Incremental streams are scheduled in a round-robin fashion.
269 queues.1.push_back(stream_id)
270 };
271 }
272
273 /// Removes and returns the first stream ID from the flushable streams
274 /// queue with the specified urgency.
275 ///
276 /// Note that if the stream is still flushable after sending some of its
277 /// outstanding data, it needs to be added back to the queue.
pop_flushable(&mut self) -> Option<u64>278 pub fn pop_flushable(&mut self) -> Option<u64> {
279 // Remove the first element from the queue corresponding to the lowest
280 // urgency that has elements.
281 let (node, clear) =
282 if let Some((urgency, queues)) = self.flushable.iter_mut().next() {
283 let node = if !queues.0.is_empty() {
284 queues.0.pop().map(|x| x.0)
285 } else {
286 queues.1.pop_front()
287 };
288
289 let clear = if queues.0.is_empty() && queues.1.is_empty() {
290 Some(*urgency)
291 } else {
292 None
293 };
294
295 (node, clear)
296 } else {
297 (None, None)
298 };
299
300 // Remove the queue from the list of queues if it is now empty, so that
301 // the next time `pop_flushable()` is called the next queue with elements
302 // is used.
303 if let Some(urgency) = &clear {
304 self.flushable.remove(urgency);
305 }
306
307 node
308 }
309
310 /// Adds or removes the stream ID to/from the readable streams set.
311 ///
312 /// If the stream was already in the list, this does nothing.
mark_readable(&mut self, stream_id: u64, readable: bool)313 pub fn mark_readable(&mut self, stream_id: u64, readable: bool) {
314 if readable {
315 self.readable.insert(stream_id);
316 } else {
317 self.readable.remove(&stream_id);
318 }
319 }
320
321 /// Adds or removes the stream ID to/from the writable streams set.
322 ///
323 /// This should also be called anytime a new stream is created, in addition
324 /// to when an existing stream becomes writable (or stops being writable).
325 ///
326 /// If the stream was already in the list, this does nothing.
mark_writable(&mut self, stream_id: u64, writable: bool)327 pub fn mark_writable(&mut self, stream_id: u64, writable: bool) {
328 if writable {
329 self.writable.insert(stream_id);
330 } else {
331 self.writable.remove(&stream_id);
332 }
333 }
334
335 /// Adds or removes the stream ID to/from the almost full streams set.
336 ///
337 /// If the stream was already in the list, this does nothing.
mark_almost_full(&mut self, stream_id: u64, almost_full: bool)338 pub fn mark_almost_full(&mut self, stream_id: u64, almost_full: bool) {
339 if almost_full {
340 self.almost_full.insert(stream_id);
341 } else {
342 self.almost_full.remove(&stream_id);
343 }
344 }
345
346 /// Adds or removes the stream ID to/from the blocked streams set with the
347 /// given offset value.
348 ///
349 /// If the stream was already in the list, this does nothing.
mark_blocked(&mut self, stream_id: u64, blocked: bool, off: u64)350 pub fn mark_blocked(&mut self, stream_id: u64, blocked: bool, off: u64) {
351 if blocked {
352 self.blocked.insert(stream_id, off);
353 } else {
354 self.blocked.remove(&stream_id);
355 }
356 }
357
358 /// Updates the peer's maximum bidirectional stream count limit.
update_peer_max_streams_bidi(&mut self, v: u64)359 pub fn update_peer_max_streams_bidi(&mut self, v: u64) {
360 self.peer_max_streams_bidi = cmp::max(self.peer_max_streams_bidi, v);
361 }
362
363 /// Updates the peer's maximum unidirectional stream count limit.
update_peer_max_streams_uni(&mut self, v: u64)364 pub fn update_peer_max_streams_uni(&mut self, v: u64) {
365 self.peer_max_streams_uni = cmp::max(self.peer_max_streams_uni, v);
366 }
367
368 /// Commits the new max_streams_bidi limit.
update_max_streams_bidi(&mut self)369 pub fn update_max_streams_bidi(&mut self) {
370 self.local_max_streams_bidi = self.local_max_streams_bidi_next;
371 }
372
373 /// Returns the new max_streams_bidi limit.
max_streams_bidi_next(&mut self) -> u64374 pub fn max_streams_bidi_next(&mut self) -> u64 {
375 self.local_max_streams_bidi_next
376 }
377
378 /// Commits the new max_streams_uni limit.
update_max_streams_uni(&mut self)379 pub fn update_max_streams_uni(&mut self) {
380 self.local_max_streams_uni = self.local_max_streams_uni_next;
381 }
382
383 /// Returns the new max_streams_uni limit.
max_streams_uni_next(&mut self) -> u64384 pub fn max_streams_uni_next(&mut self) -> u64 {
385 self.local_max_streams_uni_next
386 }
387
388 /// Drops completed stream.
389 ///
390 /// This should only be called when Stream::is_complete() returns true for
391 /// the given stream.
collect(&mut self, stream_id: u64, local: bool)392 pub fn collect(&mut self, stream_id: u64, local: bool) {
393 if !local {
394 // If the stream was created by the peer, give back a max streams
395 // credit.
396 if is_bidi(stream_id) {
397 self.local_max_streams_bidi_next =
398 self.local_max_streams_bidi_next.saturating_add(1);
399 } else {
400 self.local_max_streams_uni_next =
401 self.local_max_streams_uni_next.saturating_add(1);
402 }
403 }
404
405 self.streams.remove(&stream_id);
406 self.collected.insert(stream_id);
407 }
408
409 /// Creates an iterator over streams that have outstanding data to read.
readable(&self) -> StreamIter410 pub fn readable(&self) -> StreamIter {
411 StreamIter::from(&self.readable)
412 }
413
414 /// Creates an iterator over streams that can be written to.
writable(&self) -> StreamIter415 pub fn writable(&self) -> StreamIter {
416 StreamIter::from(&self.writable)
417 }
418
419 /// Creates an iterator over streams that need to send MAX_STREAM_DATA.
almost_full(&self) -> StreamIter420 pub fn almost_full(&self) -> StreamIter {
421 StreamIter::from(&self.almost_full)
422 }
423
424 /// Creates an iterator over streams that need to send STREAM_DATA_BLOCKED.
blocked(&self) -> hash_map::Iter<u64, u64>425 pub fn blocked(&self) -> hash_map::Iter<u64, u64> {
426 self.blocked.iter()
427 }
428
429 /// Returns true if there are any streams that have data to write.
has_flushable(&self) -> bool430 pub fn has_flushable(&self) -> bool {
431 !self.flushable.is_empty()
432 }
433
434 /// Returns true if there are any streams that need to update the local
435 /// flow control limit.
has_almost_full(&self) -> bool436 pub fn has_almost_full(&self) -> bool {
437 !self.almost_full.is_empty()
438 }
439
440 /// Returns true if there are any streams that are blocked.
has_blocked(&self) -> bool441 pub fn has_blocked(&self) -> bool {
442 !self.blocked.is_empty()
443 }
444
445 /// Returns true if the max bidirectional streams count needs to be updated
446 /// by sending a MAX_STREAMS frame to the peer.
should_update_max_streams_bidi(&self) -> bool447 pub fn should_update_max_streams_bidi(&self) -> bool {
448 self.local_max_streams_bidi_next != self.local_max_streams_bidi &&
449 self.local_max_streams_bidi_next / 2 >
450 self.local_max_streams_bidi - self.peer_opened_streams_bidi
451 }
452
453 /// Returns true if the max unidirectional streams count needs to be updated
454 /// by sending a MAX_STREAMS frame to the peer.
should_update_max_streams_uni(&self) -> bool455 pub fn should_update_max_streams_uni(&self) -> bool {
456 self.local_max_streams_uni_next != self.local_max_streams_uni &&
457 self.local_max_streams_uni_next / 2 >
458 self.local_max_streams_uni - self.peer_opened_streams_uni
459 }
460
461 /// Returns the number of active streams in the map.
462 #[cfg(test)]
len(&self) -> usize463 pub fn len(&self) -> usize {
464 self.streams.len()
465 }
466 }
467
468 /// A QUIC stream.
469 #[derive(Default)]
470 pub struct Stream {
471 /// Receive-side stream buffer.
472 pub recv: RecvBuf,
473
474 /// Send-side stream buffer.
475 pub send: SendBuf,
476
477 /// Whether the stream is bidirectional.
478 pub bidi: bool,
479
480 /// Whether the stream was created by the local endpoint.
481 pub local: bool,
482
483 /// Application data.
484 pub data: Option<Box<dyn Send + std::any::Any>>,
485
486 /// The stream's urgency (lower is better). Default is `DEFAULT_URGENCY`.
487 pub urgency: u8,
488
489 /// Whether the stream can be flushed incrementally. Default is `true`.
490 pub incremental: bool,
491 }
492
493 impl Stream {
494 /// Creates a new stream with the given flow control limits.
new( max_rx_data: u64, max_tx_data: u64, bidi: bool, local: bool, ) -> Stream495 pub fn new(
496 max_rx_data: u64, max_tx_data: u64, bidi: bool, local: bool,
497 ) -> Stream {
498 Stream {
499 recv: RecvBuf::new(max_rx_data),
500 send: SendBuf::new(max_tx_data),
501 bidi,
502 local,
503 data: None,
504 urgency: DEFAULT_URGENCY,
505 incremental: true,
506 }
507 }
508
509 /// Returns true if the stream has data to read.
is_readable(&self) -> bool510 pub fn is_readable(&self) -> bool {
511 self.recv.ready()
512 }
513
514 /// Returns true if the stream has enough flow control capacity to be
515 /// written to, and is not finished.
is_writable(&self) -> bool516 pub fn is_writable(&self) -> bool {
517 !self.send.shutdown &&
518 !self.send.is_fin() &&
519 self.send.off < self.send.max_data
520 }
521
522 /// Returns true if the stream has data to send and is allowed to send at
523 /// least some of it.
is_flushable(&self) -> bool524 pub fn is_flushable(&self) -> bool {
525 self.send.ready() && self.send.off_front() < self.send.max_data
526 }
527
528 /// Returns true if the stream is complete.
529 ///
530 /// For bidirectional streams this happens when both the receive and send
531 /// sides are complete. That is when all incoming data has been read by the
532 /// application, and when all outgoing data has been acked by the peer.
533 ///
534 /// For unidirectional streams this happens when either the receive or send
535 /// side is complete, depending on whether the stream was created locally
536 /// or not.
is_complete(&self) -> bool537 pub fn is_complete(&self) -> bool {
538 match (self.bidi, self.local) {
539 // For bidirectional streams we need to check both receive and send
540 // sides for completion.
541 (true, _) => self.recv.is_fin() && self.send.is_complete(),
542
543 // For unidirectional streams generated locally, we only need to
544 // check the send side for completion.
545 (false, true) => self.send.is_complete(),
546
547 // For unidirectional streams generated by the peer, we only need
548 // to check the receive side for completion.
549 (false, false) => self.recv.is_fin(),
550 }
551 }
552 }
553
554 /// Returns true if the stream was created locally.
is_local(stream_id: u64, is_server: bool) -> bool555 pub fn is_local(stream_id: u64, is_server: bool) -> bool {
556 (stream_id & 0x1) == (is_server as u64)
557 }
558
559 /// Returns true if the stream is bidirectional.
is_bidi(stream_id: u64) -> bool560 pub fn is_bidi(stream_id: u64) -> bool {
561 (stream_id & 0x2) == 0
562 }
563
564 /// An iterator over QUIC streams.
565 #[derive(Default)]
566 pub struct StreamIter {
567 streams: Vec<u64>,
568 }
569
570 impl StreamIter {
from(streams: &HashSet<u64>) -> Self571 fn from(streams: &HashSet<u64>) -> Self {
572 StreamIter {
573 streams: streams.iter().copied().collect(),
574 }
575 }
576 }
577
578 impl Iterator for StreamIter {
579 type Item = u64;
580
next(&mut self) -> Option<Self::Item>581 fn next(&mut self) -> Option<Self::Item> {
582 self.streams.pop()
583 }
584 }
585
586 impl ExactSizeIterator for StreamIter {
len(&self) -> usize587 fn len(&self) -> usize {
588 self.streams.len()
589 }
590 }
591
592 /// Receive-side stream buffer.
593 ///
594 /// Stream data received by the peer is buffered in a list of data chunks
595 /// ordered by offset in ascending order. Contiguous data can then be read
596 /// into a slice.
597 #[derive(Debug, Default)]
598 pub struct RecvBuf {
599 /// Chunks of data received from the peer that have not yet been read by
600 /// the application, ordered by offset.
601 data: BinaryHeap<RangeBuf>,
602
603 /// The lowest data offset that has yet to be read by the application.
604 off: u64,
605
606 /// The total length of data received on this stream.
607 len: u64,
608
609 /// The maximum offset the peer is allowed to send us.
610 max_data: u64,
611
612 /// The updated maximum offset the peer is allowed to send us.
613 max_data_next: u64,
614
615 /// The final stream offset received from the peer, if any.
616 fin_off: Option<u64>,
617
618 /// Whether incoming data is validated but not buffered.
619 drain: bool,
620 }
621
622 impl RecvBuf {
623 /// Creates a new receive buffer.
new(max_data: u64) -> RecvBuf624 fn new(max_data: u64) -> RecvBuf {
625 RecvBuf {
626 max_data,
627 max_data_next: max_data,
628 ..RecvBuf::default()
629 }
630 }
631
632 /// Inserts the given chunk of data in the buffer.
633 ///
634 /// This also takes care of enforcing stream flow control limits, as well
635 /// as handling incoming data that overlaps data that is already in the
636 /// buffer.
push(&mut self, buf: RangeBuf) -> Result<()>637 pub fn push(&mut self, buf: RangeBuf) -> Result<()> {
638 if buf.max_off() > self.max_data {
639 return Err(Error::FlowControl);
640 }
641
642 if let Some(fin_off) = self.fin_off {
643 // Stream's size is known, forbid data beyond that point.
644 if buf.max_off() > fin_off {
645 return Err(Error::FinalSize);
646 }
647
648 // Stream's size is already known, forbid changing it.
649 if buf.fin() && fin_off != buf.max_off() {
650 return Err(Error::FinalSize);
651 }
652 }
653
654 // Stream's known size is lower than data already received.
655 if buf.fin() && buf.max_off() < self.len {
656 return Err(Error::FinalSize);
657 }
658
659 // We already saved the final offset, so there's nothing else we
660 // need to keep from the RangeBuf if it's empty.
661 if self.fin_off.is_some() && buf.is_empty() {
662 return Ok(());
663 }
664
665 // No need to process an empty buffer with the fin flag, if we already
666 // know the final size.
667 if buf.fin() && buf.is_empty() && self.fin_off.is_some() {
668 return Ok(());
669 }
670
671 if buf.fin() {
672 self.fin_off = Some(buf.max_off());
673 }
674
675 // No need to store empty buffer that doesn't carry the fin flag.
676 if !buf.fin() && buf.is_empty() {
677 return Ok(());
678 }
679
680 // Check if data is fully duplicate, that is the buffer's max offset is
681 // lower or equal to the offset already stored in the recv buffer.
682 if self.off >= buf.max_off() {
683 // An exception is applied to empty range buffers, because an empty
684 // buffer's max offset matches the max offset of the recv buffer.
685 //
686 // By this point all spurious empty buffers should have already been
687 // discarded, so allowing empty buffers here should be safe.
688 if !buf.is_empty() {
689 return Ok(());
690 }
691 }
692
693 if self.drain {
694 return Ok(());
695 }
696
697 let mut tmp_buf = Some(buf);
698
699 while let Some(mut buf) = tmp_buf {
700 tmp_buf = None;
701
702 // Discard incoming data below current stream offset. Bytes up to
703 // `self.off` have already been received so we should not buffer
704 // them again. This is also important to make sure `ready()` doesn't
705 // get stuck when a buffer with lower offset than the stream's is
706 // buffered.
707 if self.off > buf.off() {
708 buf = buf.split_off((self.off - buf.off()) as usize);
709 }
710
711 for b in &self.data {
712 // New buffer is fully contained in existing buffer.
713 if buf.off() >= b.off() && buf.max_off() <= b.max_off() {
714 return Ok(());
715 }
716
717 // New buffer's start overlaps existing buffer.
718 if buf.off() >= b.off() && buf.off() < b.max_off() {
719 buf = buf.split_off((b.max_off() - buf.off()) as usize);
720 }
721
722 // New buffer's end overlaps existing buffer.
723 if buf.off() < b.off() && buf.max_off() > b.off() {
724 tmp_buf = Some(buf.split_off((b.off() - buf.off()) as usize));
725 }
726 }
727
728 self.len = cmp::max(self.len, buf.max_off());
729
730 self.data.push(buf);
731 }
732
733 Ok(())
734 }
735
736 /// Writes data from the receive buffer into the given output buffer.
737 ///
738 /// Only contiguous data is written to the output buffer, starting from
739 /// offset 0. The offset is incremented as data is read out of the receive
740 /// buffer into the application buffer. If there is no data at the expected
741 /// read offset, the `Done` error is returned.
742 ///
743 /// On success the amount of data read, and a flag indicating if there is
744 /// no more data in the buffer, are returned as a tuple.
pop(&mut self, out: &mut [u8]) -> Result<(usize, bool)>745 pub fn pop(&mut self, out: &mut [u8]) -> Result<(usize, bool)> {
746 let mut len = 0;
747 let mut cap = out.len();
748
749 if !self.ready() {
750 return Err(Error::Done);
751 }
752
753 while cap > 0 && self.ready() {
754 let mut buf = match self.data.peek_mut() {
755 Some(v) => v,
756
757 None => break,
758 };
759
760 let buf_len = cmp::min(buf.len(), cap);
761
762 out[len..len + buf_len].copy_from_slice(&buf[..buf_len]);
763
764 self.off += buf_len as u64;
765
766 len += buf_len;
767 cap -= buf_len;
768
769 if buf_len < buf.len() {
770 buf.consume(buf_len);
771
772 // We reached the maximum capacity, so end here.
773 break;
774 }
775
776 std::collections::binary_heap::PeekMut::pop(buf);
777 }
778
779 self.max_data_next = self.max_data_next.saturating_add(len as u64);
780
781 Ok((len, self.is_fin()))
782 }
783
784 /// Resets the stream at the given offset.
reset(&mut self, final_size: u64) -> Result<usize>785 pub fn reset(&mut self, final_size: u64) -> Result<usize> {
786 // Stream's size is already known, forbid changing it.
787 if let Some(fin_off) = self.fin_off {
788 if fin_off != final_size {
789 return Err(Error::FinalSize);
790 }
791 }
792
793 // Stream's known size is lower than data already received.
794 if final_size < self.len {
795 return Err(Error::FinalSize);
796 }
797
798 self.fin_off = Some(final_size);
799
800 // Return how many bytes need to be removed from the connection flow
801 // control.
802 Ok((final_size - self.len) as usize)
803 }
804
805 /// Commits the new max_data limit.
update_max_data(&mut self)806 pub fn update_max_data(&mut self) {
807 self.max_data = self.max_data_next;
808 }
809
810 /// Return the new max_data limit.
max_data_next(&mut self) -> u64811 pub fn max_data_next(&mut self) -> u64 {
812 self.max_data_next
813 }
814
815 /// Shuts down receiving data.
shutdown(&mut self) -> Result<()>816 pub fn shutdown(&mut self) -> Result<()> {
817 if self.drain {
818 return Err(Error::Done);
819 }
820
821 self.drain = true;
822
823 self.data.clear();
824
825 Ok(())
826 }
827
828 /// Returns the lowest offset of data buffered.
829 #[allow(dead_code)]
off_front(&self) -> u64830 pub fn off_front(&self) -> u64 {
831 self.off
832 }
833
834 /// Returns true if we need to update the local flow control limit.
almost_full(&self) -> bool835 pub fn almost_full(&self) -> bool {
836 // Send MAX_STREAM_DATA when the new limit is at least double the
837 // amount of data that can be received before blocking.
838 self.fin_off.is_none() &&
839 self.max_data_next != self.max_data &&
840 self.max_data_next / 2 > self.max_data - self.len
841 }
842
843 /// Returns the largest offset ever received.
max_off(&self) -> u64844 pub fn max_off(&self) -> u64 {
845 self.len
846 }
847
848 /// Returns true if the receive-side of the stream is complete.
849 ///
850 /// This happens when the stream's receive final size is known, and the
851 /// application has read all data from the stream.
is_fin(&self) -> bool852 pub fn is_fin(&self) -> bool {
853 if self.fin_off == Some(self.off) {
854 return true;
855 }
856
857 false
858 }
859
860 /// Returns true if the stream has data to be read.
ready(&self) -> bool861 fn ready(&self) -> bool {
862 let buf = match self.data.peek() {
863 Some(v) => v,
864
865 None => return false,
866 };
867
868 buf.off() == self.off
869 }
870 }
871
872 /// Send-side stream buffer.
873 ///
874 /// Stream data scheduled to be sent to the peer is buffered in a list of data
875 /// chunks ordered by offset in ascending order. Contiguous data can then be
876 /// read into a slice.
877 ///
878 /// By default, new data is appended at the end of the stream, but data can be
879 /// inserted at the start of the buffer (this is to allow data that needs to be
880 /// retransmitted to be re-buffered).
881 #[derive(Debug, Default)]
882 pub struct SendBuf {
883 /// Chunks of data to be sent, ordered by offset.
884 data: BinaryHeap<RangeBuf>,
885
886 /// The maximum offset of data buffered in the stream.
887 off: u64,
888
889 /// The amount of data that was ever written to this stream.
890 len: u64,
891
892 /// The maximum offset we are allowed to send to the peer.
893 max_data: u64,
894
895 /// The final stream offset written to the stream, if any.
896 fin_off: Option<u64>,
897
898 /// Whether the stream's send-side has been shut down.
899 shutdown: bool,
900
901 /// Ranges of data offsets that have been acked.
902 acked: ranges::RangeSet,
903 }
904
905 impl SendBuf {
906 /// Creates a new send buffer.
new(max_data: u64) -> SendBuf907 fn new(max_data: u64) -> SendBuf {
908 SendBuf {
909 max_data,
910 ..SendBuf::default()
911 }
912 }
913
914 /// Inserts the given slice of data at the end of the buffer.
915 ///
916 /// The number of bytes that were actually stored in the buffer is returned
917 /// (this may be lower than the size of the input buffer, in case of partial
918 /// writes).
push_slice( &mut self, mut data: &[u8], mut fin: bool, ) -> Result<usize>919 pub fn push_slice(
920 &mut self, mut data: &[u8], mut fin: bool,
921 ) -> Result<usize> {
922 if self.shutdown {
923 // Since we won't write any more data anyway, pretend that we sent
924 // all data that was passed in.
925 return Ok(data.len());
926 }
927
928 if data.is_empty() {
929 // Create a dummy range buffer, in order to propagate the `fin` flag
930 // into `RangeBuf::push()`. This will be discarded later on.
931 let buf = RangeBuf::from(&[], self.off, fin);
932
933 return self.push(buf).map(|_| 0);
934 }
935
936 if data.len() > self.cap() {
937 // Truncate the input buffer according to the stream's capacity.
938 let len = self.cap();
939 data = &data[..len];
940
941 // We are not buffering the full input, so clear the fin flag.
942 fin = false;
943 }
944
945 let buf = RangeBuf::from(data, self.off, fin);
946 self.push(buf)?;
947
948 self.off += data.len() as u64;
949
950 Ok(data.len())
951 }
952
953 /// Inserts the given chunk of data in the buffer.
push(&mut self, buf: RangeBuf) -> Result<()>954 pub fn push(&mut self, buf: RangeBuf) -> Result<()> {
955 if let Some(fin_off) = self.fin_off {
956 // Can't write past final offset.
957 if buf.max_off() > fin_off {
958 return Err(Error::FinalSize);
959 }
960
961 // Can't "undo" final offset.
962 if buf.max_off() == fin_off && !buf.fin() {
963 return Err(Error::FinalSize);
964 }
965 }
966
967 if self.shutdown {
968 return Ok(());
969 }
970
971 if buf.fin() {
972 self.fin_off = Some(buf.max_off());
973 }
974
975 // Don't queue data that was already fully acked.
976 if self.ack_off() >= buf.max_off() {
977 return Ok(());
978 }
979
980 self.len += buf.len() as u64;
981
982 // We already recorded the final offset, so we can just discard the
983 // empty buffer now.
984 if buf.is_empty() {
985 return Ok(());
986 }
987
988 self.data.push(buf);
989
990 Ok(())
991 }
992
993 /// Returns contiguous data from the send buffer as a single `RangeBuf`.
pop(&mut self, max_data: usize) -> Result<RangeBuf>994 pub fn pop(&mut self, max_data: usize) -> Result<RangeBuf> {
995 let mut out = RangeBuf::default();
996 out.data =
997 Vec::with_capacity(cmp::min(max_data as u64, self.len) as usize);
998 out.off = self.off;
999
1000 let mut out_len = max_data;
1001 let mut out_off = self.data.peek().map_or_else(|| out.off, RangeBuf::off);
1002
1003 while out_len > 0 &&
1004 self.ready() &&
1005 self.off_front() == out_off &&
1006 self.off_front() < self.max_data
1007 {
1008 let mut buf = match self.data.peek_mut() {
1009 Some(v) => v,
1010
1011 None => break,
1012 };
1013
1014 let buf_len = cmp::min(buf.len(), out_len);
1015
1016 if out.is_empty() {
1017 out.off = buf.off();
1018 }
1019
1020 self.len -= buf_len as u64;
1021
1022 out_len -= buf_len;
1023 out_off = buf.off() + buf_len as u64;
1024
1025 out.data.extend_from_slice(&buf[..buf_len]);
1026
1027 if buf_len < buf.len() {
1028 buf.consume(buf_len);
1029
1030 // We reached the maximum capacity, so end here.
1031 break;
1032 }
1033
1034 std::collections::binary_heap::PeekMut::pop(buf);
1035 }
1036
1037 // Override the `fin` flag set for the output buffer by matching the
1038 // buffer's maximum offset against the stream's final offset (if known).
1039 //
1040 // This is more efficient than tracking `fin` using the range buffers
1041 // themselves, and lets us avoid queueing empty buffers just so we can
1042 // propagate the final size.
1043 out.fin = self.fin_off == Some(out.max_off());
1044
1045 Ok(out)
1046 }
1047
1048 /// Updates the max_data limit to the given value.
update_max_data(&mut self, max_data: u64)1049 pub fn update_max_data(&mut self, max_data: u64) {
1050 self.max_data = cmp::max(self.max_data, max_data);
1051 }
1052
1053 /// Increments the acked data offset.
ack(&mut self, off: u64, len: usize)1054 pub fn ack(&mut self, off: u64, len: usize) {
1055 self.acked.insert(off..off + len as u64);
1056 }
1057
1058 /// Shuts down sending data.
shutdown(&mut self) -> Result<()>1059 pub fn shutdown(&mut self) -> Result<()> {
1060 if self.shutdown {
1061 return Err(Error::Done);
1062 }
1063
1064 self.shutdown = true;
1065
1066 self.data.clear();
1067
1068 Ok(())
1069 }
1070
1071 /// Returns the largest offset of data buffered.
1072 #[allow(dead_code)]
off_back(&self) -> u641073 pub fn off_back(&self) -> u64 {
1074 self.off
1075 }
1076
1077 /// Returns the lowest offset of data buffered.
off_front(&self) -> u641078 pub fn off_front(&self) -> u64 {
1079 match self.data.peek() {
1080 Some(v) => v.off(),
1081
1082 None => self.off,
1083 }
1084 }
1085
1086 /// The maximum offset we are allowed to send to the peer.
max_off(&self) -> u641087 pub fn max_off(&self) -> u64 {
1088 self.max_data
1089 }
1090
1091 /// Returns true if all data in the stream has been sent.
1092 ///
1093 /// This happens when the stream's send final size is knwon, and the
1094 /// application has already written data up to that point.
is_fin(&self) -> bool1095 pub fn is_fin(&self) -> bool {
1096 if self.fin_off == Some(self.off) {
1097 return true;
1098 }
1099
1100 false
1101 }
1102
1103 /// Returns true if the send-side of the stream is complete.
1104 ///
1105 /// This happens when the stream's send final size is known, and the peer
1106 /// has already acked all stream data up to that point.
is_complete(&self) -> bool1107 pub fn is_complete(&self) -> bool {
1108 if let Some(fin_off) = self.fin_off {
1109 if self.acked == (0..fin_off) {
1110 return true;
1111 }
1112 }
1113
1114 false
1115 }
1116
1117 /// Returns true if there is data to be written.
ready(&self) -> bool1118 fn ready(&self) -> bool {
1119 !self.data.is_empty()
1120 }
1121
1122 /// Returns the highest contiguously acked offset.
ack_off(&self) -> u641123 fn ack_off(&self) -> u64 {
1124 match self.acked.iter().next() {
1125 // Only consider the initial range if it contiguously covers the
1126 // start of the stream (i.e. from offset 0).
1127 Some(std::ops::Range { start: 0, end }) => end,
1128
1129 Some(_) | None => 0,
1130 }
1131 }
1132
1133 /// Returns the outgoing flow control capacity.
cap(&self) -> usize1134 pub fn cap(&self) -> usize {
1135 (self.max_data - self.off) as usize
1136 }
1137 }
1138
1139 /// Buffer holding data at a specific offset.
1140 #[derive(Clone, Debug, Default, Eq)]
1141 pub struct RangeBuf {
1142 /// The internal buffer holding the data.
1143 data: Vec<u8>,
1144
1145 /// The starting offset within `data`. This allows partially consuming a
1146 /// buffer without duplicating the data.
1147 pos: usize,
1148
1149 /// The starting offset within a stream.
1150 off: u64,
1151
1152 /// Whether this contains the final byte in the stream.
1153 fin: bool,
1154 }
1155
1156 impl RangeBuf {
1157 /// Creates a new `RangeBuf` from the given slice.
from(buf: &[u8], off: u64, fin: bool) -> RangeBuf1158 pub(crate) fn from(buf: &[u8], off: u64, fin: bool) -> RangeBuf {
1159 RangeBuf {
1160 data: Vec::from(buf),
1161 pos: 0,
1162 off,
1163 fin,
1164 }
1165 }
1166
1167 /// Returns whether `self` holds the final offset in the stream.
fin(&self) -> bool1168 pub fn fin(&self) -> bool {
1169 self.fin
1170 }
1171
1172 /// Returns the starting offset of `self`.
off(&self) -> u641173 pub fn off(&self) -> u64 {
1174 self.off + self.pos as u64
1175 }
1176
1177 /// Returns the final offset of `self`.
max_off(&self) -> u641178 pub fn max_off(&self) -> u64 {
1179 self.off() + self.len() as u64
1180 }
1181
1182 /// Returns the length of `self`.
len(&self) -> usize1183 pub fn len(&self) -> usize {
1184 self.data.len() - self.pos
1185 }
1186
1187 /// Returns true if `self` has a length of zero bytes.
is_empty(&self) -> bool1188 pub fn is_empty(&self) -> bool {
1189 self.len() == 0
1190 }
1191
1192 /// Consumes the starting `count` bytes of `self`.
consume(&mut self, count: usize)1193 pub fn consume(&mut self, count: usize) {
1194 self.pos += count;
1195 }
1196
1197 /// Splits the buffer into two at the given index.
split_off(&mut self, at: usize) -> RangeBuf1198 pub fn split_off(&mut self, at: usize) -> RangeBuf {
1199 let buf = RangeBuf {
1200 data: self.data.split_off(at),
1201 pos: 0,
1202 off: self.off + at as u64,
1203 fin: self.fin,
1204 };
1205
1206 self.fin = false;
1207
1208 buf
1209 }
1210 }
1211
1212 impl std::ops::Deref for RangeBuf {
1213 type Target = [u8];
1214
deref(&self) -> &[u8]1215 fn deref(&self) -> &[u8] {
1216 &self.data[self.pos..]
1217 }
1218 }
1219
1220 impl std::ops::DerefMut for RangeBuf {
deref_mut(&mut self) -> &mut [u8]1221 fn deref_mut(&mut self) -> &mut [u8] {
1222 &mut self.data[self.pos..]
1223 }
1224 }
1225
1226 impl Ord for RangeBuf {
cmp(&self, other: &RangeBuf) -> cmp::Ordering1227 fn cmp(&self, other: &RangeBuf) -> cmp::Ordering {
1228 // Invert ordering to implement min-heap.
1229 self.off.cmp(&other.off).reverse()
1230 }
1231 }
1232
1233 impl PartialOrd for RangeBuf {
partial_cmp(&self, other: &RangeBuf) -> Option<cmp::Ordering>1234 fn partial_cmp(&self, other: &RangeBuf) -> Option<cmp::Ordering> {
1235 Some(self.cmp(other))
1236 }
1237 }
1238
1239 impl PartialEq for RangeBuf {
eq(&self, other: &RangeBuf) -> bool1240 fn eq(&self, other: &RangeBuf) -> bool {
1241 self.off == other.off
1242 }
1243 }
1244
1245 #[cfg(test)]
1246 mod tests {
1247 use super::*;
1248
1249 #[test]
empty_read()1250 fn empty_read() {
1251 let mut recv = RecvBuf::new(std::u64::MAX);
1252 assert_eq!(recv.len, 0);
1253
1254 let mut buf = [0; 32];
1255
1256 assert_eq!(recv.pop(&mut buf), Err(Error::Done));
1257 }
1258
1259 #[test]
empty_stream_frame()1260 fn empty_stream_frame() {
1261 let mut recv = RecvBuf::new(15);
1262 assert_eq!(recv.len, 0);
1263
1264 let buf = RangeBuf::from(b"hello", 0, false);
1265 assert!(recv.push(buf).is_ok());
1266 assert_eq!(recv.len, 5);
1267 assert_eq!(recv.off, 0);
1268 assert_eq!(recv.data.len(), 1);
1269
1270 let mut buf = [0; 32];
1271 assert_eq!(recv.pop(&mut buf), Ok((5, false)));
1272
1273 // Don't store non-fin empty buffer.
1274 let buf = RangeBuf::from(b"", 10, false);
1275 assert!(recv.push(buf).is_ok());
1276 assert_eq!(recv.len, 5);
1277 assert_eq!(recv.off, 5);
1278 assert_eq!(recv.data.len(), 0);
1279
1280 // Check flow control for empty buffer.
1281 let buf = RangeBuf::from(b"", 16, false);
1282 assert_eq!(recv.push(buf), Err(Error::FlowControl));
1283
1284 // Store fin empty buffer.
1285 let buf = RangeBuf::from(b"", 5, true);
1286 assert!(recv.push(buf).is_ok());
1287 assert_eq!(recv.len, 5);
1288 assert_eq!(recv.off, 5);
1289 assert_eq!(recv.data.len(), 1);
1290
1291 // Don't store additional fin empty buffers.
1292 let buf = RangeBuf::from(b"", 5, true);
1293 assert!(recv.push(buf).is_ok());
1294 assert_eq!(recv.len, 5);
1295 assert_eq!(recv.off, 5);
1296 assert_eq!(recv.data.len(), 1);
1297
1298 // Don't store additional fin non-empty buffers.
1299 let buf = RangeBuf::from(b"aa", 3, true);
1300 assert!(recv.push(buf).is_ok());
1301 assert_eq!(recv.len, 5);
1302 assert_eq!(recv.off, 5);
1303 assert_eq!(recv.data.len(), 1);
1304
1305 // Validate final size with fin empty buffers.
1306 let buf = RangeBuf::from(b"", 6, true);
1307 assert_eq!(recv.push(buf), Err(Error::FinalSize));
1308 let buf = RangeBuf::from(b"", 4, true);
1309 assert_eq!(recv.push(buf), Err(Error::FinalSize));
1310
1311 let mut buf = [0; 32];
1312 assert_eq!(recv.pop(&mut buf), Ok((0, true)));
1313 }
1314
1315 #[test]
ordered_read()1316 fn ordered_read() {
1317 let mut recv = RecvBuf::new(std::u64::MAX);
1318 assert_eq!(recv.len, 0);
1319
1320 let mut buf = [0; 32];
1321
1322 let first = RangeBuf::from(b"hello", 0, false);
1323 let second = RangeBuf::from(b"world", 5, false);
1324 let third = RangeBuf::from(b"something", 10, true);
1325
1326 assert!(recv.push(second).is_ok());
1327 assert_eq!(recv.len, 10);
1328 assert_eq!(recv.off, 0);
1329
1330 assert_eq!(recv.pop(&mut buf), Err(Error::Done));
1331
1332 assert!(recv.push(third).is_ok());
1333 assert_eq!(recv.len, 19);
1334 assert_eq!(recv.off, 0);
1335
1336 assert_eq!(recv.pop(&mut buf), Err(Error::Done));
1337
1338 assert!(recv.push(first).is_ok());
1339 assert_eq!(recv.len, 19);
1340 assert_eq!(recv.off, 0);
1341
1342 let (len, fin) = recv.pop(&mut buf).unwrap();
1343 assert_eq!(len, 19);
1344 assert_eq!(fin, true);
1345 assert_eq!(&buf[..len], b"helloworldsomething");
1346 assert_eq!(recv.len, 19);
1347 assert_eq!(recv.off, 19);
1348
1349 assert_eq!(recv.pop(&mut buf), Err(Error::Done));
1350 }
1351
1352 #[test]
split_read()1353 fn split_read() {
1354 let mut recv = RecvBuf::new(std::u64::MAX);
1355 assert_eq!(recv.len, 0);
1356
1357 let mut buf = [0; 32];
1358
1359 let first = RangeBuf::from(b"something", 0, false);
1360 let second = RangeBuf::from(b"helloworld", 9, true);
1361
1362 assert!(recv.push(first).is_ok());
1363 assert_eq!(recv.len, 9);
1364 assert_eq!(recv.off, 0);
1365
1366 assert!(recv.push(second).is_ok());
1367 assert_eq!(recv.len, 19);
1368 assert_eq!(recv.off, 0);
1369
1370 let (len, fin) = recv.pop(&mut buf[..10]).unwrap();
1371 assert_eq!(len, 10);
1372 assert_eq!(fin, false);
1373 assert_eq!(&buf[..len], b"somethingh");
1374 assert_eq!(recv.len, 19);
1375 assert_eq!(recv.off, 10);
1376
1377 let (len, fin) = recv.pop(&mut buf[..5]).unwrap();
1378 assert_eq!(len, 5);
1379 assert_eq!(fin, false);
1380 assert_eq!(&buf[..len], b"ellow");
1381 assert_eq!(recv.len, 19);
1382 assert_eq!(recv.off, 15);
1383
1384 let (len, fin) = recv.pop(&mut buf[..10]).unwrap();
1385 assert_eq!(len, 4);
1386 assert_eq!(fin, true);
1387 assert_eq!(&buf[..len], b"orld");
1388 assert_eq!(recv.len, 19);
1389 assert_eq!(recv.off, 19);
1390 }
1391
1392 #[test]
incomplete_read()1393 fn incomplete_read() {
1394 let mut recv = RecvBuf::new(std::u64::MAX);
1395 assert_eq!(recv.len, 0);
1396
1397 let mut buf = [0; 32];
1398
1399 let first = RangeBuf::from(b"something", 0, false);
1400 let second = RangeBuf::from(b"helloworld", 9, true);
1401
1402 assert!(recv.push(second).is_ok());
1403 assert_eq!(recv.len, 19);
1404 assert_eq!(recv.off, 0);
1405
1406 assert_eq!(recv.pop(&mut buf), Err(Error::Done));
1407
1408 assert!(recv.push(first).is_ok());
1409 assert_eq!(recv.len, 19);
1410 assert_eq!(recv.off, 0);
1411
1412 let (len, fin) = recv.pop(&mut buf).unwrap();
1413 assert_eq!(len, 19);
1414 assert_eq!(fin, true);
1415 assert_eq!(&buf[..len], b"somethinghelloworld");
1416 assert_eq!(recv.len, 19);
1417 assert_eq!(recv.off, 19);
1418 }
1419
1420 #[test]
zero_len_read()1421 fn zero_len_read() {
1422 let mut recv = RecvBuf::new(std::u64::MAX);
1423 assert_eq!(recv.len, 0);
1424
1425 let mut buf = [0; 32];
1426
1427 let first = RangeBuf::from(b"something", 0, false);
1428 let second = RangeBuf::from(b"", 9, true);
1429
1430 assert!(recv.push(first).is_ok());
1431 assert_eq!(recv.len, 9);
1432 assert_eq!(recv.off, 0);
1433 assert_eq!(recv.data.len(), 1);
1434
1435 assert!(recv.push(second).is_ok());
1436 assert_eq!(recv.len, 9);
1437 assert_eq!(recv.off, 0);
1438 assert_eq!(recv.data.len(), 1);
1439
1440 let (len, fin) = recv.pop(&mut buf).unwrap();
1441 assert_eq!(len, 9);
1442 assert_eq!(fin, true);
1443 assert_eq!(&buf[..len], b"something");
1444 assert_eq!(recv.len, 9);
1445 assert_eq!(recv.off, 9);
1446 }
1447
1448 #[test]
past_read()1449 fn past_read() {
1450 let mut recv = RecvBuf::new(std::u64::MAX);
1451 assert_eq!(recv.len, 0);
1452
1453 let mut buf = [0; 32];
1454
1455 let first = RangeBuf::from(b"something", 0, false);
1456 let second = RangeBuf::from(b"hello", 3, false);
1457 let third = RangeBuf::from(b"ello", 4, true);
1458 let fourth = RangeBuf::from(b"ello", 5, true);
1459
1460 assert!(recv.push(first).is_ok());
1461 assert_eq!(recv.len, 9);
1462 assert_eq!(recv.off, 0);
1463 assert_eq!(recv.data.len(), 1);
1464
1465 let (len, fin) = recv.pop(&mut buf).unwrap();
1466 assert_eq!(len, 9);
1467 assert_eq!(fin, false);
1468 assert_eq!(&buf[..len], b"something");
1469 assert_eq!(recv.len, 9);
1470 assert_eq!(recv.off, 9);
1471
1472 assert!(recv.push(second).is_ok());
1473 assert_eq!(recv.len, 9);
1474 assert_eq!(recv.off, 9);
1475 assert_eq!(recv.data.len(), 0);
1476
1477 assert_eq!(recv.push(third), Err(Error::FinalSize));
1478
1479 assert!(recv.push(fourth).is_ok());
1480 assert_eq!(recv.len, 9);
1481 assert_eq!(recv.off, 9);
1482 assert_eq!(recv.data.len(), 0);
1483
1484 assert_eq!(recv.pop(&mut buf), Err(Error::Done));
1485 }
1486
1487 #[test]
fully_overlapping_read()1488 fn fully_overlapping_read() {
1489 let mut recv = RecvBuf::new(std::u64::MAX);
1490 assert_eq!(recv.len, 0);
1491
1492 let mut buf = [0; 32];
1493
1494 let first = RangeBuf::from(b"something", 0, false);
1495 let second = RangeBuf::from(b"hello", 4, false);
1496
1497 assert!(recv.push(first).is_ok());
1498 assert_eq!(recv.len, 9);
1499 assert_eq!(recv.off, 0);
1500 assert_eq!(recv.data.len(), 1);
1501
1502 assert!(recv.push(second).is_ok());
1503 assert_eq!(recv.len, 9);
1504 assert_eq!(recv.off, 0);
1505 assert_eq!(recv.data.len(), 1);
1506
1507 let (len, fin) = recv.pop(&mut buf).unwrap();
1508 assert_eq!(len, 9);
1509 assert_eq!(fin, false);
1510 assert_eq!(&buf[..len], b"something");
1511 assert_eq!(recv.len, 9);
1512 assert_eq!(recv.off, 9);
1513 assert_eq!(recv.data.len(), 0);
1514
1515 assert_eq!(recv.pop(&mut buf), Err(Error::Done));
1516 }
1517
1518 #[test]
fully_overlapping_read2()1519 fn fully_overlapping_read2() {
1520 let mut recv = RecvBuf::new(std::u64::MAX);
1521 assert_eq!(recv.len, 0);
1522
1523 let mut buf = [0; 32];
1524
1525 let first = RangeBuf::from(b"something", 0, false);
1526 let second = RangeBuf::from(b"hello", 4, false);
1527
1528 assert!(recv.push(second).is_ok());
1529 assert_eq!(recv.len, 9);
1530 assert_eq!(recv.off, 0);
1531 assert_eq!(recv.data.len(), 1);
1532
1533 assert!(recv.push(first).is_ok());
1534 assert_eq!(recv.len, 9);
1535 assert_eq!(recv.off, 0);
1536 assert_eq!(recv.data.len(), 2);
1537
1538 let (len, fin) = recv.pop(&mut buf).unwrap();
1539 assert_eq!(len, 9);
1540 assert_eq!(fin, false);
1541 assert_eq!(&buf[..len], b"somehello");
1542 assert_eq!(recv.len, 9);
1543 assert_eq!(recv.off, 9);
1544 assert_eq!(recv.data.len(), 0);
1545
1546 assert_eq!(recv.pop(&mut buf), Err(Error::Done));
1547 }
1548
1549 #[test]
fully_overlapping_read3()1550 fn fully_overlapping_read3() {
1551 let mut recv = RecvBuf::new(std::u64::MAX);
1552 assert_eq!(recv.len, 0);
1553
1554 let mut buf = [0; 32];
1555
1556 let first = RangeBuf::from(b"something", 0, false);
1557 let second = RangeBuf::from(b"hello", 3, false);
1558
1559 assert!(recv.push(second).is_ok());
1560 assert_eq!(recv.len, 8);
1561 assert_eq!(recv.off, 0);
1562 assert_eq!(recv.data.len(), 1);
1563
1564 assert!(recv.push(first).is_ok());
1565 assert_eq!(recv.len, 9);
1566 assert_eq!(recv.off, 0);
1567 assert_eq!(recv.data.len(), 3);
1568
1569 let (len, fin) = recv.pop(&mut buf).unwrap();
1570 assert_eq!(len, 9);
1571 assert_eq!(fin, false);
1572 assert_eq!(&buf[..len], b"somhellog");
1573 assert_eq!(recv.len, 9);
1574 assert_eq!(recv.off, 9);
1575 assert_eq!(recv.data.len(), 0);
1576
1577 assert_eq!(recv.pop(&mut buf), Err(Error::Done));
1578 }
1579
1580 #[test]
fully_overlapping_read_multi()1581 fn fully_overlapping_read_multi() {
1582 let mut recv = RecvBuf::new(std::u64::MAX);
1583 assert_eq!(recv.len, 0);
1584
1585 let mut buf = [0; 32];
1586
1587 let first = RangeBuf::from(b"somethingsomething", 0, false);
1588 let second = RangeBuf::from(b"hello", 3, false);
1589 let third = RangeBuf::from(b"hello", 12, false);
1590
1591 assert!(recv.push(second).is_ok());
1592 assert_eq!(recv.len, 8);
1593 assert_eq!(recv.off, 0);
1594 assert_eq!(recv.data.len(), 1);
1595
1596 assert!(recv.push(third).is_ok());
1597 assert_eq!(recv.len, 17);
1598 assert_eq!(recv.off, 0);
1599 assert_eq!(recv.data.len(), 2);
1600
1601 assert!(recv.push(first).is_ok());
1602 assert_eq!(recv.len, 18);
1603 assert_eq!(recv.off, 0);
1604 assert_eq!(recv.data.len(), 5);
1605
1606 let (len, fin) = recv.pop(&mut buf).unwrap();
1607 assert_eq!(len, 18);
1608 assert_eq!(fin, false);
1609 assert_eq!(&buf[..len], b"somhellogsomhellog");
1610 assert_eq!(recv.len, 18);
1611 assert_eq!(recv.off, 18);
1612 assert_eq!(recv.data.len(), 0);
1613
1614 assert_eq!(recv.pop(&mut buf), Err(Error::Done));
1615 }
1616
1617 #[test]
overlapping_start_read()1618 fn overlapping_start_read() {
1619 let mut recv = RecvBuf::new(std::u64::MAX);
1620 assert_eq!(recv.len, 0);
1621
1622 let mut buf = [0; 32];
1623
1624 let first = RangeBuf::from(b"something", 0, false);
1625 let second = RangeBuf::from(b"hello", 8, true);
1626
1627 assert!(recv.push(first).is_ok());
1628 assert_eq!(recv.len, 9);
1629 assert_eq!(recv.off, 0);
1630 assert_eq!(recv.data.len(), 1);
1631
1632 assert!(recv.push(second).is_ok());
1633 assert_eq!(recv.len, 13);
1634 assert_eq!(recv.off, 0);
1635 assert_eq!(recv.data.len(), 2);
1636
1637 let (len, fin) = recv.pop(&mut buf).unwrap();
1638 assert_eq!(len, 13);
1639 assert_eq!(fin, true);
1640 assert_eq!(&buf[..len], b"somethingello");
1641 assert_eq!(recv.len, 13);
1642 assert_eq!(recv.off, 13);
1643
1644 assert_eq!(recv.pop(&mut buf), Err(Error::Done));
1645 }
1646
1647 #[test]
overlapping_end_read()1648 fn overlapping_end_read() {
1649 let mut recv = RecvBuf::new(std::u64::MAX);
1650 assert_eq!(recv.len, 0);
1651
1652 let mut buf = [0; 32];
1653
1654 let first = RangeBuf::from(b"hello", 0, false);
1655 let second = RangeBuf::from(b"something", 3, true);
1656
1657 assert!(recv.push(second).is_ok());
1658 assert_eq!(recv.len, 12);
1659 assert_eq!(recv.off, 0);
1660 assert_eq!(recv.data.len(), 1);
1661
1662 assert!(recv.push(first).is_ok());
1663 assert_eq!(recv.len, 12);
1664 assert_eq!(recv.off, 0);
1665 assert_eq!(recv.data.len(), 2);
1666
1667 let (len, fin) = recv.pop(&mut buf).unwrap();
1668 assert_eq!(len, 12);
1669 assert_eq!(fin, true);
1670 assert_eq!(&buf[..len], b"helsomething");
1671 assert_eq!(recv.len, 12);
1672 assert_eq!(recv.off, 12);
1673
1674 assert_eq!(recv.pop(&mut buf), Err(Error::Done));
1675 }
1676
1677 #[test]
partially_multi_overlapping_reordered_read()1678 fn partially_multi_overlapping_reordered_read() {
1679 let mut recv = RecvBuf::new(std::u64::MAX);
1680 assert_eq!(recv.len, 0);
1681
1682 let mut buf = [0; 32];
1683
1684 let first = RangeBuf::from(b"hello", 8, false);
1685 let second = RangeBuf::from(b"something", 0, false);
1686 let third = RangeBuf::from(b"moar", 11, true);
1687
1688 assert!(recv.push(first).is_ok());
1689 assert_eq!(recv.len, 13);
1690 assert_eq!(recv.off, 0);
1691 assert_eq!(recv.data.len(), 1);
1692
1693 assert!(recv.push(second).is_ok());
1694 assert_eq!(recv.len, 13);
1695 assert_eq!(recv.off, 0);
1696 assert_eq!(recv.data.len(), 2);
1697
1698 assert!(recv.push(third).is_ok());
1699 assert_eq!(recv.len, 15);
1700 assert_eq!(recv.off, 0);
1701 assert_eq!(recv.data.len(), 3);
1702
1703 let (len, fin) = recv.pop(&mut buf).unwrap();
1704 assert_eq!(len, 15);
1705 assert_eq!(fin, true);
1706 assert_eq!(&buf[..len], b"somethinhelloar");
1707 assert_eq!(recv.len, 15);
1708 assert_eq!(recv.off, 15);
1709 assert_eq!(recv.data.len(), 0);
1710
1711 assert_eq!(recv.pop(&mut buf), Err(Error::Done));
1712 }
1713
1714 #[test]
partially_multi_overlapping_reordered_read2()1715 fn partially_multi_overlapping_reordered_read2() {
1716 let mut recv = RecvBuf::new(std::u64::MAX);
1717 assert_eq!(recv.len, 0);
1718
1719 let mut buf = [0; 32];
1720
1721 let first = RangeBuf::from(b"aaa", 0, false);
1722 let second = RangeBuf::from(b"bbb", 2, false);
1723 let third = RangeBuf::from(b"ccc", 4, false);
1724 let fourth = RangeBuf::from(b"ddd", 6, false);
1725 let fifth = RangeBuf::from(b"eee", 9, false);
1726 let sixth = RangeBuf::from(b"fff", 11, false);
1727
1728 assert!(recv.push(second).is_ok());
1729 assert_eq!(recv.len, 5);
1730 assert_eq!(recv.off, 0);
1731 assert_eq!(recv.data.len(), 1);
1732
1733 assert!(recv.push(fourth).is_ok());
1734 assert_eq!(recv.len, 9);
1735 assert_eq!(recv.off, 0);
1736 assert_eq!(recv.data.len(), 2);
1737
1738 assert!(recv.push(third).is_ok());
1739 assert_eq!(recv.len, 9);
1740 assert_eq!(recv.off, 0);
1741 assert_eq!(recv.data.len(), 3);
1742
1743 assert!(recv.push(first).is_ok());
1744 assert_eq!(recv.len, 9);
1745 assert_eq!(recv.off, 0);
1746 assert_eq!(recv.data.len(), 4);
1747
1748 assert!(recv.push(sixth).is_ok());
1749 assert_eq!(recv.len, 14);
1750 assert_eq!(recv.off, 0);
1751 assert_eq!(recv.data.len(), 5);
1752
1753 assert!(recv.push(fifth).is_ok());
1754 assert_eq!(recv.len, 14);
1755 assert_eq!(recv.off, 0);
1756 assert_eq!(recv.data.len(), 6);
1757
1758 let (len, fin) = recv.pop(&mut buf).unwrap();
1759 assert_eq!(len, 14);
1760 assert_eq!(fin, false);
1761 assert_eq!(&buf[..len], b"aabbbcdddeefff");
1762 assert_eq!(recv.len, 14);
1763 assert_eq!(recv.off, 14);
1764 assert_eq!(recv.data.len(), 0);
1765
1766 assert_eq!(recv.pop(&mut buf), Err(Error::Done));
1767 }
1768
1769 #[test]
empty_write()1770 fn empty_write() {
1771 let mut send = SendBuf::new(std::u64::MAX);
1772 assert_eq!(send.len, 0);
1773
1774 let write = send.pop(std::usize::MAX).unwrap();
1775 assert_eq!(write.len(), 0);
1776 assert_eq!(write.fin(), false);
1777 }
1778
1779 #[test]
multi_write()1780 fn multi_write() {
1781 let mut send = SendBuf::new(std::u64::MAX);
1782 assert_eq!(send.len, 0);
1783
1784 let first = b"something";
1785 let second = b"helloworld";
1786
1787 assert!(send.push_slice(first, false).is_ok());
1788 assert_eq!(send.len, 9);
1789
1790 assert!(send.push_slice(second, true).is_ok());
1791 assert_eq!(send.len, 19);
1792
1793 let write = send.pop(128).unwrap();
1794 assert_eq!(write.len(), 19);
1795 assert_eq!(write.fin(), true);
1796 assert_eq!(&write[..], b"somethinghelloworld");
1797 assert_eq!(send.len, 0);
1798 }
1799
1800 #[test]
split_write()1801 fn split_write() {
1802 let mut send = SendBuf::new(std::u64::MAX);
1803 assert_eq!(send.len, 0);
1804
1805 let first = b"something";
1806 let second = b"helloworld";
1807
1808 assert!(send.push_slice(first, false).is_ok());
1809 assert_eq!(send.len, 9);
1810
1811 assert!(send.push_slice(second, true).is_ok());
1812 assert_eq!(send.len, 19);
1813
1814 let write = send.pop(10).unwrap();
1815 assert_eq!(write.off(), 0);
1816 assert_eq!(write.len(), 10);
1817 assert_eq!(write.fin(), false);
1818 assert_eq!(&write[..], b"somethingh");
1819 assert_eq!(send.len, 9);
1820
1821 let write = send.pop(5).unwrap();
1822 assert_eq!(write.off(), 10);
1823 assert_eq!(write.len(), 5);
1824 assert_eq!(write.fin(), false);
1825 assert_eq!(&write[..], b"ellow");
1826 assert_eq!(send.len, 4);
1827
1828 let write = send.pop(10).unwrap();
1829 assert_eq!(write.off(), 15);
1830 assert_eq!(write.len(), 4);
1831 assert_eq!(write.fin(), true);
1832 assert_eq!(&write[..], b"orld");
1833 assert_eq!(send.len, 0);
1834 }
1835
1836 #[test]
resend()1837 fn resend() {
1838 let mut send = SendBuf::new(std::u64::MAX);
1839 assert_eq!(send.len, 0);
1840 assert_eq!(send.off_front(), 0);
1841
1842 let first = b"something";
1843 let second = b"helloworld";
1844
1845 assert!(send.push_slice(first, false).is_ok());
1846 assert_eq!(send.off_front(), 0);
1847
1848 assert!(send.push_slice(second, true).is_ok());
1849 assert_eq!(send.off_front(), 0);
1850
1851 let write1 = send.pop(4).unwrap();
1852 assert_eq!(write1.off(), 0);
1853 assert_eq!(write1.len(), 4);
1854 assert_eq!(write1.fin(), false);
1855 assert_eq!(&write1[..], b"some");
1856 assert_eq!(send.len, 15);
1857 assert_eq!(send.off_front(), 4);
1858
1859 let write2 = send.pop(5).unwrap();
1860 assert_eq!(write2.off(), 4);
1861 assert_eq!(write2.len(), 5);
1862 assert_eq!(write2.fin(), false);
1863 assert_eq!(&write2[..], b"thing");
1864 assert_eq!(send.len, 10);
1865 assert_eq!(send.off_front(), 9);
1866
1867 let write3 = send.pop(5).unwrap();
1868 assert_eq!(write3.off(), 9);
1869 assert_eq!(write3.len(), 5);
1870 assert_eq!(write3.fin(), false);
1871 assert_eq!(&write3[..], b"hello");
1872 assert_eq!(send.len, 5);
1873 assert_eq!(send.off_front(), 14);
1874
1875 send.push(write2).unwrap();
1876 assert_eq!(send.len, 10);
1877 assert_eq!(send.off_front(), 4);
1878
1879 send.push(write1).unwrap();
1880 assert_eq!(send.len, 14);
1881 assert_eq!(send.off_front(), 0);
1882
1883 let write4 = send.pop(11).unwrap();
1884 assert_eq!(write4.off(), 0);
1885 assert_eq!(write4.len(), 9);
1886 assert_eq!(write4.fin(), false);
1887 assert_eq!(&write4[..], b"something");
1888 assert_eq!(send.len, 5);
1889 assert_eq!(send.off_front(), 14);
1890
1891 let write5 = send.pop(11).unwrap();
1892 assert_eq!(write5.off(), 14);
1893 assert_eq!(write5.len(), 5);
1894 assert_eq!(write5.fin(), true);
1895 assert_eq!(&write5[..], b"world");
1896 assert_eq!(send.len, 0);
1897 assert_eq!(send.off_front(), 19);
1898 }
1899
1900 #[test]
write_blocked_by_off()1901 fn write_blocked_by_off() {
1902 let mut send = SendBuf::default();
1903 assert_eq!(send.len, 0);
1904
1905 let first = b"something";
1906 let second = b"helloworld";
1907
1908 assert_eq!(send.push_slice(first, false), Ok(0));
1909 assert_eq!(send.len, 0);
1910
1911 assert_eq!(send.push_slice(second, true), Ok(0));
1912 assert_eq!(send.len, 0);
1913
1914 send.update_max_data(5);
1915
1916 assert_eq!(send.push_slice(first, false), Ok(5));
1917 assert_eq!(send.len, 5);
1918
1919 assert_eq!(send.push_slice(second, true), Ok(0));
1920 assert_eq!(send.len, 5);
1921
1922 let write = send.pop(10).unwrap();
1923 assert_eq!(write.off(), 0);
1924 assert_eq!(write.len(), 5);
1925 assert_eq!(write.fin(), false);
1926 assert_eq!(&write[..], b"somet");
1927 assert_eq!(send.len, 0);
1928
1929 let write = send.pop(10).unwrap();
1930 assert_eq!(write.off(), 5);
1931 assert_eq!(write.len(), 0);
1932 assert_eq!(write.fin(), false);
1933 assert_eq!(&write[..], b"");
1934 assert_eq!(send.len, 0);
1935
1936 send.update_max_data(15);
1937
1938 assert_eq!(send.push_slice(&first[5..], false), Ok(4));
1939 assert_eq!(send.len, 4);
1940
1941 assert_eq!(send.push_slice(second, true), Ok(6));
1942 assert_eq!(send.len, 10);
1943
1944 let write = send.pop(10).unwrap();
1945 assert_eq!(write.off(), 5);
1946 assert_eq!(write.len(), 10);
1947 assert_eq!(write.fin(), false);
1948 assert_eq!(&write[..], b"hinghellow");
1949 assert_eq!(send.len, 0);
1950
1951 send.update_max_data(25);
1952
1953 assert_eq!(send.push_slice(&second[6..], true), Ok(4));
1954 assert_eq!(send.len, 4);
1955
1956 let write = send.pop(10).unwrap();
1957 assert_eq!(write.off(), 15);
1958 assert_eq!(write.len(), 4);
1959 assert_eq!(write.fin(), true);
1960 assert_eq!(&write[..], b"orld");
1961 assert_eq!(send.len, 0);
1962 }
1963
1964 #[test]
zero_len_write()1965 fn zero_len_write() {
1966 let mut send = SendBuf::new(std::u64::MAX);
1967 assert_eq!(send.len, 0);
1968
1969 let first = b"something";
1970
1971 assert!(send.push_slice(first, false).is_ok());
1972 assert_eq!(send.len, 9);
1973
1974 assert!(send.push_slice(&[], true).is_ok());
1975 assert_eq!(send.len, 9);
1976
1977 let write = send.pop(10).unwrap();
1978 assert_eq!(write.off(), 0);
1979 assert_eq!(write.len(), 9);
1980 assert_eq!(write.fin(), true);
1981 assert_eq!(&write[..], b"something");
1982 assert_eq!(send.len, 0);
1983 }
1984
1985 #[test]
recv_flow_control()1986 fn recv_flow_control() {
1987 let mut stream = Stream::new(15, 0, true, true);
1988 assert!(!stream.recv.almost_full());
1989
1990 let mut buf = [0; 32];
1991
1992 let first = RangeBuf::from(b"hello", 0, false);
1993 let second = RangeBuf::from(b"world", 5, false);
1994 let third = RangeBuf::from(b"something", 10, false);
1995
1996 assert_eq!(stream.recv.push(second), Ok(()));
1997 assert_eq!(stream.recv.push(first), Ok(()));
1998 assert!(!stream.recv.almost_full());
1999
2000 assert_eq!(stream.recv.push(third), Err(Error::FlowControl));
2001
2002 let (len, fin) = stream.recv.pop(&mut buf).unwrap();
2003 assert_eq!(&buf[..len], b"helloworld");
2004 assert_eq!(fin, false);
2005
2006 assert!(stream.recv.almost_full());
2007
2008 stream.recv.update_max_data();
2009 assert_eq!(stream.recv.max_data_next(), 25);
2010 assert!(!stream.recv.almost_full());
2011
2012 let third = RangeBuf::from(b"something", 10, false);
2013 assert_eq!(stream.recv.push(third), Ok(()));
2014 }
2015
2016 #[test]
recv_past_fin()2017 fn recv_past_fin() {
2018 let mut stream = Stream::new(15, 0, true, true);
2019 assert!(!stream.recv.almost_full());
2020
2021 let first = RangeBuf::from(b"hello", 0, true);
2022 let second = RangeBuf::from(b"world", 5, false);
2023
2024 assert_eq!(stream.recv.push(first), Ok(()));
2025 assert_eq!(stream.recv.push(second), Err(Error::FinalSize));
2026 }
2027
2028 #[test]
recv_fin_dup()2029 fn recv_fin_dup() {
2030 let mut stream = Stream::new(15, 0, true, true);
2031 assert!(!stream.recv.almost_full());
2032
2033 let first = RangeBuf::from(b"hello", 0, true);
2034 let second = RangeBuf::from(b"hello", 0, true);
2035
2036 assert_eq!(stream.recv.push(first), Ok(()));
2037 assert_eq!(stream.recv.push(second), Ok(()));
2038
2039 let mut buf = [0; 32];
2040
2041 let (len, fin) = stream.recv.pop(&mut buf).unwrap();
2042 assert_eq!(&buf[..len], b"hello");
2043 assert_eq!(fin, true);
2044 }
2045
2046 #[test]
recv_fin_change()2047 fn recv_fin_change() {
2048 let mut stream = Stream::new(15, 0, true, true);
2049 assert!(!stream.recv.almost_full());
2050
2051 let first = RangeBuf::from(b"hello", 0, true);
2052 let second = RangeBuf::from(b"world", 5, true);
2053
2054 assert_eq!(stream.recv.push(second), Ok(()));
2055 assert_eq!(stream.recv.push(first), Err(Error::FinalSize));
2056 }
2057
2058 #[test]
recv_fin_lower_than_received()2059 fn recv_fin_lower_than_received() {
2060 let mut stream = Stream::new(15, 0, true, true);
2061 assert!(!stream.recv.almost_full());
2062
2063 let first = RangeBuf::from(b"hello", 0, true);
2064 let second = RangeBuf::from(b"world", 5, false);
2065
2066 assert_eq!(stream.recv.push(second), Ok(()));
2067 assert_eq!(stream.recv.push(first), Err(Error::FinalSize));
2068 }
2069
2070 #[test]
recv_fin_flow_control()2071 fn recv_fin_flow_control() {
2072 let mut stream = Stream::new(15, 0, true, true);
2073 assert!(!stream.recv.almost_full());
2074
2075 let mut buf = [0; 32];
2076
2077 let first = RangeBuf::from(b"hello", 0, false);
2078 let second = RangeBuf::from(b"world", 5, true);
2079
2080 assert_eq!(stream.recv.push(first), Ok(()));
2081 assert_eq!(stream.recv.push(second), Ok(()));
2082
2083 let (len, fin) = stream.recv.pop(&mut buf).unwrap();
2084 assert_eq!(&buf[..len], b"helloworld");
2085 assert_eq!(fin, true);
2086
2087 assert!(!stream.recv.almost_full());
2088 }
2089
2090 #[test]
recv_fin_reset_mismatch()2091 fn recv_fin_reset_mismatch() {
2092 let mut stream = Stream::new(15, 0, true, true);
2093 assert!(!stream.recv.almost_full());
2094
2095 let first = RangeBuf::from(b"hello", 0, true);
2096
2097 assert_eq!(stream.recv.push(first), Ok(()));
2098 assert_eq!(stream.recv.reset(10), Err(Error::FinalSize));
2099 }
2100
2101 #[test]
recv_reset_dup()2102 fn recv_reset_dup() {
2103 let mut stream = Stream::new(15, 0, true, true);
2104 assert!(!stream.recv.almost_full());
2105
2106 let first = RangeBuf::from(b"hello", 0, false);
2107
2108 assert_eq!(stream.recv.push(first), Ok(()));
2109 assert_eq!(stream.recv.reset(5), Ok(0));
2110 assert_eq!(stream.recv.reset(5), Ok(0));
2111 }
2112
2113 #[test]
recv_reset_change()2114 fn recv_reset_change() {
2115 let mut stream = Stream::new(15, 0, true, true);
2116 assert!(!stream.recv.almost_full());
2117
2118 let first = RangeBuf::from(b"hello", 0, false);
2119
2120 assert_eq!(stream.recv.push(first), Ok(()));
2121 assert_eq!(stream.recv.reset(5), Ok(0));
2122 assert_eq!(stream.recv.reset(10), Err(Error::FinalSize));
2123 }
2124
2125 #[test]
recv_reset_lower_than_received()2126 fn recv_reset_lower_than_received() {
2127 let mut stream = Stream::new(15, 0, true, true);
2128 assert!(!stream.recv.almost_full());
2129
2130 let first = RangeBuf::from(b"hello", 0, false);
2131
2132 assert_eq!(stream.recv.push(first), Ok(()));
2133 assert_eq!(stream.recv.reset(4), Err(Error::FinalSize));
2134 }
2135
2136 #[test]
send_flow_control()2137 fn send_flow_control() {
2138 let mut stream = Stream::new(0, 15, true, true);
2139
2140 let first = b"hello";
2141 let second = b"world";
2142 let third = b"something";
2143
2144 assert!(stream.send.push_slice(first, false).is_ok());
2145 assert!(stream.send.push_slice(second, false).is_ok());
2146 assert!(stream.send.push_slice(third, false).is_ok());
2147
2148 let write = stream.send.pop(25).unwrap();
2149 assert_eq!(write.off(), 0);
2150 assert_eq!(write.len(), 15);
2151 assert_eq!(write.fin(), false);
2152 assert_eq!(write.data, b"helloworldsomet");
2153
2154 let write = stream.send.pop(25).unwrap();
2155 assert_eq!(write.off(), 15);
2156 assert_eq!(write.len(), 0);
2157 assert_eq!(write.fin(), false);
2158 assert_eq!(write.data, b"");
2159
2160 let first = RangeBuf::from(b"helloworldsomet", 0, false);
2161 assert_eq!(stream.send.push(first), Ok(()));
2162
2163 let write = stream.send.pop(10).unwrap();
2164 assert_eq!(write.off(), 0);
2165 assert_eq!(write.len(), 10);
2166 assert_eq!(write.fin(), false);
2167 assert_eq!(write.data, b"helloworld");
2168
2169 let write = stream.send.pop(10).unwrap();
2170 assert_eq!(write.off(), 10);
2171 assert_eq!(write.len(), 5);
2172 assert_eq!(write.fin(), false);
2173 assert_eq!(write.data, b"somet");
2174 }
2175
2176 #[test]
send_past_fin()2177 fn send_past_fin() {
2178 let mut stream = Stream::new(0, 15, true, true);
2179
2180 let first = b"hello";
2181 let second = b"world";
2182 let third = b"third";
2183
2184 assert_eq!(stream.send.push_slice(first, false), Ok(5));
2185
2186 assert_eq!(stream.send.push_slice(second, true), Ok(5));
2187 assert!(stream.send.is_fin());
2188
2189 assert_eq!(stream.send.push_slice(third, false), Err(Error::FinalSize));
2190 }
2191
2192 #[test]
send_fin_dup()2193 fn send_fin_dup() {
2194 let mut stream = Stream::new(0, 15, true, true);
2195
2196 let first = RangeBuf::from(b"hello", 0, true);
2197 let second = RangeBuf::from(b"hello", 0, true);
2198
2199 assert_eq!(stream.send.push(first), Ok(()));
2200 assert_eq!(stream.send.push(second), Ok(()));
2201 }
2202
2203 #[test]
send_undo_fin()2204 fn send_undo_fin() {
2205 let mut stream = Stream::new(0, 15, true, true);
2206
2207 let first = b"hello";
2208 let second = RangeBuf::from(b"hello", 0, false);
2209
2210 assert_eq!(stream.send.push_slice(first, true), Ok(5));
2211 assert!(stream.send.is_fin());
2212
2213 assert_eq!(stream.send.push(second), Err(Error::FinalSize));
2214 }
2215
2216 #[test]
send_fin_max_data_match()2217 fn send_fin_max_data_match() {
2218 let mut stream = Stream::new(0, 15, true, true);
2219
2220 let slice = b"hellohellohello";
2221
2222 assert!(stream.send.push_slice(slice, true).is_ok());
2223
2224 let write = stream.send.pop(15).unwrap();
2225 assert_eq!(write.off(), 0);
2226 assert_eq!(write.len(), 15);
2227 assert_eq!(write.fin(), true);
2228 assert_eq!(write.data, slice);
2229 }
2230
2231 #[test]
send_fin_zero_length()2232 fn send_fin_zero_length() {
2233 let mut stream = Stream::new(0, 15, true, true);
2234
2235 assert_eq!(stream.send.push_slice(b"hello", false), Ok(5));
2236 assert_eq!(stream.send.push_slice(b"", true), Ok(0));
2237 assert!(stream.send.is_fin());
2238
2239 let write = stream.send.pop(5).unwrap();
2240 assert_eq!(write.off(), 0);
2241 assert_eq!(write.len(), 5);
2242 assert_eq!(write.fin(), true);
2243 assert_eq!(write.data, b"hello");
2244 }
2245
2246 #[test]
send_ack()2247 fn send_ack() {
2248 let mut stream = Stream::new(0, 15, true, true);
2249
2250 assert_eq!(stream.send.push_slice(b"hello", false), Ok(5));
2251 assert_eq!(stream.send.push_slice(b"world", false), Ok(5));
2252 assert_eq!(stream.send.push_slice(b"", true), Ok(0));
2253 assert!(stream.send.is_fin());
2254
2255 let write = stream.send.pop(5).unwrap();
2256 assert_eq!(write.off(), 0);
2257 assert_eq!(write.len(), 5);
2258 assert_eq!(write.fin(), false);
2259 assert_eq!(write.data, b"hello");
2260
2261 stream.send.ack(write.off(), write.len());
2262
2263 assert_eq!(stream.send.push(write), Ok(()));
2264
2265 let write = stream.send.pop(5).unwrap();
2266 assert_eq!(write.off(), 5);
2267 assert_eq!(write.len(), 5);
2268 assert_eq!(write.fin(), true);
2269 assert_eq!(write.data, b"world");
2270 }
2271
2272 #[test]
send_ack_reordering()2273 fn send_ack_reordering() {
2274 let mut stream = Stream::new(0, 15, true, true);
2275
2276 assert_eq!(stream.send.push_slice(b"hello", false), Ok(5));
2277 assert_eq!(stream.send.push_slice(b"world", false), Ok(5));
2278 assert_eq!(stream.send.push_slice(b"", true), Ok(0));
2279 assert!(stream.send.is_fin());
2280
2281 let write1 = stream.send.pop(5).unwrap();
2282 assert_eq!(write1.off(), 0);
2283 assert_eq!(write1.len(), 5);
2284 assert_eq!(write1.fin(), false);
2285 assert_eq!(write1.data, b"hello");
2286
2287 let write2 = stream.send.pop(1).unwrap();
2288 assert_eq!(write2.off(), 5);
2289 assert_eq!(write2.len(), 1);
2290 assert_eq!(write2.fin(), false);
2291 assert_eq!(write2.data, b"w");
2292
2293 stream.send.ack(write2.off(), write2.len());
2294 stream.send.ack(write1.off(), write1.len());
2295
2296 assert_eq!(stream.send.push(write1), Ok(()));
2297 assert_eq!(stream.send.push(write2), Ok(()));
2298
2299 let write = stream.send.pop(5).unwrap();
2300 assert_eq!(write.off(), 6);
2301 assert_eq!(write.len(), 4);
2302 assert_eq!(write.fin(), true);
2303 assert_eq!(write.data, b"orld");
2304 }
2305
2306 #[test]
recv_data_below_off()2307 fn recv_data_below_off() {
2308 let mut stream = Stream::new(15, 0, true, true);
2309
2310 let first = RangeBuf::from(b"hello", 0, false);
2311
2312 assert_eq!(stream.recv.push(first), Ok(()));
2313
2314 let mut buf = [0; 10];
2315
2316 let (len, fin) = stream.recv.pop(&mut buf).unwrap();
2317 assert_eq!(&buf[..len], b"hello");
2318 assert_eq!(fin, false);
2319
2320 let first = RangeBuf::from(b"elloworld", 1, true);
2321 assert_eq!(stream.recv.push(first), Ok(()));
2322
2323 let (len, fin) = stream.recv.pop(&mut buf).unwrap();
2324 assert_eq!(&buf[..len], b"world");
2325 assert_eq!(fin, true);
2326 }
2327
2328 #[test]
stream_complete()2329 fn stream_complete() {
2330 let mut stream = Stream::new(30, 30, true, true);
2331
2332 assert_eq!(stream.send.push_slice(b"hello", false), Ok(5));
2333 assert_eq!(stream.send.push_slice(b"world", false), Ok(5));
2334
2335 assert!(!stream.send.is_complete());
2336 assert!(!stream.send.is_fin());
2337
2338 assert_eq!(stream.send.push_slice(b"", true), Ok(0));
2339
2340 assert!(!stream.send.is_complete());
2341 assert!(stream.send.is_fin());
2342
2343 let buf = RangeBuf::from(b"hello", 0, true);
2344 assert!(stream.recv.push(buf).is_ok());
2345 assert!(!stream.recv.is_fin());
2346
2347 stream.send.ack(6, 4);
2348 assert!(!stream.send.is_complete());
2349
2350 let mut buf = [0; 2];
2351 assert_eq!(stream.recv.pop(&mut buf), Ok((2, false)));
2352 assert!(!stream.recv.is_fin());
2353
2354 stream.send.ack(1, 5);
2355 assert!(!stream.send.is_complete());
2356
2357 stream.send.ack(0, 1);
2358 assert!(stream.send.is_complete());
2359
2360 assert!(!stream.is_complete());
2361
2362 let mut buf = [0; 3];
2363 assert_eq!(stream.recv.pop(&mut buf), Ok((3, true)));
2364 assert!(stream.recv.is_fin());
2365
2366 assert!(stream.is_complete());
2367 }
2368
2369 #[test]
send_fin_zero_length_output()2370 fn send_fin_zero_length_output() {
2371 let mut stream = Stream::new(0, 15, true, true);
2372
2373 assert_eq!(stream.send.push_slice(b"hello", false), Ok(5));
2374 assert!(!stream.send.is_fin());
2375
2376 let write = stream.send.pop(5).unwrap();
2377 assert_eq!(write.off(), 0);
2378 assert_eq!(write.len(), 5);
2379 assert_eq!(write.fin(), false);
2380 assert_eq!(write.data, b"hello");
2381
2382 assert_eq!(stream.send.push_slice(b"", true), Ok(0));
2383 assert!(stream.send.is_fin());
2384
2385 let write = stream.send.pop(5).unwrap();
2386 assert_eq!(write.off(), 5);
2387 assert_eq!(write.len(), 0);
2388 assert_eq!(write.fin(), true);
2389 assert_eq!(write.data, b"");
2390 }
2391 }
2392