1 #![cfg(not(loom))] 2 3 //! A mock type implementing [`AsyncRead`] and [`AsyncWrite`]. 4 //! 5 //! 6 //! # Overview 7 //! 8 //! Provides a type that implements [`AsyncRead`] + [`AsyncWrite`] that can be configured 9 //! to handle an arbitrary sequence of read and write operations. This is useful 10 //! for writing unit tests for networking services as using an actual network 11 //! type is fairly non deterministic. 12 //! 13 //! # Usage 14 //! 15 //! Attempting to write data that the mock isn't expecting will result in a 16 //! panic. 17 //! 18 //! [`AsyncRead`]: tokio::io::AsyncRead 19 //! [`AsyncWrite`]: tokio::io::AsyncWrite 20 21 use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; 22 use tokio::sync::mpsc; 23 use tokio::time::{self, Duration, Instant, Sleep}; 24 use tokio_stream::wrappers::UnboundedReceiverStream; 25 26 use futures_core::{ready, Stream}; 27 use std::collections::VecDeque; 28 use std::fmt; 29 use std::future::Future; 30 use std::pin::Pin; 31 use std::sync::Arc; 32 use std::task::{self, Poll, Waker}; 33 use std::{cmp, io}; 34 35 /// An I/O object that follows a predefined script. 36 /// 37 /// This value is created by `Builder` and implements `AsyncRead` + `AsyncWrite`. It 38 /// follows the scenario described by the builder and panics otherwise. 39 #[derive(Debug)] 40 pub struct Mock { 41 inner: Inner, 42 } 43 44 /// A handle to send additional actions to the related `Mock`. 45 #[derive(Debug)] 46 pub struct Handle { 47 tx: mpsc::UnboundedSender<Action>, 48 } 49 50 /// Builds `Mock` instances. 51 #[derive(Debug, Clone, Default)] 52 pub struct Builder { 53 // Sequence of actions for the Mock to take 54 actions: VecDeque<Action>, 55 } 56 57 #[derive(Debug, Clone)] 58 enum Action { 59 Read(Vec<u8>), 60 Write(Vec<u8>), 61 Wait(Duration), 62 // Wrapped in Arc so that Builder can be cloned and Send. 63 // Mock is not cloned as does not need to check Rc for ref counts. 64 ReadError(Option<Arc<io::Error>>), 65 WriteError(Option<Arc<io::Error>>), 66 } 67 68 struct Inner { 69 actions: VecDeque<Action>, 70 waiting: Option<Instant>, 71 sleep: Option<Pin<Box<Sleep>>>, 72 read_wait: Option<Waker>, 73 rx: UnboundedReceiverStream<Action>, 74 } 75 76 impl Builder { 77 /// Return a new, empty `Builder. new() -> Self78 pub fn new() -> Self { 79 Self::default() 80 } 81 82 /// Sequence a `read` operation. 83 /// 84 /// The next operation in the mock's script will be to expect a `read` call 85 /// and return `buf`. read(&mut self, buf: &[u8]) -> &mut Self86 pub fn read(&mut self, buf: &[u8]) -> &mut Self { 87 self.actions.push_back(Action::Read(buf.into())); 88 self 89 } 90 91 /// Sequence a `read` operation that produces an error. 92 /// 93 /// The next operation in the mock's script will be to expect a `read` call 94 /// and return `error`. read_error(&mut self, error: io::Error) -> &mut Self95 pub fn read_error(&mut self, error: io::Error) -> &mut Self { 96 let error = Some(error.into()); 97 self.actions.push_back(Action::ReadError(error)); 98 self 99 } 100 101 /// Sequence a `write` operation. 102 /// 103 /// The next operation in the mock's script will be to expect a `write` 104 /// call. write(&mut self, buf: &[u8]) -> &mut Self105 pub fn write(&mut self, buf: &[u8]) -> &mut Self { 106 self.actions.push_back(Action::Write(buf.into())); 107 self 108 } 109 110 /// Sequence a `write` operation that produces an error. 111 /// 112 /// The next operation in the mock's script will be to expect a `write` 113 /// call that provides `error`. write_error(&mut self, error: io::Error) -> &mut Self114 pub fn write_error(&mut self, error: io::Error) -> &mut Self { 115 let error = Some(error.into()); 116 self.actions.push_back(Action::WriteError(error)); 117 self 118 } 119 120 /// Sequence a wait. 121 /// 122 /// The next operation in the mock's script will be to wait without doing so 123 /// for `duration` amount of time. wait(&mut self, duration: Duration) -> &mut Self124 pub fn wait(&mut self, duration: Duration) -> &mut Self { 125 let duration = cmp::max(duration, Duration::from_millis(1)); 126 self.actions.push_back(Action::Wait(duration)); 127 self 128 } 129 130 /// Build a `Mock` value according to the defined script. build(&mut self) -> Mock131 pub fn build(&mut self) -> Mock { 132 let (mock, _) = self.build_with_handle(); 133 mock 134 } 135 136 /// Build a `Mock` value paired with a handle build_with_handle(&mut self) -> (Mock, Handle)137 pub fn build_with_handle(&mut self) -> (Mock, Handle) { 138 let (inner, handle) = Inner::new(self.actions.clone()); 139 140 let mock = Mock { inner }; 141 142 (mock, handle) 143 } 144 } 145 146 impl Handle { 147 /// Sequence a `read` operation. 148 /// 149 /// The next operation in the mock's script will be to expect a `read` call 150 /// and return `buf`. read(&mut self, buf: &[u8]) -> &mut Self151 pub fn read(&mut self, buf: &[u8]) -> &mut Self { 152 self.tx.send(Action::Read(buf.into())).unwrap(); 153 self 154 } 155 156 /// Sequence a `read` operation error. 157 /// 158 /// The next operation in the mock's script will be to expect a `read` call 159 /// and return `error`. read_error(&mut self, error: io::Error) -> &mut Self160 pub fn read_error(&mut self, error: io::Error) -> &mut Self { 161 let error = Some(error.into()); 162 self.tx.send(Action::ReadError(error)).unwrap(); 163 self 164 } 165 166 /// Sequence a `write` operation. 167 /// 168 /// The next operation in the mock's script will be to expect a `write` 169 /// call. write(&mut self, buf: &[u8]) -> &mut Self170 pub fn write(&mut self, buf: &[u8]) -> &mut Self { 171 self.tx.send(Action::Write(buf.into())).unwrap(); 172 self 173 } 174 175 /// Sequence a `write` operation error. 176 /// 177 /// The next operation in the mock's script will be to expect a `write` 178 /// call error. write_error(&mut self, error: io::Error) -> &mut Self179 pub fn write_error(&mut self, error: io::Error) -> &mut Self { 180 let error = Some(error.into()); 181 self.tx.send(Action::WriteError(error)).unwrap(); 182 self 183 } 184 } 185 186 impl Inner { new(actions: VecDeque<Action>) -> (Inner, Handle)187 fn new(actions: VecDeque<Action>) -> (Inner, Handle) { 188 let (tx, rx) = mpsc::unbounded_channel(); 189 190 let rx = UnboundedReceiverStream::new(rx); 191 192 let inner = Inner { 193 actions, 194 sleep: None, 195 read_wait: None, 196 rx, 197 waiting: None, 198 }; 199 200 let handle = Handle { tx }; 201 202 (inner, handle) 203 } 204 poll_action(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Action>>205 fn poll_action(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Action>> { 206 Pin::new(&mut self.rx).poll_next(cx) 207 } 208 read(&mut self, dst: &mut ReadBuf<'_>) -> io::Result<()>209 fn read(&mut self, dst: &mut ReadBuf<'_>) -> io::Result<()> { 210 match self.action() { 211 Some(&mut Action::Read(ref mut data)) => { 212 // Figure out how much to copy 213 let n = cmp::min(dst.remaining(), data.len()); 214 215 // Copy the data into the `dst` slice 216 dst.put_slice(&data[..n]); 217 218 // Drain the data from the source 219 data.drain(..n); 220 221 Ok(()) 222 } 223 Some(&mut Action::ReadError(ref mut err)) => { 224 // As the 225 let err = err.take().expect("Should have been removed from actions."); 226 let err = Arc::try_unwrap(err).expect("There are no other references."); 227 Err(err) 228 } 229 Some(_) => { 230 // Either waiting or expecting a write 231 Err(io::ErrorKind::WouldBlock.into()) 232 } 233 None => Ok(()), 234 } 235 } 236 write(&mut self, mut src: &[u8]) -> io::Result<usize>237 fn write(&mut self, mut src: &[u8]) -> io::Result<usize> { 238 let mut ret = 0; 239 240 if self.actions.is_empty() { 241 return Err(io::ErrorKind::BrokenPipe.into()); 242 } 243 244 if let Some(&mut Action::Wait(..)) = self.action() { 245 return Err(io::ErrorKind::WouldBlock.into()); 246 } 247 248 if let Some(&mut Action::WriteError(ref mut err)) = self.action() { 249 let err = err.take().expect("Should have been removed from actions."); 250 let err = Arc::try_unwrap(err).expect("There are no other references."); 251 return Err(err); 252 } 253 254 for i in 0..self.actions.len() { 255 match self.actions[i] { 256 Action::Write(ref mut expect) => { 257 let n = cmp::min(src.len(), expect.len()); 258 259 assert_eq!(&src[..n], &expect[..n]); 260 261 // Drop data that was matched 262 expect.drain(..n); 263 src = &src[n..]; 264 265 ret += n; 266 267 if src.is_empty() { 268 return Ok(ret); 269 } 270 } 271 Action::Wait(..) | Action::WriteError(..) => { 272 break; 273 } 274 _ => {} 275 } 276 277 // TODO: remove write 278 } 279 280 Ok(ret) 281 } 282 remaining_wait(&mut self) -> Option<Duration>283 fn remaining_wait(&mut self) -> Option<Duration> { 284 match self.action() { 285 Some(&mut Action::Wait(dur)) => Some(dur), 286 _ => None, 287 } 288 } 289 action(&mut self) -> Option<&mut Action>290 fn action(&mut self) -> Option<&mut Action> { 291 loop { 292 if self.actions.is_empty() { 293 return None; 294 } 295 296 match self.actions[0] { 297 Action::Read(ref mut data) => { 298 if !data.is_empty() { 299 break; 300 } 301 } 302 Action::Write(ref mut data) => { 303 if !data.is_empty() { 304 break; 305 } 306 } 307 Action::Wait(ref mut dur) => { 308 if let Some(until) = self.waiting { 309 let now = Instant::now(); 310 311 if now < until { 312 break; 313 } 314 } else { 315 self.waiting = Some(Instant::now() + *dur); 316 break; 317 } 318 } 319 Action::ReadError(ref mut error) | Action::WriteError(ref mut error) => { 320 if error.is_some() { 321 break; 322 } 323 } 324 } 325 326 let _action = self.actions.pop_front(); 327 } 328 329 self.actions.front_mut() 330 } 331 } 332 333 // ===== impl Inner ===== 334 335 impl Mock { maybe_wakeup_reader(&mut self)336 fn maybe_wakeup_reader(&mut self) { 337 match self.inner.action() { 338 Some(&mut Action::Read(_)) | Some(&mut Action::ReadError(_)) | None => { 339 if let Some(waker) = self.inner.read_wait.take() { 340 waker.wake(); 341 } 342 } 343 _ => {} 344 } 345 } 346 } 347 348 impl AsyncRead for Mock { poll_read( mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>349 fn poll_read( 350 mut self: Pin<&mut Self>, 351 cx: &mut task::Context<'_>, 352 buf: &mut ReadBuf<'_>, 353 ) -> Poll<io::Result<()>> { 354 loop { 355 if let Some(ref mut sleep) = self.inner.sleep { 356 ready!(Pin::new(sleep).poll(cx)); 357 } 358 359 // If a sleep is set, it has already fired 360 self.inner.sleep = None; 361 362 // Capture 'filled' to monitor if it changed 363 let filled = buf.filled().len(); 364 365 match self.inner.read(buf) { 366 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 367 if let Some(rem) = self.inner.remaining_wait() { 368 let until = Instant::now() + rem; 369 self.inner.sleep = Some(Box::pin(time::sleep_until(until))); 370 } else { 371 self.inner.read_wait = Some(cx.waker().clone()); 372 return Poll::Pending; 373 } 374 } 375 Ok(()) => { 376 if buf.filled().len() == filled { 377 match ready!(self.inner.poll_action(cx)) { 378 Some(action) => { 379 self.inner.actions.push_back(action); 380 continue; 381 } 382 None => { 383 return Poll::Ready(Ok(())); 384 } 385 } 386 } else { 387 return Poll::Ready(Ok(())); 388 } 389 } 390 Err(e) => return Poll::Ready(Err(e)), 391 } 392 } 393 } 394 } 395 396 impl AsyncWrite for Mock { poll_write( mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>397 fn poll_write( 398 mut self: Pin<&mut Self>, 399 cx: &mut task::Context<'_>, 400 buf: &[u8], 401 ) -> Poll<io::Result<usize>> { 402 loop { 403 if let Some(ref mut sleep) = self.inner.sleep { 404 ready!(Pin::new(sleep).poll(cx)); 405 } 406 407 // If a sleep is set, it has already fired 408 self.inner.sleep = None; 409 410 match self.inner.write(buf) { 411 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 412 if let Some(rem) = self.inner.remaining_wait() { 413 let until = Instant::now() + rem; 414 self.inner.sleep = Some(Box::pin(time::sleep_until(until))); 415 } else { 416 panic!("unexpected WouldBlock"); 417 } 418 } 419 Ok(0) => { 420 // TODO: Is this correct? 421 if !self.inner.actions.is_empty() { 422 return Poll::Pending; 423 } 424 425 // TODO: Extract 426 match ready!(self.inner.poll_action(cx)) { 427 Some(action) => { 428 self.inner.actions.push_back(action); 429 continue; 430 } 431 None => { 432 panic!("unexpected write"); 433 } 434 } 435 } 436 ret => { 437 self.maybe_wakeup_reader(); 438 return Poll::Ready(ret); 439 } 440 } 441 } 442 } 443 poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>>444 fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> { 445 Poll::Ready(Ok(())) 446 } 447 poll_shutdown(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>>448 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> { 449 Poll::Ready(Ok(())) 450 } 451 } 452 453 /// Ensures that Mock isn't dropped with data "inside". 454 impl Drop for Mock { drop(&mut self)455 fn drop(&mut self) { 456 // Avoid double panicking, since makes debugging much harder. 457 if std::thread::panicking() { 458 return; 459 } 460 461 self.inner.actions.iter().for_each(|a| match a { 462 Action::Read(data) => assert!(data.is_empty(), "There is still data left to read."), 463 Action::Write(data) => assert!(data.is_empty(), "There is still data left to write."), 464 _ => (), 465 }) 466 } 467 } 468 /* 469 /// Returns `true` if called from the context of a futures-rs Task 470 fn is_task_ctx() -> bool { 471 use std::panic; 472 473 // Save the existing panic hook 474 let h = panic::take_hook(); 475 476 // Install a new one that does nothing 477 panic::set_hook(Box::new(|_| {})); 478 479 // Attempt to call the fn 480 let r = panic::catch_unwind(|| task::current()).is_ok(); 481 482 // Re-install the old one 483 panic::set_hook(h); 484 485 // Return the result 486 r 487 } 488 */ 489 490 impl fmt::Debug for Inner { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result491 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 492 write!(f, "Inner {{...}}") 493 } 494 } 495