• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2020 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 //! A wrapped IO source that uses FdExecutor to drive asynchronous completion. Used from
6 //! `IoSourceExt::new` when uring isn't available in the kernel.
7 
8 use std::{
9     io,
10     ops::{Deref, DerefMut},
11     os::unix::io::AsRawFd,
12     sync::Arc,
13 };
14 
15 use async_trait::async_trait;
16 use data_model::VolatileSlice;
17 use remain::sorted;
18 use thiserror::Error as ThisError;
19 
20 use super::{
21     fd_executor::{
22         FdExecutor, RegisteredSource, {self},
23     },
24     mem::{BackingMemory, MemRegion},
25     AsyncError, AsyncResult, IoSourceExt, ReadAsync, WriteAsync,
26 };
27 
28 #[sorted]
29 #[derive(ThisError, Debug)]
30 pub enum Error {
31     /// An error occurred attempting to register a waker with the executor.
32     #[error("An error occurred attempting to register a waker with the executor: {0}.")]
33     AddingWaker(fd_executor::Error),
34     /// An executor error occurred.
35     #[error("An executor error occurred: {0}")]
36     Executor(fd_executor::Error),
37     /// An error occurred when executing fallocate synchronously.
38     #[error("An error occurred when executing fallocate synchronously: {0}")]
39     Fallocate(sys_util::Error),
40     /// An error occurred when executing fsync synchronously.
41     #[error("An error occurred when executing fsync synchronously: {0}")]
42     Fsync(sys_util::Error),
43     /// An error occurred when reading the FD.
44     #[error("An error occurred when reading the FD: {0}.")]
45     Read(sys_util::Error),
46     /// Can't seek file.
47     #[error("An error occurred when seeking the FD: {0}.")]
48     Seeking(sys_util::Error),
49     /// An error occurred when writing the FD.
50     #[error("An error occurred when writing the FD: {0}.")]
51     Write(sys_util::Error),
52 }
53 pub type Result<T> = std::result::Result<T, Error>;
54 
55 impl From<Error> for io::Error {
from(e: Error) -> Self56     fn from(e: Error) -> Self {
57         use Error::*;
58         match e {
59             AddingWaker(e) => e.into(),
60             Executor(e) => e.into(),
61             Fallocate(e) => e.into(),
62             Fsync(e) => e.into(),
63             Read(e) => e.into(),
64             Seeking(e) => e.into(),
65             Write(e) => e.into(),
66         }
67     }
68 }
69 
70 /// Async wrapper for an IO source that uses the FD executor to drive async operations.
71 /// Used by `IoSourceExt::new` when uring isn't available.
72 pub struct PollSource<F>(RegisteredSource<F>);
73 
74 impl<F: AsRawFd> PollSource<F> {
75     /// Create a new `PollSource` from the given IO source.
new(f: F, ex: &FdExecutor) -> Result<Self>76     pub fn new(f: F, ex: &FdExecutor) -> Result<Self> {
77         ex.register_source(f)
78             .map(PollSource)
79             .map_err(Error::Executor)
80     }
81 
82     /// Return the inner source.
into_source(self) -> F83     pub fn into_source(self) -> F {
84         self.0.into_source()
85     }
86 }
87 
88 impl<F: AsRawFd> Deref for PollSource<F> {
89     type Target = F;
90 
deref(&self) -> &Self::Target91     fn deref(&self) -> &Self::Target {
92         self.0.as_ref()
93     }
94 }
95 
96 impl<F: AsRawFd> DerefMut for PollSource<F> {
deref_mut(&mut self) -> &mut Self::Target97     fn deref_mut(&mut self) -> &mut Self::Target {
98         self.0.as_mut()
99     }
100 }
101 
102 #[async_trait(?Send)]
103 impl<F: AsRawFd> ReadAsync for PollSource<F> {
104     /// Reads from the iosource at `file_offset` and fill the given `vec`.
read_to_vec<'a>( &'a self, file_offset: Option<u64>, mut vec: Vec<u8>, ) -> AsyncResult<(usize, Vec<u8>)>105     async fn read_to_vec<'a>(
106         &'a self,
107         file_offset: Option<u64>,
108         mut vec: Vec<u8>,
109     ) -> AsyncResult<(usize, Vec<u8>)> {
110         loop {
111             // Safe because this will only modify `vec` and we check the return value.
112             let res = if let Some(offset) = file_offset {
113                 unsafe {
114                     libc::pread64(
115                         self.as_raw_fd(),
116                         vec.as_mut_ptr() as *mut libc::c_void,
117                         vec.len(),
118                         offset as libc::off64_t,
119                     )
120                 }
121             } else {
122                 unsafe {
123                     libc::read(
124                         self.as_raw_fd(),
125                         vec.as_mut_ptr() as *mut libc::c_void,
126                         vec.len(),
127                     )
128                 }
129             };
130 
131             if res >= 0 {
132                 return Ok((res as usize, vec));
133             }
134 
135             match sys_util::Error::last() {
136                 e if e.errno() == libc::EWOULDBLOCK => {
137                     let op = self.0.wait_readable().map_err(Error::AddingWaker)?;
138                     op.await.map_err(Error::Executor)?;
139                 }
140                 e => return Err(Error::Read(e).into()),
141             }
142         }
143     }
144 
145     /// Reads to the given `mem` at the given offsets from the file starting at `file_offset`.
read_to_mem<'a>( &'a self, file_offset: Option<u64>, mem: Arc<dyn BackingMemory + Send + Sync>, mem_offsets: &'a [MemRegion], ) -> AsyncResult<usize>146     async fn read_to_mem<'a>(
147         &'a self,
148         file_offset: Option<u64>,
149         mem: Arc<dyn BackingMemory + Send + Sync>,
150         mem_offsets: &'a [MemRegion],
151     ) -> AsyncResult<usize> {
152         let mut iovecs = mem_offsets
153             .iter()
154             .filter_map(|&mem_vec| mem.get_volatile_slice(mem_vec).ok())
155             .collect::<Vec<VolatileSlice>>();
156 
157         loop {
158             // Safe because we trust the kernel not to write path the length given and the length is
159             // guaranteed to be valid from the pointer by io_slice_mut.
160             let res = if let Some(offset) = file_offset {
161                 unsafe {
162                     libc::preadv64(
163                         self.as_raw_fd(),
164                         iovecs.as_mut_ptr() as *mut _,
165                         iovecs.len() as i32,
166                         offset as libc::off64_t,
167                     )
168                 }
169             } else {
170                 unsafe {
171                     libc::readv(
172                         self.as_raw_fd(),
173                         iovecs.as_mut_ptr() as *mut _,
174                         iovecs.len() as i32,
175                     )
176                 }
177             };
178 
179             if res >= 0 {
180                 return Ok(res as usize);
181             }
182 
183             match sys_util::Error::last() {
184                 e if e.errno() == libc::EWOULDBLOCK => {
185                     let op = self.0.wait_readable().map_err(Error::AddingWaker)?;
186                     op.await.map_err(Error::Executor)?;
187                 }
188                 e => return Err(Error::Read(e).into()),
189             }
190         }
191     }
192 
193     /// Wait for the FD of `self` to be readable.
wait_readable(&self) -> AsyncResult<()>194     async fn wait_readable(&self) -> AsyncResult<()> {
195         let op = self.0.wait_readable().map_err(Error::AddingWaker)?;
196         op.await.map_err(Error::Executor)?;
197         Ok(())
198     }
199 
read_u64(&self) -> AsyncResult<u64>200     async fn read_u64(&self) -> AsyncResult<u64> {
201         let mut buf = 0u64.to_ne_bytes();
202         loop {
203             // Safe because this will only modify `buf` and we check the return value.
204             let res = unsafe {
205                 libc::read(
206                     self.as_raw_fd(),
207                     buf.as_mut_ptr() as *mut libc::c_void,
208                     buf.len(),
209                 )
210             };
211 
212             if res >= 0 {
213                 return Ok(u64::from_ne_bytes(buf));
214             }
215 
216             match sys_util::Error::last() {
217                 e if e.errno() == libc::EWOULDBLOCK => {
218                     let op = self.0.wait_readable().map_err(Error::AddingWaker)?;
219                     op.await.map_err(Error::Executor)?;
220                 }
221                 e => return Err(Error::Read(e).into()),
222             }
223         }
224     }
225 }
226 
227 #[async_trait(?Send)]
228 impl<F: AsRawFd> WriteAsync for PollSource<F> {
229     /// Writes from the given `vec` to the file starting at `file_offset`.
write_from_vec<'a>( &'a self, file_offset: Option<u64>, vec: Vec<u8>, ) -> AsyncResult<(usize, Vec<u8>)>230     async fn write_from_vec<'a>(
231         &'a self,
232         file_offset: Option<u64>,
233         vec: Vec<u8>,
234     ) -> AsyncResult<(usize, Vec<u8>)> {
235         loop {
236             // Safe because this will not modify any memory and we check the return value.
237             let res = if let Some(offset) = file_offset {
238                 unsafe {
239                     libc::pwrite64(
240                         self.as_raw_fd(),
241                         vec.as_ptr() as *const libc::c_void,
242                         vec.len(),
243                         offset as libc::off64_t,
244                     )
245                 }
246             } else {
247                 unsafe {
248                     libc::write(
249                         self.as_raw_fd(),
250                         vec.as_ptr() as *const libc::c_void,
251                         vec.len(),
252                     )
253                 }
254             };
255 
256             if res >= 0 {
257                 return Ok((res as usize, vec));
258             }
259 
260             match sys_util::Error::last() {
261                 e if e.errno() == libc::EWOULDBLOCK => {
262                     let op = self.0.wait_writable().map_err(Error::AddingWaker)?;
263                     op.await.map_err(Error::Executor)?;
264                 }
265                 e => return Err(Error::Write(e).into()),
266             }
267         }
268     }
269 
270     /// Writes from the given `mem` from the given offsets to the file starting at `file_offset`.
write_from_mem<'a>( &'a self, file_offset: Option<u64>, mem: Arc<dyn BackingMemory + Send + Sync>, mem_offsets: &'a [MemRegion], ) -> AsyncResult<usize>271     async fn write_from_mem<'a>(
272         &'a self,
273         file_offset: Option<u64>,
274         mem: Arc<dyn BackingMemory + Send + Sync>,
275         mem_offsets: &'a [MemRegion],
276     ) -> AsyncResult<usize> {
277         let iovecs = mem_offsets
278             .iter()
279             .map(|&mem_vec| mem.get_volatile_slice(mem_vec))
280             .filter_map(|r| r.ok())
281             .collect::<Vec<VolatileSlice>>();
282 
283         loop {
284             // Safe because we trust the kernel not to write path the length given and the length is
285             // guaranteed to be valid from the pointer by io_slice_mut.
286             let res = if let Some(offset) = file_offset {
287                 unsafe {
288                     libc::pwritev64(
289                         self.as_raw_fd(),
290                         iovecs.as_ptr() as *mut _,
291                         iovecs.len() as i32,
292                         offset as libc::off64_t,
293                     )
294                 }
295             } else {
296                 unsafe {
297                     libc::writev(
298                         self.as_raw_fd(),
299                         iovecs.as_ptr() as *mut _,
300                         iovecs.len() as i32,
301                     )
302                 }
303             };
304 
305             if res >= 0 {
306                 return Ok(res as usize);
307             }
308 
309             match sys_util::Error::last() {
310                 e if e.errno() == libc::EWOULDBLOCK => {
311                     let op = self.0.wait_writable().map_err(Error::AddingWaker)?;
312                     op.await.map_err(Error::Executor)?;
313                 }
314                 e => return Err(Error::Write(e).into()),
315             }
316         }
317     }
318 
319     /// See `fallocate(2)` for details.
fallocate(&self, file_offset: u64, len: u64, mode: u32) -> AsyncResult<()>320     async fn fallocate(&self, file_offset: u64, len: u64, mode: u32) -> AsyncResult<()> {
321         let ret = unsafe {
322             libc::fallocate64(
323                 self.as_raw_fd(),
324                 mode as libc::c_int,
325                 file_offset as libc::off64_t,
326                 len as libc::off64_t,
327             )
328         };
329         if ret == 0 {
330             Ok(())
331         } else {
332             Err(AsyncError::Poll(Error::Fallocate(sys_util::Error::last())))
333         }
334     }
335 
336     /// Sync all completed write operations to the backing storage.
fsync(&self) -> AsyncResult<()>337     async fn fsync(&self) -> AsyncResult<()> {
338         let ret = unsafe { libc::fsync(self.as_raw_fd()) };
339         if ret == 0 {
340             Ok(())
341         } else {
342             Err(AsyncError::Poll(Error::Fsync(sys_util::Error::last())))
343         }
344     }
345 }
346 
347 #[async_trait(?Send)]
348 impl<F: AsRawFd> IoSourceExt<F> for PollSource<F> {
349     /// Yields the underlying IO source.
into_source(self: Box<Self>) -> F350     fn into_source(self: Box<Self>) -> F {
351         self.0.into_source()
352     }
353 
354     /// Provides a mutable ref to the underlying IO source.
as_source_mut(&mut self) -> &mut F355     fn as_source_mut(&mut self) -> &mut F {
356         self
357     }
358 
359     /// Provides a ref to the underlying IO source.
as_source(&self) -> &F360     fn as_source(&self) -> &F {
361         self
362     }
363 }
364 
365 #[cfg(test)]
366 mod tests {
367     use std::{
368         fs::{File, OpenOptions},
369         path::PathBuf,
370     };
371 
372     use super::*;
373 
374     #[test]
readvec()375     fn readvec() {
376         async fn go(ex: &FdExecutor) {
377             let f = File::open("/dev/zero").unwrap();
378             let async_source = PollSource::new(f, ex).unwrap();
379             let v = vec![0x55u8; 32];
380             let v_ptr = v.as_ptr();
381             let ret = async_source.read_to_vec(None, v).await.unwrap();
382             assert_eq!(ret.0, 32);
383             let ret_v = ret.1;
384             assert_eq!(v_ptr, ret_v.as_ptr());
385             assert!(ret_v.iter().all(|&b| b == 0));
386         }
387 
388         let ex = FdExecutor::new().unwrap();
389         ex.run_until(go(&ex)).unwrap();
390     }
391 
392     #[test]
writevec()393     fn writevec() {
394         async fn go(ex: &FdExecutor) {
395             let f = OpenOptions::new().write(true).open("/dev/null").unwrap();
396             let async_source = PollSource::new(f, ex).unwrap();
397             let v = vec![0x55u8; 32];
398             let v_ptr = v.as_ptr();
399             let ret = async_source.write_from_vec(None, v).await.unwrap();
400             assert_eq!(ret.0, 32);
401             let ret_v = ret.1;
402             assert_eq!(v_ptr, ret_v.as_ptr());
403         }
404 
405         let ex = FdExecutor::new().unwrap();
406         ex.run_until(go(&ex)).unwrap();
407     }
408 
409     #[test]
fallocate()410     fn fallocate() {
411         async fn go(ex: &FdExecutor) {
412             let dir = tempfile::TempDir::new().unwrap();
413             let mut file_path = PathBuf::from(dir.path());
414             file_path.push("test");
415 
416             let f = OpenOptions::new()
417                 .create(true)
418                 .write(true)
419                 .open(&file_path)
420                 .unwrap();
421             let source = PollSource::new(f, ex).unwrap();
422             source.fallocate(0, 4096, 0).await.unwrap();
423 
424             let meta_data = std::fs::metadata(&file_path).unwrap();
425             assert_eq!(meta_data.len(), 4096);
426         }
427 
428         let ex = FdExecutor::new().unwrap();
429         ex.run_until(go(&ex)).unwrap();
430     }
431 
432     #[test]
memory_leak()433     fn memory_leak() {
434         // This test needs to run under ASAN to detect memory leaks.
435 
436         async fn owns_poll_source(source: PollSource<File>) {
437             let _ = source.wait_readable().await;
438         }
439 
440         let (rx, _tx) = sys_util::pipe(true).unwrap();
441         let ex = FdExecutor::new().unwrap();
442         let source = PollSource::new(rx, &ex).unwrap();
443         ex.spawn_local(owns_poll_source(source)).detach();
444 
445         // Drop `ex` without running. This would cause a memory leak if PollSource owned a strong
446         // reference to the executor because it owns a reference to the future that owns PollSource
447         // (via its Runnable). The strong reference prevents the drop impl from running, which would
448         // otherwise poll the future and have it return with an error.
449     }
450 }
451