1 // Copyright 2020 The Chromium OS Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 //! A wrapped IO source that uses FdExecutor to drive asynchronous completion. Used from 6 //! `IoSourceExt::new` when uring isn't available in the kernel. 7 8 use std::{ 9 io, 10 ops::{Deref, DerefMut}, 11 os::unix::io::AsRawFd, 12 sync::Arc, 13 }; 14 15 use async_trait::async_trait; 16 use data_model::VolatileSlice; 17 use remain::sorted; 18 use thiserror::Error as ThisError; 19 20 use super::{ 21 fd_executor::{ 22 FdExecutor, RegisteredSource, {self}, 23 }, 24 mem::{BackingMemory, MemRegion}, 25 AsyncError, AsyncResult, IoSourceExt, ReadAsync, WriteAsync, 26 }; 27 28 #[sorted] 29 #[derive(ThisError, Debug)] 30 pub enum Error { 31 /// An error occurred attempting to register a waker with the executor. 32 #[error("An error occurred attempting to register a waker with the executor: {0}.")] 33 AddingWaker(fd_executor::Error), 34 /// An executor error occurred. 35 #[error("An executor error occurred: {0}")] 36 Executor(fd_executor::Error), 37 /// An error occurred when executing fallocate synchronously. 38 #[error("An error occurred when executing fallocate synchronously: {0}")] 39 Fallocate(sys_util::Error), 40 /// An error occurred when executing fsync synchronously. 41 #[error("An error occurred when executing fsync synchronously: {0}")] 42 Fsync(sys_util::Error), 43 /// An error occurred when reading the FD. 44 #[error("An error occurred when reading the FD: {0}.")] 45 Read(sys_util::Error), 46 /// Can't seek file. 47 #[error("An error occurred when seeking the FD: {0}.")] 48 Seeking(sys_util::Error), 49 /// An error occurred when writing the FD. 50 #[error("An error occurred when writing the FD: {0}.")] 51 Write(sys_util::Error), 52 } 53 pub type Result<T> = std::result::Result<T, Error>; 54 55 impl From<Error> for io::Error { from(e: Error) -> Self56 fn from(e: Error) -> Self { 57 use Error::*; 58 match e { 59 AddingWaker(e) => e.into(), 60 Executor(e) => e.into(), 61 Fallocate(e) => e.into(), 62 Fsync(e) => e.into(), 63 Read(e) => e.into(), 64 Seeking(e) => e.into(), 65 Write(e) => e.into(), 66 } 67 } 68 } 69 70 /// Async wrapper for an IO source that uses the FD executor to drive async operations. 71 /// Used by `IoSourceExt::new` when uring isn't available. 72 pub struct PollSource<F>(RegisteredSource<F>); 73 74 impl<F: AsRawFd> PollSource<F> { 75 /// Create a new `PollSource` from the given IO source. new(f: F, ex: &FdExecutor) -> Result<Self>76 pub fn new(f: F, ex: &FdExecutor) -> Result<Self> { 77 ex.register_source(f) 78 .map(PollSource) 79 .map_err(Error::Executor) 80 } 81 82 /// Return the inner source. into_source(self) -> F83 pub fn into_source(self) -> F { 84 self.0.into_source() 85 } 86 } 87 88 impl<F: AsRawFd> Deref for PollSource<F> { 89 type Target = F; 90 deref(&self) -> &Self::Target91 fn deref(&self) -> &Self::Target { 92 self.0.as_ref() 93 } 94 } 95 96 impl<F: AsRawFd> DerefMut for PollSource<F> { deref_mut(&mut self) -> &mut Self::Target97 fn deref_mut(&mut self) -> &mut Self::Target { 98 self.0.as_mut() 99 } 100 } 101 102 #[async_trait(?Send)] 103 impl<F: AsRawFd> ReadAsync for PollSource<F> { 104 /// Reads from the iosource at `file_offset` and fill the given `vec`. read_to_vec<'a>( &'a self, file_offset: Option<u64>, mut vec: Vec<u8>, ) -> AsyncResult<(usize, Vec<u8>)>105 async fn read_to_vec<'a>( 106 &'a self, 107 file_offset: Option<u64>, 108 mut vec: Vec<u8>, 109 ) -> AsyncResult<(usize, Vec<u8>)> { 110 loop { 111 // Safe because this will only modify `vec` and we check the return value. 112 let res = if let Some(offset) = file_offset { 113 unsafe { 114 libc::pread64( 115 self.as_raw_fd(), 116 vec.as_mut_ptr() as *mut libc::c_void, 117 vec.len(), 118 offset as libc::off64_t, 119 ) 120 } 121 } else { 122 unsafe { 123 libc::read( 124 self.as_raw_fd(), 125 vec.as_mut_ptr() as *mut libc::c_void, 126 vec.len(), 127 ) 128 } 129 }; 130 131 if res >= 0 { 132 return Ok((res as usize, vec)); 133 } 134 135 match sys_util::Error::last() { 136 e if e.errno() == libc::EWOULDBLOCK => { 137 let op = self.0.wait_readable().map_err(Error::AddingWaker)?; 138 op.await.map_err(Error::Executor)?; 139 } 140 e => return Err(Error::Read(e).into()), 141 } 142 } 143 } 144 145 /// Reads to the given `mem` at the given offsets from the file starting at `file_offset`. read_to_mem<'a>( &'a self, file_offset: Option<u64>, mem: Arc<dyn BackingMemory + Send + Sync>, mem_offsets: &'a [MemRegion], ) -> AsyncResult<usize>146 async fn read_to_mem<'a>( 147 &'a self, 148 file_offset: Option<u64>, 149 mem: Arc<dyn BackingMemory + Send + Sync>, 150 mem_offsets: &'a [MemRegion], 151 ) -> AsyncResult<usize> { 152 let mut iovecs = mem_offsets 153 .iter() 154 .filter_map(|&mem_vec| mem.get_volatile_slice(mem_vec).ok()) 155 .collect::<Vec<VolatileSlice>>(); 156 157 loop { 158 // Safe because we trust the kernel not to write path the length given and the length is 159 // guaranteed to be valid from the pointer by io_slice_mut. 160 let res = if let Some(offset) = file_offset { 161 unsafe { 162 libc::preadv64( 163 self.as_raw_fd(), 164 iovecs.as_mut_ptr() as *mut _, 165 iovecs.len() as i32, 166 offset as libc::off64_t, 167 ) 168 } 169 } else { 170 unsafe { 171 libc::readv( 172 self.as_raw_fd(), 173 iovecs.as_mut_ptr() as *mut _, 174 iovecs.len() as i32, 175 ) 176 } 177 }; 178 179 if res >= 0 { 180 return Ok(res as usize); 181 } 182 183 match sys_util::Error::last() { 184 e if e.errno() == libc::EWOULDBLOCK => { 185 let op = self.0.wait_readable().map_err(Error::AddingWaker)?; 186 op.await.map_err(Error::Executor)?; 187 } 188 e => return Err(Error::Read(e).into()), 189 } 190 } 191 } 192 193 /// Wait for the FD of `self` to be readable. wait_readable(&self) -> AsyncResult<()>194 async fn wait_readable(&self) -> AsyncResult<()> { 195 let op = self.0.wait_readable().map_err(Error::AddingWaker)?; 196 op.await.map_err(Error::Executor)?; 197 Ok(()) 198 } 199 read_u64(&self) -> AsyncResult<u64>200 async fn read_u64(&self) -> AsyncResult<u64> { 201 let mut buf = 0u64.to_ne_bytes(); 202 loop { 203 // Safe because this will only modify `buf` and we check the return value. 204 let res = unsafe { 205 libc::read( 206 self.as_raw_fd(), 207 buf.as_mut_ptr() as *mut libc::c_void, 208 buf.len(), 209 ) 210 }; 211 212 if res >= 0 { 213 return Ok(u64::from_ne_bytes(buf)); 214 } 215 216 match sys_util::Error::last() { 217 e if e.errno() == libc::EWOULDBLOCK => { 218 let op = self.0.wait_readable().map_err(Error::AddingWaker)?; 219 op.await.map_err(Error::Executor)?; 220 } 221 e => return Err(Error::Read(e).into()), 222 } 223 } 224 } 225 } 226 227 #[async_trait(?Send)] 228 impl<F: AsRawFd> WriteAsync for PollSource<F> { 229 /// Writes from the given `vec` to the file starting at `file_offset`. write_from_vec<'a>( &'a self, file_offset: Option<u64>, vec: Vec<u8>, ) -> AsyncResult<(usize, Vec<u8>)>230 async fn write_from_vec<'a>( 231 &'a self, 232 file_offset: Option<u64>, 233 vec: Vec<u8>, 234 ) -> AsyncResult<(usize, Vec<u8>)> { 235 loop { 236 // Safe because this will not modify any memory and we check the return value. 237 let res = if let Some(offset) = file_offset { 238 unsafe { 239 libc::pwrite64( 240 self.as_raw_fd(), 241 vec.as_ptr() as *const libc::c_void, 242 vec.len(), 243 offset as libc::off64_t, 244 ) 245 } 246 } else { 247 unsafe { 248 libc::write( 249 self.as_raw_fd(), 250 vec.as_ptr() as *const libc::c_void, 251 vec.len(), 252 ) 253 } 254 }; 255 256 if res >= 0 { 257 return Ok((res as usize, vec)); 258 } 259 260 match sys_util::Error::last() { 261 e if e.errno() == libc::EWOULDBLOCK => { 262 let op = self.0.wait_writable().map_err(Error::AddingWaker)?; 263 op.await.map_err(Error::Executor)?; 264 } 265 e => return Err(Error::Write(e).into()), 266 } 267 } 268 } 269 270 /// Writes from the given `mem` from the given offsets to the file starting at `file_offset`. write_from_mem<'a>( &'a self, file_offset: Option<u64>, mem: Arc<dyn BackingMemory + Send + Sync>, mem_offsets: &'a [MemRegion], ) -> AsyncResult<usize>271 async fn write_from_mem<'a>( 272 &'a self, 273 file_offset: Option<u64>, 274 mem: Arc<dyn BackingMemory + Send + Sync>, 275 mem_offsets: &'a [MemRegion], 276 ) -> AsyncResult<usize> { 277 let iovecs = mem_offsets 278 .iter() 279 .map(|&mem_vec| mem.get_volatile_slice(mem_vec)) 280 .filter_map(|r| r.ok()) 281 .collect::<Vec<VolatileSlice>>(); 282 283 loop { 284 // Safe because we trust the kernel not to write path the length given and the length is 285 // guaranteed to be valid from the pointer by io_slice_mut. 286 let res = if let Some(offset) = file_offset { 287 unsafe { 288 libc::pwritev64( 289 self.as_raw_fd(), 290 iovecs.as_ptr() as *mut _, 291 iovecs.len() as i32, 292 offset as libc::off64_t, 293 ) 294 } 295 } else { 296 unsafe { 297 libc::writev( 298 self.as_raw_fd(), 299 iovecs.as_ptr() as *mut _, 300 iovecs.len() as i32, 301 ) 302 } 303 }; 304 305 if res >= 0 { 306 return Ok(res as usize); 307 } 308 309 match sys_util::Error::last() { 310 e if e.errno() == libc::EWOULDBLOCK => { 311 let op = self.0.wait_writable().map_err(Error::AddingWaker)?; 312 op.await.map_err(Error::Executor)?; 313 } 314 e => return Err(Error::Write(e).into()), 315 } 316 } 317 } 318 319 /// See `fallocate(2)` for details. fallocate(&self, file_offset: u64, len: u64, mode: u32) -> AsyncResult<()>320 async fn fallocate(&self, file_offset: u64, len: u64, mode: u32) -> AsyncResult<()> { 321 let ret = unsafe { 322 libc::fallocate64( 323 self.as_raw_fd(), 324 mode as libc::c_int, 325 file_offset as libc::off64_t, 326 len as libc::off64_t, 327 ) 328 }; 329 if ret == 0 { 330 Ok(()) 331 } else { 332 Err(AsyncError::Poll(Error::Fallocate(sys_util::Error::last()))) 333 } 334 } 335 336 /// Sync all completed write operations to the backing storage. fsync(&self) -> AsyncResult<()>337 async fn fsync(&self) -> AsyncResult<()> { 338 let ret = unsafe { libc::fsync(self.as_raw_fd()) }; 339 if ret == 0 { 340 Ok(()) 341 } else { 342 Err(AsyncError::Poll(Error::Fsync(sys_util::Error::last()))) 343 } 344 } 345 } 346 347 #[async_trait(?Send)] 348 impl<F: AsRawFd> IoSourceExt<F> for PollSource<F> { 349 /// Yields the underlying IO source. into_source(self: Box<Self>) -> F350 fn into_source(self: Box<Self>) -> F { 351 self.0.into_source() 352 } 353 354 /// Provides a mutable ref to the underlying IO source. as_source_mut(&mut self) -> &mut F355 fn as_source_mut(&mut self) -> &mut F { 356 self 357 } 358 359 /// Provides a ref to the underlying IO source. as_source(&self) -> &F360 fn as_source(&self) -> &F { 361 self 362 } 363 } 364 365 #[cfg(test)] 366 mod tests { 367 use std::{ 368 fs::{File, OpenOptions}, 369 path::PathBuf, 370 }; 371 372 use super::*; 373 374 #[test] readvec()375 fn readvec() { 376 async fn go(ex: &FdExecutor) { 377 let f = File::open("/dev/zero").unwrap(); 378 let async_source = PollSource::new(f, ex).unwrap(); 379 let v = vec![0x55u8; 32]; 380 let v_ptr = v.as_ptr(); 381 let ret = async_source.read_to_vec(None, v).await.unwrap(); 382 assert_eq!(ret.0, 32); 383 let ret_v = ret.1; 384 assert_eq!(v_ptr, ret_v.as_ptr()); 385 assert!(ret_v.iter().all(|&b| b == 0)); 386 } 387 388 let ex = FdExecutor::new().unwrap(); 389 ex.run_until(go(&ex)).unwrap(); 390 } 391 392 #[test] writevec()393 fn writevec() { 394 async fn go(ex: &FdExecutor) { 395 let f = OpenOptions::new().write(true).open("/dev/null").unwrap(); 396 let async_source = PollSource::new(f, ex).unwrap(); 397 let v = vec![0x55u8; 32]; 398 let v_ptr = v.as_ptr(); 399 let ret = async_source.write_from_vec(None, v).await.unwrap(); 400 assert_eq!(ret.0, 32); 401 let ret_v = ret.1; 402 assert_eq!(v_ptr, ret_v.as_ptr()); 403 } 404 405 let ex = FdExecutor::new().unwrap(); 406 ex.run_until(go(&ex)).unwrap(); 407 } 408 409 #[test] fallocate()410 fn fallocate() { 411 async fn go(ex: &FdExecutor) { 412 let dir = tempfile::TempDir::new().unwrap(); 413 let mut file_path = PathBuf::from(dir.path()); 414 file_path.push("test"); 415 416 let f = OpenOptions::new() 417 .create(true) 418 .write(true) 419 .open(&file_path) 420 .unwrap(); 421 let source = PollSource::new(f, ex).unwrap(); 422 source.fallocate(0, 4096, 0).await.unwrap(); 423 424 let meta_data = std::fs::metadata(&file_path).unwrap(); 425 assert_eq!(meta_data.len(), 4096); 426 } 427 428 let ex = FdExecutor::new().unwrap(); 429 ex.run_until(go(&ex)).unwrap(); 430 } 431 432 #[test] memory_leak()433 fn memory_leak() { 434 // This test needs to run under ASAN to detect memory leaks. 435 436 async fn owns_poll_source(source: PollSource<File>) { 437 let _ = source.wait_readable().await; 438 } 439 440 let (rx, _tx) = sys_util::pipe(true).unwrap(); 441 let ex = FdExecutor::new().unwrap(); 442 let source = PollSource::new(rx, &ex).unwrap(); 443 ex.spawn_local(owns_poll_source(source)).detach(); 444 445 // Drop `ex` without running. This would cause a memory leak if PollSource owned a strong 446 // reference to the executor because it owns a reference to the future that owns PollSource 447 // (via its Runnable). The strong reference prevents the drop impl from running, which would 448 // otherwise poll the future and have it return with an error. 449 } 450 } 451