• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::cell::RefCell;
6 use std::collections::BTreeMap;
7 use std::collections::BTreeSet;
8 use std::io;
9 use std::io::Write;
10 use std::mem::size_of;
11 #[cfg(windows)]
12 use std::num::NonZeroU32;
13 use std::rc::Rc;
14 use std::result;
15 use std::sync::atomic::AtomicU64;
16 use std::sync::atomic::Ordering;
17 use std::sync::Arc;
18 use std::time::Duration;
19 use std::u32;
20 
21 use anyhow::Context;
22 use base::debug;
23 use base::error;
24 use base::info;
25 use base::warn;
26 use base::AsRawDescriptor;
27 use base::Error as SysError;
28 use base::Event;
29 use base::RawDescriptor;
30 use base::Result as SysResult;
31 use base::Timer;
32 use base::Tube;
33 use base::TubeError;
34 use base::WorkerThread;
35 use cros_async::sync::RwLock as AsyncRwLock;
36 use cros_async::AsyncError;
37 use cros_async::AsyncTube;
38 use cros_async::EventAsync;
39 use cros_async::Executor;
40 use cros_async::ExecutorKind;
41 use cros_async::TimerAsync;
42 use data_model::Le16;
43 use data_model::Le32;
44 use data_model::Le64;
45 use disk::AsyncDisk;
46 use disk::DiskFile;
47 use futures::channel::mpsc;
48 use futures::channel::oneshot;
49 use futures::pin_mut;
50 use futures::stream::FuturesUnordered;
51 use futures::stream::StreamExt;
52 use futures::FutureExt;
53 use remain::sorted;
54 use thiserror::Error as ThisError;
55 use virtio_sys::virtio_config::VIRTIO_F_RING_PACKED;
56 use vm_control::DiskControlCommand;
57 use vm_control::DiskControlResult;
58 use vm_memory::GuestMemory;
59 use zerocopy::AsBytes;
60 
61 use crate::virtio::async_utils;
62 use crate::virtio::block::sys::*;
63 use crate::virtio::block::DiskOption;
64 use crate::virtio::copy_config;
65 use crate::virtio::device_constants::block::virtio_blk_config;
66 use crate::virtio::device_constants::block::virtio_blk_discard_write_zeroes;
67 use crate::virtio::device_constants::block::virtio_blk_req_header;
68 use crate::virtio::device_constants::block::VIRTIO_BLK_DISCARD_WRITE_ZEROES_FLAG_UNMAP;
69 use crate::virtio::device_constants::block::VIRTIO_BLK_F_BLK_SIZE;
70 use crate::virtio::device_constants::block::VIRTIO_BLK_F_DISCARD;
71 use crate::virtio::device_constants::block::VIRTIO_BLK_F_FLUSH;
72 use crate::virtio::device_constants::block::VIRTIO_BLK_F_MQ;
73 use crate::virtio::device_constants::block::VIRTIO_BLK_F_RO;
74 use crate::virtio::device_constants::block::VIRTIO_BLK_F_SEG_MAX;
75 use crate::virtio::device_constants::block::VIRTIO_BLK_F_WRITE_ZEROES;
76 use crate::virtio::device_constants::block::VIRTIO_BLK_S_IOERR;
77 use crate::virtio::device_constants::block::VIRTIO_BLK_S_OK;
78 use crate::virtio::device_constants::block::VIRTIO_BLK_S_UNSUPP;
79 use crate::virtio::device_constants::block::VIRTIO_BLK_T_DISCARD;
80 use crate::virtio::device_constants::block::VIRTIO_BLK_T_FLUSH;
81 use crate::virtio::device_constants::block::VIRTIO_BLK_T_GET_ID;
82 use crate::virtio::device_constants::block::VIRTIO_BLK_T_IN;
83 use crate::virtio::device_constants::block::VIRTIO_BLK_T_OUT;
84 use crate::virtio::device_constants::block::VIRTIO_BLK_T_WRITE_ZEROES;
85 use crate::virtio::DescriptorChain;
86 use crate::virtio::DeviceType;
87 use crate::virtio::Interrupt;
88 use crate::virtio::Queue;
89 use crate::virtio::Reader;
90 use crate::virtio::VirtioDevice;
91 use crate::virtio::Writer;
92 use crate::PciAddress;
93 
94 const DEFAULT_QUEUE_SIZE: u16 = 256;
95 const DEFAULT_NUM_QUEUES: u16 = 16;
96 
97 const SECTOR_SHIFT: u8 = 9;
98 const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT;
99 
100 const MAX_DISCARD_SECTORS: u32 = u32::MAX;
101 const MAX_WRITE_ZEROES_SECTORS: u32 = u32::MAX;
102 // Arbitrary limits for number of discard/write zeroes segments.
103 const MAX_DISCARD_SEG: u32 = 32;
104 const MAX_WRITE_ZEROES_SEG: u32 = 32;
105 // Hard-coded to 64 KiB (in 512-byte sectors) for now,
106 // but this should probably be based on cluster size for qcow.
107 const DISCARD_SECTOR_ALIGNMENT: u32 = 128;
108 
109 #[sorted]
110 #[derive(ThisError, Debug)]
111 enum ExecuteError {
112     #[error("failed to copy ID string: {0}")]
113     CopyId(io::Error),
114     #[error("failed to perform discard or write zeroes; sector={sector} num_sectors={num_sectors} flags={flags}; {ioerr:?}")]
115     DiscardWriteZeroes {
116         ioerr: Option<disk::Error>,
117         sector: u64,
118         num_sectors: u32,
119         flags: u32,
120     },
121     #[error("failed to flush: {0}")]
122     Flush(disk::Error),
123     #[error("not enough space in descriptor chain to write status")]
124     MissingStatus,
125     #[error("out of range")]
126     OutOfRange,
127     #[error("failed to read message: {0}")]
128     Read(io::Error),
129     #[error("io error reading {length} bytes from sector {sector}: {desc_error}")]
130     ReadIo {
131         length: usize,
132         sector: u64,
133         desc_error: disk::Error,
134     },
135     #[error("read only; request_type={request_type}")]
136     ReadOnly { request_type: u32 },
137     #[error("failed to recieve command message: {0}")]
138     ReceivingCommand(TubeError),
139     #[error("failed to send command response: {0}")]
140     SendingResponse(TubeError),
141     #[error("couldn't reset the timer: {0}")]
142     TimerReset(base::Error),
143     #[error("unsupported ({0})")]
144     Unsupported(u32),
145     #[error("io error writing {length} bytes from sector {sector}: {desc_error}")]
146     WriteIo {
147         length: usize,
148         sector: u64,
149         desc_error: disk::Error,
150     },
151     #[error("failed to write request status: {0}")]
152     WriteStatus(io::Error),
153 }
154 
155 enum LogLevel {
156     Debug,
157     Error,
158 }
159 
160 impl ExecuteError {
status(&self) -> u8161     fn status(&self) -> u8 {
162         match self {
163             ExecuteError::CopyId(_) => VIRTIO_BLK_S_IOERR,
164             ExecuteError::DiscardWriteZeroes { .. } => VIRTIO_BLK_S_IOERR,
165             ExecuteError::Flush(_) => VIRTIO_BLK_S_IOERR,
166             ExecuteError::MissingStatus => VIRTIO_BLK_S_IOERR,
167             ExecuteError::OutOfRange { .. } => VIRTIO_BLK_S_IOERR,
168             ExecuteError::Read(_) => VIRTIO_BLK_S_IOERR,
169             ExecuteError::ReadIo { .. } => VIRTIO_BLK_S_IOERR,
170             ExecuteError::ReadOnly { .. } => VIRTIO_BLK_S_IOERR,
171             ExecuteError::ReceivingCommand(_) => VIRTIO_BLK_S_IOERR,
172             ExecuteError::SendingResponse(_) => VIRTIO_BLK_S_IOERR,
173             ExecuteError::TimerReset(_) => VIRTIO_BLK_S_IOERR,
174             ExecuteError::WriteIo { .. } => VIRTIO_BLK_S_IOERR,
175             ExecuteError::WriteStatus(_) => VIRTIO_BLK_S_IOERR,
176             ExecuteError::Unsupported(_) => VIRTIO_BLK_S_UNSUPP,
177         }
178     }
179 
log_level(&self) -> LogLevel180     fn log_level(&self) -> LogLevel {
181         match self {
182             // Since there is no feature bit for the guest to detect support for
183             // VIRTIO_BLK_T_GET_ID, the driver has to try executing the request to see if it works.
184             ExecuteError::Unsupported(VIRTIO_BLK_T_GET_ID) => LogLevel::Debug,
185             // Log disk I/O errors at debug level to avoid flooding the logs.
186             ExecuteError::ReadIo { .. }
187             | ExecuteError::WriteIo { .. }
188             | ExecuteError::Flush { .. }
189             | ExecuteError::DiscardWriteZeroes { .. } => LogLevel::Debug,
190             // Log all other failures as errors.
191             _ => LogLevel::Error,
192         }
193     }
194 }
195 
196 /// Errors that happen in block outside of executing a request.
197 /// This includes errors during resize and flush operations.
198 #[sorted]
199 #[derive(ThisError, Debug)]
200 enum ControlError {
201     #[error("couldn't get a value from a timer for flushing: {0}")]
202     FlushTimer(AsyncError),
203     #[error("failed to fsync the disk: {0}")]
204     FsyncDisk(disk::Error),
205 }
206 
207 /// Maximum length of the virtio-block ID string field.
208 const ID_LEN: usize = 20;
209 
210 /// Virtio block device identifier.
211 /// This is an ASCII string terminated by a \0, unless all 20 bytes are used,
212 /// in which case the \0 terminator is omitted.
213 type BlockId = [u8; ID_LEN];
214 
215 /// Tracks the state of an anynchronous disk.
216 struct DiskState {
217     disk_image: Box<dyn AsyncDisk>,
218     read_only: bool,
219     sparse: bool,
220     id: Option<BlockId>,
221     /// A DiskState is owned by each worker's executor and cannot be shared by workers, thus
222     /// `worker_shared_state` holds the state shared by workers in Arc.
223     worker_shared_state: Arc<AsyncRwLock<WorkerSharedState>>,
224 }
225 
226 /// Disk state which can be modified by other worker threads
227 struct WorkerSharedState {
228     disk_size: Arc<AtomicU64>,
229 }
230 
process_one_request( avail_desc: &mut DescriptorChain, disk_state: &AsyncRwLock<DiskState>, flush_timer: &RefCell<TimerAsync<Timer>>, flush_timer_armed: &RefCell<bool>, ) -> result::Result<usize, ExecuteError>231 async fn process_one_request(
232     avail_desc: &mut DescriptorChain,
233     disk_state: &AsyncRwLock<DiskState>,
234     flush_timer: &RefCell<TimerAsync<Timer>>,
235     flush_timer_armed: &RefCell<bool>,
236 ) -> result::Result<usize, ExecuteError> {
237     let reader = &mut avail_desc.reader;
238     let writer = &mut avail_desc.writer;
239 
240     // The last byte of the buffer is virtio_blk_req::status.
241     // Split it into a separate Writer so that status_writer is the final byte and
242     // the original writer is left with just the actual block I/O data.
243     let available_bytes = writer.available_bytes();
244     let status_offset = available_bytes
245         .checked_sub(1)
246         .ok_or(ExecuteError::MissingStatus)?;
247     let mut status_writer = writer.split_at(status_offset);
248 
249     let status = match BlockAsync::execute_request(
250         reader,
251         writer,
252         disk_state,
253         flush_timer,
254         flush_timer_armed,
255     )
256     .await
257     {
258         Ok(()) => VIRTIO_BLK_S_OK,
259         Err(e) => {
260             match e.log_level() {
261                 LogLevel::Debug => debug!("failed executing disk request: {:#}", e),
262                 LogLevel::Error => error!("failed executing disk request: {:#}", e),
263             }
264             e.status()
265         }
266     };
267 
268     status_writer
269         .write_all(&[status])
270         .map_err(ExecuteError::WriteStatus)?;
271     Ok(available_bytes)
272 }
273 
274 /// Process one descriptor chain asynchronously.
process_one_chain( queue: &RefCell<Queue>, mut avail_desc: DescriptorChain, disk_state: &AsyncRwLock<DiskState>, interrupt: &Interrupt, flush_timer: &RefCell<TimerAsync<Timer>>, flush_timer_armed: &RefCell<bool>, )275 async fn process_one_chain(
276     queue: &RefCell<Queue>,
277     mut avail_desc: DescriptorChain,
278     disk_state: &AsyncRwLock<DiskState>,
279     interrupt: &Interrupt,
280     flush_timer: &RefCell<TimerAsync<Timer>>,
281     flush_timer_armed: &RefCell<bool>,
282 ) {
283     let _trace = cros_tracing::trace_event!(VirtioBlk, "process_one_chain");
284     let len = match process_one_request(&mut avail_desc, disk_state, flush_timer, flush_timer_armed)
285         .await
286     {
287         Ok(len) => len,
288         Err(e) => {
289             error!("block: failed to handle request: {:#}", e);
290             0
291         }
292     };
293 
294     let mut queue = queue.borrow_mut();
295     queue.add_used(avail_desc, len as u32);
296     queue.trigger_interrupt(interrupt);
297 }
298 
299 // There is one async task running `handle_queue` per virtio queue in use.
300 // Receives messages from the guest and queues a task to complete the operations with the async
301 // executor.
handle_queue( disk_state: Rc<AsyncRwLock<DiskState>>, queue: Queue, evt: EventAsync, interrupt: Interrupt, flush_timer: Rc<RefCell<TimerAsync<Timer>>>, flush_timer_armed: Rc<RefCell<bool>>, mut stop_rx: oneshot::Receiver<()>, ) -> Queue302 async fn handle_queue(
303     disk_state: Rc<AsyncRwLock<DiskState>>,
304     queue: Queue,
305     evt: EventAsync,
306     interrupt: Interrupt,
307     flush_timer: Rc<RefCell<TimerAsync<Timer>>>,
308     flush_timer_armed: Rc<RefCell<bool>>,
309     mut stop_rx: oneshot::Receiver<()>,
310 ) -> Queue {
311     let queue = RefCell::new(queue);
312     let mut background_tasks = FuturesUnordered::new();
313     let evt_future = evt.next_val().fuse();
314     pin_mut!(evt_future);
315     loop {
316         // Wait for the next signal from `evt` and process `background_tasks` in the meantime.
317         //
318         // NOTE: We can't call `evt.next_val()` directly in the `select!` expression. That would
319         // create a new future each time, which, in the completion-based async backends like
320         // io_uring, means we'd submit a new syscall each time (i.e. a race condition on the
321         // eventfd).
322         futures::select! {
323             _ = background_tasks.next() => continue,
324             res = evt_future => {
325                 evt_future.set(evt.next_val().fuse());
326                 if let Err(e) = res {
327                     error!("Failed to read the next queue event: {:#}", e);
328                     continue;
329                 }
330             }
331             _ = stop_rx => {
332                 // Process all the descriptors we've already popped from the queue so that we leave
333                 // the queue in a consistent state.
334                 background_tasks.collect::<()>().await;
335                 return queue.into_inner();
336             }
337         };
338         while let Some(descriptor_chain) = queue.borrow_mut().pop() {
339             background_tasks.push(process_one_chain(
340                 &queue,
341                 descriptor_chain,
342                 &disk_state,
343                 &interrupt,
344                 &flush_timer,
345                 &flush_timer_armed,
346             ));
347         }
348     }
349 }
350 
handle_command_tube( command_tube: &Option<AsyncTube>, interrupt: Interrupt, disk_state: Rc<AsyncRwLock<DiskState>>, ) -> Result<(), ExecuteError>351 async fn handle_command_tube(
352     command_tube: &Option<AsyncTube>,
353     interrupt: Interrupt,
354     disk_state: Rc<AsyncRwLock<DiskState>>,
355 ) -> Result<(), ExecuteError> {
356     let command_tube = match command_tube {
357         Some(c) => c,
358         None => {
359             futures::future::pending::<()>().await;
360             return Ok(());
361         }
362     };
363     loop {
364         match command_tube.next().await {
365             Ok(command) => {
366                 let resp = match command {
367                     DiskControlCommand::Resize { new_size } => resize(&disk_state, new_size).await,
368                 };
369 
370                 let resp_clone = resp.clone();
371                 command_tube
372                     .send(resp_clone)
373                     .await
374                     .map_err(ExecuteError::SendingResponse)?;
375                 if let DiskControlResult::Ok = resp {
376                     interrupt.signal_config_changed();
377                 }
378             }
379             Err(e) => return Err(ExecuteError::ReceivingCommand(e)),
380         }
381     }
382 }
383 
resize(disk_state: &AsyncRwLock<DiskState>, new_size: u64) -> DiskControlResult384 async fn resize(disk_state: &AsyncRwLock<DiskState>, new_size: u64) -> DiskControlResult {
385     // Acquire exclusive, mutable access to the state so the virtqueue task won't be able to read
386     // the state while resizing.
387     let mut disk_state = disk_state.lock().await;
388     // Prevent any other worker threads won't be able to do IO.
389     let worker_shared_state = Arc::clone(&disk_state.worker_shared_state);
390     let worker_shared_state = worker_shared_state.lock().await;
391 
392     if disk_state.read_only {
393         error!("Attempted to resize read-only block device");
394         return DiskControlResult::Err(SysError::new(libc::EROFS));
395     }
396 
397     info!("Resizing block device to {} bytes", new_size);
398 
399     if let Err(e) = disk_state.disk_image.set_len(new_size) {
400         error!("Resizing disk failed! {:#}", e);
401         return DiskControlResult::Err(SysError::new(libc::EIO));
402     }
403 
404     // Allocate new space if the disk image is not sparse.
405     if !disk_state.sparse {
406         if let Err(e) = disk_state.disk_image.allocate(0, new_size) {
407             error!("Allocating disk space after resize failed! {:#}", e);
408             return DiskControlResult::Err(SysError::new(libc::EIO));
409         }
410     }
411 
412     if let Ok(new_disk_size) = disk_state.disk_image.get_len() {
413         worker_shared_state
414             .disk_size
415             .store(new_disk_size, Ordering::Release);
416     }
417     DiskControlResult::Ok
418 }
419 
420 /// Periodically flushes the disk when the given timer fires.
flush_disk( disk_state: Rc<AsyncRwLock<DiskState>>, timer: TimerAsync<Timer>, armed: Rc<RefCell<bool>>, ) -> Result<(), ControlError>421 async fn flush_disk(
422     disk_state: Rc<AsyncRwLock<DiskState>>,
423     timer: TimerAsync<Timer>,
424     armed: Rc<RefCell<bool>>,
425 ) -> Result<(), ControlError> {
426     loop {
427         timer.wait().await.map_err(ControlError::FlushTimer)?;
428         if !*armed.borrow() {
429             continue;
430         }
431 
432         // Reset armed before calling fsync to guarantee that IO requests that started after we call
433         // fsync will be committed eventually.
434         *armed.borrow_mut() = false;
435 
436         disk_state
437             .read_lock()
438             .await
439             .disk_image
440             .fsync()
441             .await
442             .map_err(ControlError::FsyncDisk)?;
443     }
444 }
445 
446 enum WorkerCmd {
447     StartQueue {
448         index: usize,
449         queue: Queue,
450         interrupt: Interrupt,
451     },
452     StopQueue {
453         index: usize,
454         // Once the queue is stopped, it will be sent back over `response_tx`.
455         // `None` indicates that there was no queue at the given index.
456         response_tx: oneshot::Sender<Option<Queue>>,
457     },
458 }
459 
460 // The main worker thread. Initialized the asynchronous worker tasks and passes them to the executor
461 // to be processed.
462 //
463 // `disk_state` is wrapped by `AsyncRwLock`, which provides both shared and exclusive locks. It's
464 // because the state can be read from the virtqueue task while the control task is processing a
465 // resizing command.
run_worker( ex: &Executor, interrupt: Interrupt, disk_state: &Rc<AsyncRwLock<DiskState>>, control_tube: &Option<AsyncTube>, mut worker_rx: mpsc::UnboundedReceiver<WorkerCmd>, kill_evt: Event, ) -> anyhow::Result<()>466 async fn run_worker(
467     ex: &Executor,
468     interrupt: Interrupt,
469     disk_state: &Rc<AsyncRwLock<DiskState>>,
470     control_tube: &Option<AsyncTube>,
471     mut worker_rx: mpsc::UnboundedReceiver<WorkerCmd>,
472     kill_evt: Event,
473 ) -> anyhow::Result<()> {
474     // One flush timer per disk.
475     let timer = Timer::new().expect("Failed to create a timer");
476     let flush_timer_armed = Rc::new(RefCell::new(false));
477 
478     // Handles control requests.
479     let control = handle_command_tube(control_tube, interrupt.clone(), disk_state.clone()).fuse();
480     pin_mut!(control);
481 
482     // Handle all the queues in one sub-select call.
483     let flush_timer = Rc::new(RefCell::new(
484         TimerAsync::new(
485             // Call try_clone() to share the same underlying FD with the `flush_disk` task.
486             timer.try_clone().expect("Failed to clone flush_timer"),
487             ex,
488         )
489         .expect("Failed to create an async timer"),
490     ));
491 
492     // Flushes the disk periodically.
493     let flush_timer2 = TimerAsync::new(timer, ex).expect("Failed to create an async timer");
494     let disk_flush = flush_disk(disk_state.clone(), flush_timer2, flush_timer_armed.clone()).fuse();
495     pin_mut!(disk_flush);
496 
497     // Exit if the kill event is triggered.
498     let kill = async_utils::await_and_exit(ex, kill_evt).fuse();
499     pin_mut!(kill);
500 
501     // Process any requests to resample the irq value.
502     let resample_future = async_utils::handle_irq_resample(ex, interrupt.clone()).fuse();
503     pin_mut!(resample_future);
504 
505     // Running queue handlers.
506     let mut queue_handlers = FuturesUnordered::new();
507     // Async stop functions for queue handlers, by queue index.
508     let mut queue_handler_stop_fns = std::collections::BTreeMap::new();
509 
510     loop {
511         futures::select! {
512             _ = queue_handlers.next() => continue,
513             r = disk_flush => return r.context("failed to flush a disk"),
514             r = control => return r.context("failed to handle a control request"),
515             r = resample_future => return r.context("failed to resample an irq value"),
516             r = kill => return r.context("failed to wait on the kill event"),
517             worker_cmd = worker_rx.next() => {
518                 match worker_cmd {
519                     None => anyhow::bail!("worker control channel unexpectedly closed"),
520                     Some(WorkerCmd::StartQueue{index, queue, interrupt}) => {
521                         let (tx, rx) = oneshot::channel();
522                         let kick_evt = queue.event().try_clone().expect("Failed to clone queue event");
523                         let (handle_queue_future, remote_handle) = handle_queue(
524                             Rc::clone(disk_state),
525                             queue,
526                             EventAsync::new(kick_evt, ex).expect("Failed to create async event for queue"),
527                             interrupt,
528                             Rc::clone(&flush_timer),
529                             Rc::clone(&flush_timer_armed),
530                             rx,
531                         ).remote_handle();
532                         let old_stop_fn = queue_handler_stop_fns.insert(index, move || {
533                             // Ask the handler to stop.
534                             tx.send(()).unwrap_or_else(|_| panic!("queue handler channel closed early"));
535                             // Wait for its return value.
536                             remote_handle
537                         });
538 
539                         // If there was already a handler for this index, stop it before adding the
540                         // new handler future.
541                         if let Some(stop_fn) = old_stop_fn {
542                             warn!("Starting new queue handler without stopping old handler");
543                             // Unfortunately we can't just do `stop_fn().await` because the actual
544                             // work we are waiting on is in `queue_handlers`. So, run both.
545                             let mut fut = stop_fn().fuse();
546                             loop {
547                                 futures::select! {
548                                     _ = queue_handlers.next() => continue,
549                                     _queue = fut => break,
550                                 }
551                             }
552                         }
553 
554                         queue_handlers.push(handle_queue_future);
555                     }
556                     Some(WorkerCmd::StopQueue{index, response_tx}) => {
557                         match queue_handler_stop_fns.remove(&index) {
558                             Some(stop_fn) => {
559                                 // NOTE: This await is blocking the select loop. If we want to
560                                 // support stopping queues concurrently, then it needs to be moved.
561                                 // For now, keep it simple.
562                                 //
563                                 // Unfortunately we can't just do `stop_fn().await` because the
564                                 // actual work we are waiting on is in `queue_handlers`. So, run
565                                 // both.
566                                 let mut fut = stop_fn().fuse();
567                                 let queue = loop {
568                                     futures::select! {
569                                         _ = queue_handlers.next() => continue,
570                                         queue = fut => break queue,
571                                     }
572                                 };
573                                 let _ = response_tx.send(Some(queue));
574                             }
575                             None => { let _ = response_tx.send(None); },
576                         }
577                     }
578                 }
579             }
580         };
581     }
582 }
583 
584 /// Virtio device for exposing block level read/write operations on a host file.
585 pub struct BlockAsync {
586     // We need to make boot_index public bc the field is used by the main crate to determine boot
587     // order
588     boot_index: Option<usize>,
589     // `None` iff `self.worker_per_queue == false` and the worker thread is running.
590     disk_image: Option<Box<dyn DiskFile>>,
591     disk_size: Arc<AtomicU64>,
592     avail_features: u64,
593     read_only: bool,
594     sparse: bool,
595     seg_max: u32,
596     block_size: u32,
597     id: Option<BlockId>,
598     control_tube: Option<Tube>,
599     queue_sizes: Vec<u16>,
600     pub(super) executor_kind: ExecutorKind,
601     // If `worker_per_queue == true`, `worker_threads` contains the worker for each running queue
602     // by index. Otherwise, contains the monolithic worker for all queues at index 0.
603     worker_threads: BTreeMap<
604         usize,
605         (
606             WorkerThread<(Box<dyn DiskFile>, Option<Tube>)>,
607             mpsc::UnboundedSender<WorkerCmd>,
608         ),
609     >,
610     shared_state: Arc<AsyncRwLock<WorkerSharedState>>,
611     // Whether to run worker threads in parallel for each queue
612     worker_per_queue: bool,
613     // Indices of running queues.
614     // TODO: The worker already tracks this. Only need it here to stop queues on sleep. Maybe add a
615     // worker cmd to stop all at once, then we can delete this field.
616     activated_queues: BTreeSet<usize>,
617     #[cfg(windows)]
618     pub(super) io_concurrency: u32,
619     pci_address: Option<PciAddress>,
620 }
621 
622 impl BlockAsync {
623     /// Create a new virtio block device that operates on the given AsyncDisk.
new( base_features: u64, disk_image: Box<dyn DiskFile>, disk_option: &DiskOption, control_tube: Option<Tube>, queue_size: Option<u16>, num_queues: Option<u16>, ) -> SysResult<BlockAsync>624     pub fn new(
625         base_features: u64,
626         disk_image: Box<dyn DiskFile>,
627         disk_option: &DiskOption,
628         control_tube: Option<Tube>,
629         queue_size: Option<u16>,
630         num_queues: Option<u16>,
631     ) -> SysResult<BlockAsync> {
632         let read_only = disk_option.read_only;
633         let sparse = disk_option.sparse;
634         let block_size = disk_option.block_size;
635         let packed_queue = disk_option.packed_queue;
636         let id = disk_option.id;
637         let mut worker_per_queue = disk_option.multiple_workers;
638         // Automatically disable multiple workers if the disk image can't be cloned.
639         if worker_per_queue && disk_image.try_clone().is_err() {
640             base::warn!("multiple workers requested, but not supported by disk image type");
641             worker_per_queue = false;
642         }
643         let executor_kind = disk_option.async_executor;
644         let boot_index = disk_option.bootindex;
645         #[cfg(windows)]
646         let io_concurrency = disk_option.io_concurrency.get();
647 
648         if block_size % SECTOR_SIZE as u32 != 0 {
649             error!(
650                 "Block size {} is not a multiple of {}.",
651                 block_size, SECTOR_SIZE,
652             );
653             return Err(SysError::new(libc::EINVAL));
654         }
655         let disk_size = disk_image.get_len()?;
656         if disk_size % block_size as u64 != 0 {
657             warn!(
658                 "Disk size {} is not a multiple of block size {}; \
659                  the remainder will not be visible to the guest.",
660                 disk_size, block_size,
661             );
662         }
663         let num_queues = num_queues.unwrap_or(DEFAULT_NUM_QUEUES);
664         let multi_queue = match num_queues {
665             0 => panic!("Number of queues cannot be zero for a block device"),
666             1 => false,
667             _ => true,
668         };
669         let q_size = queue_size.unwrap_or(DEFAULT_QUEUE_SIZE);
670         if !q_size.is_power_of_two() {
671             error!("queue size {} is not a power of 2.", q_size);
672             return Err(SysError::new(libc::EINVAL));
673         }
674         let queue_sizes = vec![q_size; num_queues as usize];
675 
676         let avail_features =
677             Self::build_avail_features(base_features, read_only, sparse, multi_queue, packed_queue);
678 
679         let seg_max = get_seg_max(q_size);
680         let executor_kind = executor_kind.unwrap_or_default();
681 
682         let disk_size = Arc::new(AtomicU64::new(disk_size));
683         let shared_state = Arc::new(AsyncRwLock::new(WorkerSharedState {
684             disk_size: disk_size.clone(),
685         }));
686 
687         Ok(BlockAsync {
688             disk_image: Some(disk_image),
689             disk_size,
690             avail_features,
691             read_only,
692             sparse,
693             seg_max,
694             block_size,
695             id,
696             queue_sizes,
697             worker_threads: BTreeMap::new(),
698             shared_state,
699             worker_per_queue,
700             control_tube,
701             executor_kind,
702             activated_queues: BTreeSet::new(),
703             boot_index,
704             #[cfg(windows)]
705             io_concurrency,
706             pci_address: disk_option.pci_address,
707         })
708     }
709 
710     /// Returns the feature flags given the specified attributes.
build_avail_features( base_features: u64, read_only: bool, sparse: bool, multi_queue: bool, packed_queue: bool, ) -> u64711     fn build_avail_features(
712         base_features: u64,
713         read_only: bool,
714         sparse: bool,
715         multi_queue: bool,
716         packed_queue: bool,
717     ) -> u64 {
718         let mut avail_features = base_features;
719         if read_only {
720             avail_features |= 1 << VIRTIO_BLK_F_RO;
721         } else {
722             if sparse {
723                 avail_features |= 1 << VIRTIO_BLK_F_DISCARD;
724             }
725             avail_features |= 1 << VIRTIO_BLK_F_FLUSH;
726             avail_features |= 1 << VIRTIO_BLK_F_WRITE_ZEROES;
727         }
728         avail_features |= 1 << VIRTIO_BLK_F_SEG_MAX;
729         avail_features |= 1 << VIRTIO_BLK_F_BLK_SIZE;
730         if multi_queue {
731             avail_features |= 1 << VIRTIO_BLK_F_MQ;
732         }
733         if packed_queue {
734             avail_features |= 1 << VIRTIO_F_RING_PACKED;
735         }
736         avail_features
737     }
738 
739     // Execute a single block device request.
740     // `writer` includes the data region only; the status byte is not included.
741     // It is up to the caller to convert the result of this function into a status byte
742     // and write it to the expected location in guest memory.
execute_request( reader: &mut Reader, writer: &mut Writer, disk_state: &AsyncRwLock<DiskState>, flush_timer: &RefCell<TimerAsync<Timer>>, flush_timer_armed: &RefCell<bool>, ) -> result::Result<(), ExecuteError>743     async fn execute_request(
744         reader: &mut Reader,
745         writer: &mut Writer,
746         disk_state: &AsyncRwLock<DiskState>,
747         flush_timer: &RefCell<TimerAsync<Timer>>,
748         flush_timer_armed: &RefCell<bool>,
749     ) -> result::Result<(), ExecuteError> {
750         // Acquire immutable access to prevent tasks from resizing disk.
751         let disk_state = disk_state.read_lock().await;
752         // Acquire immutable access to prevent other worker threads from resizing disk.
753         let worker_shared_state = disk_state.worker_shared_state.read_lock().await;
754 
755         let req_header: virtio_blk_req_header = reader.read_obj().map_err(ExecuteError::Read)?;
756 
757         let req_type = req_header.req_type.to_native();
758         let sector = req_header.sector.to_native();
759 
760         if disk_state.read_only && req_type != VIRTIO_BLK_T_IN && req_type != VIRTIO_BLK_T_GET_ID {
761             return Err(ExecuteError::ReadOnly {
762                 request_type: req_type,
763             });
764         }
765 
766         /// Check that a request accesses only data within the disk's current size.
767         /// All parameters are in units of bytes.
768         fn check_range(
769             io_start: u64,
770             io_length: u64,
771             disk_size: u64,
772         ) -> result::Result<(), ExecuteError> {
773             let io_end = io_start
774                 .checked_add(io_length)
775                 .ok_or(ExecuteError::OutOfRange)?;
776             if io_end > disk_size {
777                 Err(ExecuteError::OutOfRange)
778             } else {
779                 Ok(())
780             }
781         }
782 
783         let disk_size = worker_shared_state.disk_size.load(Ordering::Relaxed);
784         match req_type {
785             VIRTIO_BLK_T_IN => {
786                 let data_len = writer.available_bytes();
787                 if data_len == 0 {
788                     return Ok(());
789                 }
790                 let offset = sector
791                     .checked_shl(u32::from(SECTOR_SHIFT))
792                     .ok_or(ExecuteError::OutOfRange)?;
793                 let _trace = cros_tracing::trace_event!(VirtioBlk, "in", offset, data_len);
794                 check_range(offset, data_len as u64, disk_size)?;
795                 let disk_image = &disk_state.disk_image;
796                 writer
797                     .write_all_from_at_fut(&**disk_image, data_len, offset)
798                     .await
799                     .map_err(|desc_error| ExecuteError::ReadIo {
800                         length: data_len,
801                         sector,
802                         desc_error,
803                     })?;
804             }
805             VIRTIO_BLK_T_OUT => {
806                 let data_len = reader.available_bytes();
807                 if data_len == 0 {
808                     return Ok(());
809                 }
810                 let offset = sector
811                     .checked_shl(u32::from(SECTOR_SHIFT))
812                     .ok_or(ExecuteError::OutOfRange)?;
813                 let _trace = cros_tracing::trace_event!(VirtioBlk, "out", offset, data_len);
814                 check_range(offset, data_len as u64, disk_size)?;
815                 let disk_image = &disk_state.disk_image;
816                 reader
817                     .read_exact_to_at_fut(&**disk_image, data_len, offset)
818                     .await
819                     .map_err(|desc_error| ExecuteError::WriteIo {
820                         length: data_len,
821                         sector,
822                         desc_error,
823                     })?;
824 
825                 if !*flush_timer_armed.borrow() {
826                     *flush_timer_armed.borrow_mut() = true;
827 
828                     let flush_delay = Duration::from_secs(60);
829                     flush_timer
830                         .borrow_mut()
831                         .reset(flush_delay, None)
832                         .map_err(ExecuteError::TimerReset)?;
833                 }
834             }
835             VIRTIO_BLK_T_DISCARD | VIRTIO_BLK_T_WRITE_ZEROES => {
836                 #[allow(clippy::if_same_then_else)]
837                 let _trace = if req_type == VIRTIO_BLK_T_DISCARD {
838                     cros_tracing::trace_event!(VirtioBlk, "discard")
839                 } else {
840                     cros_tracing::trace_event!(VirtioBlk, "write_zeroes")
841                 };
842                 if req_type == VIRTIO_BLK_T_DISCARD && !disk_state.sparse {
843                     // Discard is a hint; if this is a non-sparse disk, just ignore it.
844                     return Ok(());
845                 }
846 
847                 while reader.available_bytes() >= size_of::<virtio_blk_discard_write_zeroes>() {
848                     let seg: virtio_blk_discard_write_zeroes =
849                         reader.read_obj().map_err(ExecuteError::Read)?;
850 
851                     let sector = seg.sector.to_native();
852                     let num_sectors = seg.num_sectors.to_native();
853                     let flags = seg.flags.to_native();
854 
855                     let valid_flags = if req_type == VIRTIO_BLK_T_WRITE_ZEROES {
856                         VIRTIO_BLK_DISCARD_WRITE_ZEROES_FLAG_UNMAP
857                     } else {
858                         0
859                     };
860 
861                     if (flags & !valid_flags) != 0 {
862                         return Err(ExecuteError::DiscardWriteZeroes {
863                             ioerr: None,
864                             sector,
865                             num_sectors,
866                             flags,
867                         });
868                     }
869 
870                     let offset = sector
871                         .checked_shl(u32::from(SECTOR_SHIFT))
872                         .ok_or(ExecuteError::OutOfRange)?;
873                     let length = u64::from(num_sectors)
874                         .checked_shl(u32::from(SECTOR_SHIFT))
875                         .ok_or(ExecuteError::OutOfRange)?;
876                     check_range(offset, length, disk_size)?;
877 
878                     if req_type == VIRTIO_BLK_T_DISCARD {
879                         // Since Discard is just a hint and some filesystems may not implement
880                         // FALLOC_FL_PUNCH_HOLE, ignore punch_hole errors.
881                         let _ = disk_state.disk_image.punch_hole(offset, length).await;
882                     } else {
883                         disk_state
884                             .disk_image
885                             .write_zeroes_at(offset, length)
886                             .await
887                             .map_err(|e| ExecuteError::DiscardWriteZeroes {
888                                 ioerr: Some(e),
889                                 sector,
890                                 num_sectors,
891                                 flags,
892                             })?;
893                     }
894                 }
895             }
896             VIRTIO_BLK_T_FLUSH => {
897                 let _trace = cros_tracing::trace_event!(VirtioBlk, "flush");
898                 disk_state
899                     .disk_image
900                     .fdatasync()
901                     .await
902                     .map_err(ExecuteError::Flush)?;
903 
904                 if *flush_timer_armed.borrow() {
905                     flush_timer
906                         .borrow_mut()
907                         .clear()
908                         .map_err(ExecuteError::TimerReset)?;
909                     *flush_timer_armed.borrow_mut() = false;
910                 }
911             }
912             VIRTIO_BLK_T_GET_ID => {
913                 let _trace = cros_tracing::trace_event!(VirtioBlk, "get_id");
914                 if let Some(id) = disk_state.id {
915                     writer.write_all(&id).map_err(ExecuteError::CopyId)?;
916                 } else {
917                     return Err(ExecuteError::Unsupported(req_type));
918                 }
919             }
920             t => return Err(ExecuteError::Unsupported(t)),
921         };
922         Ok(())
923     }
924 
925     /// Builds and returns the config structure used to specify block features.
build_config_space( disk_size: u64, seg_max: u32, block_size: u32, num_queues: u16, ) -> virtio_blk_config926     fn build_config_space(
927         disk_size: u64,
928         seg_max: u32,
929         block_size: u32,
930         num_queues: u16,
931     ) -> virtio_blk_config {
932         virtio_blk_config {
933             // If the image is not a multiple of the sector size, the tail bits are not exposed.
934             capacity: Le64::from(disk_size >> SECTOR_SHIFT),
935             seg_max: Le32::from(seg_max),
936             blk_size: Le32::from(block_size),
937             num_queues: Le16::from(num_queues),
938             max_discard_sectors: Le32::from(MAX_DISCARD_SECTORS),
939             discard_sector_alignment: Le32::from(DISCARD_SECTOR_ALIGNMENT),
940             max_write_zeroes_sectors: Le32::from(MAX_WRITE_ZEROES_SECTORS),
941             write_zeroes_may_unmap: 1,
942             max_discard_seg: Le32::from(MAX_DISCARD_SEG),
943             max_write_zeroes_seg: Le32::from(MAX_WRITE_ZEROES_SEG),
944             ..Default::default()
945         }
946     }
947 
948     /// Get the worker for a queue, starting it if necessary.
949     // NOTE: Can't use `BTreeMap::entry` because it requires an exclusive ref for the whole branch.
950     #[allow(clippy::map_entry)]
start_worker( &mut self, idx: usize, interrupt: Interrupt, ) -> anyhow::Result<&( WorkerThread<(Box<dyn DiskFile>, Option<Tube>)>, mpsc::UnboundedSender<WorkerCmd>, )>951     fn start_worker(
952         &mut self,
953         idx: usize,
954         interrupt: Interrupt,
955     ) -> anyhow::Result<&(
956         WorkerThread<(Box<dyn DiskFile>, Option<Tube>)>,
957         mpsc::UnboundedSender<WorkerCmd>,
958     )> {
959         let key = if self.worker_per_queue { idx } else { 0 };
960         if self.worker_threads.contains_key(&key) {
961             return Ok(self.worker_threads.get(&key).unwrap());
962         }
963 
964         let ex = self.create_executor();
965         let control_tube = self.control_tube.take();
966         let disk_image = if self.worker_per_queue {
967             self.disk_image
968                 .as_ref()
969                 .context("Failed to ref a disk image")?
970                 .try_clone()
971                 .context("Failed to clone a disk image")?
972         } else {
973             self.disk_image
974                 .take()
975                 .context("Failed to take a disk image")?
976         };
977         let read_only = self.read_only;
978         let sparse = self.sparse;
979         let id = self.id;
980         let worker_shared_state = self.shared_state.clone();
981 
982         let (worker_tx, worker_rx) = mpsc::unbounded();
983         let worker_thread = WorkerThread::start("virtio_blk", move |kill_evt| {
984             let async_control =
985                 control_tube.map(|c| AsyncTube::new(&ex, c).expect("failed to create async tube"));
986 
987             let async_image = match disk_image.to_async_disk(&ex) {
988                 Ok(d) => d,
989                 Err(e) => panic!("Failed to create async disk {:#}", e),
990             };
991 
992             let disk_state = Rc::new(AsyncRwLock::new(DiskState {
993                 disk_image: async_image,
994                 read_only,
995                 sparse,
996                 id,
997                 worker_shared_state,
998             }));
999 
1000             if let Err(err_string) = ex
1001                 .run_until(async {
1002                     let r = run_worker(
1003                         &ex,
1004                         interrupt,
1005                         &disk_state,
1006                         &async_control,
1007                         worker_rx,
1008                         kill_evt,
1009                     )
1010                     .await;
1011                     // Flush any in-memory disk image state to file.
1012                     if let Err(e) = disk_state.lock().await.disk_image.flush().await {
1013                         error!("failed to flush disk image when stopping worker: {e:?}");
1014                     }
1015                     r
1016                 })
1017                 .expect("run_until failed")
1018             {
1019                 error!("{:#}", err_string);
1020             }
1021 
1022             let disk_state = match Rc::try_unwrap(disk_state) {
1023                 Ok(d) => d.into_inner(),
1024                 Err(_) => panic!("too many refs to the disk"),
1025             };
1026             (
1027                 disk_state.disk_image.into_inner(),
1028                 async_control.map(Tube::from),
1029             )
1030         });
1031         match self.worker_threads.entry(key) {
1032             std::collections::btree_map::Entry::Occupied(_) => unreachable!(),
1033             std::collections::btree_map::Entry::Vacant(e) => {
1034                 Ok(e.insert((worker_thread, worker_tx)))
1035             }
1036         }
1037     }
1038 
start_queue( &mut self, idx: usize, queue: Queue, _mem: GuestMemory, doorbell: Interrupt, ) -> anyhow::Result<()>1039     pub fn start_queue(
1040         &mut self,
1041         idx: usize,
1042         queue: Queue,
1043         _mem: GuestMemory,
1044         doorbell: Interrupt,
1045     ) -> anyhow::Result<()> {
1046         let (_, worker_tx) = self.start_worker(idx, doorbell.clone())?;
1047         worker_tx
1048             .unbounded_send(WorkerCmd::StartQueue {
1049                 index: idx,
1050                 queue,
1051                 interrupt: doorbell,
1052             })
1053             .expect("worker channel closed early");
1054         self.activated_queues.insert(idx);
1055         Ok(())
1056     }
1057 
stop_queue(&mut self, idx: usize) -> anyhow::Result<Queue>1058     pub fn stop_queue(&mut self, idx: usize) -> anyhow::Result<Queue> {
1059         // TODO: Consider stopping the worker thread if this is the last queue managed by it. Then,
1060         // simplify `virtio_sleep` and/or `reset` methods.
1061         let (_, worker_tx) = self
1062             .worker_threads
1063             .get(if self.worker_per_queue { &idx } else { &0 })
1064             .context("worker not found")?;
1065         let (response_tx, response_rx) = oneshot::channel();
1066         worker_tx
1067             .unbounded_send(WorkerCmd::StopQueue {
1068                 index: idx,
1069                 response_tx,
1070             })
1071             .expect("worker channel closed early");
1072         let queue = cros_async::block_on(async {
1073             response_rx
1074                 .await
1075                 .expect("response_rx closed early")
1076                 .context("queue not found")
1077         })?;
1078         self.activated_queues.remove(&idx);
1079         Ok(queue)
1080     }
1081 }
1082 
1083 impl VirtioDevice for BlockAsync {
keep_rds(&self) -> Vec<RawDescriptor>1084     fn keep_rds(&self) -> Vec<RawDescriptor> {
1085         let mut keep_rds = Vec::new();
1086 
1087         if let Some(disk_image) = &self.disk_image {
1088             keep_rds.extend(disk_image.as_raw_descriptors());
1089         }
1090 
1091         if let Some(control_tube) = &self.control_tube {
1092             keep_rds.push(control_tube.as_raw_descriptor());
1093         }
1094 
1095         keep_rds
1096     }
1097 
features(&self) -> u641098     fn features(&self) -> u64 {
1099         self.avail_features
1100     }
1101 
device_type(&self) -> DeviceType1102     fn device_type(&self) -> DeviceType {
1103         DeviceType::Block
1104     }
1105 
queue_max_sizes(&self) -> &[u16]1106     fn queue_max_sizes(&self) -> &[u16] {
1107         &self.queue_sizes
1108     }
1109 
read_config(&self, offset: u64, data: &mut [u8])1110     fn read_config(&self, offset: u64, data: &mut [u8]) {
1111         let config_space = {
1112             let disk_size = self.disk_size.load(Ordering::Acquire);
1113             Self::build_config_space(
1114                 disk_size,
1115                 self.seg_max,
1116                 self.block_size,
1117                 self.queue_sizes.len() as u16,
1118             )
1119         };
1120         copy_config(data, 0, config_space.as_bytes(), offset);
1121     }
1122 
activate( &mut self, mem: GuestMemory, interrupt: Interrupt, queues: BTreeMap<usize, Queue>, ) -> anyhow::Result<()>1123     fn activate(
1124         &mut self,
1125         mem: GuestMemory,
1126         interrupt: Interrupt,
1127         queues: BTreeMap<usize, Queue>,
1128     ) -> anyhow::Result<()> {
1129         for (i, q) in queues {
1130             self.start_queue(i, q, mem.clone(), interrupt.clone())?;
1131         }
1132         Ok(())
1133     }
1134 
reset(&mut self) -> anyhow::Result<()>1135     fn reset(&mut self) -> anyhow::Result<()> {
1136         while let Some((_, (worker_thread, _))) = self.worker_threads.pop_first() {
1137             let (disk_image, control_tube) = worker_thread.stop();
1138             self.disk_image = Some(disk_image);
1139             if let Some(control_tube) = control_tube {
1140                 self.control_tube = Some(control_tube);
1141             }
1142         }
1143         self.activated_queues.clear();
1144         Ok(())
1145     }
1146 
virtio_sleep(&mut self) -> anyhow::Result<Option<BTreeMap<usize, Queue>>>1147     fn virtio_sleep(&mut self) -> anyhow::Result<Option<BTreeMap<usize, Queue>>> {
1148         if self.worker_threads.is_empty() {
1149             return Ok(None); // Not activated.
1150         }
1151 
1152         // Reclaim the queues from workers.
1153         let mut queues = BTreeMap::new();
1154         for index in self.activated_queues.clone() {
1155             queues.insert(index, self.stop_queue(index)?);
1156         }
1157         // Shutdown the workers.
1158         while let Some((_, (worker_thread, _))) = self.worker_threads.pop_first() {
1159             let (disk_image, control_tube) = worker_thread.stop();
1160             self.disk_image = Some(disk_image);
1161             if let Some(control_tube) = control_tube {
1162                 self.control_tube = Some(control_tube);
1163             }
1164         }
1165         Ok(Some(queues))
1166     }
1167 
virtio_wake( &mut self, queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>, ) -> anyhow::Result<()>1168     fn virtio_wake(
1169         &mut self,
1170         queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>,
1171     ) -> anyhow::Result<()> {
1172         if let Some((mem, interrupt, queues)) = queues_state {
1173             for (i, q) in queues {
1174                 self.start_queue(i, q, mem.clone(), interrupt.clone())?
1175             }
1176         }
1177         Ok(())
1178     }
1179 
virtio_snapshot(&mut self) -> anyhow::Result<serde_json::Value>1180     fn virtio_snapshot(&mut self) -> anyhow::Result<serde_json::Value> {
1181         // `virtio_sleep` ensures there is no pending state, except for the `Queue`s, which are
1182         // handled at a higher layer.
1183         Ok(serde_json::Value::Null)
1184     }
1185 
virtio_restore(&mut self, data: serde_json::Value) -> anyhow::Result<()>1186     fn virtio_restore(&mut self, data: serde_json::Value) -> anyhow::Result<()> {
1187         anyhow::ensure!(
1188             data == serde_json::Value::Null,
1189             "unexpected snapshot data: should be null, got {}",
1190             data,
1191         );
1192         Ok(())
1193     }
1194 
pci_address(&self) -> Option<PciAddress>1195     fn pci_address(&self) -> Option<PciAddress> {
1196         self.pci_address
1197     }
1198 
bootorder_fw_cfg(&self, pci_slot: u8) -> Option<(Vec<u8>, usize)>1199     fn bootorder_fw_cfg(&self, pci_slot: u8) -> Option<(Vec<u8>, usize)> {
1200         self.boot_index
1201             .map(|s| (format!("scsi@{}/disk@0,0", pci_slot).as_bytes().to_vec(), s))
1202     }
1203 }
1204 
1205 #[cfg(test)]
1206 mod tests {
1207     use std::fs::File;
1208     use std::mem::size_of_val;
1209     use std::sync::atomic::AtomicU64;
1210 
1211     use data_model::Le32;
1212     use data_model::Le64;
1213     use disk::SingleFileDisk;
1214     use hypervisor::ProtectionType;
1215     use tempfile::tempfile;
1216     use tempfile::TempDir;
1217     use vm_memory::GuestAddress;
1218 
1219     use super::*;
1220     use crate::suspendable_virtio_tests;
1221     use crate::virtio::base_features;
1222     use crate::virtio::descriptor_utils::create_descriptor_chain;
1223     use crate::virtio::descriptor_utils::DescriptorType;
1224     use crate::virtio::QueueConfig;
1225 
1226     #[test]
read_size()1227     fn read_size() {
1228         let f = tempfile().unwrap();
1229         f.set_len(0x1000).unwrap();
1230 
1231         let features = base_features(ProtectionType::Unprotected);
1232         let disk_option = DiskOption::default();
1233         let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1234         let mut num_sectors = [0u8; 4];
1235         b.read_config(0, &mut num_sectors);
1236         // size is 0x1000, so num_sectors is 8 (4096/512).
1237         assert_eq!([0x08, 0x00, 0x00, 0x00], num_sectors);
1238         let mut msw_sectors = [0u8; 4];
1239         b.read_config(4, &mut msw_sectors);
1240         // size is 0x1000, so msw_sectors is 0.
1241         assert_eq!([0x00, 0x00, 0x00, 0x00], msw_sectors);
1242     }
1243 
1244     #[test]
read_block_size()1245     fn read_block_size() {
1246         let f = tempfile().unwrap();
1247         f.set_len(0x1000).unwrap();
1248 
1249         let features = base_features(ProtectionType::Unprotected);
1250         let disk_option = DiskOption {
1251             block_size: 4096,
1252             sparse: false,
1253             ..Default::default()
1254         };
1255         let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1256         let mut blk_size = [0u8; 4];
1257         b.read_config(20, &mut blk_size);
1258         // blk_size should be 4096 (0x1000).
1259         assert_eq!([0x00, 0x10, 0x00, 0x00], blk_size);
1260     }
1261 
1262     #[test]
read_features()1263     fn read_features() {
1264         let tempdir = TempDir::new().unwrap();
1265         let mut path = tempdir.path().to_owned();
1266         path.push("disk_image");
1267 
1268         // read-write block device
1269         {
1270             let f = File::create(&path).unwrap();
1271             let features = base_features(ProtectionType::Unprotected);
1272             let disk_option = DiskOption::default();
1273             let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1274             // writable device should set VIRTIO_BLK_F_FLUSH + VIRTIO_BLK_F_DISCARD
1275             // + VIRTIO_BLK_F_WRITE_ZEROES + VIRTIO_F_VERSION_1 + VIRTIO_BLK_F_BLK_SIZE
1276             // + VIRTIO_BLK_F_SEG_MAX + VIRTIO_BLK_F_MQ + VIRTIO_RING_F_EVENT_IDX
1277             assert_eq!(0x120007244, b.features());
1278         }
1279 
1280         // read-write block device, non-sparse
1281         {
1282             let f = File::create(&path).unwrap();
1283             let features = base_features(ProtectionType::Unprotected);
1284             let disk_option = DiskOption {
1285                 sparse: false,
1286                 ..Default::default()
1287             };
1288             let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1289             // writable device should set VIRTIO_F_FLUSH + VIRTIO_BLK_F_RO
1290             // + VIRTIO_F_VERSION_1 + VIRTIO_BLK_F_BLK_SIZE + VIRTIO_BLK_F_SEG_MAX
1291             // + VIRTIO_BLK_F_MQ + VIRTIO_RING_F_EVENT_IDX
1292             assert_eq!(0x120005244, b.features());
1293         }
1294 
1295         // read-only block device
1296         {
1297             let f = File::create(&path).unwrap();
1298             let features = base_features(ProtectionType::Unprotected);
1299             let disk_option = DiskOption {
1300                 read_only: true,
1301                 ..Default::default()
1302             };
1303             let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1304             // read-only device should set VIRTIO_BLK_F_RO
1305             // + VIRTIO_F_VERSION_1 + VIRTIO_BLK_F_BLK_SIZE + VIRTIO_BLK_F_SEG_MAX
1306             // + VIRTIO_BLK_F_MQ + VIRTIO_RING_F_EVENT_IDX
1307             assert_eq!(0x120001064, b.features());
1308         }
1309     }
1310 
1311     #[test]
check_pci_adress_configurability()1312     fn check_pci_adress_configurability() {
1313         let f = tempfile().unwrap();
1314 
1315         let features = base_features(ProtectionType::Unprotected);
1316         let disk_option = DiskOption {
1317             pci_address: Some(PciAddress {
1318                 bus: 0,
1319                 dev: 1,
1320                 func: 1,
1321             }),
1322             ..Default::default()
1323         };
1324         let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1325 
1326         assert_eq!(b.pci_address(), disk_option.pci_address);
1327     }
1328 
1329     #[test]
check_runtime_blk_queue_configurability()1330     fn check_runtime_blk_queue_configurability() {
1331         let tempdir = TempDir::new().unwrap();
1332         let mut path = tempdir.path().to_owned();
1333         path.push("disk_image");
1334         let features = base_features(ProtectionType::Unprotected);
1335 
1336         // Default case
1337         let f = File::create(&path).unwrap();
1338         let disk_option = DiskOption::default();
1339         let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1340         assert_eq!(
1341             [DEFAULT_QUEUE_SIZE; DEFAULT_NUM_QUEUES as usize],
1342             b.queue_max_sizes()
1343         );
1344 
1345         // Single queue of size 128
1346         let f = File::create(&path).unwrap();
1347         let disk_option = DiskOption::default();
1348         let b = BlockAsync::new(
1349             features,
1350             Box::new(f),
1351             &disk_option,
1352             None,
1353             Some(128),
1354             Some(1),
1355         )
1356         .unwrap();
1357         assert_eq!([128; 1], b.queue_max_sizes());
1358         // Single queue device should not set VIRTIO_BLK_F_MQ
1359         assert_eq!(0, b.features() & (1 << VIRTIO_BLK_F_MQ) as u64);
1360     }
1361 
1362     #[test]
read_last_sector()1363     fn read_last_sector() {
1364         let ex = Executor::new().expect("creating an executor failed");
1365 
1366         let f = tempfile().unwrap();
1367         let disk_size = 0x1000;
1368         f.set_len(disk_size).unwrap();
1369         let af = SingleFileDisk::new(f, &ex).expect("Failed to create SFD");
1370 
1371         let mem = Rc::new(
1372             GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1373                 .expect("Creating guest memory failed."),
1374         );
1375 
1376         let req_hdr = virtio_blk_req_header {
1377             req_type: Le32::from(VIRTIO_BLK_T_IN),
1378             reserved: Le32::from(0),
1379             sector: Le64::from(7), // Disk is 8 sectors long, so this is the last valid sector.
1380         };
1381         mem.write_obj_at_addr(req_hdr, GuestAddress(0x1000))
1382             .expect("writing req failed");
1383 
1384         let mut avail_desc = create_descriptor_chain(
1385             &mem,
1386             GuestAddress(0x100),  // Place descriptor chain at 0x100.
1387             GuestAddress(0x1000), // Describe buffer at 0x1000.
1388             vec![
1389                 // Request header
1390                 (DescriptorType::Readable, size_of_val(&req_hdr) as u32),
1391                 // I/O buffer (1 sector of data)
1392                 (DescriptorType::Writable, 512),
1393                 // Request status
1394                 (DescriptorType::Writable, 1),
1395             ],
1396             0,
1397         )
1398         .expect("create_descriptor_chain failed");
1399 
1400         let timer = Timer::new().expect("Failed to create a timer");
1401         let flush_timer = Rc::new(RefCell::new(
1402             TimerAsync::new(timer, &ex).expect("Failed to create an async timer"),
1403         ));
1404         let flush_timer_armed = Rc::new(RefCell::new(false));
1405 
1406         let disk_state = Rc::new(AsyncRwLock::new(DiskState {
1407             disk_image: Box::new(af),
1408             read_only: false,
1409             sparse: true,
1410             id: None,
1411             worker_shared_state: Arc::new(AsyncRwLock::new(WorkerSharedState {
1412                 disk_size: Arc::new(AtomicU64::new(disk_size)),
1413             })),
1414         }));
1415 
1416         let fut = process_one_request(
1417             &mut avail_desc,
1418             &disk_state,
1419             &flush_timer,
1420             &flush_timer_armed,
1421         );
1422 
1423         ex.run_until(fut)
1424             .expect("running executor failed")
1425             .expect("execute failed");
1426 
1427         let status_offset = GuestAddress((0x1000 + size_of_val(&req_hdr) + 512) as u64);
1428         let status = mem.read_obj_from_addr::<u8>(status_offset).unwrap();
1429         assert_eq!(status, VIRTIO_BLK_S_OK);
1430     }
1431 
1432     #[test]
read_beyond_last_sector()1433     fn read_beyond_last_sector() {
1434         let f = tempfile().unwrap();
1435         let disk_size = 0x1000;
1436         f.set_len(disk_size).unwrap();
1437         let mem = Rc::new(
1438             GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1439                 .expect("Creating guest memory failed."),
1440         );
1441 
1442         let req_hdr = virtio_blk_req_header {
1443             req_type: Le32::from(VIRTIO_BLK_T_IN),
1444             reserved: Le32::from(0),
1445             sector: Le64::from(7), // Disk is 8 sectors long, so this is the last valid sector.
1446         };
1447         mem.write_obj_at_addr(req_hdr, GuestAddress(0x1000))
1448             .expect("writing req failed");
1449 
1450         let mut avail_desc = create_descriptor_chain(
1451             &mem,
1452             GuestAddress(0x100),  // Place descriptor chain at 0x100.
1453             GuestAddress(0x1000), // Describe buffer at 0x1000.
1454             vec![
1455                 // Request header
1456                 (DescriptorType::Readable, size_of_val(&req_hdr) as u32),
1457                 // I/O buffer (2 sectors of data - overlap the end of the disk).
1458                 (DescriptorType::Writable, 512 * 2),
1459                 // Request status
1460                 (DescriptorType::Writable, 1),
1461             ],
1462             0,
1463         )
1464         .expect("create_descriptor_chain failed");
1465 
1466         let ex = Executor::new().expect("creating an executor failed");
1467 
1468         let af = SingleFileDisk::new(f, &ex).expect("Failed to create SFD");
1469         let timer = Timer::new().expect("Failed to create a timer");
1470         let flush_timer = Rc::new(RefCell::new(
1471             TimerAsync::new(timer, &ex).expect("Failed to create an async timer"),
1472         ));
1473         let flush_timer_armed = Rc::new(RefCell::new(false));
1474         let disk_state = Rc::new(AsyncRwLock::new(DiskState {
1475             disk_image: Box::new(af),
1476             read_only: false,
1477             sparse: true,
1478             id: None,
1479             worker_shared_state: Arc::new(AsyncRwLock::new(WorkerSharedState {
1480                 disk_size: Arc::new(AtomicU64::new(disk_size)),
1481             })),
1482         }));
1483 
1484         let fut = process_one_request(
1485             &mut avail_desc,
1486             &disk_state,
1487             &flush_timer,
1488             &flush_timer_armed,
1489         );
1490 
1491         ex.run_until(fut)
1492             .expect("running executor failed")
1493             .expect("execute failed");
1494 
1495         let status_offset = GuestAddress((0x1000 + size_of_val(&req_hdr) + 512 * 2) as u64);
1496         let status = mem.read_obj_from_addr::<u8>(status_offset).unwrap();
1497         assert_eq!(status, VIRTIO_BLK_S_IOERR);
1498     }
1499 
1500     #[test]
get_id()1501     fn get_id() {
1502         let ex = Executor::new().expect("creating an executor failed");
1503 
1504         let f = tempfile().unwrap();
1505         let disk_size = 0x1000;
1506         f.set_len(disk_size).unwrap();
1507 
1508         let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1509             .expect("Creating guest memory failed.");
1510 
1511         let req_hdr = virtio_blk_req_header {
1512             req_type: Le32::from(VIRTIO_BLK_T_GET_ID),
1513             reserved: Le32::from(0),
1514             sector: Le64::from(0),
1515         };
1516         mem.write_obj_at_addr(req_hdr, GuestAddress(0x1000))
1517             .expect("writing req failed");
1518 
1519         let mut avail_desc = create_descriptor_chain(
1520             &mem,
1521             GuestAddress(0x100),  // Place descriptor chain at 0x100.
1522             GuestAddress(0x1000), // Describe buffer at 0x1000.
1523             vec![
1524                 // Request header
1525                 (DescriptorType::Readable, size_of_val(&req_hdr) as u32),
1526                 // I/O buffer (20 bytes for serial)
1527                 (DescriptorType::Writable, 20),
1528                 // Request status
1529                 (DescriptorType::Writable, 1),
1530             ],
1531             0,
1532         )
1533         .expect("create_descriptor_chain failed");
1534 
1535         let af = SingleFileDisk::new(f, &ex).expect("Failed to create SFD");
1536         let timer = Timer::new().expect("Failed to create a timer");
1537         let flush_timer = Rc::new(RefCell::new(
1538             TimerAsync::new(timer, &ex).expect("Failed to create an async timer"),
1539         ));
1540         let flush_timer_armed = Rc::new(RefCell::new(false));
1541 
1542         let id = b"a20-byteserialnumber";
1543 
1544         let disk_state = Rc::new(AsyncRwLock::new(DiskState {
1545             disk_image: Box::new(af),
1546             read_only: false,
1547             sparse: true,
1548             id: Some(*id),
1549             worker_shared_state: Arc::new(AsyncRwLock::new(WorkerSharedState {
1550                 disk_size: Arc::new(AtomicU64::new(disk_size)),
1551             })),
1552         }));
1553 
1554         let fut = process_one_request(
1555             &mut avail_desc,
1556             &disk_state,
1557             &flush_timer,
1558             &flush_timer_armed,
1559         );
1560 
1561         ex.run_until(fut)
1562             .expect("running executor failed")
1563             .expect("execute failed");
1564 
1565         let status_offset = GuestAddress((0x1000 + size_of_val(&req_hdr) + 512) as u64);
1566         let status = mem.read_obj_from_addr::<u8>(status_offset).unwrap();
1567         assert_eq!(status, VIRTIO_BLK_S_OK);
1568 
1569         let id_offset = GuestAddress(0x1000 + size_of_val(&req_hdr) as u64);
1570         let returned_id = mem.read_obj_from_addr::<[u8; 20]>(id_offset).unwrap();
1571         assert_eq!(returned_id, *id);
1572     }
1573 
1574     // TODO(b/270225199): enable this test on Windows once IoSource::into_source is implemented
1575     #[cfg(any(target_os = "android", target_os = "linux"))]
1576     #[test]
reset_and_reactivate_single_worker()1577     fn reset_and_reactivate_single_worker() {
1578         reset_and_reactivate(false);
1579     }
1580 
1581     // TODO(b/270225199): enable this test on Windows once IoSource::into_source is implemented
1582     #[cfg(any(target_os = "android", target_os = "linux"))]
1583     #[test]
reset_and_reactivate_multiple_workers()1584     fn reset_and_reactivate_multiple_workers() {
1585         reset_and_reactivate(true);
1586     }
1587 
1588     #[cfg(any(target_os = "android", target_os = "linux"))]
reset_and_reactivate(enables_multiple_workers: bool)1589     fn reset_and_reactivate(enables_multiple_workers: bool) {
1590         // Create an empty disk image
1591         let f = tempfile().unwrap();
1592         f.set_len(0x1000).unwrap();
1593         let disk_image: Box<dyn DiskFile> = Box::new(f);
1594 
1595         // Create an empty guest memory
1596         let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1597             .expect("Creating guest memory failed.");
1598 
1599         // Create a control tube.
1600         // NOTE: We don't want to drop the vmm half of the tube. That would cause the worker thread
1601         // will immediately fail, which isn't what we want to test in this case.
1602         let (_control_tube, control_tube_device) = Tube::pair().unwrap();
1603 
1604         // Create a BlockAsync to test
1605         let features = base_features(ProtectionType::Unprotected);
1606         let id = b"Block serial number\0";
1607         let disk_option = DiskOption {
1608             read_only: true,
1609             id: Some(*id),
1610             sparse: false,
1611             multiple_workers: enables_multiple_workers,
1612             ..Default::default()
1613         };
1614         let mut b = BlockAsync::new(
1615             features,
1616             disk_image.try_clone().unwrap(),
1617             &disk_option,
1618             Some(control_tube_device),
1619             None,
1620             None,
1621         )
1622         .unwrap();
1623 
1624         // activate with queues of an arbitrary size.
1625         let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1626         q0.set_ready(true);
1627         let q0 = q0
1628             .activate(&mem, Event::new().unwrap())
1629             .expect("QueueConfig::activate");
1630 
1631         let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1632         q1.set_ready(true);
1633         let q1 = q1
1634             .activate(&mem, Event::new().unwrap())
1635             .expect("QueueConfig::activate");
1636 
1637         b.activate(
1638             mem.clone(),
1639             Interrupt::new_for_test(),
1640             BTreeMap::from([(0, q0), (1, q1)]),
1641         )
1642         .expect("activate should succeed");
1643         // assert resources are consumed
1644         if !enables_multiple_workers {
1645             assert!(
1646                 b.disk_image.is_none(),
1647                 "BlockAsync should not have a disk image"
1648             );
1649         }
1650         assert!(
1651             b.control_tube.is_none(),
1652             "BlockAsync should not have a control tube"
1653         );
1654 
1655         // reset and assert resources are got back
1656         assert!(b.reset().is_ok(), "reset should succeed");
1657         assert!(
1658             b.disk_image.is_some(),
1659             "BlockAsync should have a disk image"
1660         );
1661         assert!(
1662             b.control_tube.is_some(),
1663             "BlockAsync should have a control tube"
1664         );
1665         assert_eq!(b.id, Some(*b"Block serial number\0"));
1666 
1667         // re-activate should succeed
1668         let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1669         q0.set_ready(true);
1670         let q0 = q0
1671             .activate(&mem, Event::new().unwrap())
1672             .expect("QueueConfig::activate");
1673 
1674         let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1675         q1.set_ready(true);
1676         let q1 = q1
1677             .activate(&mem, Event::new().unwrap())
1678             .expect("QueueConfig::activate");
1679 
1680         b.activate(
1681             mem,
1682             Interrupt::new_for_test(),
1683             BTreeMap::from([(0, q0), (1, q1)]),
1684         )
1685         .expect("re-activate should succeed");
1686     }
1687 
1688     // TODO(b/270225199): enable this test on Windows once IoSource::into_source is implemented,
1689     // or after finding a good way to prevent BlockAsync::drop() from panicking due to that.
1690     #[cfg(any(target_os = "android", target_os = "linux"))]
1691     #[test]
resize_with_single_worker()1692     fn resize_with_single_worker() {
1693         resize(false);
1694     }
1695 
1696     // TODO(b/270225199): enable this test on Windows once IoSource::into_source is implemented,
1697     // or after finding a good way to prevent BlockAsync::drop() from panicking due to that.
1698     #[cfg(any(target_os = "android", target_os = "linux"))]
1699     #[test]
resize_with_multiple_workers()1700     fn resize_with_multiple_workers() {
1701         // Test resize handled by one worker affect the whole state
1702         resize(true);
1703     }
1704 
resize(enables_multiple_workers: bool)1705     fn resize(enables_multiple_workers: bool) {
1706         // disk image size constants
1707         let original_size = 0x1000;
1708         let resized_size = 0x2000;
1709 
1710         // Create an empty disk image
1711         let f = tempfile().unwrap();
1712         f.set_len(original_size).unwrap();
1713         let disk_image: Box<dyn DiskFile> = Box::new(f);
1714         assert_eq!(disk_image.get_len().unwrap(), original_size);
1715 
1716         // Create an empty guest memory
1717         let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1718             .expect("Creating guest memory failed.");
1719 
1720         // Create a control tube
1721         let (control_tube, control_tube_device) = Tube::pair().unwrap();
1722 
1723         // Create a BlockAsync to test
1724         let features = base_features(ProtectionType::Unprotected);
1725         let disk_option = DiskOption {
1726             multiple_workers: enables_multiple_workers,
1727             ..Default::default()
1728         };
1729         let mut b = BlockAsync::new(
1730             features,
1731             disk_image.try_clone().unwrap(),
1732             &disk_option,
1733             Some(control_tube_device),
1734             None,
1735             None,
1736         )
1737         .unwrap();
1738 
1739         // activate with queues of an arbitrary size.
1740         let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1741         q0.set_ready(true);
1742         let q0 = q0
1743             .activate(&mem, Event::new().unwrap())
1744             .expect("QueueConfig::activate");
1745 
1746         let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1747         q1.set_ready(true);
1748         let q1 = q1
1749             .activate(&mem, Event::new().unwrap())
1750             .expect("QueueConfig::activate");
1751 
1752         let interrupt = Interrupt::new_for_test();
1753         b.activate(mem, interrupt.clone(), BTreeMap::from([(0, q0), (1, q1)]))
1754             .expect("activate should succeed");
1755 
1756         // assert the original size first
1757         assert_eq!(
1758             b.disk_size.load(Ordering::Acquire),
1759             original_size,
1760             "disk_size should be the original size first"
1761         );
1762         let mut capacity = [0u8; 8];
1763         b.read_config(0, &mut capacity);
1764         assert_eq!(
1765             capacity,
1766             // original_size (0x1000) >> SECTOR_SHIFT (9) = 0x8
1767             [0x8, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00],
1768             "read_config should read the original capacity first"
1769         );
1770 
1771         // assert resize works
1772         control_tube
1773             .send(&DiskControlCommand::Resize {
1774                 new_size: resized_size,
1775             })
1776             .unwrap();
1777         assert_eq!(
1778             control_tube.recv::<DiskControlResult>().unwrap(),
1779             DiskControlResult::Ok,
1780             "resize command should succeed"
1781         );
1782         assert_eq!(
1783             b.disk_size.load(Ordering::Acquire),
1784             resized_size,
1785             "disk_size should be resized to the new size"
1786         );
1787         assert_eq!(
1788             disk_image.get_len().unwrap(),
1789             resized_size,
1790             "underlying disk image should be resized to the new size"
1791         );
1792         let mut capacity = [0u8; 8];
1793         b.read_config(0, &mut capacity);
1794         assert_eq!(
1795             capacity,
1796             // resized_size (0x2000) >> SECTOR_SHIFT (9) = 0x10
1797             [0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00],
1798             "read_config should read the resized capacity"
1799         );
1800         assert_eq!(
1801             interrupt
1802                     .get_interrupt_evt()
1803                     // Wait a bit until the blk signals the interrupt
1804                     .wait_timeout(Duration::from_millis(300)),
1805             Ok(base::EventWaitResult::Signaled),
1806             "interrupt should be signaled"
1807         );
1808         assert_eq!(
1809             interrupt.read_interrupt_status(),
1810             crate::virtio::INTERRUPT_STATUS_CONFIG_CHANGED as u8,
1811             "INTERRUPT_STATUS_CONFIG_CHANGED should be signaled"
1812         );
1813     }
1814 
1815     // TODO(b/270225199): enable this test on Windows once IoSource::into_source is implemented,
1816     // or after finding a good way to prevent BlockAsync::drop() from panicking due to that.
1817     #[cfg(any(target_os = "android", target_os = "linux"))]
1818     #[test]
run_worker_threads()1819     fn run_worker_threads() {
1820         // Create an empty duplicable disk image
1821         let f = tempfile().unwrap();
1822         f.set_len(0x1000).unwrap();
1823         let disk_image: Box<dyn DiskFile> = Box::new(f);
1824 
1825         // Create an empty guest memory
1826         let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1827             .expect("Creating guest memory failed.");
1828 
1829         // Create a BlockAsync to test with single worker thread
1830         let features = base_features(ProtectionType::Unprotected);
1831         let disk_option = DiskOption::default();
1832         let mut b = BlockAsync::new(
1833             features,
1834             disk_image.try_clone().unwrap(),
1835             &disk_option,
1836             None,
1837             None,
1838             None,
1839         )
1840         .unwrap();
1841 
1842         // activate with queues of an arbitrary size.
1843         let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1844         q0.set_ready(true);
1845         let q0 = q0
1846             .activate(&mem, Event::new().unwrap())
1847             .expect("QueueConfig::activate");
1848 
1849         let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1850         q1.set_ready(true);
1851         let q1 = q1
1852             .activate(&mem, Event::new().unwrap())
1853             .expect("QueueConfig::activate");
1854 
1855         b.activate(
1856             mem.clone(),
1857             Interrupt::new_for_test(),
1858             BTreeMap::from([(0, q0), (1, q1)]),
1859         )
1860         .expect("activate should succeed");
1861 
1862         assert_eq!(b.worker_threads.len(), 1, "1 threads should be spawned.");
1863         drop(b);
1864 
1865         // Create a BlockAsync to test with multiple worker threads
1866         let features = base_features(ProtectionType::Unprotected);
1867         let disk_option = DiskOption {
1868             read_only: true,
1869             sparse: false,
1870             multiple_workers: true,
1871             ..DiskOption::default()
1872         };
1873         let mut b = BlockAsync::new(features, disk_image, &disk_option, None, None, None).unwrap();
1874 
1875         // activate should succeed
1876         let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1877         q0.set_ready(true);
1878         let q0 = q0
1879             .activate(&mem, Event::new().unwrap())
1880             .expect("QueueConfig::activate");
1881 
1882         let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1883         q1.set_ready(true);
1884         let q1 = q1
1885             .activate(&mem, Event::new().unwrap())
1886             .expect("QueueConfig::activate");
1887 
1888         b.activate(
1889             mem,
1890             Interrupt::new_for_test(),
1891             BTreeMap::from([(0, q0), (1, q1)]),
1892         )
1893         .expect("activate should succeed");
1894 
1895         assert_eq!(b.worker_threads.len(), 2, "2 threads should be spawned.");
1896     }
1897 
1898     struct BlockContext {}
1899 
modify_device(_block_context: &mut BlockContext, b: &mut BlockAsync)1900     fn modify_device(_block_context: &mut BlockContext, b: &mut BlockAsync) {
1901         b.avail_features = !b.avail_features;
1902     }
1903 
create_device() -> (BlockContext, BlockAsync)1904     fn create_device() -> (BlockContext, BlockAsync) {
1905         // Create an empty disk image
1906         let f = tempfile().unwrap();
1907         f.set_len(0x1000).unwrap();
1908         let disk_image: Box<dyn DiskFile> = Box::new(f);
1909 
1910         // Create a BlockAsync to test
1911         let features = base_features(ProtectionType::Unprotected);
1912         let id = b"Block serial number\0";
1913         let disk_option = DiskOption {
1914             read_only: true,
1915             id: Some(*id),
1916             sparse: false,
1917             multiple_workers: true,
1918             ..Default::default()
1919         };
1920         (
1921             BlockContext {},
1922             BlockAsync::new(
1923                 features,
1924                 disk_image.try_clone().unwrap(),
1925                 &disk_option,
1926                 None,
1927                 None,
1928                 None,
1929             )
1930             .unwrap(),
1931         )
1932     }
1933 
1934     #[cfg(any(target_os = "android", target_os = "linux"))]
1935     suspendable_virtio_tests!(asyncblock, create_device, 2, modify_device);
1936 }
1937