1 //
2 //Traits and implementations of arbitrary data streams.
3 //!
4 //! Streams are similar to the `Iterator` trait in that they represent some sequential set of items
5 //! which can be retrieved one by one. Where `Stream`s differ is that they are allowed to return
6 //! errors instead of just `None` and if they implement the `RangeStreamOnce` trait they are also
7 //! capable of returning multiple items at the same time, usually in the form of a slice.
8 //!
9 //! In addition to he functionality above, a proper `Stream` usable by a `Parser` must also have a
10 //! position (marked by the `Positioned` trait) and must also be resetable (marked by the
11 //! `ResetStream` trait). The former is used to ensure that errors at different points in the stream
12 //! aren't combined and the latter is used in parsers such as `or` to try multiple alternative
13 //! parses.
14
15 use crate::lib::{cmp::Ordering, fmt, marker::PhantomData, str::Chars};
16
17 use crate::{
18 error::{
19 ParseError,
20 ParseResult::{self, *},
21 StreamError, StringStreamError, Tracked, UnexpectedParse,
22 },
23 Parser,
24 };
25
26 #[cfg(feature = "std")]
27 pub use self::decoder::Decoder;
28
29 #[doc(hidden)]
30 #[macro_export]
31 macro_rules! clone_resetable {
32 (( $($params: tt)* ) $ty: ty) => {
33 impl<$($params)*> ResetStream for $ty
34 where Self: StreamOnce
35 {
36 type Checkpoint = Self;
37
38 fn checkpoint(&self) -> Self {
39 self.clone()
40 }
41 #[inline]
42 fn reset(&mut self, checkpoint: Self) -> Result<(), Self::Error> {
43 *self = checkpoint;
44 Ok(())
45 }
46 }
47 }
48 }
49
50 #[cfg(feature = "std")]
51 #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
52 pub mod buf_reader;
53 /// Stream wrapper which provides a `ResetStream` impl for `StreamOnce` impls which do not have
54 /// one.
55 #[cfg(feature = "alloc")]
56 #[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
57 pub mod buffered;
58 #[cfg(feature = "std")]
59 #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
60 pub mod easy;
61 /// Stream wrapper which provides more detailed position information.
62 pub mod position;
63 /// Stream wrapper allowing `std::io::Read` to be used
64 #[cfg(feature = "std")]
65 #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
66 pub mod read;
67 pub mod span;
68 /// Stream wrapper allowing custom state to be used.
69 pub mod state;
70
71 #[cfg(feature = "std")]
72 #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
73 pub mod decoder;
74
75 /// A type which has a position.
76 pub trait Positioned: StreamOnce {
77 /// Returns the current position of the stream.
position(&self) -> Self::Position78 fn position(&self) -> Self::Position;
79 }
80
81 /// Convenience alias over the `StreamError` for the input stream `Input`
82 ///
83 /// ```
84 /// #[macro_use]
85 /// extern crate combine;
86 /// use combine::{easy, Parser, Stream, many1};
87 /// use combine::parser::char::letter;
88 /// use combine::stream::StreamErrorFor;
89 /// use combine::error::{ParseError, StreamError};
90 ///
91 /// parser!{
92 /// fn parser[Input]()(Input) -> String
93 /// where [ Input: Stream<Token = char>, ]
94 /// {
95 /// many1(letter()).and_then(|word: String| {
96 /// if word == "combine" {
97 /// Ok(word)
98 /// } else {
99 /// // The alias makes it easy to refer to the `StreamError` type of `Input`
100 /// Err(StreamErrorFor::<Input>::expected_static_message("combine"))
101 /// }
102 /// })
103 /// }
104 /// }
105 ///
106 /// fn main() {
107 /// }
108 /// ```
109 pub type StreamErrorFor<Input> = <<Input as StreamOnce>::Error as ParseError<
110 <Input as StreamOnce>::Token,
111 <Input as StreamOnce>::Range,
112 <Input as StreamOnce>::Position,
113 >>::StreamError;
114
115 /// `StreamOnce` represents a sequence of items that can be extracted one by one.
116 pub trait StreamOnce {
117 /// The type of items which is yielded from this stream.
118 type Token: Clone;
119
120 /// The type of a range of items yielded from this stream.
121 /// Types which do not a have a way of yielding ranges of items should just use the
122 /// `Self::Token` for this type.
123 type Range: Clone;
124
125 /// Type which represents the position in a stream.
126 /// `Ord` is required to allow parsers to determine which of two positions are further ahead.
127 type Position: Clone + Ord;
128
129 type Error: ParseError<Self::Token, Self::Range, Self::Position>;
130 /// Takes a stream and removes its first token, yielding the token and the rest of the elements.
131 /// Returns `Err` if no element could be retrieved.
uncons(&mut self) -> Result<Self::Token, StreamErrorFor<Self>>132 fn uncons(&mut self) -> Result<Self::Token, StreamErrorFor<Self>>;
133
134 /// Returns `true` if this stream only contains partial input.
135 ///
136 /// See `PartialStream`.
is_partial(&self) -> bool137 fn is_partial(&self) -> bool {
138 false
139 }
140 }
141
142 /// A `StreamOnce` which can create checkpoints which the stream can be reset to
143 pub trait ResetStream: StreamOnce {
144 type Checkpoint: Clone;
145
146 /// Creates a `Checkpoint` at the current position which can be used to reset the stream
147 /// later to the current position
checkpoint(&self) -> Self::Checkpoint148 fn checkpoint(&self) -> Self::Checkpoint;
149 /// Attempts to reset the stream to an earlier position.
reset(&mut self, checkpoint: Self::Checkpoint) -> Result<(), Self::Error>150 fn reset(&mut self, checkpoint: Self::Checkpoint) -> Result<(), Self::Error>;
151 }
152
153 clone_resetable! {('a) &'a str}
154 clone_resetable! {('a, T) &'a [T]}
155 clone_resetable! {('a, T) SliceStream<'a, T> }
156 clone_resetable! {(T: Clone) IteratorStream<T>}
157
158 /// A stream of tokens which can be duplicated
159 ///
160 /// This is a trait over types which implement the `StreamOnce`, `ResetStream` and `Positioned`
161 /// traits. If you need a custom `Stream` object then implement those traits and `Stream` is
162 /// implemented automatically.
163 pub trait Stream: StreamOnce + ResetStream + Positioned {}
164
165 impl<Input> Stream for Input
166 where
167 Input: StreamOnce + Positioned + ResetStream,
168 Input::Error: ParseError<Input::Token, Input::Range, Input::Position>,
169 {
170 }
171
172 #[inline]
uncons<Input>(input: &mut Input) -> ParseResult<Input::Token, Input::Error> where Input: ?Sized + Stream,173 pub fn uncons<Input>(input: &mut Input) -> ParseResult<Input::Token, Input::Error>
174 where
175 Input: ?Sized + Stream,
176 {
177 match input.uncons() {
178 Ok(x) => CommitOk(x),
179 Err(err) => wrap_stream_error(input, err),
180 }
181 }
182
183 /// A `RangeStream` is an extension of `StreamOnce` which allows for zero copy parsing.
184 pub trait RangeStreamOnce: StreamOnce + ResetStream {
185 /// Takes `size` elements from the stream.
186 /// Fails if the length of the stream is less than `size`.
uncons_range(&mut self, size: usize) -> Result<Self::Range, StreamErrorFor<Self>>187 fn uncons_range(&mut self, size: usize) -> Result<Self::Range, StreamErrorFor<Self>>;
188
189 /// Takes items from stream, testing each one with `predicate`.
190 /// returns the range of items which passed `predicate`.
uncons_while<F>(&mut self, f: F) -> Result<Self::Range, StreamErrorFor<Self>> where F: FnMut(Self::Token) -> bool191 fn uncons_while<F>(&mut self, f: F) -> Result<Self::Range, StreamErrorFor<Self>>
192 where
193 F: FnMut(Self::Token) -> bool;
194
195 #[inline]
196 /// Takes items from stream, testing each one with `predicate`
197 /// returns a range of at least one items which passed `predicate`.
198 ///
199 /// # Note
200 ///
201 /// This may not return `PeekOk` as it should uncons at least one token.
uncons_while1<F>(&mut self, mut f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>> where F: FnMut(Self::Token) -> bool,202 fn uncons_while1<F>(&mut self, mut f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>>
203 where
204 F: FnMut(Self::Token) -> bool,
205 {
206 let mut committed = false;
207 let mut started_at_eoi = true;
208 let result = self.uncons_while(|c| {
209 let ok = f(c);
210 committed |= ok;
211 started_at_eoi = false;
212 ok
213 });
214 if committed {
215 match result {
216 Ok(x) => CommitOk(x),
217 Err(x) => CommitErr(x),
218 }
219 } else if started_at_eoi {
220 PeekErr(Tracked::from(StreamErrorFor::<Self>::end_of_input()))
221 } else {
222 PeekErr(Tracked::from(
223 StreamErrorFor::<Self>::unexpected_static_message(""),
224 ))
225 }
226 }
227
228 /// Returns the distance between `self` and `end`. The returned `usize` must be so that
229 ///
230 /// ```ignore
231 /// let start = stream.checkpoint();
232 /// stream.uncons_range(distance);
233 /// stream.distance(&start) == distance
234 /// ```
distance(&self, end: &Self::Checkpoint) -> usize235 fn distance(&self, end: &Self::Checkpoint) -> usize;
236
237 /// Returns the entire range of `self`
range(&self) -> Self::Range238 fn range(&self) -> Self::Range;
239 }
240
241 /// A `RangeStream` is an extension of `Stream` which allows for zero copy parsing.
242 pub trait RangeStream: Stream + RangeStreamOnce {}
243
244 impl<Input> RangeStream for Input where Input: RangeStreamOnce + Stream {}
245
246 #[doc(hidden)]
wrap_stream_error<T, Input>( input: &Input, err: <Input::Error as ParseError<Input::Token, Input::Range, Input::Position>>::StreamError, ) -> ParseResult<T, <Input as StreamOnce>::Error> where Input: ?Sized + StreamOnce + Positioned,247 pub fn wrap_stream_error<T, Input>(
248 input: &Input,
249 err: <Input::Error as ParseError<Input::Token, Input::Range, Input::Position>>::StreamError,
250 ) -> ParseResult<T, <Input as StreamOnce>::Error>
251 where
252 Input: ?Sized + StreamOnce + Positioned,
253 {
254 let err = Input::Error::from_error(input.position(), err);
255 if input.is_partial() {
256 CommitErr(err)
257 } else {
258 PeekErr(err.into())
259 }
260 }
261
262 #[inline]
uncons_range<Input>( input: &mut Input, size: usize, ) -> ParseResult<Input::Range, <Input as StreamOnce>::Error> where Input: ?Sized + RangeStream,263 pub fn uncons_range<Input>(
264 input: &mut Input,
265 size: usize,
266 ) -> ParseResult<Input::Range, <Input as StreamOnce>::Error>
267 where
268 Input: ?Sized + RangeStream,
269 {
270 match input.uncons_range(size) {
271 Err(err) => wrap_stream_error(input, err),
272 Ok(x) => {
273 if size == 0 {
274 PeekOk(x)
275 } else {
276 CommitOk(x)
277 }
278 }
279 }
280 }
281
282 #[doc(hidden)]
input_at_eof<Input>(input: &mut Input) -> bool where Input: ?Sized + Stream,283 pub fn input_at_eof<Input>(input: &mut Input) -> bool
284 where
285 Input: ?Sized + Stream,
286 {
287 let before = input.checkpoint();
288 let x = input
289 .uncons()
290 .err()
291 .map_or(false, |err| err.is_unexpected_end_of_input());
292 input.reset(before).is_ok() && x
293 }
294
295 /// Removes items from the input while `predicate` returns `true`.
296 #[inline]
uncons_while<Input, F>( input: &mut Input, predicate: F, ) -> ParseResult<Input::Range, Input::Error> where F: FnMut(Input::Token) -> bool, Input: ?Sized + RangeStream, Input::Range: Range,297 pub fn uncons_while<Input, F>(
298 input: &mut Input,
299 predicate: F,
300 ) -> ParseResult<Input::Range, Input::Error>
301 where
302 F: FnMut(Input::Token) -> bool,
303 Input: ?Sized + RangeStream,
304 Input::Range: Range,
305 {
306 match input.uncons_while(predicate) {
307 Err(err) => wrap_stream_error(input, err),
308 Ok(x) => {
309 if input.is_partial() && input_at_eof(input) {
310 // Partial inputs which encounter end of file must fail to let more input be
311 // retrieved
312 CommitErr(Input::Error::from_error(
313 input.position(),
314 StreamError::end_of_input(),
315 ))
316 } else if x.len() == 0 {
317 PeekOk(x)
318 } else {
319 CommitOk(x)
320 }
321 }
322 }
323 }
324
325 #[inline]
326 /// Takes items from stream, testing each one with `predicate`
327 /// returns a range of at least one items which passed `predicate`.
328 ///
329 /// # Note
330 ///
331 /// This may not return `PeekOk` as it should uncons at least one token.
uncons_while1<Input, F>( input: &mut Input, predicate: F, ) -> ParseResult<Input::Range, Input::Error> where F: FnMut(Input::Token) -> bool, Input: ?Sized + RangeStream,332 pub fn uncons_while1<Input, F>(
333 input: &mut Input,
334 predicate: F,
335 ) -> ParseResult<Input::Range, Input::Error>
336 where
337 F: FnMut(Input::Token) -> bool,
338 Input: ?Sized + RangeStream,
339 {
340 match input.uncons_while1(predicate) {
341 CommitOk(x) => {
342 if input.is_partial() && input_at_eof(input) {
343 // Partial inputs which encounter end of file must fail to let more input be
344 // retrieved
345 CommitErr(Input::Error::from_error(
346 input.position(),
347 StreamError::end_of_input(),
348 ))
349 } else {
350 CommitOk(x)
351 }
352 }
353 PeekErr(_) => {
354 if input.is_partial() && input_at_eof(input) {
355 // Partial inputs which encounter end of file must fail to let more input be
356 // retrieved
357 CommitErr(Input::Error::from_error(
358 input.position(),
359 StreamError::end_of_input(),
360 ))
361 } else {
362 PeekErr(Input::Error::empty(input.position()).into())
363 }
364 }
365 CommitErr(err) => {
366 if input.is_partial() && input_at_eof(input) {
367 // Partial inputs which encounter end of file must fail to let more input be
368 // retrieved
369 CommitErr(Input::Error::from_error(
370 input.position(),
371 StreamError::end_of_input(),
372 ))
373 } else {
374 wrap_stream_error(input, err)
375 }
376 }
377 PeekOk(_) => unreachable!(),
378 }
379 }
380
381 /// Trait representing a range of elements.
382 pub trait Range {
383 /// Returns the remaining length of `self`.
384 /// The returned length need not be the same as the number of items left in the stream.
len(&self) -> usize385 fn len(&self) -> usize;
386
387 /// Returns `true` if the range does not contain any elements (`Range::len() == 0`)
is_empty(&self) -> bool388 fn is_empty(&self) -> bool {
389 self.len() == 0
390 }
391 }
392
393 impl<'a, I> StreamOnce for &'a mut I
394 where
395 I: StreamOnce + ?Sized,
396 {
397 type Token = I::Token;
398
399 type Range = I::Range;
400
401 type Position = I::Position;
402
403 type Error = I::Error;
uncons(&mut self) -> Result<Self::Token, StreamErrorFor<Self>>404 fn uncons(&mut self) -> Result<Self::Token, StreamErrorFor<Self>> {
405 (**self).uncons()
406 }
407
is_partial(&self) -> bool408 fn is_partial(&self) -> bool {
409 (**self).is_partial()
410 }
411 }
412
413 impl<'a, I> Positioned for &'a mut I
414 where
415 I: Positioned + ?Sized,
416 {
417 #[inline]
position(&self) -> Self::Position418 fn position(&self) -> Self::Position {
419 (**self).position()
420 }
421 }
422
423 impl<'a, I> ResetStream for &'a mut I
424 where
425 I: ResetStream + ?Sized,
426 {
427 type Checkpoint = I::Checkpoint;
428
checkpoint(&self) -> Self::Checkpoint429 fn checkpoint(&self) -> Self::Checkpoint {
430 (**self).checkpoint()
431 }
432
reset(&mut self, checkpoint: Self::Checkpoint) -> Result<(), Self::Error>433 fn reset(&mut self, checkpoint: Self::Checkpoint) -> Result<(), Self::Error> {
434 (**self).reset(checkpoint)
435 }
436 }
437
438 impl<'a, I> RangeStreamOnce for &'a mut I
439 where
440 I: RangeStreamOnce + ?Sized,
441 {
442 #[inline]
uncons_while<F>(&mut self, f: F) -> Result<Self::Range, StreamErrorFor<Self>> where F: FnMut(Self::Token) -> bool,443 fn uncons_while<F>(&mut self, f: F) -> Result<Self::Range, StreamErrorFor<Self>>
444 where
445 F: FnMut(Self::Token) -> bool,
446 {
447 (**self).uncons_while(f)
448 }
449
450 #[inline]
uncons_while1<F>(&mut self, f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>> where F: FnMut(Self::Token) -> bool,451 fn uncons_while1<F>(&mut self, f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>>
452 where
453 F: FnMut(Self::Token) -> bool,
454 {
455 (**self).uncons_while1(f)
456 }
457
458 #[inline]
uncons_range(&mut self, size: usize) -> Result<Self::Range, StreamErrorFor<Self>>459 fn uncons_range(&mut self, size: usize) -> Result<Self::Range, StreamErrorFor<Self>> {
460 (**self).uncons_range(size)
461 }
462
463 #[inline]
distance(&self, end: &Self::Checkpoint) -> usize464 fn distance(&self, end: &Self::Checkpoint) -> usize {
465 (**self).distance(end)
466 }
467
range(&self) -> Self::Range468 fn range(&self) -> Self::Range {
469 (**self).range()
470 }
471 }
472
473 impl<'a, I> Range for &'a mut I
474 where
475 I: Range + ?Sized,
476 {
len(&self) -> usize477 fn len(&self) -> usize {
478 (**self).len()
479 }
480 }
481
482 impl<'a> StreamOnce for &'a str {
483 type Token = char;
484 type Range = &'a str;
485 type Position = PointerOffset<str>;
486 type Error = StringStreamError;
487
488 #[inline]
uncons(&mut self) -> Result<char, StreamErrorFor<Self>>489 fn uncons(&mut self) -> Result<char, StreamErrorFor<Self>> {
490 let mut chars = self.chars();
491 match chars.next() {
492 Some(c) => {
493 *self = chars.as_str();
494 Ok(c)
495 }
496 None => Err(StringStreamError::Eoi),
497 }
498 }
499 }
500
501 impl<'a> Positioned for &'a str {
502 #[inline]
position(&self) -> Self::Position503 fn position(&self) -> Self::Position {
504 PointerOffset::new(self.as_bytes().position().0)
505 }
506 }
507
508 #[allow(clippy::while_let_loop)]
str_uncons_while<'a, F>(slice: &mut &'a str, mut chars: Chars<'a>, mut f: F) -> &'a str where F: FnMut(char) -> bool,509 fn str_uncons_while<'a, F>(slice: &mut &'a str, mut chars: Chars<'a>, mut f: F) -> &'a str
510 where
511 F: FnMut(char) -> bool,
512 {
513 let mut last_char_size = 0;
514
515 macro_rules! test_next {
516 () => {
517 match chars.next() {
518 Some(c) => {
519 if !f(c) {
520 last_char_size = c.len_utf8();
521 break;
522 }
523 }
524 None => break,
525 }
526 };
527 }
528 loop {
529 test_next!();
530 test_next!();
531 test_next!();
532 test_next!();
533 test_next!();
534 test_next!();
535 test_next!();
536 test_next!();
537 }
538
539 let len = slice.len() - chars.as_str().len() - last_char_size;
540 let (result, rest) = slice.split_at(len);
541 *slice = rest;
542 result
543 }
544
545 impl<'a> RangeStreamOnce for &'a str {
uncons_while<F>(&mut self, f: F) -> Result<&'a str, StreamErrorFor<Self>> where F: FnMut(Self::Token) -> bool,546 fn uncons_while<F>(&mut self, f: F) -> Result<&'a str, StreamErrorFor<Self>>
547 where
548 F: FnMut(Self::Token) -> bool,
549 {
550 Ok(str_uncons_while(self, self.chars(), f))
551 }
552
553 #[inline]
uncons_while1<F>(&mut self, mut f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>> where F: FnMut(Self::Token) -> bool,554 fn uncons_while1<F>(&mut self, mut f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>>
555 where
556 F: FnMut(Self::Token) -> bool,
557 {
558 let mut chars = self.chars();
559 match chars.next() {
560 Some(c) => {
561 if !f(c) {
562 return PeekErr(Tracked::from(StringStreamError::UnexpectedParse));
563 }
564 }
565 None => return PeekErr(Tracked::from(StringStreamError::Eoi)),
566 }
567
568 CommitOk(str_uncons_while(self, chars, f))
569 }
570
571 #[inline]
uncons_range(&mut self, size: usize) -> Result<&'a str, StreamErrorFor<Self>>572 fn uncons_range(&mut self, size: usize) -> Result<&'a str, StreamErrorFor<Self>> {
573 fn is_char_boundary(s: &str, index: usize) -> bool {
574 if index == s.len() {
575 return true;
576 }
577 match s.as_bytes().get(index) {
578 None => false,
579 Some(b) => !(128..=192).contains(b),
580 }
581 }
582 if size <= self.len() {
583 if is_char_boundary(self, size) {
584 let (result, remaining) = self.split_at(size);
585 *self = remaining;
586 Ok(result)
587 } else {
588 Err(StringStreamError::CharacterBoundary)
589 }
590 } else {
591 Err(StringStreamError::Eoi)
592 }
593 }
594
595 #[inline]
distance(&self, end: &Self) -> usize596 fn distance(&self, end: &Self) -> usize {
597 self.position().0 - end.position().0
598 }
599
range(&self) -> Self::Range600 fn range(&self) -> Self::Range {
601 self
602 }
603 }
604
605 impl<'a> Range for &'a str {
606 #[inline]
len(&self) -> usize607 fn len(&self) -> usize {
608 str::len(self)
609 }
610 }
611
612 impl<'a, T> Range for &'a [T] {
613 #[inline]
len(&self) -> usize614 fn len(&self) -> usize {
615 <[T]>::len(self)
616 }
617 }
618
619 #[repr(usize)]
620 enum UnconsStart {
621 Zero = 0,
622 One = 1,
623 }
624
slice_uncons_while<'a, T, F>(slice: &mut &'a [T], start: UnconsStart, mut f: F) -> &'a [T] where F: FnMut(T) -> bool, T: Clone,625 fn slice_uncons_while<'a, T, F>(slice: &mut &'a [T], start: UnconsStart, mut f: F) -> &'a [T]
626 where
627 F: FnMut(T) -> bool,
628 T: Clone,
629 {
630 let mut i = start as usize;
631 let len = slice.len();
632 // SAFETY: We only call this function with `One` if the slice has length >= 1
633 debug_assert!(len >= i, "");
634 let mut found = false;
635
636 macro_rules! check {
637 () => {
638 if !f(unsafe { slice.get_unchecked(i).clone() }) {
639 found = true;
640 break;
641 }
642 i += 1;
643 };
644 }
645
646 // SAFETY: ensures we can access at least 8 elements starting at i, making get_unchecked sound.
647 while len - i >= 8 {
648 check!();
649 check!();
650 check!();
651 check!();
652 check!();
653 check!();
654 check!();
655 check!();
656 }
657
658 if !found {
659 while let Some(c) = slice.get(i) {
660 if !f(c.clone()) {
661 break;
662 }
663 i += 1;
664 }
665 }
666
667 let (result, remaining) = slice.split_at(i);
668 *slice = remaining;
669 result
670 }
671
672 impl<'a, T> RangeStreamOnce for &'a [T]
673 where
674 T: Clone + PartialEq,
675 {
676 #[inline]
uncons_range(&mut self, size: usize) -> Result<&'a [T], StreamErrorFor<Self>>677 fn uncons_range(&mut self, size: usize) -> Result<&'a [T], StreamErrorFor<Self>> {
678 if size <= self.len() {
679 let (result, remaining) = self.split_at(size);
680 *self = remaining;
681 Ok(result)
682 } else {
683 Err(UnexpectedParse::Eoi)
684 }
685 }
686
687 #[inline]
uncons_while<F>(&mut self, f: F) -> Result<&'a [T], StreamErrorFor<Self>> where F: FnMut(Self::Token) -> bool,688 fn uncons_while<F>(&mut self, f: F) -> Result<&'a [T], StreamErrorFor<Self>>
689 where
690 F: FnMut(Self::Token) -> bool,
691 {
692 Ok(slice_uncons_while(self, UnconsStart::Zero, f))
693 }
694
695 #[inline]
uncons_while1<F>(&mut self, mut f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>> where F: FnMut(Self::Token) -> bool,696 fn uncons_while1<F>(&mut self, mut f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>>
697 where
698 F: FnMut(Self::Token) -> bool,
699 {
700 match self.first() {
701 Some(c) => {
702 if !f(c.clone()) {
703 return PeekErr(Tracked::from(UnexpectedParse::Unexpected));
704 }
705 }
706 None => {
707 return PeekErr(Tracked::from(UnexpectedParse::Eoi));
708 }
709 }
710
711 CommitOk(slice_uncons_while(self, UnconsStart::One, f))
712 }
713
714 #[inline]
distance(&self, end: &Self) -> usize715 fn distance(&self, end: &Self) -> usize {
716 end.len() - self.len()
717 }
718
range(&self) -> Self::Range719 fn range(&self) -> Self::Range {
720 self
721 }
722 }
723
724 impl<'a, T> Positioned for &'a [T]
725 where
726 T: Clone + PartialEq,
727 {
728 #[inline]
position(&self) -> Self::Position729 fn position(&self) -> Self::Position {
730 PointerOffset::new(self.as_ptr() as usize)
731 }
732 }
733
734 impl<'a, T> StreamOnce for &'a [T]
735 where
736 T: Clone + PartialEq,
737 {
738 type Token = T;
739 type Range = &'a [T];
740 type Position = PointerOffset<[T]>;
741 type Error = UnexpectedParse;
742
743 #[inline]
uncons(&mut self) -> Result<T, StreamErrorFor<Self>>744 fn uncons(&mut self) -> Result<T, StreamErrorFor<Self>> {
745 match self.split_first() {
746 Some((first, rest)) => {
747 *self = rest;
748 Ok(first.clone())
749 }
750 None => Err(UnexpectedParse::Eoi),
751 }
752 }
753 }
754
755 /// Stream type which indicates that the stream is partial if end of input is reached
756 #[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug)]
757 pub struct PartialStream<S>(pub S);
758
759 impl<S> From<S> for PartialStream<S> {
from(t: S) -> Self760 fn from(t: S) -> Self {
761 PartialStream(t)
762 }
763 }
764
765 impl<S> Positioned for PartialStream<S>
766 where
767 S: Positioned,
768 {
769 #[inline]
position(&self) -> Self::Position770 fn position(&self) -> Self::Position {
771 self.0.position()
772 }
773 }
774
775 impl<S> ResetStream for PartialStream<S>
776 where
777 S: ResetStream,
778 {
779 type Checkpoint = S::Checkpoint;
780
781 #[inline]
checkpoint(&self) -> Self::Checkpoint782 fn checkpoint(&self) -> Self::Checkpoint {
783 self.0.checkpoint()
784 }
785
786 #[inline]
reset(&mut self, checkpoint: Self::Checkpoint) -> Result<(), S::Error>787 fn reset(&mut self, checkpoint: Self::Checkpoint) -> Result<(), S::Error> {
788 self.0.reset(checkpoint)
789 }
790 }
791
792 impl<S> StreamOnce for PartialStream<S>
793 where
794 S: StreamOnce,
795 {
796 type Token = S::Token;
797 type Range = S::Range;
798 type Position = S::Position;
799 type Error = S::Error;
800
801 #[inline]
uncons(&mut self) -> Result<S::Token, StreamErrorFor<Self>>802 fn uncons(&mut self) -> Result<S::Token, StreamErrorFor<Self>> {
803 self.0.uncons()
804 }
805
is_partial(&self) -> bool806 fn is_partial(&self) -> bool {
807 true
808 }
809 }
810
811 impl<S> RangeStreamOnce for PartialStream<S>
812 where
813 S: RangeStreamOnce,
814 {
815 #[inline]
uncons_range(&mut self, size: usize) -> Result<Self::Range, StreamErrorFor<Self>>816 fn uncons_range(&mut self, size: usize) -> Result<Self::Range, StreamErrorFor<Self>> {
817 self.0.uncons_range(size)
818 }
819
820 #[inline]
uncons_while<F>(&mut self, f: F) -> Result<Self::Range, StreamErrorFor<Self>> where F: FnMut(Self::Token) -> bool,821 fn uncons_while<F>(&mut self, f: F) -> Result<Self::Range, StreamErrorFor<Self>>
822 where
823 F: FnMut(Self::Token) -> bool,
824 {
825 self.0.uncons_while(f)
826 }
827
uncons_while1<F>(&mut self, f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>> where F: FnMut(Self::Token) -> bool,828 fn uncons_while1<F>(&mut self, f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>>
829 where
830 F: FnMut(Self::Token) -> bool,
831 {
832 self.0.uncons_while1(f)
833 }
834
835 #[inline]
distance(&self, end: &Self::Checkpoint) -> usize836 fn distance(&self, end: &Self::Checkpoint) -> usize {
837 self.0.distance(end)
838 }
839
840 #[inline]
range(&self) -> Self::Range841 fn range(&self) -> Self::Range {
842 self.0.range()
843 }
844 }
845
846 /// Stream type which indicates that the stream is complete if end of input is reached
847 ///
848 /// For most streams this is already the default but this wrapper can be used to override a nested
849 /// `PartialStream`
850 #[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug)]
851 #[repr(transparent)]
852 pub struct CompleteStream<S>(pub S);
853
854 impl<S> From<S> for CompleteStream<S> {
from(t: S) -> Self855 fn from(t: S) -> Self {
856 CompleteStream(t)
857 }
858 }
859
860 impl<'s, S> From<&'s mut S> for &'s mut CompleteStream<S> {
from(t: &'s mut S) -> Self861 fn from(t: &'s mut S) -> Self {
862 // SAFETY repr(transparent) is specified on CompleteStream
863 unsafe { &mut *(t as *mut S as *mut CompleteStream<S>) }
864 }
865 }
866
867 impl<S> Positioned for CompleteStream<S>
868 where
869 S: Positioned,
870 {
871 #[inline]
position(&self) -> Self::Position872 fn position(&self) -> Self::Position {
873 self.0.position()
874 }
875 }
876
877 impl<S> ResetStream for CompleteStream<S>
878 where
879 S: ResetStream,
880 {
881 type Checkpoint = S::Checkpoint;
882
883 #[inline]
checkpoint(&self) -> Self::Checkpoint884 fn checkpoint(&self) -> Self::Checkpoint {
885 self.0.checkpoint()
886 }
887
888 #[inline]
reset(&mut self, checkpoint: Self::Checkpoint) -> Result<(), S::Error>889 fn reset(&mut self, checkpoint: Self::Checkpoint) -> Result<(), S::Error> {
890 self.0.reset(checkpoint)
891 }
892 }
893
894 impl<S> StreamOnce for CompleteStream<S>
895 where
896 S: StreamOnce,
897 {
898 type Token = S::Token;
899 type Range = S::Range;
900 type Position = S::Position;
901 type Error = S::Error;
902
903 #[inline]
uncons(&mut self) -> Result<S::Token, StreamErrorFor<Self>>904 fn uncons(&mut self) -> Result<S::Token, StreamErrorFor<Self>> {
905 self.0.uncons()
906 }
907
is_partial(&self) -> bool908 fn is_partial(&self) -> bool {
909 false
910 }
911 }
912
913 impl<S> RangeStreamOnce for CompleteStream<S>
914 where
915 S: RangeStreamOnce,
916 {
917 #[inline]
uncons_range(&mut self, size: usize) -> Result<Self::Range, StreamErrorFor<Self>>918 fn uncons_range(&mut self, size: usize) -> Result<Self::Range, StreamErrorFor<Self>> {
919 self.0.uncons_range(size)
920 }
921
922 #[inline]
uncons_while<F>(&mut self, f: F) -> Result<Self::Range, StreamErrorFor<Self>> where F: FnMut(Self::Token) -> bool,923 fn uncons_while<F>(&mut self, f: F) -> Result<Self::Range, StreamErrorFor<Self>>
924 where
925 F: FnMut(Self::Token) -> bool,
926 {
927 self.0.uncons_while(f)
928 }
929
uncons_while1<F>(&mut self, f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>> where F: FnMut(Self::Token) -> bool,930 fn uncons_while1<F>(&mut self, f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>>
931 where
932 F: FnMut(Self::Token) -> bool,
933 {
934 self.0.uncons_while1(f)
935 }
936
937 #[inline]
distance(&self, end: &Self::Checkpoint) -> usize938 fn distance(&self, end: &Self::Checkpoint) -> usize {
939 self.0.distance(end)
940 }
941
942 #[inline]
range(&self) -> Self::Range943 fn range(&self) -> Self::Range {
944 self.0.range()
945 }
946 }
947
948 #[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug)]
949 pub struct MaybePartialStream<S>(pub S, pub bool);
950
951 impl<S> Positioned for MaybePartialStream<S>
952 where
953 S: Positioned,
954 {
955 #[inline]
position(&self) -> Self::Position956 fn position(&self) -> Self::Position {
957 self.0.position()
958 }
959 }
960
961 impl<S> ResetStream for MaybePartialStream<S>
962 where
963 S: ResetStream,
964 {
965 type Checkpoint = S::Checkpoint;
966
967 #[inline]
checkpoint(&self) -> Self::Checkpoint968 fn checkpoint(&self) -> Self::Checkpoint {
969 self.0.checkpoint()
970 }
971
972 #[inline]
reset(&mut self, checkpoint: Self::Checkpoint) -> Result<(), S::Error>973 fn reset(&mut self, checkpoint: Self::Checkpoint) -> Result<(), S::Error> {
974 self.0.reset(checkpoint)
975 }
976 }
977
978 impl<S> StreamOnce for MaybePartialStream<S>
979 where
980 S: StreamOnce,
981 {
982 type Token = S::Token;
983 type Range = S::Range;
984 type Position = S::Position;
985 type Error = S::Error;
986
987 #[inline]
uncons(&mut self) -> Result<S::Token, StreamErrorFor<Self>>988 fn uncons(&mut self) -> Result<S::Token, StreamErrorFor<Self>> {
989 self.0.uncons()
990 }
991
is_partial(&self) -> bool992 fn is_partial(&self) -> bool {
993 self.1
994 }
995 }
996
997 impl<S> RangeStreamOnce for MaybePartialStream<S>
998 where
999 S: RangeStreamOnce,
1000 {
1001 #[inline]
uncons_range(&mut self, size: usize) -> Result<Self::Range, StreamErrorFor<Self>>1002 fn uncons_range(&mut self, size: usize) -> Result<Self::Range, StreamErrorFor<Self>> {
1003 self.0.uncons_range(size)
1004 }
1005
1006 #[inline]
uncons_while<F>(&mut self, f: F) -> Result<Self::Range, StreamErrorFor<Self>> where F: FnMut(Self::Token) -> bool,1007 fn uncons_while<F>(&mut self, f: F) -> Result<Self::Range, StreamErrorFor<Self>>
1008 where
1009 F: FnMut(Self::Token) -> bool,
1010 {
1011 self.0.uncons_while(f)
1012 }
1013
uncons_while1<F>(&mut self, f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>> where F: FnMut(Self::Token) -> bool,1014 fn uncons_while1<F>(&mut self, f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>>
1015 where
1016 F: FnMut(Self::Token) -> bool,
1017 {
1018 self.0.uncons_while1(f)
1019 }
1020
1021 #[inline]
distance(&self, end: &Self::Checkpoint) -> usize1022 fn distance(&self, end: &Self::Checkpoint) -> usize {
1023 self.0.distance(end)
1024 }
1025
1026 #[inline]
range(&self) -> Self::Range1027 fn range(&self) -> Self::Range {
1028 self.0.range()
1029 }
1030 }
1031
1032 /// Newtype for constructing a stream from a slice where the items in the slice are not copyable.
1033 #[derive(Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
1034 pub struct SliceStream<'a, T>(pub &'a [T]);
1035
1036 impl<'a, T> Clone for SliceStream<'a, T> {
clone(&self) -> SliceStream<'a, T>1037 fn clone(&self) -> SliceStream<'a, T> {
1038 SliceStream(self.0)
1039 }
1040 }
1041
1042 impl<'a, T> Positioned for SliceStream<'a, T>
1043 where
1044 T: PartialEq + 'a,
1045 {
1046 #[inline]
position(&self) -> Self::Position1047 fn position(&self) -> Self::Position {
1048 PointerOffset::new(self.0.as_ptr() as usize)
1049 }
1050 }
1051
1052 impl<'a, T> StreamOnce for SliceStream<'a, T>
1053 where
1054 T: PartialEq + 'a,
1055 {
1056 type Token = &'a T;
1057 type Range = &'a [T];
1058 type Position = PointerOffset<[T]>;
1059 type Error = UnexpectedParse;
1060
1061 #[inline]
uncons(&mut self) -> Result<&'a T, StreamErrorFor<Self>>1062 fn uncons(&mut self) -> Result<&'a T, StreamErrorFor<Self>> {
1063 match self.0.split_first() {
1064 Some((first, rest)) => {
1065 self.0 = rest;
1066 Ok(first)
1067 }
1068 None => Err(UnexpectedParse::Eoi),
1069 }
1070 }
1071 }
1072
slice_uncons_while_ref<'a, T, F>(slice: &mut &'a [T], start: UnconsStart, mut f: F) -> &'a [T] where F: FnMut(&'a T) -> bool,1073 fn slice_uncons_while_ref<'a, T, F>(slice: &mut &'a [T], start: UnconsStart, mut f: F) -> &'a [T]
1074 where
1075 F: FnMut(&'a T) -> bool,
1076 {
1077 let mut i = start as usize;
1078 let len = slice.len();
1079 // SAFETY: We only call this function with `One` if the slice has length >= 1
1080 debug_assert!(len >= i, "");
1081 let mut found = false;
1082
1083 macro_rules! check {
1084 () => {
1085 if !f(unsafe { slice.get_unchecked(i) }) {
1086 found = true;
1087 break;
1088 }
1089 i += 1;
1090 };
1091 }
1092
1093 // SAFETY: ensures we can access at least 8 elements starting at i, making get_unchecked sound.
1094 while len - i >= 8 {
1095 check!();
1096 check!();
1097 check!();
1098 check!();
1099 check!();
1100 check!();
1101 check!();
1102 check!();
1103 }
1104
1105 if !found {
1106 while let Some(c) = slice.get(i) {
1107 if !f(c) {
1108 break;
1109 }
1110 i += 1;
1111 }
1112 }
1113
1114 let (result, remaining) = slice.split_at(i);
1115 *slice = remaining;
1116 result
1117 }
1118
1119 impl<'a, T> RangeStreamOnce for SliceStream<'a, T>
1120 where
1121 T: PartialEq + 'a,
1122 {
1123 #[inline]
uncons_range(&mut self, size: usize) -> Result<&'a [T], StreamErrorFor<Self>>1124 fn uncons_range(&mut self, size: usize) -> Result<&'a [T], StreamErrorFor<Self>> {
1125 if size <= self.0.len() {
1126 let (range, rest) = self.0.split_at(size);
1127 self.0 = rest;
1128 Ok(range)
1129 } else {
1130 Err(UnexpectedParse::Eoi)
1131 }
1132 }
1133
1134 #[inline]
uncons_while<F>(&mut self, f: F) -> Result<&'a [T], StreamErrorFor<Self>> where F: FnMut(Self::Token) -> bool,1135 fn uncons_while<F>(&mut self, f: F) -> Result<&'a [T], StreamErrorFor<Self>>
1136 where
1137 F: FnMut(Self::Token) -> bool,
1138 {
1139 Ok(slice_uncons_while_ref(&mut self.0, UnconsStart::Zero, f))
1140 }
1141
1142 #[inline]
uncons_while1<F>(&mut self, mut f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>> where F: FnMut(Self::Token) -> bool,1143 fn uncons_while1<F>(&mut self, mut f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>>
1144 where
1145 F: FnMut(Self::Token) -> bool,
1146 {
1147 match self.0.first() {
1148 Some(c) => {
1149 if !f(c) {
1150 return PeekErr(Tracked::from(UnexpectedParse::Unexpected));
1151 }
1152 }
1153 None => return PeekErr(Tracked::from(UnexpectedParse::Eoi)),
1154 }
1155
1156 CommitOk(slice_uncons_while_ref(&mut self.0, UnconsStart::One, f))
1157 }
1158
1159 #[inline]
distance(&self, end: &Self) -> usize1160 fn distance(&self, end: &Self) -> usize {
1161 end.0.len() - self.0.len()
1162 }
1163
range(&self) -> Self::Range1164 fn range(&self) -> Self::Range {
1165 self.0
1166 }
1167 }
1168
1169 /// Wrapper around iterators which allows them to be treated as a stream.
1170 /// Returned by [`IteratorStream::new`].
1171 #[derive(Copy, Clone, Debug)]
1172 pub struct IteratorStream<Input>(Input);
1173
1174 impl<Input> IteratorStream<Input>
1175 where
1176 Input: Iterator,
1177 {
1178 /// Converts an `Iterator` into a stream.
1179 ///
1180 /// NOTE: This type do not implement `Positioned` and `Clone` and must be wrapped with types
1181 /// such as `BufferedStreamRef` and `State` to become a `Stream` which can be parsed
new<T>(iter: T) -> IteratorStream<Input> where T: IntoIterator<IntoIter = Input, Item = Input::Item>,1182 pub fn new<T>(iter: T) -> IteratorStream<Input>
1183 where
1184 T: IntoIterator<IntoIter = Input, Item = Input::Item>,
1185 {
1186 IteratorStream(iter.into_iter())
1187 }
1188 }
1189
1190 impl<Input> Iterator for IteratorStream<Input>
1191 where
1192 Input: Iterator,
1193 {
1194 type Item = Input::Item;
next(&mut self) -> Option<Input::Item>1195 fn next(&mut self) -> Option<Input::Item> {
1196 self.0.next()
1197 }
1198 }
1199
1200 impl<Input: Iterator> StreamOnce for IteratorStream<Input>
1201 where
1202 Input::Item: Clone + PartialEq,
1203 {
1204 type Token = Input::Item;
1205 type Range = Input::Item;
1206 type Position = ();
1207 type Error = UnexpectedParse;
1208
1209 #[inline]
uncons(&mut self) -> Result<Self::Token, StreamErrorFor<Self>>1210 fn uncons(&mut self) -> Result<Self::Token, StreamErrorFor<Self>> {
1211 match self.next() {
1212 Some(x) => Ok(x),
1213 None => Err(UnexpectedParse::Eoi),
1214 }
1215 }
1216 }
1217
1218 /// Newtype around a pointer offset into a slice stream (`&[T]`/`&str`).
1219 pub struct PointerOffset<T: ?Sized>(pub usize, PhantomData<T>);
1220
1221 impl<T: ?Sized> Clone for PointerOffset<T> {
clone(&self) -> Self1222 fn clone(&self) -> Self {
1223 PointerOffset::new(self.0)
1224 }
1225 }
1226
1227 impl<T: ?Sized> Copy for PointerOffset<T> {}
1228
1229 impl<T: ?Sized> Default for PointerOffset<T> {
default() -> Self1230 fn default() -> Self {
1231 PointerOffset::new(0)
1232 }
1233 }
1234
1235 impl<T: ?Sized> PartialEq for PointerOffset<T> {
eq(&self, other: &Self) -> bool1236 fn eq(&self, other: &Self) -> bool {
1237 self.0 == other.0
1238 }
1239 }
1240
1241 impl<T: ?Sized> Eq for PointerOffset<T> {}
1242
1243 impl<T: ?Sized> PartialOrd for PointerOffset<T> {
partial_cmp(&self, other: &Self) -> Option<Ordering>1244 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1245 self.0.partial_cmp(&other.0)
1246 }
1247 }
1248
1249 impl<T: ?Sized> Ord for PointerOffset<T> {
cmp(&self, other: &Self) -> Ordering1250 fn cmp(&self, other: &Self) -> Ordering {
1251 self.0.cmp(&other.0)
1252 }
1253 }
1254
1255 impl<T> fmt::Debug for PointerOffset<T>
1256 where
1257 T: ?Sized,
1258 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1259 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1260 write!(f, "{}", self)
1261 }
1262 }
1263
1264 impl<T> fmt::Display for PointerOffset<T>
1265 where
1266 T: ?Sized,
1267 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1268 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1269 write!(f, "PointerOffset({:?})", self.0 as *const ())
1270 }
1271 }
1272
1273 impl<T> PointerOffset<T>
1274 where
1275 T: ?Sized,
1276 {
new(offset: usize) -> Self1277 pub fn new(offset: usize) -> Self {
1278 PointerOffset(offset, PhantomData)
1279 }
1280
1281 /// Converts the pointer-based position into an indexed position.
1282 ///
1283 /// ```rust
1284 /// # extern crate combine;
1285 /// # use combine::*;
1286 /// # fn main() {
1287 /// let text = "b";
1288 /// let err = token('a').easy_parse(text).unwrap_err();
1289 /// assert_eq!(err.position.0, text.as_ptr() as usize);
1290 /// assert_eq!(err.map_position(|p| p.translate_position(text)).position, 0);
1291 /// # }
1292 /// ```
translate_position(mut self, initial_slice: &T) -> usize1293 pub fn translate_position(mut self, initial_slice: &T) -> usize {
1294 self.0 -= initial_slice as *const T as *const () as usize;
1295 self.0
1296 }
1297 }
1298
1299 /// Decodes `input` using `parser`.
1300 ///
1301 /// Return `Ok(Some(token), committed_data)` if there was enough data to finish parsing using
1302 /// `parser`.
1303 /// Returns `Ok(None, committed_data)` if `input` did not contain enough data to finish parsing
1304 /// using `parser`.
1305 ///
1306 /// See `examples/async.rs` for example usage in a `tokio_io::codec::Decoder`
decode<Input, P>( mut parser: P, input: &mut Input, partial_state: &mut P::PartialState, ) -> Result<(Option<P::Output>, usize), <Input as StreamOnce>::Error> where P: Parser<Input>, Input: RangeStream,1307 pub fn decode<Input, P>(
1308 mut parser: P,
1309 input: &mut Input,
1310 partial_state: &mut P::PartialState,
1311 ) -> Result<(Option<P::Output>, usize), <Input as StreamOnce>::Error>
1312 where
1313 P: Parser<Input>,
1314 Input: RangeStream,
1315 {
1316 let start = input.checkpoint();
1317 match parser.parse_with_state(input, partial_state) {
1318 Ok(message) => Ok((Some(message), input.distance(&start))),
1319 Err(err) => {
1320 if err.is_unexpected_end_of_input() {
1321 if input.is_partial() {
1322 // The parser expected more input to parse and input is partial, return `None`
1323 // as we did not finish and also return how much may be removed from the stream
1324 Ok((None, input.distance(&start)))
1325 } else {
1326 Err(err)
1327 }
1328 } else {
1329 Err(err)
1330 }
1331 }
1332 }
1333 }
1334
1335 /// Decodes `input` using `parser`. Like `decode` but works directly in both
1336 /// `tokio_util::Decoder::decode` and `tokio_util::Decoder::decode_eof`
1337 ///
1338 /// Return `Ok(Some(token), committed_data)` if there was enough data to finish parsing using
1339 /// `parser`.
1340 /// Returns `Ok(None, committed_data)` if `input` did not contain enough data to finish parsing
1341 /// using `parser`.
1342 /// Returns `Ok(None, 0)` if `input` did not contain enough data to finish parsing
1343 /// using `parser`.
1344 ///
1345 /// See `examples/async.rs` for example usage in a `tokio_io::codec::Decoder`
decode_tokio<Input, P>( mut parser: P, input: &mut Input, partial_state: &mut P::PartialState, ) -> Result<(Option<P::Output>, usize), <Input as StreamOnce>::Error> where P: Parser<Input>, Input: RangeStream,1346 pub fn decode_tokio<Input, P>(
1347 mut parser: P,
1348 input: &mut Input,
1349 partial_state: &mut P::PartialState,
1350 ) -> Result<(Option<P::Output>, usize), <Input as StreamOnce>::Error>
1351 where
1352 P: Parser<Input>,
1353 Input: RangeStream,
1354 {
1355 let start = input.checkpoint();
1356 match parser.parse_with_state(input, partial_state) {
1357 Ok(message) => Ok((Some(message), input.distance(&start))),
1358 Err(err) => {
1359 if err.is_unexpected_end_of_input() {
1360 if input.is_partial() {
1361 // The parser expected more input to parse and input is partial, return `None`
1362 // as we did not finish and also return how much may be removed from the stream
1363 Ok((None, input.distance(&start)))
1364 } else if input_at_eof(input) && input.distance(&start) == 0 {
1365 // We are at eof and the input is empty, return None to indicate that we are
1366 // done
1367 Ok((None, 0))
1368 } else {
1369 Err(err)
1370 }
1371 } else {
1372 Err(err)
1373 }
1374 }
1375 }
1376 }
1377
1378 /// Parses an instance of `std::io::Read` as a `&[u8]` without reading the entire file into
1379 /// memory.
1380 ///
1381 /// This is defined as a macro to work around the lack of Higher Ranked Types. See the
1382 /// example for how to pass a parser to the macro (constructing parts of the parser outside of
1383 /// the `decode!` call is unlikely to work.
1384 ///
1385 /// ```
1386 /// use std::{
1387 /// fs::File,
1388 /// };
1389 /// use combine::{decode, satisfy, skip_many1, many1, sep_end_by, Parser, stream::Decoder};
1390 ///
1391 /// let mut read = File::open("README.md").unwrap();
1392 /// let mut decoder = Decoder::new();
1393 /// let is_whitespace = |b: u8| b == b' ' || b == b'\r' || b == b'\n';
1394 /// assert_eq!(
1395 /// decode!(
1396 /// decoder,
1397 /// read,
1398 /// {
1399 /// let word = many1(satisfy(|b| !is_whitespace(b)));
1400 /// sep_end_by(word, skip_many1(satisfy(is_whitespace))).map(|words: Vec<Vec<u8>>| words.len())
1401 /// },
1402 /// |input, _position| combine::easy::Stream::from(input),
1403 /// ).map_err(combine::easy::Errors::<u8, &[u8], _>::from),
1404 /// Ok(824),
1405 /// );
1406 /// ```
1407 #[cfg(feature = "std")]
1408 #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
1409 #[macro_export]
1410 macro_rules! decode {
1411 ($decoder: expr, $read: expr, $parser: expr $(,)?) => {
1412 $crate::decode!($decoder, $read, $parser, |input, _position| input, |x| x)
1413 };
1414
1415 ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr $(,)?) => {
1416 $crate::decode!($decoder, $read, $parser, $input_stream, |x| x)
1417 };
1418
1419 ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr, $post_decode: expr $(,)?) => {
1420 match $decoder {
1421 ref mut decoder => match $read {
1422 ref mut read => 'outer: loop {
1423 let (opt, removed) = {
1424 let (state, position, buffer, end_of_input) = decoder.__inner();
1425 let buffer =
1426 $crate::stream::buf_reader::CombineBuffer::buffer(buffer, read);
1427
1428 let mut stream = $crate::stream::call_with2(
1429 $crate::stream::MaybePartialStream(buffer, !end_of_input),
1430 *position,
1431 $input_stream,
1432 );
1433 let result = $crate::stream::decode($parser, &mut stream, state);
1434 *position = $crate::stream::Positioned::position(&stream);
1435 $crate::stream::call_with(stream, $post_decode);
1436 match result {
1437 Ok(x) => x,
1438 Err(err) => {
1439 break 'outer Err($crate::stream::decoder::Error::Parse(err))
1440 }
1441 }
1442 };
1443
1444 decoder.advance(&mut *read, removed);
1445
1446 if let Some(v) = opt {
1447 break 'outer Ok(v);
1448 }
1449
1450 match decoder.__before_parse(&mut *read) {
1451 Ok(x) => x,
1452 Err(error) => {
1453 break 'outer Err($crate::stream::decoder::Error::Io {
1454 error,
1455 position: Clone::clone(decoder.position()),
1456 })
1457 }
1458 };
1459 },
1460 },
1461 }
1462 };
1463 }
1464
1465 /// Parses an instance of `futures::io::AsyncRead` as a `&[u8]` without reading the entire file into
1466 /// memory.
1467 ///
1468 /// This is defined as a macro to work around the lack of Higher Ranked Types. See the
1469 /// example for how to pass a parser to the macro (constructing parts of the parser outside of
1470 /// the `decode!` call is unlikely to work.
1471 ///
1472 /// ```
1473 /// # use futures_03_dep as futures;
1474 /// use futures::pin_mut;
1475 /// use async_std::{
1476 /// fs::File,
1477 /// task,
1478 /// };
1479 ///
1480 /// use combine::{decode_futures_03, satisfy, skip_many1, many1, sep_end_by, Parser, stream::Decoder};
1481 ///
1482 /// fn main() {
1483 /// task::block_on(main_());
1484 /// }
1485 ///
1486 /// async fn main_() {
1487 /// let mut read = File::open("README.md").await.unwrap();
1488 /// let mut decoder = Decoder::new();
1489 /// let is_whitespace = |b: u8| b == b' ' || b == b'\r' || b == b'\n';
1490 /// assert_eq!(
1491 /// decode_futures_03!(
1492 /// decoder,
1493 /// read,
1494 /// {
1495 /// let word = many1(satisfy(|b| !is_whitespace(b)));
1496 /// sep_end_by(word, skip_many1(satisfy(is_whitespace))).map(|words: Vec<Vec<u8>>| words.len())
1497 /// },
1498 /// |input, _position| combine::easy::Stream::from(input),
1499 /// ).map_err(combine::easy::Errors::<u8, &[u8], _>::from),
1500 /// Ok(824),
1501 /// );
1502 /// }
1503 /// ```
1504 #[cfg(feature = "futures-io-03")]
1505 #[cfg_attr(docsrs, doc(cfg(feature = "futures-io-03")))]
1506 #[macro_export]
1507 macro_rules! decode_futures_03 {
1508 ($decoder: expr, $read: expr, $parser: expr) => {
1509 $crate::decode_futures_03!($decoder, $read, $parser, |x| x $(,)?)
1510 };
1511
1512
1513 ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr $(,)?) => {
1514 $crate::decode_futures_03!($decoder, $read, $parser, $input_stream, |x| x)
1515 };
1516
1517 ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr, $post_decode: expr $(,)?) => {
1518 match $decoder {
1519 ref mut decoder => match $read {
1520 ref mut read => 'outer: loop {
1521 let (opt, removed) = {
1522 let (state, position, buffer, end_of_input) = decoder.__inner();
1523 let buffer =
1524 $crate::stream::buf_reader::CombineBuffer::buffer(buffer, &*read);
1525
1526 let mut stream = $crate::stream::call_with2(
1527 $crate::stream::MaybePartialStream(buffer, !end_of_input),
1528 *position,
1529 $input_stream,
1530 );
1531 let result = $crate::stream::decode($parser, &mut stream, state);
1532 *position = $crate::stream::Positioned::position(&stream);
1533 $crate::stream::call_with(stream, $post_decode);
1534 match result {
1535 Ok(x) => x,
1536 Err(err) => break 'outer Err($crate::stream::decoder::Error::Parse(err)),
1537 }
1538 };
1539
1540 decoder.advance_pin(std::pin::Pin::new(&mut *read), removed);
1541
1542 if let Some(v) = opt {
1543 break 'outer Ok(v);
1544 }
1545
1546
1547 match decoder.__before_parse_async(std::pin::Pin::new(&mut *read)).await {
1548 Ok(_) => (),
1549 Err(error) => {
1550 break 'outer Err($crate::stream::decoder::Error::Io {
1551 error,
1552 position: Clone::clone(decoder.position()),
1553 })
1554 }
1555 };
1556 }
1557 }
1558 }
1559 };
1560 }
1561
1562 /// Parses an instance of `tokio::io::AsyncRead` as a `&[u8]` without reading the entire file into
1563 /// memory.
1564 ///
1565 /// This is defined as a macro to work around the lack of Higher Ranked Types. See the
1566 /// example for how to pass a parser to the macro (constructing parts of the parser outside of
1567 /// the `decode!` call is unlikely to work.
1568 ///
1569 /// ```
1570 /// # use tokio_02_dep as tokio;
1571 /// # use futures_03_dep as futures;
1572 /// use futures::pin_mut;
1573 /// use tokio::{
1574 /// fs::File,
1575 /// };
1576 ///
1577 /// use combine::{decode_tokio_02, satisfy, skip_many1, many1, sep_end_by, Parser, stream::{Decoder, buf_reader::BufReader}};
1578 ///
1579 /// #[tokio::main]
1580 /// async fn main() {
1581 /// let mut read = BufReader::new(File::open("README.md").await.unwrap());
1582 /// let mut decoder = Decoder::new_bufferless();
1583 /// let is_whitespace = |b: u8| b == b' ' || b == b'\r' || b == b'\n';
1584 /// assert_eq!(
1585 /// decode_tokio_02!(
1586 /// decoder,
1587 /// read,
1588 /// {
1589 /// let word = many1(satisfy(|b| !is_whitespace(b)));
1590 /// sep_end_by(word, skip_many1(satisfy(is_whitespace))).map(|words: Vec<Vec<u8>>| words.len())
1591 /// },
1592 /// |input, _position| combine::easy::Stream::from(input),
1593 /// ).map_err(combine::easy::Errors::<u8, &[u8], _>::from),
1594 /// Ok(824),
1595 /// );
1596 /// }
1597 /// ```
1598 #[cfg(feature = "tokio-02")]
1599 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-02")))]
1600 #[macro_export]
1601 macro_rules! decode_tokio_02 {
1602 ($decoder: expr, $read: expr, $parser: expr $(,)?) => {
1603 $crate::decode_tokio_02!($decoder, $read, $parser, |input, _position| input)
1604 };
1605
1606 ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr $(,)?) => {
1607 $crate::decode_tokio_02!($decoder, $read, $parser, $input_stream, |x| x)
1608 };
1609
1610 ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr, $post_decode: expr $(,)?) => {
1611 match $decoder {
1612 ref mut decoder => match $read {
1613 ref mut read => 'outer: loop {
1614 let (opt, removed) = {
1615 let (state, position, buffer, end_of_input) = decoder.__inner();
1616 let buffer =
1617 $crate::stream::buf_reader::CombineBuffer::buffer(buffer, &*read);
1618 let mut stream = $crate::stream::call_with2(
1619 $crate::stream::MaybePartialStream(buffer, !end_of_input),
1620 *position,
1621 $input_stream,
1622 );
1623 let result = $crate::stream::decode($parser, &mut stream, state);
1624 *position = $crate::stream::Positioned::position(&stream);
1625 $crate::stream::call_with(stream, $post_decode);
1626 match result {
1627 Ok(x) => x,
1628 Err(err) => {
1629 break 'outer Err($crate::stream::decoder::Error::Parse(err))
1630 }
1631 }
1632 };
1633
1634 decoder.advance_pin(std::pin::Pin::new(read), removed);
1635
1636 if let Some(v) = opt {
1637 break 'outer Ok(v);
1638 }
1639
1640 match decoder
1641 .__before_parse_tokio_02(std::pin::Pin::new(&mut *read))
1642 .await
1643 {
1644 Ok(x) => x,
1645 Err(error) => {
1646 break 'outer Err($crate::stream::decoder::Error::Io {
1647 error,
1648 position: Clone::clone(decoder.position()),
1649 })
1650 }
1651 };
1652 },
1653 },
1654 }
1655 };
1656 }
1657
1658 /// Parses an instance of `tokio::io::AsyncRead` as a `&[u8]` without reading the entire file into
1659 /// memory.
1660 ///
1661 /// This is defined as a macro to work around the lack of Higher Ranked Types. See the
1662 /// example for how to pass a parser to the macro (constructing parts of the parser outside of
1663 /// the `decode!` call is unlikely to work.
1664 ///
1665 /// ```
1666 /// # use tokio_03_dep as tokio;
1667 /// # use futures_03_dep as futures;
1668 /// use futures::pin_mut;
1669 /// use tokio::{
1670 /// fs::File,
1671 /// };
1672 ///
1673 /// use combine::{decode_tokio_03, satisfy, skip_many1, many1, sep_end_by, Parser, stream::{Decoder, buf_reader::BufReader}};
1674 ///
1675 /// #[tokio::main]
1676 /// async fn main() {
1677 /// let mut read = BufReader::new(File::open("README.md").await.unwrap());
1678 /// let mut decoder = Decoder::new_bufferless();
1679 /// let is_whitespace = |b: u8| b == b' ' || b == b'\r' || b == b'\n';
1680 /// assert_eq!(
1681 /// decode_tokio_03!(
1682 /// decoder,
1683 /// read,
1684 /// {
1685 /// let word = many1(satisfy(|b| !is_whitespace(b)));
1686 /// sep_end_by(word, skip_many1(satisfy(is_whitespace))).map(|words: Vec<Vec<u8>>| words.len())
1687 /// },
1688 /// |input, _position| combine::easy::Stream::from(input),
1689 /// ).map_err(combine::easy::Errors::<u8, &[u8], _>::from),
1690 /// Ok(824),
1691 /// );
1692 /// }
1693 /// ```
1694 #[cfg(feature = "tokio-03")]
1695 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-03")))]
1696 #[macro_export]
1697 macro_rules! decode_tokio_03 {
1698 ($decoder: expr, $read: expr, $parser: expr $(,)?) => {
1699 $crate::decode_tokio_03!($decoder, $read, $parser, |input, _position| input)
1700 };
1701
1702 ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr $(,)?) => {
1703 $crate::decode_tokio_03!($decoder, $read, $parser, $input_stream, |x| x)
1704 };
1705
1706 ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr, $post_decode: expr $(,)?) => {
1707 match $decoder {
1708 ref mut decoder => match $read {
1709 ref mut read => 'outer: loop {
1710 let (opt, removed) = {
1711 let (state, position, buffer, end_of_input) = decoder.__inner();
1712 let buffer =
1713 $crate::stream::buf_reader::CombineBuffer::buffer(buffer, &*read);
1714 let mut stream = $crate::stream::call_with2(
1715 $crate::stream::MaybePartialStream(buffer, !end_of_input),
1716 *position,
1717 $input_stream,
1718 );
1719 let result = $crate::stream::decode($parser, &mut stream, state);
1720 *position = $crate::stream::Positioned::position(&stream);
1721 $crate::stream::call_with(stream, $post_decode);
1722 match result {
1723 Ok(x) => x,
1724 Err(err) => {
1725 break 'outer Err($crate::stream::decoder::Error::Parse(err))
1726 }
1727 }
1728 };
1729
1730 decoder.advance_pin(std::pin::Pin::new(read), removed);
1731
1732 if let Some(v) = opt {
1733 break 'outer Ok(v);
1734 }
1735
1736 match decoder
1737 .__before_parse_tokio_03(std::pin::Pin::new(&mut *read))
1738 .await
1739 {
1740 Ok(x) => x,
1741 Err(error) => {
1742 break 'outer Err($crate::stream::decoder::Error::Io {
1743 error,
1744 position: Clone::clone(decoder.position()),
1745 })
1746 }
1747 };
1748 },
1749 },
1750 }
1751 };
1752 }
1753
1754 /// Parses an instance of `tokio::io::AsyncRead` as a `&[u8]` without reading the entire file into
1755 /// memory.
1756 ///
1757 /// This is defined as a macro to work around the lack of Higher Ranked Types. See the
1758 /// example for how to pass a parser to the macro (constructing parts of the parser outside of
1759 /// the `decode!` call is unlikely to work.
1760 ///
1761 /// ```
1762 /// # use tokio_dep as tokio;
1763 /// # use futures_03_dep as futures;
1764 /// use futures::pin_mut;
1765 /// use tokio::{
1766 /// fs::File,
1767 /// };
1768 ///
1769 /// use combine::{decode_tokio, satisfy, skip_many1, many1, sep_end_by, Parser, stream::{Decoder, buf_reader::BufReader}};
1770 ///
1771 /// #[tokio::main]
1772 /// async fn main() {
1773 /// let mut read = BufReader::new(File::open("README.md").await.unwrap());
1774 /// let mut decoder = Decoder::new_bufferless();
1775 /// let is_whitespace = |b: u8| b == b' ' || b == b'\r' || b == b'\n';
1776 /// assert_eq!(
1777 /// decode_tokio!(
1778 /// decoder,
1779 /// read,
1780 /// {
1781 /// let word = many1(satisfy(|b| !is_whitespace(b)));
1782 /// sep_end_by(word, skip_many1(satisfy(is_whitespace))).map(|words: Vec<Vec<u8>>| words.len())
1783 /// },
1784 /// |input, _position| combine::easy::Stream::from(input),
1785 /// ).map_err(combine::easy::Errors::<u8, &[u8], _>::from),
1786 /// Ok(824),
1787 /// );
1788 /// }
1789 /// ```
1790 #[cfg(feature = "tokio")]
1791 #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
1792 #[macro_export]
1793 macro_rules! decode_tokio {
1794 ($decoder: expr, $read: expr, $parser: expr $(,)?) => {
1795 $crate::decode_tokio!($decoder, $read, $parser, |input, _position| input)
1796 };
1797
1798 ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr $(,)?) => {
1799 $crate::decode_tokio!($decoder, $read, $parser, $input_stream, |x| x)
1800 };
1801
1802 ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr, $post_decode: expr $(,)?) => {
1803 match $decoder {
1804 ref mut decoder => match $read {
1805 ref mut read => 'outer: loop {
1806 let (opt, removed) = {
1807 let (state, position, buffer, end_of_input) = decoder.__inner();
1808 let buffer =
1809 $crate::stream::buf_reader::CombineBuffer::buffer(buffer, &*read);
1810 let mut stream = $crate::stream::call_with2(
1811 $crate::stream::MaybePartialStream(buffer, !end_of_input),
1812 *position,
1813 $input_stream,
1814 );
1815 let result = $crate::stream::decode($parser, &mut stream, state);
1816 *position = $crate::stream::Positioned::position(&stream);
1817 $crate::stream::call_with(stream, $post_decode);
1818 match result {
1819 Ok(x) => x,
1820 Err(err) => {
1821 break 'outer Err($crate::stream::decoder::Error::Parse(err))
1822 }
1823 }
1824 };
1825
1826 decoder.advance_pin(std::pin::Pin::new(read), removed);
1827
1828 if let Some(v) = opt {
1829 break 'outer Ok(v);
1830 }
1831
1832 match decoder
1833 .__before_parse_tokio(std::pin::Pin::new(&mut *read))
1834 .await
1835 {
1836 Ok(x) => x,
1837 Err(error) => {
1838 break 'outer Err($crate::stream::decoder::Error::Io {
1839 error,
1840 position: Clone::clone(decoder.position()),
1841 })
1842 }
1843 };
1844 },
1845 },
1846 }
1847 };
1848 }
1849
1850 #[doc(hidden)]
call_with2<F, A, B, R>(a: A, b: B, f: F) -> R where F: FnOnce(A, B) -> R,1851 pub fn call_with2<F, A, B, R>(a: A, b: B, f: F) -> R
1852 where
1853 F: FnOnce(A, B) -> R,
1854 {
1855 f(a, b)
1856 }
1857
1858 #[doc(hidden)]
call_with<F, A, R>(a: A, f: F) -> R where F: FnOnce(A) -> R,1859 pub fn call_with<F, A, R>(a: A, f: F) -> R
1860 where
1861 F: FnOnce(A) -> R,
1862 {
1863 f(a)
1864 }
1865
1866 #[cfg(test)]
1867 mod tests {
1868
1869 use super::*;
1870
1871 #[test]
1872 #[inline]
uncons_range_at_end()1873 fn uncons_range_at_end() {
1874 assert_eq!("".uncons_range(0), Ok(""));
1875 assert_eq!("123".uncons_range(3), Ok("123"));
1876 assert_eq!((&[1][..]).uncons_range(1), Ok(&[1][..]));
1877 let s: &[u8] = &[];
1878 assert_eq!(SliceStream(s).uncons_range(0), Ok(&[][..]));
1879 }
1880
1881 #[test]
larger_than_1_byte_items_return_correct_distance()1882 fn larger_than_1_byte_items_return_correct_distance() {
1883 let mut input = &[123i32, 0i32][..];
1884
1885 let before = input.checkpoint();
1886 assert_eq!(input.distance(&before), 0);
1887
1888 input.uncons().unwrap();
1889 assert_eq!(input.distance(&before), 1);
1890
1891 input.uncons().unwrap();
1892 assert_eq!(input.distance(&before), 2);
1893
1894 input.reset(before).unwrap();
1895 assert_eq!(input.distance(&before), 0);
1896 }
1897 }
1898