• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![cfg(feature = "std")]
2 #![cfg(feature = "tokio")]
3 
4 use std::{cell::Cell, io::Cursor, rc::Rc, str};
5 
6 use {futures_03_dep as futures, tokio_dep as tokio};
7 
8 use {
9     bytes::{Buf, BytesMut},
10     combine::{
11         error::{ParseError, StreamError},
12         parser::{
13             byte::digit,
14             combinator::{any_partial_state, AnyPartialState},
15             range::{range, recognize, take},
16         },
17         skip_many, skip_many1,
18         stream::{easy, PartialStream, RangeStream, StreamErrorFor},
19         Parser,
20     },
21     futures::prelude::*,
22     partial_io::PartialOp,
23     tokio_util::codec::{Decoder, FramedRead},
24 };
25 
26 // Workaround partial_io not working with tokio-0.2
27 #[path = "../tests/support/mod.rs"]
28 mod support;
29 use support::*;
30 
31 pub struct LanguageServerDecoder {
32     state: AnyPartialState,
33     content_length_parses: Rc<Cell<i32>>,
34 }
35 
36 impl Default for LanguageServerDecoder {
default() -> Self37     fn default() -> Self {
38         LanguageServerDecoder {
39             state: Default::default(),
40             content_length_parses: Rc::new(Cell::new(0)),
41         }
42     }
43 }
44 
45 /// Parses blocks of data with length headers
46 ///
47 /// ```
48 /// Content-Length: 18
49 ///
50 /// { "some": "data" }
51 /// ```
52 // The `content_length_parses` parameter only exists to demonstrate that `content_length` only
53 // gets parsed once per message
decode_parser<'a, Input>( content_length_parses: Rc<Cell<i32>>, ) -> impl Parser<Input, Output = Vec<u8>, PartialState = AnyPartialState> + 'a where Input: RangeStream<Token = u8, Range = &'a [u8]> + 'a, Input::Error: ParseError<Input::Token, Input::Range, Input::Position>,54 fn decode_parser<'a, Input>(
55     content_length_parses: Rc<Cell<i32>>,
56 ) -> impl Parser<Input, Output = Vec<u8>, PartialState = AnyPartialState> + 'a
57 where
58     Input: RangeStream<Token = u8, Range = &'a [u8]> + 'a,
59     // Necessary due to rust-lang/rust#24159
60     Input::Error: ParseError<Input::Token, Input::Range, Input::Position>,
61 {
62     let content_length = range(&b"Content-Length: "[..])
63         .with(recognize(skip_many1(digit())).and_then(|digits: &[u8]| {
64             str::from_utf8(digits)
65                 .unwrap()
66                 .parse::<usize>()
67                 // Convert the error from `.parse` into an error combine understands
68                 .map_err(StreamErrorFor::<Input>::other)
69         }))
70         .map(move |x| {
71             content_length_parses.set(content_length_parses.get() + 1);
72             x
73         });
74 
75     // `any_partial_state` boxes the state which hides the type and lets us store it in
76     // `self`
77     any_partial_state(
78         (
79             skip_many(range(&b"\r\n"[..])),
80             content_length,
81             range(&b"\r\n\r\n"[..]).map(|_| ()),
82         )
83             .then_partial(|&mut (_, message_length, _)| {
84                 take(message_length).map(|bytes: &[u8]| bytes.to_owned())
85             }),
86     )
87 }
88 
89 impl Decoder for LanguageServerDecoder {
90     type Item = String;
91     type Error = Box<dyn std::error::Error + Send + Sync>;
92 
decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error>93     fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
94         println!("Decoding `{:?}`", str::from_utf8(src).unwrap_or("NOT UTF8"));
95 
96         let (opt, removed_len) = combine::stream::decode(
97             decode_parser(self.content_length_parses.clone()),
98             // easy::Stream gives us nice error messages
99             // (the same error messages that combine has had since its inception)
100             // PartialStream lets the parser know that more input should be
101             // expected if end of input is unexpectedly reached
102             &mut easy::Stream(PartialStream(&src[..])),
103             &mut self.state,
104         )
105         .map_err(|err| {
106             // Since err contains references into `src` we must replace these before
107             // we can return an error or call `advance` to remove the input we
108             // just committed
109             let err = err
110                 .map_range(|r| {
111                     str::from_utf8(r)
112                         .ok()
113                         .map_or_else(|| format!("{:?}", r), |s| s.to_string())
114                 })
115                 .map_position(|p| p.translate_position(&src[..]));
116             format!("{}\nIn input: `{}`", err, str::from_utf8(src).unwrap())
117         })?;
118 
119         println!(
120             "Accepted {} bytes: `{:?}`",
121             removed_len,
122             str::from_utf8(&src[..removed_len]).unwrap_or("NOT UTF8")
123         );
124 
125         // Remove the input we just committed.
126         // Ideally this would be done automatically by the call to
127         // `stream::decode` but it does unfortunately not work due
128         // to lifetime issues (Non lexical lifetimes might fix it!)
129         src.advance(removed_len);
130 
131         match opt {
132             // `None` means we did not have enough input and we require that the
133             // caller of `decode` supply more before calling us again
134             None => {
135                 println!("Requesting more input!");
136                 Ok(None)
137             }
138 
139             // `Some` means that a message was successfully decoded
140             // (and that we are ready to start decoding the next message)
141             Some(output) => {
142                 let value = String::from_utf8(output)?;
143                 println!("Decoded `{}`", value);
144                 Ok(Some(value))
145             }
146         }
147     }
148 }
149 
150 #[tokio::main]
main()151 async fn main() {
152     let input = "Content-Length: 6\r\n\
153                  \r\n\
154                  123456\r\n\
155                  Content-Length: 4\r\n\
156                  \r\n\
157                  true";
158 
159     let seq = vec![
160         PartialOp::Limited(20),
161         PartialOp::Limited(1),
162         PartialOp::Limited(2),
163         PartialOp::Limited(3),
164     ];
165     let reader = &mut Cursor::new(input.as_bytes());
166     // Using the `partial_io` crate we emulate the partial reads that would happen when reading
167     // asynchronously from an io device.
168     let partial_reader = PartialAsyncRead::new(reader, seq);
169 
170     let decoder = LanguageServerDecoder::default();
171     let content_length_parses = decoder.content_length_parses.clone();
172 
173     let result = FramedRead::new(partial_reader, decoder).try_collect().await;
174 
175     assert!(result.as_ref().is_ok(), "{}", result.unwrap_err());
176     let values: Vec<_> = result.unwrap();
177 
178     let expected_values = ["123456", "true"];
179     assert_eq!(values, expected_values);
180 
181     assert_eq!(content_length_parses.get(), expected_values.len() as i32);
182 
183     println!("Successfully parsed: `{}`", input);
184     println!(
185         "Found {} items and never repeated a completed parse!",
186         values.len(),
187     );
188     println!("Result: {:?}", values);
189 }
190