1 // Copyright 2020 The Chromium OS Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 use std::{ 6 convert::TryInto, 7 io, 8 ops::{Deref, DerefMut}, 9 os::unix::io::AsRawFd, 10 sync::Arc, 11 }; 12 13 use async_trait::async_trait; 14 15 use super::{ 16 mem::{BackingMemory, MemRegion, VecIoWrapper}, 17 uring_executor::{Error, RegisteredSource, Result, URingExecutor}, 18 AsyncError, AsyncResult, 19 }; 20 21 /// `UringSource` wraps FD backed IO sources for use with io_uring. It is a thin wrapper around 22 /// registering an IO source with the uring that provides an `IoSource` implementation. 23 /// Most useful functions are provided by 'IoSourceExt'. 24 pub struct UringSource<F: AsRawFd> { 25 registered_source: RegisteredSource, 26 source: F, 27 } 28 29 impl<F: AsRawFd> UringSource<F> { 30 /// Creates a new `UringSource` that wraps the given `io_source` object. new(io_source: F, ex: &URingExecutor) -> Result<UringSource<F>>31 pub fn new(io_source: F, ex: &URingExecutor) -> Result<UringSource<F>> { 32 let r = ex.register_source(&io_source)?; 33 Ok(UringSource { 34 registered_source: r, 35 source: io_source, 36 }) 37 } 38 39 /// Consume `self` and return the object used to create it. into_source(self) -> F40 pub fn into_source(self) -> F { 41 self.source 42 } 43 } 44 45 #[async_trait(?Send)] 46 impl<F: AsRawFd> super::ReadAsync for UringSource<F> { 47 /// Reads from the iosource at `file_offset` and fill the given `vec`. read_to_vec<'a>( &'a self, file_offset: Option<u64>, vec: Vec<u8>, ) -> AsyncResult<(usize, Vec<u8>)>48 async fn read_to_vec<'a>( 49 &'a self, 50 file_offset: Option<u64>, 51 vec: Vec<u8>, 52 ) -> AsyncResult<(usize, Vec<u8>)> { 53 let buf = Arc::new(VecIoWrapper::from(vec)); 54 let op = self.registered_source.start_read_to_mem( 55 file_offset.unwrap_or(0), 56 buf.clone(), 57 &[MemRegion { 58 offset: 0, 59 len: buf.len(), 60 }], 61 )?; 62 let len = op.await?; 63 let bytes = if let Ok(v) = Arc::try_unwrap(buf) { 64 v.into() 65 } else { 66 panic!("too many refs on buf"); 67 }; 68 69 Ok((len as usize, bytes)) 70 } 71 72 /// Wait for the FD of `self` to be readable. wait_readable(&self) -> AsyncResult<()>73 async fn wait_readable(&self) -> AsyncResult<()> { 74 let op = self.registered_source.poll_fd_readable()?; 75 op.await?; 76 Ok(()) 77 } 78 79 /// Reads a single u64 (e.g. from an eventfd). read_u64(&self) -> AsyncResult<u64>80 async fn read_u64(&self) -> AsyncResult<u64> { 81 // This doesn't just forward to read_to_vec to avoid an unnecessary extra allocation from 82 // async-trait. 83 let buf = Arc::new(VecIoWrapper::from(0u64.to_ne_bytes().to_vec())); 84 let op = self.registered_source.start_read_to_mem( 85 0, 86 buf.clone(), 87 &[MemRegion { 88 offset: 0, 89 len: buf.len(), 90 }], 91 )?; 92 let len = op.await?; 93 if len != buf.len() as u32 { 94 Err(AsyncError::Uring(Error::Io(io::Error::new( 95 io::ErrorKind::Other, 96 format!("expected to read {} bytes, but read {}", buf.len(), len), 97 )))) 98 } else { 99 let bytes: Vec<u8> = if let Ok(v) = Arc::try_unwrap(buf) { 100 v.into() 101 } else { 102 panic!("too many refs on buf"); 103 }; 104 105 // Will never panic because bytes is of the appropriate size. 106 Ok(u64::from_ne_bytes(bytes[..].try_into().unwrap())) 107 } 108 } 109 110 /// Reads to the given `mem` at the given offsets from the file starting at `file_offset`. read_to_mem<'a>( &'a self, file_offset: Option<u64>, mem: Arc<dyn BackingMemory + Send + Sync>, mem_offsets: &'a [MemRegion], ) -> AsyncResult<usize>111 async fn read_to_mem<'a>( 112 &'a self, 113 file_offset: Option<u64>, 114 mem: Arc<dyn BackingMemory + Send + Sync>, 115 mem_offsets: &'a [MemRegion], 116 ) -> AsyncResult<usize> { 117 let op = 118 self.registered_source 119 .start_read_to_mem(file_offset.unwrap_or(0), mem, mem_offsets)?; 120 let len = op.await?; 121 Ok(len as usize) 122 } 123 } 124 125 #[async_trait(?Send)] 126 impl<F: AsRawFd> super::WriteAsync for UringSource<F> { 127 /// Writes from the given `vec` to the file starting at `file_offset`. write_from_vec<'a>( &'a self, file_offset: Option<u64>, vec: Vec<u8>, ) -> AsyncResult<(usize, Vec<u8>)>128 async fn write_from_vec<'a>( 129 &'a self, 130 file_offset: Option<u64>, 131 vec: Vec<u8>, 132 ) -> AsyncResult<(usize, Vec<u8>)> { 133 let buf = Arc::new(VecIoWrapper::from(vec)); 134 let op = self.registered_source.start_write_from_mem( 135 file_offset.unwrap_or(0), 136 buf.clone(), 137 &[MemRegion { 138 offset: 0, 139 len: buf.len(), 140 }], 141 )?; 142 let len = op.await?; 143 let bytes = if let Ok(v) = Arc::try_unwrap(buf) { 144 v.into() 145 } else { 146 panic!("too many refs on buf"); 147 }; 148 149 Ok((len as usize, bytes)) 150 } 151 152 /// Writes from the given `mem` from the given offsets to the file starting at `file_offset`. write_from_mem<'a>( &'a self, file_offset: Option<u64>, mem: Arc<dyn BackingMemory + Send + Sync>, mem_offsets: &'a [MemRegion], ) -> AsyncResult<usize>153 async fn write_from_mem<'a>( 154 &'a self, 155 file_offset: Option<u64>, 156 mem: Arc<dyn BackingMemory + Send + Sync>, 157 mem_offsets: &'a [MemRegion], 158 ) -> AsyncResult<usize> { 159 let op = self.registered_source.start_write_from_mem( 160 file_offset.unwrap_or(0), 161 mem, 162 mem_offsets, 163 )?; 164 let len = op.await?; 165 Ok(len as usize) 166 } 167 168 /// See `fallocate(2)`. Note this op is synchronous when using the Polled backend. fallocate(&self, file_offset: u64, len: u64, mode: u32) -> AsyncResult<()>169 async fn fallocate(&self, file_offset: u64, len: u64, mode: u32) -> AsyncResult<()> { 170 let op = self 171 .registered_source 172 .start_fallocate(file_offset, len, mode)?; 173 let _ = op.await?; 174 Ok(()) 175 } 176 177 /// Sync all completed write operations to the backing storage. fsync(&self) -> AsyncResult<()>178 async fn fsync(&self) -> AsyncResult<()> { 179 let op = self.registered_source.start_fsync()?; 180 let _ = op.await?; 181 Ok(()) 182 } 183 } 184 185 #[async_trait(?Send)] 186 impl<F: AsRawFd> super::IoSourceExt<F> for UringSource<F> { 187 /// Yields the underlying IO source. into_source(self: Box<Self>) -> F188 fn into_source(self: Box<Self>) -> F { 189 self.source 190 } 191 192 /// Provides a mutable ref to the underlying IO source. as_source(&self) -> &F193 fn as_source(&self) -> &F { 194 &self.source 195 } 196 197 /// Provides a ref to the underlying IO source. as_source_mut(&mut self) -> &mut F198 fn as_source_mut(&mut self) -> &mut F { 199 &mut self.source 200 } 201 } 202 203 impl<F: AsRawFd> Deref for UringSource<F> { 204 type Target = F; 205 deref(&self) -> &Self::Target206 fn deref(&self) -> &Self::Target { 207 &self.source 208 } 209 } 210 211 impl<F: AsRawFd> DerefMut for UringSource<F> { deref_mut(&mut self) -> &mut Self::Target212 fn deref_mut(&mut self) -> &mut Self::Target { 213 &mut self.source 214 } 215 } 216 217 #[cfg(test)] 218 mod tests { 219 use std::{ 220 fs::{File, OpenOptions}, 221 os::unix::io::AsRawFd, 222 path::PathBuf, 223 }; 224 225 use super::super::{ 226 io_ext::{ReadAsync, WriteAsync}, 227 uring_executor::use_uring, 228 UringSource, 229 }; 230 231 use super::*; 232 233 #[test] read_to_mem()234 fn read_to_mem() { 235 if !use_uring() { 236 return; 237 } 238 239 use super::super::mem::VecIoWrapper; 240 use std::io::Write; 241 use tempfile::tempfile; 242 243 let ex = URingExecutor::new().unwrap(); 244 // Use guest memory as a test file, it implements AsRawFd. 245 let mut source = tempfile().unwrap(); 246 let data = vec![0x55; 8192]; 247 source.write_all(&data).unwrap(); 248 249 let io_obj = UringSource::new(source, &ex).unwrap(); 250 251 // Start with memory filled with 0x44s. 252 let buf: Arc<VecIoWrapper> = Arc::new(VecIoWrapper::from(vec![0x44; 8192])); 253 254 let fut = io_obj.read_to_mem( 255 None, 256 Arc::<VecIoWrapper>::clone(&buf), 257 &[MemRegion { 258 offset: 0, 259 len: 8192, 260 }], 261 ); 262 assert_eq!(8192, ex.run_until(fut).unwrap().unwrap()); 263 let vec: Vec<u8> = match Arc::try_unwrap(buf) { 264 Ok(v) => v.into(), 265 Err(_) => panic!("Too many vec refs"), 266 }; 267 assert!(vec.iter().all(|&b| b == 0x55)); 268 } 269 270 #[test] readvec()271 fn readvec() { 272 if !use_uring() { 273 return; 274 } 275 276 async fn go(ex: &URingExecutor) { 277 let f = File::open("/dev/zero").unwrap(); 278 let source = UringSource::new(f, ex).unwrap(); 279 let v = vec![0x55u8; 32]; 280 let v_ptr = v.as_ptr(); 281 let ret = source.read_to_vec(None, v).await.unwrap(); 282 assert_eq!(ret.0, 32); 283 let ret_v = ret.1; 284 assert_eq!(v_ptr, ret_v.as_ptr()); 285 assert!(ret_v.iter().all(|&b| b == 0)); 286 } 287 288 let ex = URingExecutor::new().unwrap(); 289 ex.run_until(go(&ex)).unwrap(); 290 } 291 292 #[test] readmulti()293 fn readmulti() { 294 if !use_uring() { 295 return; 296 } 297 298 async fn go(ex: &URingExecutor) { 299 let f = File::open("/dev/zero").unwrap(); 300 let source = UringSource::new(f, ex).unwrap(); 301 let v = vec![0x55u8; 32]; 302 let v2 = vec![0x55u8; 32]; 303 let (ret, ret2) = futures::future::join( 304 source.read_to_vec(None, v), 305 source.read_to_vec(Some(32), v2), 306 ) 307 .await; 308 309 assert!(ret.unwrap().1.iter().all(|&b| b == 0)); 310 assert!(ret2.unwrap().1.iter().all(|&b| b == 0)); 311 } 312 313 let ex = URingExecutor::new().unwrap(); 314 ex.run_until(go(&ex)).unwrap(); 315 } 316 read_u64<T: AsRawFd>(source: &UringSource<T>) -> u64317 async fn read_u64<T: AsRawFd>(source: &UringSource<T>) -> u64 { 318 // Init a vec that translates to u64::max; 319 let u64_mem = vec![0xffu8; std::mem::size_of::<u64>()]; 320 let (ret, u64_mem) = source.read_to_vec(None, u64_mem).await.unwrap(); 321 assert_eq!(ret as usize, std::mem::size_of::<u64>()); 322 let mut val = 0u64.to_ne_bytes(); 323 val.copy_from_slice(&u64_mem); 324 u64::from_ne_bytes(val) 325 } 326 327 #[test] u64_from_file()328 fn u64_from_file() { 329 if !use_uring() { 330 return; 331 } 332 333 let f = File::open("/dev/zero").unwrap(); 334 let ex = URingExecutor::new().unwrap(); 335 let source = UringSource::new(f, &ex).unwrap(); 336 337 assert_eq!(0u64, ex.run_until(read_u64(&source)).unwrap()); 338 } 339 340 #[test] event()341 fn event() { 342 if !use_uring() { 343 return; 344 } 345 346 use base::EventFd; 347 348 async fn write_event(ev: EventFd, wait: EventFd, ex: &URingExecutor) { 349 let wait = UringSource::new(wait, ex).unwrap(); 350 ev.write(55).unwrap(); 351 read_u64(&wait).await; 352 ev.write(66).unwrap(); 353 read_u64(&wait).await; 354 ev.write(77).unwrap(); 355 read_u64(&wait).await; 356 } 357 358 async fn read_events(ev: EventFd, signal: EventFd, ex: &URingExecutor) { 359 let source = UringSource::new(ev, ex).unwrap(); 360 assert_eq!(read_u64(&source).await, 55); 361 signal.write(1).unwrap(); 362 assert_eq!(read_u64(&source).await, 66); 363 signal.write(1).unwrap(); 364 assert_eq!(read_u64(&source).await, 77); 365 signal.write(1).unwrap(); 366 } 367 368 let event = EventFd::new().unwrap(); 369 let signal_wait = EventFd::new().unwrap(); 370 let ex = URingExecutor::new().unwrap(); 371 let write_task = write_event( 372 event.try_clone().unwrap(), 373 signal_wait.try_clone().unwrap(), 374 &ex, 375 ); 376 let read_task = read_events(event, signal_wait, &ex); 377 ex.run_until(futures::future::join(read_task, write_task)) 378 .unwrap(); 379 } 380 381 #[test] pend_on_pipe()382 fn pend_on_pipe() { 383 if !use_uring() { 384 return; 385 } 386 387 use std::io::Write; 388 389 use futures::future::Either; 390 391 async fn do_test(ex: &URingExecutor) { 392 let (read_source, mut w) = base::pipe(true).unwrap(); 393 let source = UringSource::new(read_source, ex).unwrap(); 394 let done = Box::pin(async { 5usize }); 395 let pending = Box::pin(read_u64(&source)); 396 match futures::future::select(pending, done).await { 397 Either::Right((5, pending)) => { 398 // Write to the pipe so that the kernel will release the memory associated with 399 // the uring read operation. 400 w.write_all(&[0]).expect("failed to write to pipe"); 401 ::std::mem::drop(pending); 402 } 403 _ => panic!("unexpected select result"), 404 }; 405 } 406 407 let ex = URingExecutor::new().unwrap(); 408 ex.run_until(do_test(&ex)).unwrap(); 409 } 410 411 #[test] readmem()412 fn readmem() { 413 if !use_uring() { 414 return; 415 } 416 417 async fn go(ex: &URingExecutor) { 418 let f = File::open("/dev/zero").unwrap(); 419 let source = UringSource::new(f, ex).unwrap(); 420 let v = vec![0x55u8; 64]; 421 let vw = Arc::new(VecIoWrapper::from(v)); 422 let ret = source 423 .read_to_mem( 424 None, 425 Arc::<VecIoWrapper>::clone(&vw), 426 &[MemRegion { offset: 0, len: 32 }], 427 ) 428 .await 429 .unwrap(); 430 assert_eq!(32, ret); 431 let vec: Vec<u8> = match Arc::try_unwrap(vw) { 432 Ok(v) => v.into(), 433 Err(_) => panic!("Too many vec refs"), 434 }; 435 assert!(vec.iter().take(32).all(|&b| b == 0)); 436 assert!(vec.iter().skip(32).all(|&b| b == 0x55)); 437 438 // test second half of memory too. 439 let v = vec![0x55u8; 64]; 440 let vw = Arc::new(VecIoWrapper::from(v)); 441 let ret = source 442 .read_to_mem( 443 None, 444 Arc::<VecIoWrapper>::clone(&vw), 445 &[MemRegion { 446 offset: 32, 447 len: 32, 448 }], 449 ) 450 .await 451 .unwrap(); 452 assert_eq!(32, ret); 453 let v: Vec<u8> = match Arc::try_unwrap(vw) { 454 Ok(v) => v.into(), 455 Err(_) => panic!("Too many vec refs"), 456 }; 457 assert!(v.iter().take(32).all(|&b| b == 0x55)); 458 assert!(v.iter().skip(32).all(|&b| b == 0)); 459 } 460 461 let ex = URingExecutor::new().unwrap(); 462 ex.run_until(go(&ex)).unwrap(); 463 } 464 465 #[test] range_error()466 fn range_error() { 467 if !use_uring() { 468 return; 469 } 470 471 async fn go(ex: &URingExecutor) { 472 let f = File::open("/dev/zero").unwrap(); 473 let source = UringSource::new(f, ex).unwrap(); 474 let v = vec![0x55u8; 64]; 475 let vw = Arc::new(VecIoWrapper::from(v)); 476 let ret = source 477 .read_to_mem( 478 None, 479 Arc::<VecIoWrapper>::clone(&vw), 480 &[MemRegion { 481 offset: 32, 482 len: 33, 483 }], 484 ) 485 .await; 486 assert!(ret.is_err()); 487 } 488 489 let ex = URingExecutor::new().unwrap(); 490 ex.run_until(go(&ex)).unwrap(); 491 } 492 493 #[test] fallocate()494 fn fallocate() { 495 if !use_uring() { 496 return; 497 } 498 499 async fn go(ex: &URingExecutor) { 500 let dir = tempfile::TempDir::new().unwrap(); 501 let mut file_path = PathBuf::from(dir.path()); 502 file_path.push("test"); 503 504 let f = OpenOptions::new() 505 .create(true) 506 .write(true) 507 .open(&file_path) 508 .unwrap(); 509 let source = UringSource::new(f, ex).unwrap(); 510 if let Err(e) = source.fallocate(0, 4096, 0).await { 511 match e { 512 super::super::io_ext::Error::Uring( 513 super::super::uring_executor::Error::Io(io_err), 514 ) => { 515 if io_err.kind() == std::io::ErrorKind::InvalidInput { 516 // Skip the test on kernels before fallocate support. 517 return; 518 } 519 } 520 _ => panic!("Unexpected uring error on fallocate: {}", e), 521 } 522 } 523 524 let meta_data = std::fs::metadata(&file_path).unwrap(); 525 assert_eq!(meta_data.len(), 4096); 526 } 527 528 let ex = URingExecutor::new().unwrap(); 529 ex.run_until(go(&ex)).unwrap(); 530 } 531 532 #[test] fsync()533 fn fsync() { 534 if !use_uring() { 535 return; 536 } 537 538 async fn go(ex: &URingExecutor) { 539 let f = tempfile::tempfile().unwrap(); 540 let source = UringSource::new(f, ex).unwrap(); 541 source.fsync().await.unwrap(); 542 } 543 544 let ex = URingExecutor::new().unwrap(); 545 ex.run_until(go(&ex)).unwrap(); 546 } 547 548 #[test] wait_read()549 fn wait_read() { 550 if !use_uring() { 551 return; 552 } 553 554 async fn go(ex: &URingExecutor) { 555 let f = File::open("/dev/zero").unwrap(); 556 let source = UringSource::new(f, ex).unwrap(); 557 source.wait_readable().await.unwrap(); 558 } 559 560 let ex = URingExecutor::new().unwrap(); 561 ex.run_until(go(&ex)).unwrap(); 562 } 563 564 #[test] writemem()565 fn writemem() { 566 if !use_uring() { 567 return; 568 } 569 570 async fn go(ex: &URingExecutor) { 571 let f = OpenOptions::new() 572 .create(true) 573 .write(true) 574 .open("/tmp/write_from_vec") 575 .unwrap(); 576 let source = UringSource::new(f, ex).unwrap(); 577 let v = vec![0x55u8; 64]; 578 let vw = Arc::new(super::super::mem::VecIoWrapper::from(v)); 579 let ret = source 580 .write_from_mem(None, vw, &[MemRegion { offset: 0, len: 32 }]) 581 .await 582 .unwrap(); 583 assert_eq!(32, ret); 584 } 585 586 let ex = URingExecutor::new().unwrap(); 587 ex.run_until(go(&ex)).unwrap(); 588 } 589 590 #[test] writevec()591 fn writevec() { 592 if !use_uring() { 593 return; 594 } 595 596 async fn go(ex: &URingExecutor) { 597 let f = OpenOptions::new() 598 .create(true) 599 .truncate(true) 600 .write(true) 601 .open("/tmp/write_from_vec") 602 .unwrap(); 603 let source = UringSource::new(f, ex).unwrap(); 604 let v = vec![0x55u8; 32]; 605 let v_ptr = v.as_ptr(); 606 let (ret, ret_v) = source.write_from_vec(None, v).await.unwrap(); 607 assert_eq!(32, ret); 608 assert_eq!(v_ptr, ret_v.as_ptr()); 609 } 610 611 let ex = URingExecutor::new().unwrap(); 612 ex.run_until(go(&ex)).unwrap(); 613 } 614 615 #[test] writemulti()616 fn writemulti() { 617 if !use_uring() { 618 return; 619 } 620 621 async fn go(ex: &URingExecutor) { 622 let f = OpenOptions::new() 623 .create(true) 624 .truncate(true) 625 .write(true) 626 .open("/tmp/write_from_vec") 627 .unwrap(); 628 let source = UringSource::new(f, ex).unwrap(); 629 let v = vec![0x55u8; 32]; 630 let v2 = vec![0x55u8; 32]; 631 let (r, r2) = futures::future::join( 632 source.write_from_vec(None, v), 633 source.write_from_vec(Some(32), v2), 634 ) 635 .await; 636 assert_eq!(32, r.unwrap().0); 637 assert_eq!(32, r2.unwrap().0); 638 } 639 640 let ex = URingExecutor::new().unwrap(); 641 ex.run_until(go(&ex)).unwrap(); 642 } 643 } 644