• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2020 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 //! # `IoSourceExt`
6 //!
7 //! User functions to asynchronously access files.
8 //! Using `IoSource` directly is inconvenient and requires dealing with state
9 //! machines for the backing uring, future libraries, etc. `IoSourceExt` instead
10 //! provides users with a future that can be `await`ed from async context.
11 //!
12 //! Each member of `IoSourceExt` returns a future for the supported operation. One or more
13 //! operation can be pending at a time.
14 //!
15 //! Operations can only access memory in a `Vec` or an implementor of `BackingMemory`. See the
16 //! `URingExecutor` documentation for an explaination of why.
17 
18 use std::{
19     fs::File,
20     io,
21     ops::{Deref, DerefMut},
22     os::unix::io::{AsRawFd, RawFd},
23     sync::Arc,
24 };
25 
26 use async_trait::async_trait;
27 use remain::sorted;
28 use sys_util::net::UnixSeqpacket;
29 use thiserror::Error as ThisError;
30 
31 use super::{BackingMemory, MemRegion};
32 
33 #[sorted]
34 #[derive(ThisError, Debug)]
35 pub enum Error {
36     /// An error with a polled(FD) source.
37     #[error("An error with a poll source: {0}")]
38     Poll(#[from] super::poll_source::Error),
39     /// An error with a uring source.
40     #[error("An error with a uring source: {0}")]
41     Uring(#[from] super::uring_executor::Error),
42 }
43 pub type Result<T> = std::result::Result<T, Error>;
44 
45 impl From<Error> for io::Error {
from(e: Error) -> Self46     fn from(e: Error) -> Self {
47         use Error::*;
48         match e {
49             Poll(e) => e.into(),
50             Uring(e) => e.into(),
51         }
52     }
53 }
54 
55 /// Ergonomic methods for async reads.
56 #[async_trait(?Send)]
57 pub trait ReadAsync {
58     /// Reads from the iosource at `file_offset` and fill the given `vec`.
read_to_vec<'a>( &'a self, file_offset: Option<u64>, vec: Vec<u8>, ) -> Result<(usize, Vec<u8>)>59     async fn read_to_vec<'a>(
60         &'a self,
61         file_offset: Option<u64>,
62         vec: Vec<u8>,
63     ) -> Result<(usize, Vec<u8>)>;
64 
65     /// Reads to the given `mem` at the given offsets from the file starting at `file_offset`.
read_to_mem<'a>( &'a self, file_offset: Option<u64>, mem: Arc<dyn BackingMemory + Send + Sync>, mem_offsets: &'a [MemRegion], ) -> Result<usize>66     async fn read_to_mem<'a>(
67         &'a self,
68         file_offset: Option<u64>,
69         mem: Arc<dyn BackingMemory + Send + Sync>,
70         mem_offsets: &'a [MemRegion],
71     ) -> Result<usize>;
72 
73     /// Wait for the FD of `self` to be readable.
wait_readable(&self) -> Result<()>74     async fn wait_readable(&self) -> Result<()>;
75 
76     /// Reads a single u64 from the current offset.
read_u64(&self) -> Result<u64>77     async fn read_u64(&self) -> Result<u64>;
78 }
79 
80 /// Ergonomic methods for async writes.
81 #[async_trait(?Send)]
82 pub trait WriteAsync {
83     /// Writes from the given `vec` to the file starting at `file_offset`.
write_from_vec<'a>( &'a self, file_offset: Option<u64>, vec: Vec<u8>, ) -> Result<(usize, Vec<u8>)>84     async fn write_from_vec<'a>(
85         &'a self,
86         file_offset: Option<u64>,
87         vec: Vec<u8>,
88     ) -> Result<(usize, Vec<u8>)>;
89 
90     /// Writes from the given `mem` from the given offsets to the file starting at `file_offset`.
write_from_mem<'a>( &'a self, file_offset: Option<u64>, mem: Arc<dyn BackingMemory + Send + Sync>, mem_offsets: &'a [MemRegion], ) -> Result<usize>91     async fn write_from_mem<'a>(
92         &'a self,
93         file_offset: Option<u64>,
94         mem: Arc<dyn BackingMemory + Send + Sync>,
95         mem_offsets: &'a [MemRegion],
96     ) -> Result<usize>;
97 
98     /// See `fallocate(2)`. Note this op is synchronous when using the Polled backend.
fallocate(&self, file_offset: u64, len: u64, mode: u32) -> Result<()>99     async fn fallocate(&self, file_offset: u64, len: u64, mode: u32) -> Result<()>;
100 
101     /// Sync all completed write operations to the backing storage.
fsync(&self) -> Result<()>102     async fn fsync(&self) -> Result<()>;
103 }
104 
105 /// Subtrait for general async IO.
106 #[async_trait(?Send)]
107 pub trait IoSourceExt<F>: ReadAsync + WriteAsync {
108     /// Yields the underlying IO source.
into_source(self: Box<Self>) -> F109     fn into_source(self: Box<Self>) -> F;
110 
111     /// Provides a mutable ref to the underlying IO source.
as_source_mut(&mut self) -> &mut F112     fn as_source_mut(&mut self) -> &mut F;
113 
114     /// Provides a ref to the underlying IO source.
as_source(&self) -> &F115     fn as_source(&self) -> &F;
116 }
117 
118 /// Marker trait signifying that the implementor is suitable for use with
119 /// cros_async. Examples of this include File, and sys_util::net::UnixSeqpacket.
120 ///
121 /// (Note: it'd be really nice to implement a TryFrom for any implementors, and
122 /// remove our factory functions. Unfortunately
123 /// <https://github.com/rust-lang/rust/issues/50133> makes that too painful.)
124 pub trait IntoAsync: AsRawFd {}
125 
126 impl IntoAsync for File {}
127 impl IntoAsync for UnixSeqpacket {}
128 impl IntoAsync for &UnixSeqpacket {}
129 
130 /// Simple wrapper struct to implement IntoAsync on foreign types.
131 pub struct AsyncWrapper<T>(T);
132 
133 impl<T> AsyncWrapper<T> {
134     /// Create a new `AsyncWrapper` that wraps `val`.
new(val: T) -> Self135     pub fn new(val: T) -> Self {
136         AsyncWrapper(val)
137     }
138 
139     /// Consumes the `AsyncWrapper`, returning the inner struct.
into_inner(self) -> T140     pub fn into_inner(self) -> T {
141         self.0
142     }
143 }
144 
145 impl<T> Deref for AsyncWrapper<T> {
146     type Target = T;
147 
deref(&self) -> &T148     fn deref(&self) -> &T {
149         &self.0
150     }
151 }
152 
153 impl<T> DerefMut for AsyncWrapper<T> {
deref_mut(&mut self) -> &mut T154     fn deref_mut(&mut self) -> &mut T {
155         &mut self.0
156     }
157 }
158 
159 impl<T: AsRawFd> AsRawFd for AsyncWrapper<T> {
as_raw_fd(&self) -> RawFd160     fn as_raw_fd(&self) -> RawFd {
161         self.0.as_raw_fd()
162     }
163 }
164 
165 impl<T: AsRawFd> IntoAsync for AsyncWrapper<T> {}
166 
167 #[cfg(test)]
168 mod tests {
169     use std::{
170         fs::{File, OpenOptions},
171         future::Future,
172         os::unix::io::AsRawFd,
173         pin::Pin,
174         sync::Arc,
175         task::{Context, Poll, Waker},
176         thread,
177     };
178 
179     use sync::Mutex;
180 
181     use super::{
182         super::{
183             executor::{async_poll_from, async_uring_from},
184             mem::VecIoWrapper,
185             uring_executor::use_uring,
186             Executor, FdExecutor, MemRegion, PollSource, URingExecutor, UringSource,
187         },
188         *,
189     };
190 
191     struct State {
192         should_quit: bool,
193         waker: Option<Waker>,
194     }
195 
196     impl State {
wake(&mut self)197         fn wake(&mut self) {
198             self.should_quit = true;
199             let waker = self.waker.take();
200 
201             if let Some(waker) = waker {
202                 waker.wake();
203             }
204         }
205     }
206 
207     struct Quit {
208         state: Arc<Mutex<State>>,
209     }
210 
211     impl Future for Quit {
212         type Output = ();
213 
poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()>214         fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
215             let mut state = self.state.lock();
216             if state.should_quit {
217                 return Poll::Ready(());
218             }
219 
220             state.waker = Some(cx.waker().clone());
221             Poll::Pending
222         }
223     }
224 
225     #[test]
await_uring_from_poll()226     fn await_uring_from_poll() {
227         if !use_uring() {
228             return;
229         }
230         // Start a uring operation and then await the result from an FdExecutor.
231         async fn go(source: UringSource<File>) {
232             let v = vec![0xa4u8; 16];
233             let (len, vec) = source.read_to_vec(None, v).await.unwrap();
234             assert_eq!(len, 16);
235             assert!(vec.iter().all(|&b| b == 0));
236         }
237 
238         let state = Arc::new(Mutex::new(State {
239             should_quit: false,
240             waker: None,
241         }));
242 
243         let uring_ex = URingExecutor::new().unwrap();
244         let f = File::open("/dev/zero").unwrap();
245         let source = UringSource::new(f, &uring_ex).unwrap();
246 
247         let quit = Quit {
248             state: state.clone(),
249         };
250         let handle = thread::spawn(move || uring_ex.run_until(quit));
251 
252         let poll_ex = FdExecutor::new().unwrap();
253         poll_ex.run_until(go(source)).unwrap();
254 
255         state.lock().wake();
256         handle.join().unwrap().unwrap();
257     }
258 
259     #[test]
await_poll_from_uring()260     fn await_poll_from_uring() {
261         if !use_uring() {
262             return;
263         }
264         // Start a poll operation and then await the result from a URingExecutor.
265         async fn go(source: PollSource<File>) {
266             let v = vec![0x2cu8; 16];
267             let (len, vec) = source.read_to_vec(None, v).await.unwrap();
268             assert_eq!(len, 16);
269             assert!(vec.iter().all(|&b| b == 0));
270         }
271 
272         let state = Arc::new(Mutex::new(State {
273             should_quit: false,
274             waker: None,
275         }));
276 
277         let poll_ex = FdExecutor::new().unwrap();
278         let f = File::open("/dev/zero").unwrap();
279         let source = PollSource::new(f, &poll_ex).unwrap();
280 
281         let quit = Quit {
282             state: state.clone(),
283         };
284         let handle = thread::spawn(move || poll_ex.run_until(quit));
285 
286         let uring_ex = URingExecutor::new().unwrap();
287         uring_ex.run_until(go(source)).unwrap();
288 
289         state.lock().wake();
290         handle.join().unwrap().unwrap();
291     }
292 
293     #[test]
readvec()294     fn readvec() {
295         if !use_uring() {
296             return;
297         }
298         async fn go<F: AsRawFd>(async_source: Box<dyn IoSourceExt<F>>) {
299             let v = vec![0x55u8; 32];
300             let v_ptr = v.as_ptr();
301             let ret = async_source.read_to_vec(None, v).await.unwrap();
302             assert_eq!(ret.0, 32);
303             let ret_v = ret.1;
304             assert_eq!(v_ptr, ret_v.as_ptr());
305             assert!(ret_v.iter().all(|&b| b == 0));
306         }
307 
308         let f = File::open("/dev/zero").unwrap();
309         let uring_ex = URingExecutor::new().unwrap();
310         let uring_source = async_uring_from(f, &uring_ex).unwrap();
311         uring_ex.run_until(go(uring_source)).unwrap();
312 
313         let f = File::open("/dev/zero").unwrap();
314         let poll_ex = FdExecutor::new().unwrap();
315         let poll_source = async_poll_from(f, &poll_ex).unwrap();
316         poll_ex.run_until(go(poll_source)).unwrap();
317     }
318 
319     #[test]
writevec()320     fn writevec() {
321         if !use_uring() {
322             return;
323         }
324         async fn go<F: AsRawFd>(async_source: Box<dyn IoSourceExt<F>>) {
325             let v = vec![0x55u8; 32];
326             let v_ptr = v.as_ptr();
327             let ret = async_source.write_from_vec(None, v).await.unwrap();
328             assert_eq!(ret.0, 32);
329             let ret_v = ret.1;
330             assert_eq!(v_ptr, ret_v.as_ptr());
331         }
332 
333         let f = OpenOptions::new().write(true).open("/dev/null").unwrap();
334         let ex = URingExecutor::new().unwrap();
335         let uring_source = async_uring_from(f, &ex).unwrap();
336         ex.run_until(go(uring_source)).unwrap();
337 
338         let f = OpenOptions::new().write(true).open("/dev/null").unwrap();
339         let poll_ex = FdExecutor::new().unwrap();
340         let poll_source = async_poll_from(f, &poll_ex).unwrap();
341         poll_ex.run_until(go(poll_source)).unwrap();
342     }
343 
344     #[test]
readmem()345     fn readmem() {
346         if !use_uring() {
347             return;
348         }
349         async fn go<F: AsRawFd>(async_source: Box<dyn IoSourceExt<F>>) {
350             let mem = Arc::new(VecIoWrapper::from(vec![0x55u8; 8192]));
351             let ret = async_source
352                 .read_to_mem(
353                     None,
354                     Arc::<VecIoWrapper>::clone(&mem),
355                     &[
356                         MemRegion { offset: 0, len: 32 },
357                         MemRegion {
358                             offset: 200,
359                             len: 56,
360                         },
361                     ],
362                 )
363                 .await
364                 .unwrap();
365             assert_eq!(ret, 32 + 56);
366             let vec: Vec<u8> = match Arc::try_unwrap(mem) {
367                 Ok(v) => v.into(),
368                 Err(_) => panic!("Too many vec refs"),
369             };
370             assert!(vec.iter().take(32).all(|&b| b == 0));
371             assert!(vec.iter().skip(32).take(168).all(|&b| b == 0x55));
372             assert!(vec.iter().skip(200).take(56).all(|&b| b == 0));
373             assert!(vec.iter().skip(256).all(|&b| b == 0x55));
374         }
375 
376         let f = File::open("/dev/zero").unwrap();
377         let ex = URingExecutor::new().unwrap();
378         let uring_source = async_uring_from(f, &ex).unwrap();
379         ex.run_until(go(uring_source)).unwrap();
380 
381         let f = File::open("/dev/zero").unwrap();
382         let poll_ex = FdExecutor::new().unwrap();
383         let poll_source = async_poll_from(f, &poll_ex).unwrap();
384         poll_ex.run_until(go(poll_source)).unwrap();
385     }
386 
387     #[test]
writemem()388     fn writemem() {
389         if !use_uring() {
390             return;
391         }
392         async fn go<F: AsRawFd>(async_source: Box<dyn IoSourceExt<F>>) {
393             let mem = Arc::new(VecIoWrapper::from(vec![0x55u8; 8192]));
394             let ret = async_source
395                 .write_from_mem(
396                     None,
397                     Arc::<VecIoWrapper>::clone(&mem),
398                     &[MemRegion { offset: 0, len: 32 }],
399                 )
400                 .await
401                 .unwrap();
402             assert_eq!(ret, 32);
403         }
404 
405         let f = OpenOptions::new().write(true).open("/dev/null").unwrap();
406         let ex = URingExecutor::new().unwrap();
407         let uring_source = async_uring_from(f, &ex).unwrap();
408         ex.run_until(go(uring_source)).unwrap();
409 
410         let f = OpenOptions::new().write(true).open("/dev/null").unwrap();
411         let poll_ex = FdExecutor::new().unwrap();
412         let poll_source = async_poll_from(f, &poll_ex).unwrap();
413         poll_ex.run_until(go(poll_source)).unwrap();
414     }
415 
416     #[test]
read_u64s()417     fn read_u64s() {
418         if !use_uring() {
419             return;
420         }
421         async fn go(async_source: File, ex: URingExecutor) -> u64 {
422             let source = async_uring_from(async_source, &ex).unwrap();
423             source.read_u64().await.unwrap()
424         }
425 
426         let f = File::open("/dev/zero").unwrap();
427         let ex = URingExecutor::new().unwrap();
428         let val = ex.run_until(go(f, ex.clone())).unwrap();
429         assert_eq!(val, 0);
430     }
431 
432     #[test]
read_eventfds()433     fn read_eventfds() {
434         if !use_uring() {
435             return;
436         }
437         use sys_util::EventFd;
438 
439         async fn go<F: AsRawFd>(source: Box<dyn IoSourceExt<F>>) -> u64 {
440             source.read_u64().await.unwrap()
441         }
442 
443         let eventfd = EventFd::new().unwrap();
444         eventfd.write(0x55).unwrap();
445         let ex = URingExecutor::new().unwrap();
446         let uring_source = async_uring_from(eventfd, &ex).unwrap();
447         let val = ex.run_until(go(uring_source)).unwrap();
448         assert_eq!(val, 0x55);
449 
450         let eventfd = EventFd::new().unwrap();
451         eventfd.write(0xaa).unwrap();
452         let poll_ex = FdExecutor::new().unwrap();
453         let poll_source = async_poll_from(eventfd, &poll_ex).unwrap();
454         let val = poll_ex.run_until(go(poll_source)).unwrap();
455         assert_eq!(val, 0xaa);
456     }
457 
458     #[test]
fsync()459     fn fsync() {
460         if !use_uring() {
461             return;
462         }
463         async fn go<F: AsRawFd>(source: Box<dyn IoSourceExt<F>>) {
464             let v = vec![0x55u8; 32];
465             let v_ptr = v.as_ptr();
466             let ret = source.write_from_vec(None, v).await.unwrap();
467             assert_eq!(ret.0, 32);
468             let ret_v = ret.1;
469             assert_eq!(v_ptr, ret_v.as_ptr());
470             source.fsync().await.unwrap();
471         }
472 
473         let f = tempfile::tempfile().unwrap();
474         let ex = Executor::new().unwrap();
475         let source = ex.async_from(f).unwrap();
476 
477         ex.run_until(go(source)).unwrap();
478     }
479 }
480