• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2020 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::{
6     convert::TryInto,
7     io,
8     ops::{Deref, DerefMut},
9     os::unix::io::AsRawFd,
10     sync::Arc,
11 };
12 
13 use async_trait::async_trait;
14 
15 use super::{
16     mem::{BackingMemory, MemRegion, VecIoWrapper},
17     uring_executor::{Error, RegisteredSource, Result, URingExecutor},
18     AsyncError, AsyncResult,
19 };
20 
21 /// `UringSource` wraps FD backed IO sources for use with io_uring. It is a thin wrapper around
22 /// registering an IO source with the uring that provides an `IoSource` implementation.
23 /// Most useful functions are provided by 'IoSourceExt'.
24 pub struct UringSource<F: AsRawFd> {
25     registered_source: RegisteredSource,
26     source: F,
27 }
28 
29 impl<F: AsRawFd> UringSource<F> {
30     /// Creates a new `UringSource` that wraps the given `io_source` object.
new(io_source: F, ex: &URingExecutor) -> Result<UringSource<F>>31     pub fn new(io_source: F, ex: &URingExecutor) -> Result<UringSource<F>> {
32         let r = ex.register_source(&io_source)?;
33         Ok(UringSource {
34             registered_source: r,
35             source: io_source,
36         })
37     }
38 
39     /// Consume `self` and return the object used to create it.
into_source(self) -> F40     pub fn into_source(self) -> F {
41         self.source
42     }
43 }
44 
45 #[async_trait(?Send)]
46 impl<F: AsRawFd> super::ReadAsync for UringSource<F> {
47     /// Reads from the iosource at `file_offset` and fill the given `vec`.
read_to_vec<'a>( &'a self, file_offset: Option<u64>, vec: Vec<u8>, ) -> AsyncResult<(usize, Vec<u8>)>48     async fn read_to_vec<'a>(
49         &'a self,
50         file_offset: Option<u64>,
51         vec: Vec<u8>,
52     ) -> AsyncResult<(usize, Vec<u8>)> {
53         let buf = Arc::new(VecIoWrapper::from(vec));
54         let op = self.registered_source.start_read_to_mem(
55             file_offset.unwrap_or(0),
56             buf.clone(),
57             &[MemRegion {
58                 offset: 0,
59                 len: buf.len(),
60             }],
61         )?;
62         let len = op.await?;
63         let bytes = if let Ok(v) = Arc::try_unwrap(buf) {
64             v.into()
65         } else {
66             panic!("too many refs on buf");
67         };
68 
69         Ok((len as usize, bytes))
70     }
71 
72     /// Wait for the FD of `self` to be readable.
wait_readable(&self) -> AsyncResult<()>73     async fn wait_readable(&self) -> AsyncResult<()> {
74         let op = self.registered_source.poll_fd_readable()?;
75         op.await?;
76         Ok(())
77     }
78 
79     /// Reads a single u64 (e.g. from an eventfd).
read_u64(&self) -> AsyncResult<u64>80     async fn read_u64(&self) -> AsyncResult<u64> {
81         // This doesn't just forward to read_to_vec to avoid an unnecessary extra allocation from
82         // async-trait.
83         let buf = Arc::new(VecIoWrapper::from(0u64.to_ne_bytes().to_vec()));
84         let op = self.registered_source.start_read_to_mem(
85             0,
86             buf.clone(),
87             &[MemRegion {
88                 offset: 0,
89                 len: buf.len(),
90             }],
91         )?;
92         let len = op.await?;
93         if len != buf.len() as u32 {
94             Err(AsyncError::Uring(Error::Io(io::Error::new(
95                 io::ErrorKind::Other,
96                 format!("expected to read {} bytes, but read {}", buf.len(), len),
97             ))))
98         } else {
99             let bytes: Vec<u8> = if let Ok(v) = Arc::try_unwrap(buf) {
100                 v.into()
101             } else {
102                 panic!("too many refs on buf");
103             };
104 
105             // Will never panic because bytes is of the appropriate size.
106             Ok(u64::from_ne_bytes(bytes[..].try_into().unwrap()))
107         }
108     }
109 
110     /// Reads to the given `mem` at the given offsets from the file starting at `file_offset`.
read_to_mem<'a>( &'a self, file_offset: Option<u64>, mem: Arc<dyn BackingMemory + Send + Sync>, mem_offsets: &'a [MemRegion], ) -> AsyncResult<usize>111     async fn read_to_mem<'a>(
112         &'a self,
113         file_offset: Option<u64>,
114         mem: Arc<dyn BackingMemory + Send + Sync>,
115         mem_offsets: &'a [MemRegion],
116     ) -> AsyncResult<usize> {
117         let op =
118             self.registered_source
119                 .start_read_to_mem(file_offset.unwrap_or(0), mem, mem_offsets)?;
120         let len = op.await?;
121         Ok(len as usize)
122     }
123 }
124 
125 #[async_trait(?Send)]
126 impl<F: AsRawFd> super::WriteAsync for UringSource<F> {
127     /// Writes from the given `vec` to the file starting at `file_offset`.
write_from_vec<'a>( &'a self, file_offset: Option<u64>, vec: Vec<u8>, ) -> AsyncResult<(usize, Vec<u8>)>128     async fn write_from_vec<'a>(
129         &'a self,
130         file_offset: Option<u64>,
131         vec: Vec<u8>,
132     ) -> AsyncResult<(usize, Vec<u8>)> {
133         let buf = Arc::new(VecIoWrapper::from(vec));
134         let op = self.registered_source.start_write_from_mem(
135             file_offset.unwrap_or(0),
136             buf.clone(),
137             &[MemRegion {
138                 offset: 0,
139                 len: buf.len(),
140             }],
141         )?;
142         let len = op.await?;
143         let bytes = if let Ok(v) = Arc::try_unwrap(buf) {
144             v.into()
145         } else {
146             panic!("too many refs on buf");
147         };
148 
149         Ok((len as usize, bytes))
150     }
151 
152     /// Writes from the given `mem` from the given offsets to the file starting at `file_offset`.
write_from_mem<'a>( &'a self, file_offset: Option<u64>, mem: Arc<dyn BackingMemory + Send + Sync>, mem_offsets: &'a [MemRegion], ) -> AsyncResult<usize>153     async fn write_from_mem<'a>(
154         &'a self,
155         file_offset: Option<u64>,
156         mem: Arc<dyn BackingMemory + Send + Sync>,
157         mem_offsets: &'a [MemRegion],
158     ) -> AsyncResult<usize> {
159         let op = self.registered_source.start_write_from_mem(
160             file_offset.unwrap_or(0),
161             mem,
162             mem_offsets,
163         )?;
164         let len = op.await?;
165         Ok(len as usize)
166     }
167 
168     /// See `fallocate(2)`. Note this op is synchronous when using the Polled backend.
fallocate(&self, file_offset: u64, len: u64, mode: u32) -> AsyncResult<()>169     async fn fallocate(&self, file_offset: u64, len: u64, mode: u32) -> AsyncResult<()> {
170         let op = self
171             .registered_source
172             .start_fallocate(file_offset, len, mode)?;
173         let _ = op.await?;
174         Ok(())
175     }
176 
177     /// Sync all completed write operations to the backing storage.
fsync(&self) -> AsyncResult<()>178     async fn fsync(&self) -> AsyncResult<()> {
179         let op = self.registered_source.start_fsync()?;
180         let _ = op.await?;
181         Ok(())
182     }
183 }
184 
185 #[async_trait(?Send)]
186 impl<F: AsRawFd> super::IoSourceExt<F> for UringSource<F> {
187     /// Yields the underlying IO source.
into_source(self: Box<Self>) -> F188     fn into_source(self: Box<Self>) -> F {
189         self.source
190     }
191 
192     /// Provides a mutable ref to the underlying IO source.
as_source(&self) -> &F193     fn as_source(&self) -> &F {
194         &self.source
195     }
196 
197     /// Provides a ref to the underlying IO source.
as_source_mut(&mut self) -> &mut F198     fn as_source_mut(&mut self) -> &mut F {
199         &mut self.source
200     }
201 }
202 
203 impl<F: AsRawFd> Deref for UringSource<F> {
204     type Target = F;
205 
deref(&self) -> &Self::Target206     fn deref(&self) -> &Self::Target {
207         &self.source
208     }
209 }
210 
211 impl<F: AsRawFd> DerefMut for UringSource<F> {
deref_mut(&mut self) -> &mut Self::Target212     fn deref_mut(&mut self) -> &mut Self::Target {
213         &mut self.source
214     }
215 }
216 
217 #[cfg(test)]
218 mod tests {
219     use std::{
220         fs::{File, OpenOptions},
221         os::unix::io::AsRawFd,
222         path::PathBuf,
223     };
224 
225     use super::super::{
226         io_ext::{ReadAsync, WriteAsync},
227         uring_executor::use_uring,
228         UringSource,
229     };
230 
231     use super::*;
232 
233     #[test]
read_to_mem()234     fn read_to_mem() {
235         if !use_uring() {
236             return;
237         }
238 
239         use super::super::mem::VecIoWrapper;
240         use std::io::Write;
241         use tempfile::tempfile;
242 
243         let ex = URingExecutor::new().unwrap();
244         // Use guest memory as a test file, it implements AsRawFd.
245         let mut source = tempfile().unwrap();
246         let data = vec![0x55; 8192];
247         source.write_all(&data).unwrap();
248 
249         let io_obj = UringSource::new(source, &ex).unwrap();
250 
251         // Start with memory filled with 0x44s.
252         let buf: Arc<VecIoWrapper> = Arc::new(VecIoWrapper::from(vec![0x44; 8192]));
253 
254         let fut = io_obj.read_to_mem(
255             None,
256             Arc::<VecIoWrapper>::clone(&buf),
257             &[MemRegion {
258                 offset: 0,
259                 len: 8192,
260             }],
261         );
262         assert_eq!(8192, ex.run_until(fut).unwrap().unwrap());
263         let vec: Vec<u8> = match Arc::try_unwrap(buf) {
264             Ok(v) => v.into(),
265             Err(_) => panic!("Too many vec refs"),
266         };
267         assert!(vec.iter().all(|&b| b == 0x55));
268     }
269 
270     #[test]
readvec()271     fn readvec() {
272         if !use_uring() {
273             return;
274         }
275 
276         async fn go(ex: &URingExecutor) {
277             let f = File::open("/dev/zero").unwrap();
278             let source = UringSource::new(f, ex).unwrap();
279             let v = vec![0x55u8; 32];
280             let v_ptr = v.as_ptr();
281             let ret = source.read_to_vec(None, v).await.unwrap();
282             assert_eq!(ret.0, 32);
283             let ret_v = ret.1;
284             assert_eq!(v_ptr, ret_v.as_ptr());
285             assert!(ret_v.iter().all(|&b| b == 0));
286         }
287 
288         let ex = URingExecutor::new().unwrap();
289         ex.run_until(go(&ex)).unwrap();
290     }
291 
292     #[test]
readmulti()293     fn readmulti() {
294         if !use_uring() {
295             return;
296         }
297 
298         async fn go(ex: &URingExecutor) {
299             let f = File::open("/dev/zero").unwrap();
300             let source = UringSource::new(f, ex).unwrap();
301             let v = vec![0x55u8; 32];
302             let v2 = vec![0x55u8; 32];
303             let (ret, ret2) = futures::future::join(
304                 source.read_to_vec(None, v),
305                 source.read_to_vec(Some(32), v2),
306             )
307             .await;
308 
309             assert!(ret.unwrap().1.iter().all(|&b| b == 0));
310             assert!(ret2.unwrap().1.iter().all(|&b| b == 0));
311         }
312 
313         let ex = URingExecutor::new().unwrap();
314         ex.run_until(go(&ex)).unwrap();
315     }
316 
read_u64<T: AsRawFd>(source: &UringSource<T>) -> u64317     async fn read_u64<T: AsRawFd>(source: &UringSource<T>) -> u64 {
318         // Init a vec that translates to u64::max;
319         let u64_mem = vec![0xffu8; std::mem::size_of::<u64>()];
320         let (ret, u64_mem) = source.read_to_vec(None, u64_mem).await.unwrap();
321         assert_eq!(ret as usize, std::mem::size_of::<u64>());
322         let mut val = 0u64.to_ne_bytes();
323         val.copy_from_slice(&u64_mem);
324         u64::from_ne_bytes(val)
325     }
326 
327     #[test]
u64_from_file()328     fn u64_from_file() {
329         if !use_uring() {
330             return;
331         }
332 
333         let f = File::open("/dev/zero").unwrap();
334         let ex = URingExecutor::new().unwrap();
335         let source = UringSource::new(f, &ex).unwrap();
336 
337         assert_eq!(0u64, ex.run_until(read_u64(&source)).unwrap());
338     }
339 
340     #[test]
event()341     fn event() {
342         if !use_uring() {
343             return;
344         }
345 
346         use base::EventFd;
347 
348         async fn write_event(ev: EventFd, wait: EventFd, ex: &URingExecutor) {
349             let wait = UringSource::new(wait, ex).unwrap();
350             ev.write(55).unwrap();
351             read_u64(&wait).await;
352             ev.write(66).unwrap();
353             read_u64(&wait).await;
354             ev.write(77).unwrap();
355             read_u64(&wait).await;
356         }
357 
358         async fn read_events(ev: EventFd, signal: EventFd, ex: &URingExecutor) {
359             let source = UringSource::new(ev, ex).unwrap();
360             assert_eq!(read_u64(&source).await, 55);
361             signal.write(1).unwrap();
362             assert_eq!(read_u64(&source).await, 66);
363             signal.write(1).unwrap();
364             assert_eq!(read_u64(&source).await, 77);
365             signal.write(1).unwrap();
366         }
367 
368         let event = EventFd::new().unwrap();
369         let signal_wait = EventFd::new().unwrap();
370         let ex = URingExecutor::new().unwrap();
371         let write_task = write_event(
372             event.try_clone().unwrap(),
373             signal_wait.try_clone().unwrap(),
374             &ex,
375         );
376         let read_task = read_events(event, signal_wait, &ex);
377         ex.run_until(futures::future::join(read_task, write_task))
378             .unwrap();
379     }
380 
381     #[test]
pend_on_pipe()382     fn pend_on_pipe() {
383         if !use_uring() {
384             return;
385         }
386 
387         use std::io::Write;
388 
389         use futures::future::Either;
390 
391         async fn do_test(ex: &URingExecutor) {
392             let (read_source, mut w) = base::pipe(true).unwrap();
393             let source = UringSource::new(read_source, ex).unwrap();
394             let done = Box::pin(async { 5usize });
395             let pending = Box::pin(read_u64(&source));
396             match futures::future::select(pending, done).await {
397                 Either::Right((5, pending)) => {
398                     // Write to the pipe so that the kernel will release the memory associated with
399                     // the uring read operation.
400                     w.write_all(&[0]).expect("failed to write to pipe");
401                     ::std::mem::drop(pending);
402                 }
403                 _ => panic!("unexpected select result"),
404             };
405         }
406 
407         let ex = URingExecutor::new().unwrap();
408         ex.run_until(do_test(&ex)).unwrap();
409     }
410 
411     #[test]
readmem()412     fn readmem() {
413         if !use_uring() {
414             return;
415         }
416 
417         async fn go(ex: &URingExecutor) {
418             let f = File::open("/dev/zero").unwrap();
419             let source = UringSource::new(f, ex).unwrap();
420             let v = vec![0x55u8; 64];
421             let vw = Arc::new(VecIoWrapper::from(v));
422             let ret = source
423                 .read_to_mem(
424                     None,
425                     Arc::<VecIoWrapper>::clone(&vw),
426                     &[MemRegion { offset: 0, len: 32 }],
427                 )
428                 .await
429                 .unwrap();
430             assert_eq!(32, ret);
431             let vec: Vec<u8> = match Arc::try_unwrap(vw) {
432                 Ok(v) => v.into(),
433                 Err(_) => panic!("Too many vec refs"),
434             };
435             assert!(vec.iter().take(32).all(|&b| b == 0));
436             assert!(vec.iter().skip(32).all(|&b| b == 0x55));
437 
438             // test second half of memory too.
439             let v = vec![0x55u8; 64];
440             let vw = Arc::new(VecIoWrapper::from(v));
441             let ret = source
442                 .read_to_mem(
443                     None,
444                     Arc::<VecIoWrapper>::clone(&vw),
445                     &[MemRegion {
446                         offset: 32,
447                         len: 32,
448                     }],
449                 )
450                 .await
451                 .unwrap();
452             assert_eq!(32, ret);
453             let v: Vec<u8> = match Arc::try_unwrap(vw) {
454                 Ok(v) => v.into(),
455                 Err(_) => panic!("Too many vec refs"),
456             };
457             assert!(v.iter().take(32).all(|&b| b == 0x55));
458             assert!(v.iter().skip(32).all(|&b| b == 0));
459         }
460 
461         let ex = URingExecutor::new().unwrap();
462         ex.run_until(go(&ex)).unwrap();
463     }
464 
465     #[test]
range_error()466     fn range_error() {
467         if !use_uring() {
468             return;
469         }
470 
471         async fn go(ex: &URingExecutor) {
472             let f = File::open("/dev/zero").unwrap();
473             let source = UringSource::new(f, ex).unwrap();
474             let v = vec![0x55u8; 64];
475             let vw = Arc::new(VecIoWrapper::from(v));
476             let ret = source
477                 .read_to_mem(
478                     None,
479                     Arc::<VecIoWrapper>::clone(&vw),
480                     &[MemRegion {
481                         offset: 32,
482                         len: 33,
483                     }],
484                 )
485                 .await;
486             assert!(ret.is_err());
487         }
488 
489         let ex = URingExecutor::new().unwrap();
490         ex.run_until(go(&ex)).unwrap();
491     }
492 
493     #[test]
fallocate()494     fn fallocate() {
495         if !use_uring() {
496             return;
497         }
498 
499         async fn go(ex: &URingExecutor) {
500             let dir = tempfile::TempDir::new().unwrap();
501             let mut file_path = PathBuf::from(dir.path());
502             file_path.push("test");
503 
504             let f = OpenOptions::new()
505                 .create(true)
506                 .write(true)
507                 .open(&file_path)
508                 .unwrap();
509             let source = UringSource::new(f, ex).unwrap();
510             if let Err(e) = source.fallocate(0, 4096, 0).await {
511                 match e {
512                     super::super::io_ext::Error::Uring(
513                         super::super::uring_executor::Error::Io(io_err),
514                     ) => {
515                         if io_err.kind() == std::io::ErrorKind::InvalidInput {
516                             // Skip the test on kernels before fallocate support.
517                             return;
518                         }
519                     }
520                     _ => panic!("Unexpected uring error on fallocate: {}", e),
521                 }
522             }
523 
524             let meta_data = std::fs::metadata(&file_path).unwrap();
525             assert_eq!(meta_data.len(), 4096);
526         }
527 
528         let ex = URingExecutor::new().unwrap();
529         ex.run_until(go(&ex)).unwrap();
530     }
531 
532     #[test]
fsync()533     fn fsync() {
534         if !use_uring() {
535             return;
536         }
537 
538         async fn go(ex: &URingExecutor) {
539             let f = tempfile::tempfile().unwrap();
540             let source = UringSource::new(f, ex).unwrap();
541             source.fsync().await.unwrap();
542         }
543 
544         let ex = URingExecutor::new().unwrap();
545         ex.run_until(go(&ex)).unwrap();
546     }
547 
548     #[test]
wait_read()549     fn wait_read() {
550         if !use_uring() {
551             return;
552         }
553 
554         async fn go(ex: &URingExecutor) {
555             let f = File::open("/dev/zero").unwrap();
556             let source = UringSource::new(f, ex).unwrap();
557             source.wait_readable().await.unwrap();
558         }
559 
560         let ex = URingExecutor::new().unwrap();
561         ex.run_until(go(&ex)).unwrap();
562     }
563 
564     #[test]
writemem()565     fn writemem() {
566         if !use_uring() {
567             return;
568         }
569 
570         async fn go(ex: &URingExecutor) {
571             let f = OpenOptions::new()
572                 .create(true)
573                 .write(true)
574                 .open("/tmp/write_from_vec")
575                 .unwrap();
576             let source = UringSource::new(f, ex).unwrap();
577             let v = vec![0x55u8; 64];
578             let vw = Arc::new(super::super::mem::VecIoWrapper::from(v));
579             let ret = source
580                 .write_from_mem(None, vw, &[MemRegion { offset: 0, len: 32 }])
581                 .await
582                 .unwrap();
583             assert_eq!(32, ret);
584         }
585 
586         let ex = URingExecutor::new().unwrap();
587         ex.run_until(go(&ex)).unwrap();
588     }
589 
590     #[test]
writevec()591     fn writevec() {
592         if !use_uring() {
593             return;
594         }
595 
596         async fn go(ex: &URingExecutor) {
597             let f = OpenOptions::new()
598                 .create(true)
599                 .truncate(true)
600                 .write(true)
601                 .open("/tmp/write_from_vec")
602                 .unwrap();
603             let source = UringSource::new(f, ex).unwrap();
604             let v = vec![0x55u8; 32];
605             let v_ptr = v.as_ptr();
606             let (ret, ret_v) = source.write_from_vec(None, v).await.unwrap();
607             assert_eq!(32, ret);
608             assert_eq!(v_ptr, ret_v.as_ptr());
609         }
610 
611         let ex = URingExecutor::new().unwrap();
612         ex.run_until(go(&ex)).unwrap();
613     }
614 
615     #[test]
writemulti()616     fn writemulti() {
617         if !use_uring() {
618             return;
619         }
620 
621         async fn go(ex: &URingExecutor) {
622             let f = OpenOptions::new()
623                 .create(true)
624                 .truncate(true)
625                 .write(true)
626                 .open("/tmp/write_from_vec")
627                 .unwrap();
628             let source = UringSource::new(f, ex).unwrap();
629             let v = vec![0x55u8; 32];
630             let v2 = vec![0x55u8; 32];
631             let (r, r2) = futures::future::join(
632                 source.write_from_vec(None, v),
633                 source.write_from_vec(Some(32), v2),
634             )
635             .await;
636             assert_eq!(32, r.unwrap().0);
637             assert_eq!(32, r2.unwrap().0);
638         }
639 
640         let ex = URingExecutor::new().unwrap();
641         ex.run_until(go(&ex)).unwrap();
642     }
643 }
644