1 // Copyright 2020 The ChromiumOS Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 use std::convert::TryInto; 6 use std::io; 7 use std::ops::Deref; 8 use std::ops::DerefMut; 9 use std::sync::Arc; 10 11 use base::AsRawDescriptor; 12 13 use super::uring_executor::Error; 14 use super::uring_executor::RegisteredSource; 15 use super::uring_executor::Result; 16 use super::uring_executor::URingExecutor; 17 use crate::mem::BackingMemory; 18 use crate::mem::MemRegion; 19 use crate::mem::VecIoWrapper; 20 use crate::AllocateMode; 21 use crate::AsyncError; 22 use crate::AsyncResult; 23 24 /// `UringSource` wraps FD backed IO sources for use with io_uring. It is a thin wrapper around 25 /// registering an IO source with the uring that provides an `IoSource` implementation. 26 pub struct UringSource<F: AsRawDescriptor> { 27 registered_source: RegisteredSource, 28 source: F, 29 } 30 31 impl<F: AsRawDescriptor> UringSource<F> { 32 /// Creates a new `UringSource` that wraps the given `io_source` object. new(io_source: F, ex: &URingExecutor) -> Result<UringSource<F>>33 pub fn new(io_source: F, ex: &URingExecutor) -> Result<UringSource<F>> { 34 let r = ex.register_source(&io_source)?; 35 Ok(UringSource { 36 registered_source: r, 37 source: io_source, 38 }) 39 } 40 41 /// Reads from the iosource at `file_offset` and fill the given `vec`. read_to_vec( &self, file_offset: Option<u64>, vec: Vec<u8>, ) -> AsyncResult<(usize, Vec<u8>)>42 pub async fn read_to_vec( 43 &self, 44 file_offset: Option<u64>, 45 vec: Vec<u8>, 46 ) -> AsyncResult<(usize, Vec<u8>)> { 47 let buf = Arc::new(VecIoWrapper::from(vec)); 48 let op = self.registered_source.start_read_to_mem( 49 file_offset, 50 buf.clone(), 51 &[MemRegion { 52 offset: 0, 53 len: buf.len(), 54 }], 55 )?; 56 let len = op.await?; 57 let bytes = if let Ok(v) = Arc::try_unwrap(buf) { 58 v.into() 59 } else { 60 panic!("too many refs on buf"); 61 }; 62 63 Ok((len as usize, bytes)) 64 } 65 66 /// Wait for the FD of `self` to be readable. wait_readable(&self) -> AsyncResult<()>67 pub async fn wait_readable(&self) -> AsyncResult<()> { 68 let op = self.registered_source.poll_fd_readable()?; 69 op.await?; 70 Ok(()) 71 } 72 73 /// Reads a single u64 (e.g. from an eventfd). read_u64(&self) -> AsyncResult<u64>74 pub async fn read_u64(&self) -> AsyncResult<u64> { 75 // This doesn't just forward to read_to_vec to avoid an unnecessary extra allocation from 76 // async-trait. 77 let buf = Arc::new(VecIoWrapper::from(0u64.to_ne_bytes().to_vec())); 78 let op = self.registered_source.start_read_to_mem( 79 None, 80 buf.clone(), 81 &[MemRegion { 82 offset: 0, 83 len: buf.len(), 84 }], 85 )?; 86 let len = op.await?; 87 if len != buf.len() as u32 { 88 Err(AsyncError::Uring(Error::Io(io::Error::new( 89 io::ErrorKind::Other, 90 format!("expected to read {} bytes, but read {}", buf.len(), len), 91 )))) 92 } else { 93 let bytes: Vec<u8> = if let Ok(v) = Arc::try_unwrap(buf) { 94 v.into() 95 } else { 96 panic!("too many refs on buf"); 97 }; 98 99 // Will never panic because bytes is of the appropriate size. 100 Ok(u64::from_ne_bytes(bytes[..].try_into().unwrap())) 101 } 102 } 103 104 /// Reads to the given `mem` at the given offsets from the file starting at `file_offset`. read_to_mem( &self, file_offset: Option<u64>, mem: Arc<dyn BackingMemory + Send + Sync>, mem_offsets: &[MemRegion], ) -> AsyncResult<usize>105 pub async fn read_to_mem( 106 &self, 107 file_offset: Option<u64>, 108 mem: Arc<dyn BackingMemory + Send + Sync>, 109 mem_offsets: &[MemRegion], 110 ) -> AsyncResult<usize> { 111 let op = self 112 .registered_source 113 .start_read_to_mem(file_offset, mem, mem_offsets)?; 114 let len = op.await?; 115 Ok(len as usize) 116 } 117 118 /// Writes from the given `vec` to the file starting at `file_offset`. write_from_vec( &self, file_offset: Option<u64>, vec: Vec<u8>, ) -> AsyncResult<(usize, Vec<u8>)>119 pub async fn write_from_vec( 120 &self, 121 file_offset: Option<u64>, 122 vec: Vec<u8>, 123 ) -> AsyncResult<(usize, Vec<u8>)> { 124 let buf = Arc::new(VecIoWrapper::from(vec)); 125 let op = self.registered_source.start_write_from_mem( 126 file_offset, 127 buf.clone(), 128 &[MemRegion { 129 offset: 0, 130 len: buf.len(), 131 }], 132 )?; 133 let len = op.await?; 134 let bytes = if let Ok(v) = Arc::try_unwrap(buf) { 135 v.into() 136 } else { 137 panic!("too many refs on buf"); 138 }; 139 140 Ok((len as usize, bytes)) 141 } 142 143 /// Writes from the given `mem` from the given offsets to the file starting at `file_offset`. write_from_mem( &self, file_offset: Option<u64>, mem: Arc<dyn BackingMemory + Send + Sync>, mem_offsets: &[MemRegion], ) -> AsyncResult<usize>144 pub async fn write_from_mem( 145 &self, 146 file_offset: Option<u64>, 147 mem: Arc<dyn BackingMemory + Send + Sync>, 148 mem_offsets: &[MemRegion], 149 ) -> AsyncResult<usize> { 150 let op = self 151 .registered_source 152 .start_write_from_mem(file_offset, mem, mem_offsets)?; 153 let len = op.await?; 154 Ok(len as usize) 155 } 156 157 /// See `fallocate(2)`. Note this op is synchronous when using the Polled backend. fallocate( &self, file_offset: u64, len: u64, mode: AllocateMode, ) -> AsyncResult<()>158 pub async fn fallocate( 159 &self, 160 file_offset: u64, 161 len: u64, 162 mode: AllocateMode, 163 ) -> AsyncResult<()> { 164 let op = self 165 .registered_source 166 .start_fallocate(file_offset, len, mode.into())?; 167 let _ = op.await?; 168 Ok(()) 169 } 170 171 /// Sync all completed write operations to the backing storage. fsync(&self) -> AsyncResult<()>172 pub async fn fsync(&self) -> AsyncResult<()> { 173 let op = self.registered_source.start_fsync()?; 174 let _ = op.await?; 175 Ok(()) 176 } 177 178 /// Yields the underlying IO source. into_source(self) -> F179 pub fn into_source(self) -> F { 180 self.source 181 } 182 183 /// Provides a mutable ref to the underlying IO source. as_source(&self) -> &F184 pub fn as_source(&self) -> &F { 185 &self.source 186 } 187 188 /// Provides a ref to the underlying IO source. as_source_mut(&mut self) -> &mut F189 pub fn as_source_mut(&mut self) -> &mut F { 190 &mut self.source 191 } 192 wait_for_handle(&self) -> AsyncResult<u64>193 pub async fn wait_for_handle(&self) -> AsyncResult<u64> { 194 self.read_u64().await 195 } 196 } 197 198 impl<F: AsRawDescriptor> Deref for UringSource<F> { 199 type Target = F; 200 deref(&self) -> &Self::Target201 fn deref(&self) -> &Self::Target { 202 &self.source 203 } 204 } 205 206 impl<F: AsRawDescriptor> DerefMut for UringSource<F> { deref_mut(&mut self) -> &mut Self::Target207 fn deref_mut(&mut self) -> &mut Self::Target { 208 &mut self.source 209 } 210 } 211 212 // NOTE: Prefer adding tests to io_source.rs if not backend specific. 213 #[cfg(test)] 214 mod tests { 215 use std::fs::File; 216 use std::fs::OpenOptions; 217 use std::future::Future; 218 use std::path::PathBuf; 219 use std::pin::Pin; 220 use std::task::Context; 221 use std::task::Poll; 222 use std::task::Waker; 223 224 use sync::Mutex; 225 use tempfile::tempfile; 226 227 use super::super::uring_executor::is_uring_stable; 228 use super::super::UringSource; 229 use super::*; 230 use crate::Executor; 231 use crate::ExecutorKind; 232 use crate::IoSource; 233 234 #[test] readmulti()235 fn readmulti() { 236 if !is_uring_stable() { 237 return; 238 } 239 240 async fn go(ex: &URingExecutor) { 241 let f = File::open("/dev/zero").unwrap(); 242 let source = UringSource::new(f, ex).unwrap(); 243 let v = vec![0x55u8; 32]; 244 let v2 = vec![0x55u8; 32]; 245 let (ret, ret2) = futures::future::join( 246 source.read_to_vec(None, v), 247 source.read_to_vec(Some(32), v2), 248 ) 249 .await; 250 251 assert!(ret.unwrap().1.iter().all(|&b| b == 0)); 252 assert!(ret2.unwrap().1.iter().all(|&b| b == 0)); 253 } 254 255 let ex = URingExecutor::new().unwrap(); 256 ex.run_until(go(&ex)).unwrap(); 257 } 258 read_u64<T: AsRawDescriptor>(source: &UringSource<T>) -> u64259 async fn read_u64<T: AsRawDescriptor>(source: &UringSource<T>) -> u64 { 260 // Init a vec that translates to u64::max; 261 let u64_mem = vec![0xffu8; std::mem::size_of::<u64>()]; 262 let (ret, u64_mem) = source.read_to_vec(None, u64_mem).await.unwrap(); 263 assert_eq!(ret as usize, std::mem::size_of::<u64>()); 264 let mut val = 0u64.to_ne_bytes(); 265 val.copy_from_slice(&u64_mem); 266 u64::from_ne_bytes(val) 267 } 268 269 #[test] event()270 fn event() { 271 if !is_uring_stable() { 272 return; 273 } 274 275 use base::Event; 276 use base::EventExt; 277 278 async fn write_event(ev: Event, wait: Event, ex: &URingExecutor) { 279 let wait = UringSource::new(wait, ex).unwrap(); 280 ev.write_count(55).unwrap(); 281 read_u64(&wait).await; 282 ev.write_count(66).unwrap(); 283 read_u64(&wait).await; 284 ev.write_count(77).unwrap(); 285 read_u64(&wait).await; 286 } 287 288 async fn read_events(ev: Event, signal: Event, ex: &URingExecutor) { 289 let source = UringSource::new(ev, ex).unwrap(); 290 assert_eq!(read_u64(&source).await, 55); 291 signal.signal().unwrap(); 292 assert_eq!(read_u64(&source).await, 66); 293 signal.signal().unwrap(); 294 assert_eq!(read_u64(&source).await, 77); 295 signal.signal().unwrap(); 296 } 297 298 let event = Event::new().unwrap(); 299 let signal_wait = Event::new().unwrap(); 300 let ex = URingExecutor::new().unwrap(); 301 let write_task = write_event( 302 event.try_clone().unwrap(), 303 signal_wait.try_clone().unwrap(), 304 &ex, 305 ); 306 let read_task = read_events(event, signal_wait, &ex); 307 ex.run_until(futures::future::join(read_task, write_task)) 308 .unwrap(); 309 } 310 311 #[test] pend_on_pipe()312 fn pend_on_pipe() { 313 if !is_uring_stable() { 314 return; 315 } 316 317 use std::io::Write; 318 319 use futures::future::Either; 320 321 async fn do_test(ex: &URingExecutor) { 322 let (read_source, mut w) = base::pipe(true).unwrap(); 323 let source = UringSource::new(read_source, ex).unwrap(); 324 let done = Box::pin(async { 5usize }); 325 let pending = Box::pin(read_u64(&source)); 326 match futures::future::select(pending, done).await { 327 Either::Right((5, pending)) => { 328 // Write to the pipe so that the kernel will release the memory associated with 329 // the uring read operation. 330 w.write_all(&[0]).expect("failed to write to pipe"); 331 ::std::mem::drop(pending); 332 } 333 _ => panic!("unexpected select result"), 334 }; 335 } 336 337 let ex = URingExecutor::new().unwrap(); 338 ex.run_until(do_test(&ex)).unwrap(); 339 } 340 341 #[test] range_error()342 fn range_error() { 343 if !is_uring_stable() { 344 return; 345 } 346 347 async fn go(ex: &URingExecutor) { 348 let f = File::open("/dev/zero").unwrap(); 349 let source = UringSource::new(f, ex).unwrap(); 350 let v = vec![0x55u8; 64]; 351 let vw = Arc::new(VecIoWrapper::from(v)); 352 let ret = source 353 .read_to_mem( 354 None, 355 Arc::<VecIoWrapper>::clone(&vw), 356 &[MemRegion { 357 offset: 32, 358 len: 33, 359 }], 360 ) 361 .await; 362 assert!(ret.is_err()); 363 } 364 365 let ex = URingExecutor::new().unwrap(); 366 ex.run_until(go(&ex)).unwrap(); 367 } 368 369 #[test] fallocate()370 fn fallocate() { 371 if !is_uring_stable() { 372 return; 373 } 374 375 async fn go(ex: &URingExecutor) { 376 let dir = tempfile::TempDir::new().unwrap(); 377 let mut file_path = PathBuf::from(dir.path()); 378 file_path.push("test"); 379 380 let f = OpenOptions::new() 381 .create(true) 382 .write(true) 383 .open(&file_path) 384 .unwrap(); 385 let source = UringSource::new(f, ex).unwrap(); 386 if let Err(e) = source.fallocate(0, 4096, AllocateMode::Allocate).await { 387 match e { 388 crate::io_ext::Error::Uring(super::super::uring_executor::Error::Io( 389 io_err, 390 )) => { 391 if io_err.kind() == std::io::ErrorKind::InvalidInput { 392 // Skip the test on kernels before fallocate support. 393 return; 394 } 395 } 396 _ => panic!("Unexpected uring error on fallocate: {}", e), 397 } 398 } 399 400 let meta_data = std::fs::metadata(&file_path).unwrap(); 401 assert_eq!(meta_data.len(), 4096); 402 } 403 404 let ex = URingExecutor::new().unwrap(); 405 ex.run_until(go(&ex)).unwrap(); 406 } 407 408 #[test] fsync()409 fn fsync() { 410 if !is_uring_stable() { 411 return; 412 } 413 414 async fn go(ex: &URingExecutor) { 415 let f = tempfile::tempfile().unwrap(); 416 let source = UringSource::new(f, ex).unwrap(); 417 source.fsync().await.unwrap(); 418 } 419 420 let ex = URingExecutor::new().unwrap(); 421 ex.run_until(go(&ex)).unwrap(); 422 } 423 424 #[test] wait_read()425 fn wait_read() { 426 if !is_uring_stable() { 427 return; 428 } 429 430 async fn go(ex: &URingExecutor) { 431 let f = File::open("/dev/zero").unwrap(); 432 let source = UringSource::new(f, ex).unwrap(); 433 source.wait_readable().await.unwrap(); 434 } 435 436 let ex = URingExecutor::new().unwrap(); 437 ex.run_until(go(&ex)).unwrap(); 438 } 439 440 #[test] writemulti()441 fn writemulti() { 442 if !is_uring_stable() { 443 return; 444 } 445 446 async fn go(ex: &URingExecutor) { 447 let f = tempfile().unwrap(); 448 let source = UringSource::new(f, ex).unwrap(); 449 let v = vec![0x55u8; 32]; 450 let v2 = vec![0x55u8; 32]; 451 let (r, r2) = futures::future::join( 452 source.write_from_vec(None, v), 453 source.write_from_vec(Some(32), v2), 454 ) 455 .await; 456 assert_eq!(32, r.unwrap().0); 457 assert_eq!(32, r2.unwrap().0); 458 } 459 460 let ex = URingExecutor::new().unwrap(); 461 ex.run_until(go(&ex)).unwrap(); 462 } 463 464 #[test] readwrite_current_file_position()465 fn readwrite_current_file_position() { 466 if !is_uring_stable() { 467 return; 468 } 469 470 let ex = URingExecutor::new().unwrap(); 471 ex.run_until(async { 472 let mut f = tempfile().unwrap(); 473 let source = UringSource::new(f.try_clone().unwrap(), &ex).unwrap(); 474 assert_eq!( 475 32, 476 source 477 .write_from_vec(None, vec![0x55u8; 32]) 478 .await 479 .unwrap() 480 .0 481 ); 482 assert_eq!( 483 32, 484 source 485 .write_from_vec(None, vec![0xffu8; 32]) 486 .await 487 .unwrap() 488 .0 489 ); 490 use std::io::Seek; 491 f.seek(std::io::SeekFrom::Start(0)).unwrap(); 492 assert_eq!( 493 (32, vec![0x55u8; 32]), 494 source.read_to_vec(None, vec![0u8; 32]).await.unwrap() 495 ); 496 assert_eq!( 497 (32, vec![0xffu8; 32]), 498 source.read_to_vec(None, vec![0u8; 32]).await.unwrap() 499 ); 500 }) 501 .unwrap(); 502 } 503 504 struct State { 505 should_quit: bool, 506 waker: Option<Waker>, 507 } 508 509 impl State { wake(&mut self)510 fn wake(&mut self) { 511 self.should_quit = true; 512 let waker = self.waker.take(); 513 514 if let Some(waker) = waker { 515 waker.wake(); 516 } 517 } 518 } 519 520 struct Quit { 521 state: Arc<Mutex<State>>, 522 } 523 524 impl Future for Quit { 525 type Output = (); 526 poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()>527 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> { 528 let mut state = self.state.lock(); 529 if state.should_quit { 530 return Poll::Ready(()); 531 } 532 533 state.waker = Some(cx.waker().clone()); 534 Poll::Pending 535 } 536 } 537 538 #[cfg(unix)] 539 #[test] await_uring_from_poll()540 fn await_uring_from_poll() { 541 if !is_uring_stable() { 542 return; 543 } 544 // Start a uring operation and then await the result from an FdExecutor. 545 async fn go(source: IoSource<File>) { 546 let v = vec![0xa4u8; 16]; 547 let (len, vec) = source.read_to_vec(None, v).await.unwrap(); 548 assert_eq!(len, 16); 549 assert!(vec.iter().all(|&b| b == 0)); 550 } 551 552 let state = Arc::new(Mutex::new(State { 553 should_quit: false, 554 waker: None, 555 })); 556 557 let uring_ex = Executor::with_executor_kind(ExecutorKind::Uring).unwrap(); 558 let f = File::open("/dev/zero").unwrap(); 559 let source = uring_ex.async_from(f).unwrap(); 560 561 let quit = Quit { 562 state: state.clone(), 563 }; 564 let handle = std::thread::spawn(move || uring_ex.run_until(quit)); 565 566 let poll_ex = Executor::with_executor_kind(ExecutorKind::Fd).unwrap(); 567 poll_ex.run_until(go(source)).unwrap(); 568 569 state.lock().wake(); 570 handle.join().unwrap().unwrap(); 571 } 572 573 #[cfg(unix)] 574 #[test] await_poll_from_uring()575 fn await_poll_from_uring() { 576 if !is_uring_stable() { 577 return; 578 } 579 // Start a poll operation and then await the result from a URingExecutor. 580 async fn go(source: IoSource<File>) { 581 let v = vec![0x2cu8; 16]; 582 let (len, vec) = source.read_to_vec(None, v).await.unwrap(); 583 assert_eq!(len, 16); 584 assert!(vec.iter().all(|&b| b == 0)); 585 } 586 587 let state = Arc::new(Mutex::new(State { 588 should_quit: false, 589 waker: None, 590 })); 591 592 let poll_ex = Executor::with_executor_kind(ExecutorKind::Fd).unwrap(); 593 let f = File::open("/dev/zero").unwrap(); 594 let source = poll_ex.async_from(f).unwrap(); 595 596 let quit = Quit { 597 state: state.clone(), 598 }; 599 let handle = std::thread::spawn(move || poll_ex.run_until(quit)); 600 601 let uring_ex = Executor::with_executor_kind(ExecutorKind::Uring).unwrap(); 602 uring_ex.run_until(go(source)).unwrap(); 603 604 state.lock().wake(); 605 handle.join().unwrap().unwrap(); 606 } 607 } 608