• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 //! Asynchronous disk image helpers.
6 
7 use std::io;
8 use std::sync::Arc;
9 use std::time::Duration;
10 
11 use async_trait::async_trait;
12 use base::AsRawDescriptors;
13 use base::FileAllocate;
14 use base::FileSetLen;
15 use base::FileSync;
16 use base::PunchHoleMut;
17 use base::RawDescriptor;
18 use base::WriteZeroesAt;
19 use cros_async::BackingMemory;
20 use cros_async::BlockingPool;
21 use cros_async::Executor;
22 use sync::Mutex;
23 
24 use crate::AsyncDisk;
25 use crate::DiskFile;
26 use crate::DiskGetLen;
27 use crate::Error;
28 use crate::Result;
29 
30 /// Async wrapper around a non-async `DiskFile` using a `BlockingPool`.
31 ///
32 /// This is meant to be a transitional type, not a long-term solution for async disk support. Disk
33 /// formats should be migrated to support async instead (b/219595052).
34 pub struct AsyncDiskFileWrapper<T: DiskFile + Send> {
35     blocking_pool: BlockingPool,
36     inner: Arc<Mutex<T>>,
37 }
38 
39 impl<T: DiskFile + Send> AsyncDiskFileWrapper<T> {
40     #[allow(dead_code)] // Only used if qcow or android-sparse features are enabled
new(disk_file: T, _ex: &Executor) -> Self41     pub fn new(disk_file: T, _ex: &Executor) -> Self {
42         Self {
43             blocking_pool: BlockingPool::new(1, Duration::from_secs(10)),
44             inner: Arc::new(Mutex::new(disk_file)),
45         }
46     }
47 }
48 
49 impl<T: DiskFile + Send> DiskGetLen for AsyncDiskFileWrapper<T> {
get_len(&self) -> io::Result<u64>50     fn get_len(&self) -> io::Result<u64> {
51         self.inner.lock().get_len()
52     }
53 }
54 
55 impl<T: DiskFile + Send + FileSetLen> FileSetLen for AsyncDiskFileWrapper<T> {
set_len(&self, len: u64) -> io::Result<()>56     fn set_len(&self, len: u64) -> io::Result<()> {
57         self.inner.lock().set_len(len)
58     }
59 }
60 
61 impl<T: DiskFile + Send + FileAllocate> FileAllocate for AsyncDiskFileWrapper<T> {
allocate(&mut self, offset: u64, len: u64) -> io::Result<()>62     fn allocate(&mut self, offset: u64, len: u64) -> io::Result<()> {
63         self.inner.lock().allocate(offset, len)
64     }
65 }
66 
67 impl<T: DiskFile + Send> AsRawDescriptors for AsyncDiskFileWrapper<T> {
as_raw_descriptors(&self) -> Vec<RawDescriptor>68     fn as_raw_descriptors(&self) -> Vec<RawDescriptor> {
69         self.inner.lock().as_raw_descriptors()
70     }
71 }
72 
73 pub trait DiskFlush {
74     /// Flush intermediary buffers and/or dirty state to file. fsync not required.
flush(&mut self) -> io::Result<()>75     fn flush(&mut self) -> io::Result<()>;
76 }
77 
78 #[async_trait(?Send)]
79 impl<
80         T: 'static
81             + DiskFile
82             + DiskFlush
83             + Send
84             + FileAllocate
85             + FileSetLen
86             + FileSync
87             + PunchHoleMut
88             + WriteZeroesAt,
89     > AsyncDisk for AsyncDiskFileWrapper<T>
90 {
into_inner(self: Box<Self>) -> Box<dyn DiskFile>91     fn into_inner(self: Box<Self>) -> Box<dyn DiskFile> {
92         self.blocking_pool
93             .shutdown(None)
94             .expect("AsyncDiskFile pool shutdown failed");
95         let mtx: Mutex<T> = Arc::try_unwrap(self.inner).expect("AsyncDiskFile arc unwrap failed");
96         Box::new(mtx.into_inner())
97     }
98 
flush(&self) -> Result<()>99     async fn flush(&self) -> Result<()> {
100         let inner_clone = self.inner.clone();
101         self.blocking_pool
102             .spawn(move || {
103                 let mut disk_file = inner_clone.lock();
104                 disk_file.flush().map_err(Error::IoFlush)
105             })
106             .await
107     }
108 
fsync(&self) -> Result<()>109     async fn fsync(&self) -> Result<()> {
110         let inner_clone = self.inner.clone();
111         self.blocking_pool
112             .spawn(move || {
113                 let mut disk_file = inner_clone.lock();
114                 disk_file.fsync().map_err(Error::IoFsync)
115             })
116             .await
117     }
118 
fdatasync(&self) -> Result<()>119     async fn fdatasync(&self) -> Result<()> {
120         let inner_clone = self.inner.clone();
121         self.blocking_pool
122             .spawn(move || {
123                 let mut disk_file = inner_clone.lock();
124                 disk_file.fdatasync().map_err(Error::IoFdatasync)
125             })
126             .await
127     }
128 
read_to_mem<'a>( &'a self, mut file_offset: u64, mem: Arc<dyn BackingMemory + Send + Sync>, mem_offsets: cros_async::MemRegionIter<'a>, ) -> Result<usize>129     async fn read_to_mem<'a>(
130         &'a self,
131         mut file_offset: u64,
132         mem: Arc<dyn BackingMemory + Send + Sync>,
133         mem_offsets: cros_async::MemRegionIter<'a>,
134     ) -> Result<usize> {
135         let inner_clone = self.inner.clone();
136         let mem_offsets: Vec<cros_async::MemRegion> = mem_offsets.collect();
137         self.blocking_pool
138             .spawn(move || {
139                 let mut disk_file = inner_clone.lock();
140                 let mut size = 0;
141                 for region in mem_offsets {
142                     let mem_slice = mem.get_volatile_slice(region).unwrap();
143                     let bytes_read = disk_file
144                         .read_at_volatile(mem_slice, file_offset)
145                         .map_err(Error::ReadingData)?;
146                     size += bytes_read;
147                     if bytes_read < mem_slice.size() {
148                         break;
149                     }
150                     file_offset += bytes_read as u64;
151                 }
152                 Ok(size)
153             })
154             .await
155     }
156 
write_from_mem<'a>( &'a self, mut file_offset: u64, mem: Arc<dyn BackingMemory + Send + Sync>, mem_offsets: cros_async::MemRegionIter<'a>, ) -> Result<usize>157     async fn write_from_mem<'a>(
158         &'a self,
159         mut file_offset: u64,
160         mem: Arc<dyn BackingMemory + Send + Sync>,
161         mem_offsets: cros_async::MemRegionIter<'a>,
162     ) -> Result<usize> {
163         let inner_clone = self.inner.clone();
164         let mem_offsets: Vec<cros_async::MemRegion> = mem_offsets.collect();
165         self.blocking_pool
166             .spawn(move || {
167                 let mut disk_file = inner_clone.lock();
168                 let mut size = 0;
169                 for region in mem_offsets {
170                     let mem_slice = mem.get_volatile_slice(region).unwrap();
171                     let bytes_written = disk_file
172                         .write_at_volatile(mem_slice, file_offset)
173                         .map_err(Error::ReadingData)?;
174                     size += bytes_written;
175                     if bytes_written < mem_slice.size() {
176                         break;
177                     }
178                     file_offset += bytes_written as u64;
179                 }
180                 Ok(size)
181             })
182             .await
183     }
184 
punch_hole(&self, file_offset: u64, length: u64) -> Result<()>185     async fn punch_hole(&self, file_offset: u64, length: u64) -> Result<()> {
186         let inner_clone = self.inner.clone();
187         self.blocking_pool
188             .spawn(move || {
189                 let mut disk_file = inner_clone.lock();
190                 disk_file
191                     .punch_hole_mut(file_offset, length)
192                     .map_err(Error::IoPunchHole)
193             })
194             .await
195     }
196 
write_zeroes_at(&self, file_offset: u64, length: u64) -> Result<()>197     async fn write_zeroes_at(&self, file_offset: u64, length: u64) -> Result<()> {
198         let inner_clone = self.inner.clone();
199         self.blocking_pool
200             .spawn(move || {
201                 let mut disk_file = inner_clone.lock();
202                 disk_file
203                     .write_zeroes_all_at(file_offset, length as usize)
204                     .map_err(Error::WriteZeroes)
205             })
206             .await
207     }
208 }
209