1 // Copyright 2020 The Chromium OS Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 //! # `IoSourceExt` 6 //! 7 //! User functions to asynchronously access files. 8 //! Using `IoSource` directly is inconvenient and requires dealing with state 9 //! machines for the backing uring, future libraries, etc. `IoSourceExt` instead 10 //! provides users with a future that can be `await`ed from async context. 11 //! 12 //! Each member of `IoSourceExt` returns a future for the supported operation. One or more 13 //! operation can be pending at a time. 14 //! 15 //! Operations can only access memory in a `Vec` or an implementor of `BackingMemory`. See the 16 //! `URingExecutor` documentation for an explaination of why. 17 18 use std::{ 19 fs::File, 20 io, 21 ops::{Deref, DerefMut}, 22 os::unix::io::{AsRawFd, RawFd}, 23 sync::Arc, 24 }; 25 26 use async_trait::async_trait; 27 use remain::sorted; 28 use sys_util::net::UnixSeqpacket; 29 use thiserror::Error as ThisError; 30 31 use super::{BackingMemory, MemRegion}; 32 33 #[sorted] 34 #[derive(ThisError, Debug)] 35 pub enum Error { 36 /// An error with a polled(FD) source. 37 #[error("An error with a poll source: {0}")] 38 Poll(#[from] super::poll_source::Error), 39 /// An error with a uring source. 40 #[error("An error with a uring source: {0}")] 41 Uring(#[from] super::uring_executor::Error), 42 } 43 pub type Result<T> = std::result::Result<T, Error>; 44 45 impl From<Error> for io::Error { from(e: Error) -> Self46 fn from(e: Error) -> Self { 47 use Error::*; 48 match e { 49 Poll(e) => e.into(), 50 Uring(e) => e.into(), 51 } 52 } 53 } 54 55 /// Ergonomic methods for async reads. 56 #[async_trait(?Send)] 57 pub trait ReadAsync { 58 /// Reads from the iosource at `file_offset` and fill the given `vec`. read_to_vec<'a>( &'a self, file_offset: Option<u64>, vec: Vec<u8>, ) -> Result<(usize, Vec<u8>)>59 async fn read_to_vec<'a>( 60 &'a self, 61 file_offset: Option<u64>, 62 vec: Vec<u8>, 63 ) -> Result<(usize, Vec<u8>)>; 64 65 /// Reads to the given `mem` at the given offsets from the file starting at `file_offset`. read_to_mem<'a>( &'a self, file_offset: Option<u64>, mem: Arc<dyn BackingMemory + Send + Sync>, mem_offsets: &'a [MemRegion], ) -> Result<usize>66 async fn read_to_mem<'a>( 67 &'a self, 68 file_offset: Option<u64>, 69 mem: Arc<dyn BackingMemory + Send + Sync>, 70 mem_offsets: &'a [MemRegion], 71 ) -> Result<usize>; 72 73 /// Wait for the FD of `self` to be readable. wait_readable(&self) -> Result<()>74 async fn wait_readable(&self) -> Result<()>; 75 76 /// Reads a single u64 from the current offset. read_u64(&self) -> Result<u64>77 async fn read_u64(&self) -> Result<u64>; 78 } 79 80 /// Ergonomic methods for async writes. 81 #[async_trait(?Send)] 82 pub trait WriteAsync { 83 /// Writes from the given `vec` to the file starting at `file_offset`. write_from_vec<'a>( &'a self, file_offset: Option<u64>, vec: Vec<u8>, ) -> Result<(usize, Vec<u8>)>84 async fn write_from_vec<'a>( 85 &'a self, 86 file_offset: Option<u64>, 87 vec: Vec<u8>, 88 ) -> Result<(usize, Vec<u8>)>; 89 90 /// Writes from the given `mem` from the given offsets to the file starting at `file_offset`. write_from_mem<'a>( &'a self, file_offset: Option<u64>, mem: Arc<dyn BackingMemory + Send + Sync>, mem_offsets: &'a [MemRegion], ) -> Result<usize>91 async fn write_from_mem<'a>( 92 &'a self, 93 file_offset: Option<u64>, 94 mem: Arc<dyn BackingMemory + Send + Sync>, 95 mem_offsets: &'a [MemRegion], 96 ) -> Result<usize>; 97 98 /// See `fallocate(2)`. Note this op is synchronous when using the Polled backend. fallocate(&self, file_offset: u64, len: u64, mode: u32) -> Result<()>99 async fn fallocate(&self, file_offset: u64, len: u64, mode: u32) -> Result<()>; 100 101 /// Sync all completed write operations to the backing storage. fsync(&self) -> Result<()>102 async fn fsync(&self) -> Result<()>; 103 } 104 105 /// Subtrait for general async IO. 106 #[async_trait(?Send)] 107 pub trait IoSourceExt<F>: ReadAsync + WriteAsync { 108 /// Yields the underlying IO source. into_source(self: Box<Self>) -> F109 fn into_source(self: Box<Self>) -> F; 110 111 /// Provides a mutable ref to the underlying IO source. as_source_mut(&mut self) -> &mut F112 fn as_source_mut(&mut self) -> &mut F; 113 114 /// Provides a ref to the underlying IO source. as_source(&self) -> &F115 fn as_source(&self) -> &F; 116 } 117 118 /// Marker trait signifying that the implementor is suitable for use with 119 /// cros_async. Examples of this include File, and sys_util::net::UnixSeqpacket. 120 /// 121 /// (Note: it'd be really nice to implement a TryFrom for any implementors, and 122 /// remove our factory functions. Unfortunately 123 /// <https://github.com/rust-lang/rust/issues/50133> makes that too painful.) 124 pub trait IntoAsync: AsRawFd {} 125 126 impl IntoAsync for File {} 127 impl IntoAsync for UnixSeqpacket {} 128 impl IntoAsync for &UnixSeqpacket {} 129 130 /// Simple wrapper struct to implement IntoAsync on foreign types. 131 pub struct AsyncWrapper<T>(T); 132 133 impl<T> AsyncWrapper<T> { 134 /// Create a new `AsyncWrapper` that wraps `val`. new(val: T) -> Self135 pub fn new(val: T) -> Self { 136 AsyncWrapper(val) 137 } 138 139 /// Consumes the `AsyncWrapper`, returning the inner struct. into_inner(self) -> T140 pub fn into_inner(self) -> T { 141 self.0 142 } 143 } 144 145 impl<T> Deref for AsyncWrapper<T> { 146 type Target = T; 147 deref(&self) -> &T148 fn deref(&self) -> &T { 149 &self.0 150 } 151 } 152 153 impl<T> DerefMut for AsyncWrapper<T> { deref_mut(&mut self) -> &mut T154 fn deref_mut(&mut self) -> &mut T { 155 &mut self.0 156 } 157 } 158 159 impl<T: AsRawFd> AsRawFd for AsyncWrapper<T> { as_raw_fd(&self) -> RawFd160 fn as_raw_fd(&self) -> RawFd { 161 self.0.as_raw_fd() 162 } 163 } 164 165 impl<T: AsRawFd> IntoAsync for AsyncWrapper<T> {} 166 167 #[cfg(test)] 168 mod tests { 169 use std::{ 170 fs::{File, OpenOptions}, 171 future::Future, 172 os::unix::io::AsRawFd, 173 pin::Pin, 174 sync::Arc, 175 task::{Context, Poll, Waker}, 176 thread, 177 }; 178 179 use sync::Mutex; 180 181 use super::{ 182 super::{ 183 executor::{async_poll_from, async_uring_from}, 184 mem::VecIoWrapper, 185 uring_executor::use_uring, 186 Executor, FdExecutor, MemRegion, PollSource, URingExecutor, UringSource, 187 }, 188 *, 189 }; 190 191 struct State { 192 should_quit: bool, 193 waker: Option<Waker>, 194 } 195 196 impl State { wake(&mut self)197 fn wake(&mut self) { 198 self.should_quit = true; 199 let waker = self.waker.take(); 200 201 if let Some(waker) = waker { 202 waker.wake(); 203 } 204 } 205 } 206 207 struct Quit { 208 state: Arc<Mutex<State>>, 209 } 210 211 impl Future for Quit { 212 type Output = (); 213 poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()>214 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> { 215 let mut state = self.state.lock(); 216 if state.should_quit { 217 return Poll::Ready(()); 218 } 219 220 state.waker = Some(cx.waker().clone()); 221 Poll::Pending 222 } 223 } 224 225 #[test] await_uring_from_poll()226 fn await_uring_from_poll() { 227 if !use_uring() { 228 return; 229 } 230 // Start a uring operation and then await the result from an FdExecutor. 231 async fn go(source: UringSource<File>) { 232 let v = vec![0xa4u8; 16]; 233 let (len, vec) = source.read_to_vec(None, v).await.unwrap(); 234 assert_eq!(len, 16); 235 assert!(vec.iter().all(|&b| b == 0)); 236 } 237 238 let state = Arc::new(Mutex::new(State { 239 should_quit: false, 240 waker: None, 241 })); 242 243 let uring_ex = URingExecutor::new().unwrap(); 244 let f = File::open("/dev/zero").unwrap(); 245 let source = UringSource::new(f, &uring_ex).unwrap(); 246 247 let quit = Quit { 248 state: state.clone(), 249 }; 250 let handle = thread::spawn(move || uring_ex.run_until(quit)); 251 252 let poll_ex = FdExecutor::new().unwrap(); 253 poll_ex.run_until(go(source)).unwrap(); 254 255 state.lock().wake(); 256 handle.join().unwrap().unwrap(); 257 } 258 259 #[test] await_poll_from_uring()260 fn await_poll_from_uring() { 261 if !use_uring() { 262 return; 263 } 264 // Start a poll operation and then await the result from a URingExecutor. 265 async fn go(source: PollSource<File>) { 266 let v = vec![0x2cu8; 16]; 267 let (len, vec) = source.read_to_vec(None, v).await.unwrap(); 268 assert_eq!(len, 16); 269 assert!(vec.iter().all(|&b| b == 0)); 270 } 271 272 let state = Arc::new(Mutex::new(State { 273 should_quit: false, 274 waker: None, 275 })); 276 277 let poll_ex = FdExecutor::new().unwrap(); 278 let f = File::open("/dev/zero").unwrap(); 279 let source = PollSource::new(f, &poll_ex).unwrap(); 280 281 let quit = Quit { 282 state: state.clone(), 283 }; 284 let handle = thread::spawn(move || poll_ex.run_until(quit)); 285 286 let uring_ex = URingExecutor::new().unwrap(); 287 uring_ex.run_until(go(source)).unwrap(); 288 289 state.lock().wake(); 290 handle.join().unwrap().unwrap(); 291 } 292 293 #[test] readvec()294 fn readvec() { 295 if !use_uring() { 296 return; 297 } 298 async fn go<F: AsRawFd>(async_source: Box<dyn IoSourceExt<F>>) { 299 let v = vec![0x55u8; 32]; 300 let v_ptr = v.as_ptr(); 301 let ret = async_source.read_to_vec(None, v).await.unwrap(); 302 assert_eq!(ret.0, 32); 303 let ret_v = ret.1; 304 assert_eq!(v_ptr, ret_v.as_ptr()); 305 assert!(ret_v.iter().all(|&b| b == 0)); 306 } 307 308 let f = File::open("/dev/zero").unwrap(); 309 let uring_ex = URingExecutor::new().unwrap(); 310 let uring_source = async_uring_from(f, &uring_ex).unwrap(); 311 uring_ex.run_until(go(uring_source)).unwrap(); 312 313 let f = File::open("/dev/zero").unwrap(); 314 let poll_ex = FdExecutor::new().unwrap(); 315 let poll_source = async_poll_from(f, &poll_ex).unwrap(); 316 poll_ex.run_until(go(poll_source)).unwrap(); 317 } 318 319 #[test] writevec()320 fn writevec() { 321 if !use_uring() { 322 return; 323 } 324 async fn go<F: AsRawFd>(async_source: Box<dyn IoSourceExt<F>>) { 325 let v = vec![0x55u8; 32]; 326 let v_ptr = v.as_ptr(); 327 let ret = async_source.write_from_vec(None, v).await.unwrap(); 328 assert_eq!(ret.0, 32); 329 let ret_v = ret.1; 330 assert_eq!(v_ptr, ret_v.as_ptr()); 331 } 332 333 let f = OpenOptions::new().write(true).open("/dev/null").unwrap(); 334 let ex = URingExecutor::new().unwrap(); 335 let uring_source = async_uring_from(f, &ex).unwrap(); 336 ex.run_until(go(uring_source)).unwrap(); 337 338 let f = OpenOptions::new().write(true).open("/dev/null").unwrap(); 339 let poll_ex = FdExecutor::new().unwrap(); 340 let poll_source = async_poll_from(f, &poll_ex).unwrap(); 341 poll_ex.run_until(go(poll_source)).unwrap(); 342 } 343 344 #[test] readmem()345 fn readmem() { 346 if !use_uring() { 347 return; 348 } 349 async fn go<F: AsRawFd>(async_source: Box<dyn IoSourceExt<F>>) { 350 let mem = Arc::new(VecIoWrapper::from(vec![0x55u8; 8192])); 351 let ret = async_source 352 .read_to_mem( 353 None, 354 Arc::<VecIoWrapper>::clone(&mem), 355 &[ 356 MemRegion { offset: 0, len: 32 }, 357 MemRegion { 358 offset: 200, 359 len: 56, 360 }, 361 ], 362 ) 363 .await 364 .unwrap(); 365 assert_eq!(ret, 32 + 56); 366 let vec: Vec<u8> = match Arc::try_unwrap(mem) { 367 Ok(v) => v.into(), 368 Err(_) => panic!("Too many vec refs"), 369 }; 370 assert!(vec.iter().take(32).all(|&b| b == 0)); 371 assert!(vec.iter().skip(32).take(168).all(|&b| b == 0x55)); 372 assert!(vec.iter().skip(200).take(56).all(|&b| b == 0)); 373 assert!(vec.iter().skip(256).all(|&b| b == 0x55)); 374 } 375 376 let f = File::open("/dev/zero").unwrap(); 377 let ex = URingExecutor::new().unwrap(); 378 let uring_source = async_uring_from(f, &ex).unwrap(); 379 ex.run_until(go(uring_source)).unwrap(); 380 381 let f = File::open("/dev/zero").unwrap(); 382 let poll_ex = FdExecutor::new().unwrap(); 383 let poll_source = async_poll_from(f, &poll_ex).unwrap(); 384 poll_ex.run_until(go(poll_source)).unwrap(); 385 } 386 387 #[test] writemem()388 fn writemem() { 389 if !use_uring() { 390 return; 391 } 392 async fn go<F: AsRawFd>(async_source: Box<dyn IoSourceExt<F>>) { 393 let mem = Arc::new(VecIoWrapper::from(vec![0x55u8; 8192])); 394 let ret = async_source 395 .write_from_mem( 396 None, 397 Arc::<VecIoWrapper>::clone(&mem), 398 &[MemRegion { offset: 0, len: 32 }], 399 ) 400 .await 401 .unwrap(); 402 assert_eq!(ret, 32); 403 } 404 405 let f = OpenOptions::new().write(true).open("/dev/null").unwrap(); 406 let ex = URingExecutor::new().unwrap(); 407 let uring_source = async_uring_from(f, &ex).unwrap(); 408 ex.run_until(go(uring_source)).unwrap(); 409 410 let f = OpenOptions::new().write(true).open("/dev/null").unwrap(); 411 let poll_ex = FdExecutor::new().unwrap(); 412 let poll_source = async_poll_from(f, &poll_ex).unwrap(); 413 poll_ex.run_until(go(poll_source)).unwrap(); 414 } 415 416 #[test] read_u64s()417 fn read_u64s() { 418 if !use_uring() { 419 return; 420 } 421 async fn go(async_source: File, ex: URingExecutor) -> u64 { 422 let source = async_uring_from(async_source, &ex).unwrap(); 423 source.read_u64().await.unwrap() 424 } 425 426 let f = File::open("/dev/zero").unwrap(); 427 let ex = URingExecutor::new().unwrap(); 428 let val = ex.run_until(go(f, ex.clone())).unwrap(); 429 assert_eq!(val, 0); 430 } 431 432 #[test] read_eventfds()433 fn read_eventfds() { 434 if !use_uring() { 435 return; 436 } 437 use sys_util::EventFd; 438 439 async fn go<F: AsRawFd>(source: Box<dyn IoSourceExt<F>>) -> u64 { 440 source.read_u64().await.unwrap() 441 } 442 443 let eventfd = EventFd::new().unwrap(); 444 eventfd.write(0x55).unwrap(); 445 let ex = URingExecutor::new().unwrap(); 446 let uring_source = async_uring_from(eventfd, &ex).unwrap(); 447 let val = ex.run_until(go(uring_source)).unwrap(); 448 assert_eq!(val, 0x55); 449 450 let eventfd = EventFd::new().unwrap(); 451 eventfd.write(0xaa).unwrap(); 452 let poll_ex = FdExecutor::new().unwrap(); 453 let poll_source = async_poll_from(eventfd, &poll_ex).unwrap(); 454 let val = poll_ex.run_until(go(poll_source)).unwrap(); 455 assert_eq!(val, 0xaa); 456 } 457 458 #[test] fsync()459 fn fsync() { 460 if !use_uring() { 461 return; 462 } 463 async fn go<F: AsRawFd>(source: Box<dyn IoSourceExt<F>>) { 464 let v = vec![0x55u8; 32]; 465 let v_ptr = v.as_ptr(); 466 let ret = source.write_from_vec(None, v).await.unwrap(); 467 assert_eq!(ret.0, 32); 468 let ret_v = ret.1; 469 assert_eq!(v_ptr, ret_v.as_ptr()); 470 source.fsync().await.unwrap(); 471 } 472 473 let f = tempfile::tempfile().unwrap(); 474 let ex = Executor::new().unwrap(); 475 let source = ex.async_from(f).unwrap(); 476 477 ex.run_until(go(source)).unwrap(); 478 } 479 } 480