1 #![cfg(feature = "std")]
2 #![cfg(feature = "tokio")]
3
4 use std::{cell::Cell, io::Cursor, rc::Rc, str};
5
6 use {futures_03_dep as futures, tokio_dep as tokio};
7
8 use {
9 bytes::{Buf, BytesMut},
10 combine::{
11 error::{ParseError, StreamError},
12 parser::{
13 byte::digit,
14 combinator::{any_partial_state, AnyPartialState},
15 range::{range, recognize, take},
16 },
17 skip_many, skip_many1,
18 stream::{easy, PartialStream, RangeStream, StreamErrorFor},
19 Parser,
20 },
21 futures::prelude::*,
22 partial_io::PartialOp,
23 tokio_util::codec::{Decoder, FramedRead},
24 };
25
26 // Workaround partial_io not working with tokio-0.2
27 #[path = "../tests/support/mod.rs"]
28 mod support;
29 use support::*;
30
31 pub struct LanguageServerDecoder {
32 state: AnyPartialState,
33 content_length_parses: Rc<Cell<i32>>,
34 }
35
36 impl Default for LanguageServerDecoder {
default() -> Self37 fn default() -> Self {
38 LanguageServerDecoder {
39 state: Default::default(),
40 content_length_parses: Rc::new(Cell::new(0)),
41 }
42 }
43 }
44
45 /// Parses blocks of data with length headers
46 ///
47 /// ```
48 /// Content-Length: 18
49 ///
50 /// { "some": "data" }
51 /// ```
52 // The `content_length_parses` parameter only exists to demonstrate that `content_length` only
53 // gets parsed once per message
decode_parser<'a, Input>( content_length_parses: Rc<Cell<i32>>, ) -> impl Parser<Input, Output = Vec<u8>, PartialState = AnyPartialState> + 'a where Input: RangeStream<Token = u8, Range = &'a [u8]> + 'a, Input::Error: ParseError<Input::Token, Input::Range, Input::Position>,54 fn decode_parser<'a, Input>(
55 content_length_parses: Rc<Cell<i32>>,
56 ) -> impl Parser<Input, Output = Vec<u8>, PartialState = AnyPartialState> + 'a
57 where
58 Input: RangeStream<Token = u8, Range = &'a [u8]> + 'a,
59 // Necessary due to rust-lang/rust#24159
60 Input::Error: ParseError<Input::Token, Input::Range, Input::Position>,
61 {
62 let content_length = range(&b"Content-Length: "[..])
63 .with(recognize(skip_many1(digit())).and_then(|digits: &[u8]| {
64 str::from_utf8(digits)
65 .unwrap()
66 .parse::<usize>()
67 // Convert the error from `.parse` into an error combine understands
68 .map_err(StreamErrorFor::<Input>::other)
69 }))
70 .map(move |x| {
71 content_length_parses.set(content_length_parses.get() + 1);
72 x
73 });
74
75 // `any_partial_state` boxes the state which hides the type and lets us store it in
76 // `self`
77 any_partial_state(
78 (
79 skip_many(range(&b"\r\n"[..])),
80 content_length,
81 range(&b"\r\n\r\n"[..]).map(|_| ()),
82 )
83 .then_partial(|&mut (_, message_length, _)| {
84 take(message_length).map(|bytes: &[u8]| bytes.to_owned())
85 }),
86 )
87 }
88
89 impl Decoder for LanguageServerDecoder {
90 type Item = String;
91 type Error = Box<dyn std::error::Error + Send + Sync>;
92
decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error>93 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
94 println!("Decoding `{:?}`", str::from_utf8(src).unwrap_or("NOT UTF8"));
95
96 let (opt, removed_len) = combine::stream::decode(
97 decode_parser(self.content_length_parses.clone()),
98 // easy::Stream gives us nice error messages
99 // (the same error messages that combine has had since its inception)
100 // PartialStream lets the parser know that more input should be
101 // expected if end of input is unexpectedly reached
102 &mut easy::Stream(PartialStream(&src[..])),
103 &mut self.state,
104 )
105 .map_err(|err| {
106 // Since err contains references into `src` we must replace these before
107 // we can return an error or call `advance` to remove the input we
108 // just committed
109 let err = err
110 .map_range(|r| {
111 str::from_utf8(r)
112 .ok()
113 .map_or_else(|| format!("{:?}", r), |s| s.to_string())
114 })
115 .map_position(|p| p.translate_position(&src[..]));
116 format!("{}\nIn input: `{}`", err, str::from_utf8(src).unwrap())
117 })?;
118
119 println!(
120 "Accepted {} bytes: `{:?}`",
121 removed_len,
122 str::from_utf8(&src[..removed_len]).unwrap_or("NOT UTF8")
123 );
124
125 // Remove the input we just committed.
126 // Ideally this would be done automatically by the call to
127 // `stream::decode` but it does unfortunately not work due
128 // to lifetime issues (Non lexical lifetimes might fix it!)
129 src.advance(removed_len);
130
131 match opt {
132 // `None` means we did not have enough input and we require that the
133 // caller of `decode` supply more before calling us again
134 None => {
135 println!("Requesting more input!");
136 Ok(None)
137 }
138
139 // `Some` means that a message was successfully decoded
140 // (and that we are ready to start decoding the next message)
141 Some(output) => {
142 let value = String::from_utf8(output)?;
143 println!("Decoded `{}`", value);
144 Ok(Some(value))
145 }
146 }
147 }
148 }
149
150 #[tokio::main]
main()151 async fn main() {
152 let input = "Content-Length: 6\r\n\
153 \r\n\
154 123456\r\n\
155 Content-Length: 4\r\n\
156 \r\n\
157 true";
158
159 let seq = vec![
160 PartialOp::Limited(20),
161 PartialOp::Limited(1),
162 PartialOp::Limited(2),
163 PartialOp::Limited(3),
164 ];
165 let reader = &mut Cursor::new(input.as_bytes());
166 // Using the `partial_io` crate we emulate the partial reads that would happen when reading
167 // asynchronously from an io device.
168 let partial_reader = PartialAsyncRead::new(reader, seq);
169
170 let decoder = LanguageServerDecoder::default();
171 let content_length_parses = decoder.content_length_parses.clone();
172
173 let result = FramedRead::new(partial_reader, decoder).try_collect().await;
174
175 assert!(result.as_ref().is_ok(), "{}", result.unwrap_err());
176 let values: Vec<_> = result.unwrap();
177
178 let expected_values = ["123456", "true"];
179 assert_eq!(values, expected_values);
180
181 assert_eq!(content_length_parses.get(), expected_values.len() as i32);
182
183 println!("Successfully parsed: `{}`", input);
184 println!(
185 "Found {} items and never repeated a completed parse!",
186 values.len(),
187 );
188 println!("Result: {:?}", values);
189 }
190