1 // Copyright 2020 The Chromium OS Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 //! # `IoSourceExt` 6 //! 7 //! User functions to asynchronously access files. 8 //! Using `IoSource` directly is inconvenient and requires dealing with state 9 //! machines for the backing uring, future libraries, etc. `IoSourceExt` instead 10 //! provides users with a future that can be `await`ed from async context. 11 //! 12 //! Each member of `IoSourceExt` returns a future for the supported operation. One or more 13 //! operation can be pending at a time. 14 //! 15 //! Operations can only access memory in a `Vec` or an implementor of `BackingMemory`. See the 16 //! `URingExecutor` documentation for an explaination of why. 17 18 use std::fs::File; 19 use std::os::unix::io::AsRawFd; 20 use std::sync::Arc; 21 22 use async_trait::async_trait; 23 use sys_util::net::UnixSeqpacket; 24 use thiserror::Error as ThisError; 25 26 use crate::{BackingMemory, MemRegion}; 27 28 #[derive(ThisError, Debug)] 29 pub enum Error { 30 /// An error with a polled(FD) source. 31 #[error("An error with a poll source: {0}")] 32 Poll(crate::poll_source::Error), 33 /// An error with a uring source. 34 #[error("An error with a uring source: {0}")] 35 Uring(crate::uring_executor::Error), 36 } 37 pub type Result<T> = std::result::Result<T, Error>; 38 39 impl From<crate::uring_executor::Error> for Error { from(err: crate::uring_executor::Error) -> Self40 fn from(err: crate::uring_executor::Error) -> Self { 41 Error::Uring(err) 42 } 43 } 44 45 impl From<crate::poll_source::Error> for Error { from(err: crate::poll_source::Error) -> Self46 fn from(err: crate::poll_source::Error) -> Self { 47 Error::Poll(err) 48 } 49 } 50 51 /// Ergonomic methods for async reads. 52 #[async_trait(?Send)] 53 pub trait ReadAsync { 54 /// Reads from the iosource at `file_offset` and fill the given `vec`. read_to_vec<'a>(&'a self, file_offset: u64, vec: Vec<u8>) -> Result<(usize, Vec<u8>)>55 async fn read_to_vec<'a>(&'a self, file_offset: u64, vec: Vec<u8>) -> Result<(usize, Vec<u8>)>; 56 57 /// Reads to the given `mem` at the given offsets from the file starting at `file_offset`. read_to_mem<'a>( &'a self, file_offset: u64, mem: Arc<dyn BackingMemory + Send + Sync>, mem_offsets: &'a [MemRegion], ) -> Result<usize>58 async fn read_to_mem<'a>( 59 &'a self, 60 file_offset: u64, 61 mem: Arc<dyn BackingMemory + Send + Sync>, 62 mem_offsets: &'a [MemRegion], 63 ) -> Result<usize>; 64 65 /// Wait for the FD of `self` to be readable. wait_readable(&self) -> Result<()>66 async fn wait_readable(&self) -> Result<()>; 67 68 /// Reads a single u64 from the current offset. read_u64(&self) -> Result<u64>69 async fn read_u64(&self) -> Result<u64>; 70 } 71 72 /// Ergonomic methods for async writes. 73 #[async_trait(?Send)] 74 pub trait WriteAsync { 75 /// Writes from the given `vec` to the file starting at `file_offset`. write_from_vec<'a>( &'a self, file_offset: u64, vec: Vec<u8>, ) -> Result<(usize, Vec<u8>)>76 async fn write_from_vec<'a>( 77 &'a self, 78 file_offset: u64, 79 vec: Vec<u8>, 80 ) -> Result<(usize, Vec<u8>)>; 81 82 /// Writes from the given `mem` from the given offsets to the file starting at `file_offset`. write_from_mem<'a>( &'a self, file_offset: u64, mem: Arc<dyn BackingMemory + Send + Sync>, mem_offsets: &'a [MemRegion], ) -> Result<usize>83 async fn write_from_mem<'a>( 84 &'a self, 85 file_offset: u64, 86 mem: Arc<dyn BackingMemory + Send + Sync>, 87 mem_offsets: &'a [MemRegion], 88 ) -> Result<usize>; 89 90 /// See `fallocate(2)`. Note this op is synchronous when using the Polled backend. fallocate(&self, file_offset: u64, len: u64, mode: u32) -> Result<()>91 async fn fallocate(&self, file_offset: u64, len: u64, mode: u32) -> Result<()>; 92 93 /// Sync all completed write operations to the backing storage. fsync(&self) -> Result<()>94 async fn fsync(&self) -> Result<()>; 95 } 96 97 /// Subtrait for general async IO. 98 #[async_trait(?Send)] 99 pub trait IoSourceExt<F>: ReadAsync + WriteAsync { 100 /// Yields the underlying IO source. into_source(self: Box<Self>) -> F101 fn into_source(self: Box<Self>) -> F; 102 103 /// Provides a mutable ref to the underlying IO source. as_source_mut(&mut self) -> &mut F104 fn as_source_mut(&mut self) -> &mut F; 105 106 /// Provides a ref to the underlying IO source. as_source(&self) -> &F107 fn as_source(&self) -> &F; 108 } 109 110 /// Marker trait signifying that the implementor is suitable for use with 111 /// cros_async. Examples of this include File, and sys_util::net::UnixSeqpacket. 112 /// 113 /// (Note: it'd be really nice to implement a TryFrom for any implementors, and 114 /// remove our factory functions. Unfortunately 115 /// https://github.com/rust-lang/rust/issues/50133 makes that too painful.) 116 pub trait IntoAsync: AsRawFd {} 117 118 impl IntoAsync for File {} 119 impl IntoAsync for UnixSeqpacket {} 120 impl IntoAsync for &UnixSeqpacket {} 121 122 #[cfg(test)] 123 mod tests { 124 use std::fs::{File, OpenOptions}; 125 use std::future::Future; 126 use std::os::unix::io::AsRawFd; 127 use std::pin::Pin; 128 use std::sync::Arc; 129 use std::task::{Context, Poll, Waker}; 130 use std::thread; 131 132 use sync::Mutex; 133 134 use super::*; 135 use crate::executor::{async_poll_from, async_uring_from}; 136 use crate::mem::VecIoWrapper; 137 use crate::{Executor, FdExecutor, MemRegion, PollSource, URingExecutor, UringSource}; 138 139 struct State { 140 should_quit: bool, 141 waker: Option<Waker>, 142 } 143 144 impl State { wake(&mut self)145 fn wake(&mut self) { 146 self.should_quit = true; 147 let waker = self.waker.take(); 148 149 if let Some(waker) = waker { 150 waker.wake(); 151 } 152 } 153 } 154 155 struct Quit { 156 state: Arc<Mutex<State>>, 157 } 158 159 impl Future for Quit { 160 type Output = (); 161 poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()>162 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> { 163 let mut state = self.state.lock(); 164 if state.should_quit { 165 return Poll::Ready(()); 166 } 167 168 state.waker = Some(cx.waker().clone()); 169 Poll::Pending 170 } 171 } 172 173 #[test] await_uring_from_poll()174 fn await_uring_from_poll() { 175 // Start a uring operation and then await the result from an FdExecutor. 176 async fn go(source: UringSource<File>) { 177 let v = vec![0xa4u8; 16]; 178 let (len, vec) = source.read_to_vec(0, v).await.unwrap(); 179 assert_eq!(len, 16); 180 assert!(vec.iter().all(|&b| b == 0)); 181 } 182 183 let state = Arc::new(Mutex::new(State { 184 should_quit: false, 185 waker: None, 186 })); 187 188 let uring_ex = URingExecutor::new().unwrap(); 189 let f = File::open("/dev/zero").unwrap(); 190 let source = UringSource::new(f, &uring_ex).unwrap(); 191 192 let quit = Quit { 193 state: state.clone(), 194 }; 195 let handle = thread::spawn(move || uring_ex.run_until(quit)); 196 197 let poll_ex = FdExecutor::new().unwrap(); 198 poll_ex.run_until(go(source)).unwrap(); 199 200 state.lock().wake(); 201 handle.join().unwrap().unwrap(); 202 } 203 204 #[test] await_poll_from_uring()205 fn await_poll_from_uring() { 206 // Start a poll operation and then await the result from a URingExecutor. 207 async fn go(source: PollSource<File>) { 208 let v = vec![0x2cu8; 16]; 209 let (len, vec) = source.read_to_vec(0, v).await.unwrap(); 210 assert_eq!(len, 16); 211 assert!(vec.iter().all(|&b| b == 0)); 212 } 213 214 let state = Arc::new(Mutex::new(State { 215 should_quit: false, 216 waker: None, 217 })); 218 219 let poll_ex = FdExecutor::new().unwrap(); 220 let f = File::open("/dev/zero").unwrap(); 221 let source = PollSource::new(f, &poll_ex).unwrap(); 222 223 let quit = Quit { 224 state: state.clone(), 225 }; 226 let handle = thread::spawn(move || poll_ex.run_until(quit)); 227 228 let uring_ex = URingExecutor::new().unwrap(); 229 uring_ex.run_until(go(source)).unwrap(); 230 231 state.lock().wake(); 232 handle.join().unwrap().unwrap(); 233 } 234 235 #[test] readvec()236 fn readvec() { 237 async fn go<F: AsRawFd>(async_source: Box<dyn IoSourceExt<F>>) { 238 let v = vec![0x55u8; 32]; 239 let v_ptr = v.as_ptr(); 240 let ret = async_source.read_to_vec(0, v).await.unwrap(); 241 assert_eq!(ret.0, 32); 242 let ret_v = ret.1; 243 assert_eq!(v_ptr, ret_v.as_ptr()); 244 assert!(ret_v.iter().all(|&b| b == 0)); 245 } 246 247 let f = File::open("/dev/zero").unwrap(); 248 let uring_ex = URingExecutor::new().unwrap(); 249 let uring_source = async_uring_from(f, &uring_ex).unwrap(); 250 uring_ex.run_until(go(uring_source)).unwrap(); 251 252 let f = File::open("/dev/zero").unwrap(); 253 let poll_ex = FdExecutor::new().unwrap(); 254 let poll_source = async_poll_from(f, &poll_ex).unwrap(); 255 poll_ex.run_until(go(poll_source)).unwrap(); 256 } 257 258 #[test] writevec()259 fn writevec() { 260 async fn go<F: AsRawFd>(async_source: Box<dyn IoSourceExt<F>>) { 261 let v = vec![0x55u8; 32]; 262 let v_ptr = v.as_ptr(); 263 let ret = async_source.write_from_vec(0, v).await.unwrap(); 264 assert_eq!(ret.0, 32); 265 let ret_v = ret.1; 266 assert_eq!(v_ptr, ret_v.as_ptr()); 267 } 268 269 let f = OpenOptions::new().write(true).open("/dev/null").unwrap(); 270 let ex = URingExecutor::new().unwrap(); 271 let uring_source = async_uring_from(f, &ex).unwrap(); 272 ex.run_until(go(uring_source)).unwrap(); 273 274 let f = OpenOptions::new().write(true).open("/dev/null").unwrap(); 275 let poll_ex = FdExecutor::new().unwrap(); 276 let poll_source = async_poll_from(f, &poll_ex).unwrap(); 277 poll_ex.run_until(go(poll_source)).unwrap(); 278 } 279 280 #[test] readmem()281 fn readmem() { 282 async fn go<F: AsRawFd>(async_source: Box<dyn IoSourceExt<F>>) { 283 let mem = Arc::new(VecIoWrapper::from(vec![0x55u8; 8192])); 284 let ret = async_source 285 .read_to_mem( 286 0, 287 Arc::<VecIoWrapper>::clone(&mem), 288 &[ 289 MemRegion { offset: 0, len: 32 }, 290 MemRegion { 291 offset: 200, 292 len: 56, 293 }, 294 ], 295 ) 296 .await 297 .unwrap(); 298 assert_eq!(ret, 32 + 56); 299 let vec: Vec<u8> = match Arc::try_unwrap(mem) { 300 Ok(v) => v.into(), 301 Err(_) => panic!("Too many vec refs"), 302 }; 303 assert!(vec.iter().take(32).all(|&b| b == 0)); 304 assert!(vec.iter().skip(32).take(168).all(|&b| b == 0x55)); 305 assert!(vec.iter().skip(200).take(56).all(|&b| b == 0)); 306 assert!(vec.iter().skip(256).all(|&b| b == 0x55)); 307 } 308 309 let f = File::open("/dev/zero").unwrap(); 310 let ex = URingExecutor::new().unwrap(); 311 let uring_source = async_uring_from(f, &ex).unwrap(); 312 ex.run_until(go(uring_source)).unwrap(); 313 314 let f = File::open("/dev/zero").unwrap(); 315 let poll_ex = FdExecutor::new().unwrap(); 316 let poll_source = async_poll_from(f, &poll_ex).unwrap(); 317 poll_ex.run_until(go(poll_source)).unwrap(); 318 } 319 320 #[test] writemem()321 fn writemem() { 322 async fn go<F: AsRawFd>(async_source: Box<dyn IoSourceExt<F>>) { 323 let mem = Arc::new(VecIoWrapper::from(vec![0x55u8; 8192])); 324 let ret = async_source 325 .write_from_mem( 326 0, 327 Arc::<VecIoWrapper>::clone(&mem), 328 &[MemRegion { offset: 0, len: 32 }], 329 ) 330 .await 331 .unwrap(); 332 assert_eq!(ret, 32); 333 } 334 335 let f = OpenOptions::new().write(true).open("/dev/null").unwrap(); 336 let ex = URingExecutor::new().unwrap(); 337 let uring_source = async_uring_from(f, &ex).unwrap(); 338 ex.run_until(go(uring_source)).unwrap(); 339 340 let f = OpenOptions::new().write(true).open("/dev/null").unwrap(); 341 let poll_ex = FdExecutor::new().unwrap(); 342 let poll_source = async_poll_from(f, &poll_ex).unwrap(); 343 poll_ex.run_until(go(poll_source)).unwrap(); 344 } 345 346 #[test] read_u64s()347 fn read_u64s() { 348 async fn go(async_source: File, ex: URingExecutor) -> u64 { 349 let source = async_uring_from(async_source, &ex).unwrap(); 350 source.read_u64().await.unwrap() 351 } 352 353 let f = File::open("/dev/zero").unwrap(); 354 let ex = URingExecutor::new().unwrap(); 355 let val = ex.run_until(go(f, ex.clone())).unwrap(); 356 assert_eq!(val, 0); 357 } 358 359 #[test] read_eventfds()360 fn read_eventfds() { 361 use sys_util::EventFd; 362 363 async fn go<F: AsRawFd>(source: Box<dyn IoSourceExt<F>>) -> u64 { 364 source.read_u64().await.unwrap() 365 } 366 367 let eventfd = EventFd::new().unwrap(); 368 eventfd.write(0x55).unwrap(); 369 let ex = URingExecutor::new().unwrap(); 370 let uring_source = async_uring_from(eventfd, &ex).unwrap(); 371 let val = ex.run_until(go(uring_source)).unwrap(); 372 assert_eq!(val, 0x55); 373 374 let eventfd = EventFd::new().unwrap(); 375 eventfd.write(0xaa).unwrap(); 376 let poll_ex = FdExecutor::new().unwrap(); 377 let poll_source = async_poll_from(eventfd, &poll_ex).unwrap(); 378 let val = poll_ex.run_until(go(poll_source)).unwrap(); 379 assert_eq!(val, 0xaa); 380 } 381 382 #[test] fsync()383 fn fsync() { 384 async fn go<F: AsRawFd>(source: Box<dyn IoSourceExt<F>>) { 385 let v = vec![0x55u8; 32]; 386 let v_ptr = v.as_ptr(); 387 let ret = source.write_from_vec(0, v).await.unwrap(); 388 assert_eq!(ret.0, 32); 389 let ret_v = ret.1; 390 assert_eq!(v_ptr, ret_v.as_ptr()); 391 source.fsync().await.unwrap(); 392 } 393 394 let f = tempfile::tempfile().unwrap(); 395 let ex = Executor::new().unwrap(); 396 let source = ex.async_from(f).unwrap(); 397 398 ex.run_until(go(source)).unwrap(); 399 } 400 } 401