• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2020 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::convert::TryInto;
6 use std::io;
7 use std::ops::Deref;
8 use std::ops::DerefMut;
9 use std::sync::Arc;
10 
11 use base::AsRawDescriptor;
12 
13 use super::uring_executor::Error;
14 use super::uring_executor::RegisteredSource;
15 use super::uring_executor::Result;
16 use super::uring_executor::URingExecutor;
17 use crate::mem::BackingMemory;
18 use crate::mem::MemRegion;
19 use crate::mem::VecIoWrapper;
20 use crate::AllocateMode;
21 use crate::AsyncError;
22 use crate::AsyncResult;
23 
24 /// `UringSource` wraps FD backed IO sources for use with io_uring. It is a thin wrapper around
25 /// registering an IO source with the uring that provides an `IoSource` implementation.
26 pub struct UringSource<F: AsRawDescriptor> {
27     registered_source: RegisteredSource,
28     source: F,
29 }
30 
31 impl<F: AsRawDescriptor> UringSource<F> {
32     /// Creates a new `UringSource` that wraps the given `io_source` object.
new(io_source: F, ex: &URingExecutor) -> Result<UringSource<F>>33     pub fn new(io_source: F, ex: &URingExecutor) -> Result<UringSource<F>> {
34         let r = ex.register_source(&io_source)?;
35         Ok(UringSource {
36             registered_source: r,
37             source: io_source,
38         })
39     }
40 
41     /// Reads from the iosource at `file_offset` and fill the given `vec`.
read_to_vec( &self, file_offset: Option<u64>, vec: Vec<u8>, ) -> AsyncResult<(usize, Vec<u8>)>42     pub async fn read_to_vec(
43         &self,
44         file_offset: Option<u64>,
45         vec: Vec<u8>,
46     ) -> AsyncResult<(usize, Vec<u8>)> {
47         let buf = Arc::new(VecIoWrapper::from(vec));
48         let op = self.registered_source.start_read_to_mem(
49             file_offset,
50             buf.clone(),
51             &[MemRegion {
52                 offset: 0,
53                 len: buf.len(),
54             }],
55         )?;
56         let len = op.await?;
57         let bytes = if let Ok(v) = Arc::try_unwrap(buf) {
58             v.into()
59         } else {
60             panic!("too many refs on buf");
61         };
62 
63         Ok((len as usize, bytes))
64     }
65 
66     /// Wait for the FD of `self` to be readable.
wait_readable(&self) -> AsyncResult<()>67     pub async fn wait_readable(&self) -> AsyncResult<()> {
68         let op = self.registered_source.poll_fd_readable()?;
69         op.await?;
70         Ok(())
71     }
72 
73     /// Reads a single u64 (e.g. from an eventfd).
read_u64(&self) -> AsyncResult<u64>74     pub async fn read_u64(&self) -> AsyncResult<u64> {
75         // This doesn't just forward to read_to_vec to avoid an unnecessary extra allocation from
76         // async-trait.
77         let buf = Arc::new(VecIoWrapper::from(0u64.to_ne_bytes().to_vec()));
78         let op = self.registered_source.start_read_to_mem(
79             None,
80             buf.clone(),
81             &[MemRegion {
82                 offset: 0,
83                 len: buf.len(),
84             }],
85         )?;
86         let len = op.await?;
87         if len != buf.len() as u32 {
88             Err(AsyncError::Uring(Error::Io(io::Error::new(
89                 io::ErrorKind::Other,
90                 format!("expected to read {} bytes, but read {}", buf.len(), len),
91             ))))
92         } else {
93             let bytes: Vec<u8> = if let Ok(v) = Arc::try_unwrap(buf) {
94                 v.into()
95             } else {
96                 panic!("too many refs on buf");
97             };
98 
99             // Will never panic because bytes is of the appropriate size.
100             Ok(u64::from_ne_bytes(bytes[..].try_into().unwrap()))
101         }
102     }
103 
104     /// Reads to the given `mem` at the given offsets from the file starting at `file_offset`.
read_to_mem( &self, file_offset: Option<u64>, mem: Arc<dyn BackingMemory + Send + Sync>, mem_offsets: &[MemRegion], ) -> AsyncResult<usize>105     pub async fn read_to_mem(
106         &self,
107         file_offset: Option<u64>,
108         mem: Arc<dyn BackingMemory + Send + Sync>,
109         mem_offsets: &[MemRegion],
110     ) -> AsyncResult<usize> {
111         let op = self
112             .registered_source
113             .start_read_to_mem(file_offset, mem, mem_offsets)?;
114         let len = op.await?;
115         Ok(len as usize)
116     }
117 
118     /// Writes from the given `vec` to the file starting at `file_offset`.
write_from_vec( &self, file_offset: Option<u64>, vec: Vec<u8>, ) -> AsyncResult<(usize, Vec<u8>)>119     pub async fn write_from_vec(
120         &self,
121         file_offset: Option<u64>,
122         vec: Vec<u8>,
123     ) -> AsyncResult<(usize, Vec<u8>)> {
124         let buf = Arc::new(VecIoWrapper::from(vec));
125         let op = self.registered_source.start_write_from_mem(
126             file_offset,
127             buf.clone(),
128             &[MemRegion {
129                 offset: 0,
130                 len: buf.len(),
131             }],
132         )?;
133         let len = op.await?;
134         let bytes = if let Ok(v) = Arc::try_unwrap(buf) {
135             v.into()
136         } else {
137             panic!("too many refs on buf");
138         };
139 
140         Ok((len as usize, bytes))
141     }
142 
143     /// Writes from the given `mem` from the given offsets to the file starting at `file_offset`.
write_from_mem( &self, file_offset: Option<u64>, mem: Arc<dyn BackingMemory + Send + Sync>, mem_offsets: &[MemRegion], ) -> AsyncResult<usize>144     pub async fn write_from_mem(
145         &self,
146         file_offset: Option<u64>,
147         mem: Arc<dyn BackingMemory + Send + Sync>,
148         mem_offsets: &[MemRegion],
149     ) -> AsyncResult<usize> {
150         let op = self
151             .registered_source
152             .start_write_from_mem(file_offset, mem, mem_offsets)?;
153         let len = op.await?;
154         Ok(len as usize)
155     }
156 
157     /// See `fallocate(2)`. Note this op is synchronous when using the Polled backend.
fallocate( &self, file_offset: u64, len: u64, mode: AllocateMode, ) -> AsyncResult<()>158     pub async fn fallocate(
159         &self,
160         file_offset: u64,
161         len: u64,
162         mode: AllocateMode,
163     ) -> AsyncResult<()> {
164         let op = self
165             .registered_source
166             .start_fallocate(file_offset, len, mode.into())?;
167         let _ = op.await?;
168         Ok(())
169     }
170 
171     /// Sync all completed write operations to the backing storage.
fsync(&self) -> AsyncResult<()>172     pub async fn fsync(&self) -> AsyncResult<()> {
173         let op = self.registered_source.start_fsync()?;
174         let _ = op.await?;
175         Ok(())
176     }
177 
178     /// Yields the underlying IO source.
into_source(self) -> F179     pub fn into_source(self) -> F {
180         self.source
181     }
182 
183     /// Provides a mutable ref to the underlying IO source.
as_source(&self) -> &F184     pub fn as_source(&self) -> &F {
185         &self.source
186     }
187 
188     /// Provides a ref to the underlying IO source.
as_source_mut(&mut self) -> &mut F189     pub fn as_source_mut(&mut self) -> &mut F {
190         &mut self.source
191     }
192 
wait_for_handle(&self) -> AsyncResult<u64>193     pub async fn wait_for_handle(&self) -> AsyncResult<u64> {
194         self.read_u64().await
195     }
196 }
197 
198 impl<F: AsRawDescriptor> Deref for UringSource<F> {
199     type Target = F;
200 
deref(&self) -> &Self::Target201     fn deref(&self) -> &Self::Target {
202         &self.source
203     }
204 }
205 
206 impl<F: AsRawDescriptor> DerefMut for UringSource<F> {
deref_mut(&mut self) -> &mut Self::Target207     fn deref_mut(&mut self) -> &mut Self::Target {
208         &mut self.source
209     }
210 }
211 
212 // NOTE: Prefer adding tests to io_source.rs if not backend specific.
213 #[cfg(test)]
214 mod tests {
215     use std::fs::File;
216     use std::fs::OpenOptions;
217     use std::future::Future;
218     use std::path::PathBuf;
219     use std::pin::Pin;
220     use std::task::Context;
221     use std::task::Poll;
222     use std::task::Waker;
223 
224     use sync::Mutex;
225     use tempfile::tempfile;
226 
227     use super::super::uring_executor::is_uring_stable;
228     use super::super::UringSource;
229     use super::*;
230     use crate::Executor;
231     use crate::ExecutorKind;
232     use crate::IoSource;
233 
234     #[test]
readmulti()235     fn readmulti() {
236         if !is_uring_stable() {
237             return;
238         }
239 
240         async fn go(ex: &URingExecutor) {
241             let f = File::open("/dev/zero").unwrap();
242             let source = UringSource::new(f, ex).unwrap();
243             let v = vec![0x55u8; 32];
244             let v2 = vec![0x55u8; 32];
245             let (ret, ret2) = futures::future::join(
246                 source.read_to_vec(None, v),
247                 source.read_to_vec(Some(32), v2),
248             )
249             .await;
250 
251             assert!(ret.unwrap().1.iter().all(|&b| b == 0));
252             assert!(ret2.unwrap().1.iter().all(|&b| b == 0));
253         }
254 
255         let ex = URingExecutor::new().unwrap();
256         ex.run_until(go(&ex)).unwrap();
257     }
258 
read_u64<T: AsRawDescriptor>(source: &UringSource<T>) -> u64259     async fn read_u64<T: AsRawDescriptor>(source: &UringSource<T>) -> u64 {
260         // Init a vec that translates to u64::max;
261         let u64_mem = vec![0xffu8; std::mem::size_of::<u64>()];
262         let (ret, u64_mem) = source.read_to_vec(None, u64_mem).await.unwrap();
263         assert_eq!(ret as usize, std::mem::size_of::<u64>());
264         let mut val = 0u64.to_ne_bytes();
265         val.copy_from_slice(&u64_mem);
266         u64::from_ne_bytes(val)
267     }
268 
269     #[test]
event()270     fn event() {
271         if !is_uring_stable() {
272             return;
273         }
274 
275         use base::Event;
276         use base::EventExt;
277 
278         async fn write_event(ev: Event, wait: Event, ex: &URingExecutor) {
279             let wait = UringSource::new(wait, ex).unwrap();
280             ev.write_count(55).unwrap();
281             read_u64(&wait).await;
282             ev.write_count(66).unwrap();
283             read_u64(&wait).await;
284             ev.write_count(77).unwrap();
285             read_u64(&wait).await;
286         }
287 
288         async fn read_events(ev: Event, signal: Event, ex: &URingExecutor) {
289             let source = UringSource::new(ev, ex).unwrap();
290             assert_eq!(read_u64(&source).await, 55);
291             signal.signal().unwrap();
292             assert_eq!(read_u64(&source).await, 66);
293             signal.signal().unwrap();
294             assert_eq!(read_u64(&source).await, 77);
295             signal.signal().unwrap();
296         }
297 
298         let event = Event::new().unwrap();
299         let signal_wait = Event::new().unwrap();
300         let ex = URingExecutor::new().unwrap();
301         let write_task = write_event(
302             event.try_clone().unwrap(),
303             signal_wait.try_clone().unwrap(),
304             &ex,
305         );
306         let read_task = read_events(event, signal_wait, &ex);
307         ex.run_until(futures::future::join(read_task, write_task))
308             .unwrap();
309     }
310 
311     #[test]
pend_on_pipe()312     fn pend_on_pipe() {
313         if !is_uring_stable() {
314             return;
315         }
316 
317         use std::io::Write;
318 
319         use futures::future::Either;
320 
321         async fn do_test(ex: &URingExecutor) {
322             let (read_source, mut w) = base::pipe(true).unwrap();
323             let source = UringSource::new(read_source, ex).unwrap();
324             let done = Box::pin(async { 5usize });
325             let pending = Box::pin(read_u64(&source));
326             match futures::future::select(pending, done).await {
327                 Either::Right((5, pending)) => {
328                     // Write to the pipe so that the kernel will release the memory associated with
329                     // the uring read operation.
330                     w.write_all(&[0]).expect("failed to write to pipe");
331                     ::std::mem::drop(pending);
332                 }
333                 _ => panic!("unexpected select result"),
334             };
335         }
336 
337         let ex = URingExecutor::new().unwrap();
338         ex.run_until(do_test(&ex)).unwrap();
339     }
340 
341     #[test]
range_error()342     fn range_error() {
343         if !is_uring_stable() {
344             return;
345         }
346 
347         async fn go(ex: &URingExecutor) {
348             let f = File::open("/dev/zero").unwrap();
349             let source = UringSource::new(f, ex).unwrap();
350             let v = vec![0x55u8; 64];
351             let vw = Arc::new(VecIoWrapper::from(v));
352             let ret = source
353                 .read_to_mem(
354                     None,
355                     Arc::<VecIoWrapper>::clone(&vw),
356                     &[MemRegion {
357                         offset: 32,
358                         len: 33,
359                     }],
360                 )
361                 .await;
362             assert!(ret.is_err());
363         }
364 
365         let ex = URingExecutor::new().unwrap();
366         ex.run_until(go(&ex)).unwrap();
367     }
368 
369     #[test]
fallocate()370     fn fallocate() {
371         if !is_uring_stable() {
372             return;
373         }
374 
375         async fn go(ex: &URingExecutor) {
376             let dir = tempfile::TempDir::new().unwrap();
377             let mut file_path = PathBuf::from(dir.path());
378             file_path.push("test");
379 
380             let f = OpenOptions::new()
381                 .create(true)
382                 .write(true)
383                 .open(&file_path)
384                 .unwrap();
385             let source = UringSource::new(f, ex).unwrap();
386             if let Err(e) = source.fallocate(0, 4096, AllocateMode::Allocate).await {
387                 match e {
388                     crate::io_ext::Error::Uring(super::super::uring_executor::Error::Io(
389                         io_err,
390                     )) => {
391                         if io_err.kind() == std::io::ErrorKind::InvalidInput {
392                             // Skip the test on kernels before fallocate support.
393                             return;
394                         }
395                     }
396                     _ => panic!("Unexpected uring error on fallocate: {}", e),
397                 }
398             }
399 
400             let meta_data = std::fs::metadata(&file_path).unwrap();
401             assert_eq!(meta_data.len(), 4096);
402         }
403 
404         let ex = URingExecutor::new().unwrap();
405         ex.run_until(go(&ex)).unwrap();
406     }
407 
408     #[test]
fsync()409     fn fsync() {
410         if !is_uring_stable() {
411             return;
412         }
413 
414         async fn go(ex: &URingExecutor) {
415             let f = tempfile::tempfile().unwrap();
416             let source = UringSource::new(f, ex).unwrap();
417             source.fsync().await.unwrap();
418         }
419 
420         let ex = URingExecutor::new().unwrap();
421         ex.run_until(go(&ex)).unwrap();
422     }
423 
424     #[test]
wait_read()425     fn wait_read() {
426         if !is_uring_stable() {
427             return;
428         }
429 
430         async fn go(ex: &URingExecutor) {
431             let f = File::open("/dev/zero").unwrap();
432             let source = UringSource::new(f, ex).unwrap();
433             source.wait_readable().await.unwrap();
434         }
435 
436         let ex = URingExecutor::new().unwrap();
437         ex.run_until(go(&ex)).unwrap();
438     }
439 
440     #[test]
writemulti()441     fn writemulti() {
442         if !is_uring_stable() {
443             return;
444         }
445 
446         async fn go(ex: &URingExecutor) {
447             let f = tempfile().unwrap();
448             let source = UringSource::new(f, ex).unwrap();
449             let v = vec![0x55u8; 32];
450             let v2 = vec![0x55u8; 32];
451             let (r, r2) = futures::future::join(
452                 source.write_from_vec(None, v),
453                 source.write_from_vec(Some(32), v2),
454             )
455             .await;
456             assert_eq!(32, r.unwrap().0);
457             assert_eq!(32, r2.unwrap().0);
458         }
459 
460         let ex = URingExecutor::new().unwrap();
461         ex.run_until(go(&ex)).unwrap();
462     }
463 
464     #[test]
readwrite_current_file_position()465     fn readwrite_current_file_position() {
466         if !is_uring_stable() {
467             return;
468         }
469 
470         let ex = URingExecutor::new().unwrap();
471         ex.run_until(async {
472             let mut f = tempfile().unwrap();
473             let source = UringSource::new(f.try_clone().unwrap(), &ex).unwrap();
474             assert_eq!(
475                 32,
476                 source
477                     .write_from_vec(None, vec![0x55u8; 32])
478                     .await
479                     .unwrap()
480                     .0
481             );
482             assert_eq!(
483                 32,
484                 source
485                     .write_from_vec(None, vec![0xffu8; 32])
486                     .await
487                     .unwrap()
488                     .0
489             );
490             use std::io::Seek;
491             f.seek(std::io::SeekFrom::Start(0)).unwrap();
492             assert_eq!(
493                 (32, vec![0x55u8; 32]),
494                 source.read_to_vec(None, vec![0u8; 32]).await.unwrap()
495             );
496             assert_eq!(
497                 (32, vec![0xffu8; 32]),
498                 source.read_to_vec(None, vec![0u8; 32]).await.unwrap()
499             );
500         })
501         .unwrap();
502     }
503 
504     struct State {
505         should_quit: bool,
506         waker: Option<Waker>,
507     }
508 
509     impl State {
wake(&mut self)510         fn wake(&mut self) {
511             self.should_quit = true;
512             let waker = self.waker.take();
513 
514             if let Some(waker) = waker {
515                 waker.wake();
516             }
517         }
518     }
519 
520     struct Quit {
521         state: Arc<Mutex<State>>,
522     }
523 
524     impl Future for Quit {
525         type Output = ();
526 
poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()>527         fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
528             let mut state = self.state.lock();
529             if state.should_quit {
530                 return Poll::Ready(());
531             }
532 
533             state.waker = Some(cx.waker().clone());
534             Poll::Pending
535         }
536     }
537 
538     #[cfg(unix)]
539     #[test]
await_uring_from_poll()540     fn await_uring_from_poll() {
541         if !is_uring_stable() {
542             return;
543         }
544         // Start a uring operation and then await the result from an FdExecutor.
545         async fn go(source: IoSource<File>) {
546             let v = vec![0xa4u8; 16];
547             let (len, vec) = source.read_to_vec(None, v).await.unwrap();
548             assert_eq!(len, 16);
549             assert!(vec.iter().all(|&b| b == 0));
550         }
551 
552         let state = Arc::new(Mutex::new(State {
553             should_quit: false,
554             waker: None,
555         }));
556 
557         let uring_ex = Executor::with_executor_kind(ExecutorKind::Uring).unwrap();
558         let f = File::open("/dev/zero").unwrap();
559         let source = uring_ex.async_from(f).unwrap();
560 
561         let quit = Quit {
562             state: state.clone(),
563         };
564         let handle = std::thread::spawn(move || uring_ex.run_until(quit));
565 
566         let poll_ex = Executor::with_executor_kind(ExecutorKind::Fd).unwrap();
567         poll_ex.run_until(go(source)).unwrap();
568 
569         state.lock().wake();
570         handle.join().unwrap().unwrap();
571     }
572 
573     #[cfg(unix)]
574     #[test]
await_poll_from_uring()575     fn await_poll_from_uring() {
576         if !is_uring_stable() {
577             return;
578         }
579         // Start a poll operation and then await the result from a URingExecutor.
580         async fn go(source: IoSource<File>) {
581             let v = vec![0x2cu8; 16];
582             let (len, vec) = source.read_to_vec(None, v).await.unwrap();
583             assert_eq!(len, 16);
584             assert!(vec.iter().all(|&b| b == 0));
585         }
586 
587         let state = Arc::new(Mutex::new(State {
588             should_quit: false,
589             waker: None,
590         }));
591 
592         let poll_ex = Executor::with_executor_kind(ExecutorKind::Fd).unwrap();
593         let f = File::open("/dev/zero").unwrap();
594         let source = poll_ex.async_from(f).unwrap();
595 
596         let quit = Quit {
597             state: state.clone(),
598         };
599         let handle = std::thread::spawn(move || poll_ex.run_until(quit));
600 
601         let uring_ex = Executor::with_executor_kind(ExecutorKind::Uring).unwrap();
602         uring_ex.run_until(go(source)).unwrap();
603 
604         state.lock().wake();
605         handle.join().unwrap().unwrap();
606     }
607 }
608