• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![cfg(not(loom))]
2 
3 //! A mock type implementing [`AsyncRead`] and [`AsyncWrite`].
4 //!
5 //!
6 //! # Overview
7 //!
8 //! Provides a type that implements [`AsyncRead`] + [`AsyncWrite`] that can be configured
9 //! to handle an arbitrary sequence of read and write operations. This is useful
10 //! for writing unit tests for networking services as using an actual network
11 //! type is fairly non deterministic.
12 //!
13 //! # Usage
14 //!
15 //! Attempting to write data that the mock isn't expecting will result in a
16 //! panic.
17 //!
18 //! [`AsyncRead`]: tokio::io::AsyncRead
19 //! [`AsyncWrite`]: tokio::io::AsyncWrite
20 
21 use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
22 use tokio::sync::mpsc;
23 use tokio::time::{self, Duration, Instant, Sleep};
24 use tokio_stream::wrappers::UnboundedReceiverStream;
25 
26 use futures_core::{ready, Stream};
27 use std::collections::VecDeque;
28 use std::fmt;
29 use std::future::Future;
30 use std::pin::Pin;
31 use std::sync::Arc;
32 use std::task::{self, Poll, Waker};
33 use std::{cmp, io};
34 
35 /// An I/O object that follows a predefined script.
36 ///
37 /// This value is created by `Builder` and implements `AsyncRead` + `AsyncWrite`. It
38 /// follows the scenario described by the builder and panics otherwise.
39 #[derive(Debug)]
40 pub struct Mock {
41     inner: Inner,
42 }
43 
44 /// A handle to send additional actions to the related `Mock`.
45 #[derive(Debug)]
46 pub struct Handle {
47     tx: mpsc::UnboundedSender<Action>,
48 }
49 
50 /// Builds `Mock` instances.
51 #[derive(Debug, Clone, Default)]
52 pub struct Builder {
53     // Sequence of actions for the Mock to take
54     actions: VecDeque<Action>,
55 }
56 
57 #[derive(Debug, Clone)]
58 enum Action {
59     Read(Vec<u8>),
60     Write(Vec<u8>),
61     Wait(Duration),
62     // Wrapped in Arc so that Builder can be cloned and Send.
63     // Mock is not cloned as does not need to check Rc for ref counts.
64     ReadError(Option<Arc<io::Error>>),
65     WriteError(Option<Arc<io::Error>>),
66 }
67 
68 struct Inner {
69     actions: VecDeque<Action>,
70     waiting: Option<Instant>,
71     sleep: Option<Pin<Box<Sleep>>>,
72     read_wait: Option<Waker>,
73     rx: UnboundedReceiverStream<Action>,
74 }
75 
76 impl Builder {
77     /// Return a new, empty `Builder.
new() -> Self78     pub fn new() -> Self {
79         Self::default()
80     }
81 
82     /// Sequence a `read` operation.
83     ///
84     /// The next operation in the mock's script will be to expect a `read` call
85     /// and return `buf`.
read(&mut self, buf: &[u8]) -> &mut Self86     pub fn read(&mut self, buf: &[u8]) -> &mut Self {
87         self.actions.push_back(Action::Read(buf.into()));
88         self
89     }
90 
91     /// Sequence a `read` operation that produces an error.
92     ///
93     /// The next operation in the mock's script will be to expect a `read` call
94     /// and return `error`.
read_error(&mut self, error: io::Error) -> &mut Self95     pub fn read_error(&mut self, error: io::Error) -> &mut Self {
96         let error = Some(error.into());
97         self.actions.push_back(Action::ReadError(error));
98         self
99     }
100 
101     /// Sequence a `write` operation.
102     ///
103     /// The next operation in the mock's script will be to expect a `write`
104     /// call.
write(&mut self, buf: &[u8]) -> &mut Self105     pub fn write(&mut self, buf: &[u8]) -> &mut Self {
106         self.actions.push_back(Action::Write(buf.into()));
107         self
108     }
109 
110     /// Sequence a `write` operation that produces an error.
111     ///
112     /// The next operation in the mock's script will be to expect a `write`
113     /// call that provides `error`.
write_error(&mut self, error: io::Error) -> &mut Self114     pub fn write_error(&mut self, error: io::Error) -> &mut Self {
115         let error = Some(error.into());
116         self.actions.push_back(Action::WriteError(error));
117         self
118     }
119 
120     /// Sequence a wait.
121     ///
122     /// The next operation in the mock's script will be to wait without doing so
123     /// for `duration` amount of time.
wait(&mut self, duration: Duration) -> &mut Self124     pub fn wait(&mut self, duration: Duration) -> &mut Self {
125         let duration = cmp::max(duration, Duration::from_millis(1));
126         self.actions.push_back(Action::Wait(duration));
127         self
128     }
129 
130     /// Build a `Mock` value according to the defined script.
build(&mut self) -> Mock131     pub fn build(&mut self) -> Mock {
132         let (mock, _) = self.build_with_handle();
133         mock
134     }
135 
136     /// Build a `Mock` value paired with a handle
build_with_handle(&mut self) -> (Mock, Handle)137     pub fn build_with_handle(&mut self) -> (Mock, Handle) {
138         let (inner, handle) = Inner::new(self.actions.clone());
139 
140         let mock = Mock { inner };
141 
142         (mock, handle)
143     }
144 }
145 
146 impl Handle {
147     /// Sequence a `read` operation.
148     ///
149     /// The next operation in the mock's script will be to expect a `read` call
150     /// and return `buf`.
read(&mut self, buf: &[u8]) -> &mut Self151     pub fn read(&mut self, buf: &[u8]) -> &mut Self {
152         self.tx.send(Action::Read(buf.into())).unwrap();
153         self
154     }
155 
156     /// Sequence a `read` operation error.
157     ///
158     /// The next operation in the mock's script will be to expect a `read` call
159     /// and return `error`.
read_error(&mut self, error: io::Error) -> &mut Self160     pub fn read_error(&mut self, error: io::Error) -> &mut Self {
161         let error = Some(error.into());
162         self.tx.send(Action::ReadError(error)).unwrap();
163         self
164     }
165 
166     /// Sequence a `write` operation.
167     ///
168     /// The next operation in the mock's script will be to expect a `write`
169     /// call.
write(&mut self, buf: &[u8]) -> &mut Self170     pub fn write(&mut self, buf: &[u8]) -> &mut Self {
171         self.tx.send(Action::Write(buf.into())).unwrap();
172         self
173     }
174 
175     /// Sequence a `write` operation error.
176     ///
177     /// The next operation in the mock's script will be to expect a `write`
178     /// call error.
write_error(&mut self, error: io::Error) -> &mut Self179     pub fn write_error(&mut self, error: io::Error) -> &mut Self {
180         let error = Some(error.into());
181         self.tx.send(Action::WriteError(error)).unwrap();
182         self
183     }
184 }
185 
186 impl Inner {
new(actions: VecDeque<Action>) -> (Inner, Handle)187     fn new(actions: VecDeque<Action>) -> (Inner, Handle) {
188         let (tx, rx) = mpsc::unbounded_channel();
189 
190         let rx = UnboundedReceiverStream::new(rx);
191 
192         let inner = Inner {
193             actions,
194             sleep: None,
195             read_wait: None,
196             rx,
197             waiting: None,
198         };
199 
200         let handle = Handle { tx };
201 
202         (inner, handle)
203     }
204 
poll_action(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Action>>205     fn poll_action(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Action>> {
206         Pin::new(&mut self.rx).poll_next(cx)
207     }
208 
read(&mut self, dst: &mut ReadBuf<'_>) -> io::Result<()>209     fn read(&mut self, dst: &mut ReadBuf<'_>) -> io::Result<()> {
210         match self.action() {
211             Some(&mut Action::Read(ref mut data)) => {
212                 // Figure out how much to copy
213                 let n = cmp::min(dst.remaining(), data.len());
214 
215                 // Copy the data into the `dst` slice
216                 dst.put_slice(&data[..n]);
217 
218                 // Drain the data from the source
219                 data.drain(..n);
220 
221                 Ok(())
222             }
223             Some(&mut Action::ReadError(ref mut err)) => {
224                 // As the
225                 let err = err.take().expect("Should have been removed from actions.");
226                 let err = Arc::try_unwrap(err).expect("There are no other references.");
227                 Err(err)
228             }
229             Some(_) => {
230                 // Either waiting or expecting a write
231                 Err(io::ErrorKind::WouldBlock.into())
232             }
233             None => Ok(()),
234         }
235     }
236 
write(&mut self, mut src: &[u8]) -> io::Result<usize>237     fn write(&mut self, mut src: &[u8]) -> io::Result<usize> {
238         let mut ret = 0;
239 
240         if self.actions.is_empty() {
241             return Err(io::ErrorKind::BrokenPipe.into());
242         }
243 
244         if let Some(&mut Action::Wait(..)) = self.action() {
245             return Err(io::ErrorKind::WouldBlock.into());
246         }
247 
248         if let Some(&mut Action::WriteError(ref mut err)) = self.action() {
249             let err = err.take().expect("Should have been removed from actions.");
250             let err = Arc::try_unwrap(err).expect("There are no other references.");
251             return Err(err);
252         }
253 
254         for i in 0..self.actions.len() {
255             match self.actions[i] {
256                 Action::Write(ref mut expect) => {
257                     let n = cmp::min(src.len(), expect.len());
258 
259                     assert_eq!(&src[..n], &expect[..n]);
260 
261                     // Drop data that was matched
262                     expect.drain(..n);
263                     src = &src[n..];
264 
265                     ret += n;
266 
267                     if src.is_empty() {
268                         return Ok(ret);
269                     }
270                 }
271                 Action::Wait(..) | Action::WriteError(..) => {
272                     break;
273                 }
274                 _ => {}
275             }
276 
277             // TODO: remove write
278         }
279 
280         Ok(ret)
281     }
282 
remaining_wait(&mut self) -> Option<Duration>283     fn remaining_wait(&mut self) -> Option<Duration> {
284         match self.action() {
285             Some(&mut Action::Wait(dur)) => Some(dur),
286             _ => None,
287         }
288     }
289 
action(&mut self) -> Option<&mut Action>290     fn action(&mut self) -> Option<&mut Action> {
291         loop {
292             if self.actions.is_empty() {
293                 return None;
294             }
295 
296             match self.actions[0] {
297                 Action::Read(ref mut data) => {
298                     if !data.is_empty() {
299                         break;
300                     }
301                 }
302                 Action::Write(ref mut data) => {
303                     if !data.is_empty() {
304                         break;
305                     }
306                 }
307                 Action::Wait(ref mut dur) => {
308                     if let Some(until) = self.waiting {
309                         let now = Instant::now();
310 
311                         if now < until {
312                             break;
313                         }
314                     } else {
315                         self.waiting = Some(Instant::now() + *dur);
316                         break;
317                     }
318                 }
319                 Action::ReadError(ref mut error) | Action::WriteError(ref mut error) => {
320                     if error.is_some() {
321                         break;
322                     }
323                 }
324             }
325 
326             let _action = self.actions.pop_front();
327         }
328 
329         self.actions.front_mut()
330     }
331 }
332 
333 // ===== impl Inner =====
334 
335 impl Mock {
maybe_wakeup_reader(&mut self)336     fn maybe_wakeup_reader(&mut self) {
337         match self.inner.action() {
338             Some(&mut Action::Read(_)) | Some(&mut Action::ReadError(_)) | None => {
339                 if let Some(waker) = self.inner.read_wait.take() {
340                     waker.wake();
341                 }
342             }
343             _ => {}
344         }
345     }
346 }
347 
348 impl AsyncRead for Mock {
poll_read( mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>349     fn poll_read(
350         mut self: Pin<&mut Self>,
351         cx: &mut task::Context<'_>,
352         buf: &mut ReadBuf<'_>,
353     ) -> Poll<io::Result<()>> {
354         loop {
355             if let Some(ref mut sleep) = self.inner.sleep {
356                 ready!(Pin::new(sleep).poll(cx));
357             }
358 
359             // If a sleep is set, it has already fired
360             self.inner.sleep = None;
361 
362             // Capture 'filled' to monitor if it changed
363             let filled = buf.filled().len();
364 
365             match self.inner.read(buf) {
366                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
367                     if let Some(rem) = self.inner.remaining_wait() {
368                         let until = Instant::now() + rem;
369                         self.inner.sleep = Some(Box::pin(time::sleep_until(until)));
370                     } else {
371                         self.inner.read_wait = Some(cx.waker().clone());
372                         return Poll::Pending;
373                     }
374                 }
375                 Ok(()) => {
376                     if buf.filled().len() == filled {
377                         match ready!(self.inner.poll_action(cx)) {
378                             Some(action) => {
379                                 self.inner.actions.push_back(action);
380                                 continue;
381                             }
382                             None => {
383                                 return Poll::Ready(Ok(()));
384                             }
385                         }
386                     } else {
387                         return Poll::Ready(Ok(()));
388                     }
389                 }
390                 Err(e) => return Poll::Ready(Err(e)),
391             }
392         }
393     }
394 }
395 
396 impl AsyncWrite for Mock {
poll_write( mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>397     fn poll_write(
398         mut self: Pin<&mut Self>,
399         cx: &mut task::Context<'_>,
400         buf: &[u8],
401     ) -> Poll<io::Result<usize>> {
402         loop {
403             if let Some(ref mut sleep) = self.inner.sleep {
404                 ready!(Pin::new(sleep).poll(cx));
405             }
406 
407             // If a sleep is set, it has already fired
408             self.inner.sleep = None;
409 
410             match self.inner.write(buf) {
411                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
412                     if let Some(rem) = self.inner.remaining_wait() {
413                         let until = Instant::now() + rem;
414                         self.inner.sleep = Some(Box::pin(time::sleep_until(until)));
415                     } else {
416                         panic!("unexpected WouldBlock");
417                     }
418                 }
419                 Ok(0) => {
420                     // TODO: Is this correct?
421                     if !self.inner.actions.is_empty() {
422                         return Poll::Pending;
423                     }
424 
425                     // TODO: Extract
426                     match ready!(self.inner.poll_action(cx)) {
427                         Some(action) => {
428                             self.inner.actions.push_back(action);
429                             continue;
430                         }
431                         None => {
432                             panic!("unexpected write");
433                         }
434                     }
435                 }
436                 ret => {
437                     self.maybe_wakeup_reader();
438                     return Poll::Ready(ret);
439                 }
440             }
441         }
442     }
443 
poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>>444     fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
445         Poll::Ready(Ok(()))
446     }
447 
poll_shutdown(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>>448     fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
449         Poll::Ready(Ok(()))
450     }
451 }
452 
453 /// Ensures that Mock isn't dropped with data "inside".
454 impl Drop for Mock {
drop(&mut self)455     fn drop(&mut self) {
456         // Avoid double panicking, since makes debugging much harder.
457         if std::thread::panicking() {
458             return;
459         }
460 
461         self.inner.actions.iter().for_each(|a| match a {
462             Action::Read(data) => assert!(data.is_empty(), "There is still data left to read."),
463             Action::Write(data) => assert!(data.is_empty(), "There is still data left to write."),
464             _ => (),
465         })
466     }
467 }
468 /*
469 /// Returns `true` if called from the context of a futures-rs Task
470 fn is_task_ctx() -> bool {
471     use std::panic;
472 
473     // Save the existing panic hook
474     let h = panic::take_hook();
475 
476     // Install a new one that does nothing
477     panic::set_hook(Box::new(|_| {}));
478 
479     // Attempt to call the fn
480     let r = panic::catch_unwind(|| task::current()).is_ok();
481 
482     // Re-install the old one
483     panic::set_hook(h);
484 
485     // Return the result
486     r
487 }
488 */
489 
490 impl fmt::Debug for Inner {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result491     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
492         write!(f, "Inner {{...}}")
493     }
494 }
495