• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::cell::RefCell;
6 use std::collections::BTreeMap;
7 use std::collections::BTreeSet;
8 use std::io;
9 use std::io::Write;
10 use std::mem::size_of;
11 #[cfg(windows)]
12 use std::num::NonZeroU32;
13 use std::rc::Rc;
14 use std::result;
15 use std::sync::atomic::AtomicU64;
16 use std::sync::atomic::Ordering;
17 use std::sync::Arc;
18 use std::time::Duration;
19 
20 use anyhow::Context;
21 use base::debug;
22 use base::error;
23 use base::info;
24 use base::warn;
25 use base::AsRawDescriptor;
26 use base::Error as SysError;
27 use base::Event;
28 use base::RawDescriptor;
29 use base::Result as SysResult;
30 use base::Timer;
31 use base::Tube;
32 use base::TubeError;
33 use base::WorkerThread;
34 use cros_async::sync::RwLock as AsyncRwLock;
35 use cros_async::AsyncError;
36 use cros_async::AsyncTube;
37 use cros_async::EventAsync;
38 use cros_async::Executor;
39 use cros_async::ExecutorKind;
40 use cros_async::TimerAsync;
41 use data_model::Le16;
42 use data_model::Le32;
43 use data_model::Le64;
44 use disk::AsyncDisk;
45 use disk::DiskFile;
46 use futures::channel::mpsc;
47 use futures::channel::oneshot;
48 use futures::pin_mut;
49 use futures::stream::FuturesUnordered;
50 use futures::stream::StreamExt;
51 use futures::FutureExt;
52 use remain::sorted;
53 use snapshot::AnySnapshot;
54 use thiserror::Error as ThisError;
55 use virtio_sys::virtio_config::VIRTIO_F_RING_PACKED;
56 use vm_control::DiskControlCommand;
57 use vm_control::DiskControlResult;
58 use vm_memory::GuestMemory;
59 use zerocopy::IntoBytes;
60 
61 use crate::virtio::async_utils;
62 use crate::virtio::block::sys::*;
63 use crate::virtio::block::DiskOption;
64 use crate::virtio::copy_config;
65 use crate::virtio::device_constants::block::virtio_blk_config;
66 use crate::virtio::device_constants::block::virtio_blk_discard_write_zeroes;
67 use crate::virtio::device_constants::block::virtio_blk_req_header;
68 use crate::virtio::device_constants::block::VIRTIO_BLK_DISCARD_WRITE_ZEROES_FLAG_UNMAP;
69 use crate::virtio::device_constants::block::VIRTIO_BLK_F_BLK_SIZE;
70 use crate::virtio::device_constants::block::VIRTIO_BLK_F_DISCARD;
71 use crate::virtio::device_constants::block::VIRTIO_BLK_F_FLUSH;
72 use crate::virtio::device_constants::block::VIRTIO_BLK_F_MQ;
73 use crate::virtio::device_constants::block::VIRTIO_BLK_F_RO;
74 use crate::virtio::device_constants::block::VIRTIO_BLK_F_SEG_MAX;
75 use crate::virtio::device_constants::block::VIRTIO_BLK_F_WRITE_ZEROES;
76 use crate::virtio::device_constants::block::VIRTIO_BLK_S_IOERR;
77 use crate::virtio::device_constants::block::VIRTIO_BLK_S_OK;
78 use crate::virtio::device_constants::block::VIRTIO_BLK_S_UNSUPP;
79 use crate::virtio::device_constants::block::VIRTIO_BLK_T_DISCARD;
80 use crate::virtio::device_constants::block::VIRTIO_BLK_T_FLUSH;
81 use crate::virtio::device_constants::block::VIRTIO_BLK_T_GET_ID;
82 use crate::virtio::device_constants::block::VIRTIO_BLK_T_IN;
83 use crate::virtio::device_constants::block::VIRTIO_BLK_T_OUT;
84 use crate::virtio::device_constants::block::VIRTIO_BLK_T_WRITE_ZEROES;
85 use crate::virtio::DescriptorChain;
86 use crate::virtio::DeviceType;
87 use crate::virtio::Interrupt;
88 use crate::virtio::Queue;
89 use crate::virtio::Reader;
90 use crate::virtio::VirtioDevice;
91 use crate::virtio::Writer;
92 use crate::PciAddress;
93 
94 const DEFAULT_QUEUE_SIZE: u16 = 256;
95 const DEFAULT_NUM_QUEUES: u16 = 16;
96 
97 const SECTOR_SHIFT: u8 = 9;
98 const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT;
99 
100 const MAX_DISCARD_SECTORS: u32 = u32::MAX;
101 const MAX_WRITE_ZEROES_SECTORS: u32 = u32::MAX;
102 // Arbitrary limits for number of discard/write zeroes segments.
103 const MAX_DISCARD_SEG: u32 = 32;
104 const MAX_WRITE_ZEROES_SEG: u32 = 32;
105 // Hard-coded to 64 KiB (in 512-byte sectors) for now,
106 // but this should probably be based on cluster size for qcow.
107 const DISCARD_SECTOR_ALIGNMENT: u32 = 128;
108 
109 #[sorted]
110 #[derive(ThisError, Debug)]
111 enum ExecuteError {
112     #[error("failed to copy ID string: {0}")]
113     CopyId(io::Error),
114     #[error("failed to perform discard or write zeroes; sector={sector} num_sectors={num_sectors} flags={flags}; {ioerr:?}")]
115     DiscardWriteZeroes {
116         ioerr: Option<disk::Error>,
117         sector: u64,
118         num_sectors: u32,
119         flags: u32,
120     },
121     #[error("failed to flush: {0}")]
122     Flush(disk::Error),
123     #[error("not enough space in descriptor chain to write status")]
124     MissingStatus,
125     #[error("out of range")]
126     OutOfRange,
127     #[error("failed to read message: {0}")]
128     Read(io::Error),
129     #[error("io error reading {length} bytes from sector {sector}: {desc_error}")]
130     ReadIo {
131         length: usize,
132         sector: u64,
133         desc_error: disk::Error,
134     },
135     #[error("read only; request_type={request_type}")]
136     ReadOnly { request_type: u32 },
137     #[error("failed to recieve command message: {0}")]
138     ReceivingCommand(TubeError),
139     #[error("failed to send command response: {0}")]
140     SendingResponse(TubeError),
141     #[error("couldn't reset the timer: {0}")]
142     TimerReset(base::Error),
143     #[error("unsupported ({0})")]
144     Unsupported(u32),
145     #[error("io error writing {length} bytes from sector {sector}: {desc_error}")]
146     WriteIo {
147         length: usize,
148         sector: u64,
149         desc_error: disk::Error,
150     },
151     #[error("failed to write request status: {0}")]
152     WriteStatus(io::Error),
153 }
154 
155 enum LogLevel {
156     Debug,
157     Error,
158 }
159 
160 impl ExecuteError {
status(&self) -> u8161     fn status(&self) -> u8 {
162         match self {
163             ExecuteError::CopyId(_) => VIRTIO_BLK_S_IOERR,
164             ExecuteError::DiscardWriteZeroes { .. } => VIRTIO_BLK_S_IOERR,
165             ExecuteError::Flush(_) => VIRTIO_BLK_S_IOERR,
166             ExecuteError::MissingStatus => VIRTIO_BLK_S_IOERR,
167             ExecuteError::OutOfRange { .. } => VIRTIO_BLK_S_IOERR,
168             ExecuteError::Read(_) => VIRTIO_BLK_S_IOERR,
169             ExecuteError::ReadIo { .. } => VIRTIO_BLK_S_IOERR,
170             ExecuteError::ReadOnly { .. } => VIRTIO_BLK_S_IOERR,
171             ExecuteError::ReceivingCommand(_) => VIRTIO_BLK_S_IOERR,
172             ExecuteError::SendingResponse(_) => VIRTIO_BLK_S_IOERR,
173             ExecuteError::TimerReset(_) => VIRTIO_BLK_S_IOERR,
174             ExecuteError::WriteIo { .. } => VIRTIO_BLK_S_IOERR,
175             ExecuteError::WriteStatus(_) => VIRTIO_BLK_S_IOERR,
176             ExecuteError::Unsupported(_) => VIRTIO_BLK_S_UNSUPP,
177         }
178     }
179 
log_level(&self) -> LogLevel180     fn log_level(&self) -> LogLevel {
181         match self {
182             // Since there is no feature bit for the guest to detect support for
183             // VIRTIO_BLK_T_GET_ID, the driver has to try executing the request to see if it works.
184             ExecuteError::Unsupported(VIRTIO_BLK_T_GET_ID) => LogLevel::Debug,
185             // Log disk I/O errors at debug level to avoid flooding the logs.
186             ExecuteError::ReadIo { .. }
187             | ExecuteError::WriteIo { .. }
188             | ExecuteError::Flush { .. }
189             | ExecuteError::DiscardWriteZeroes { .. } => LogLevel::Debug,
190             // Log all other failures as errors.
191             _ => LogLevel::Error,
192         }
193     }
194 }
195 
196 /// Errors that happen in block outside of executing a request.
197 /// This includes errors during resize and flush operations.
198 #[sorted]
199 #[derive(ThisError, Debug)]
200 enum ControlError {
201     #[error("failed to fdatasync the disk: {0}")]
202     FdatasyncDisk(disk::Error),
203     #[error("couldn't get a value from a timer for flushing: {0}")]
204     FlushTimer(AsyncError),
205 }
206 
207 /// Maximum length of the virtio-block ID string field.
208 const ID_LEN: usize = 20;
209 
210 /// Virtio block device identifier.
211 /// This is an ASCII string terminated by a \0, unless all 20 bytes are used,
212 /// in which case the \0 terminator is omitted.
213 type BlockId = [u8; ID_LEN];
214 
215 /// Tracks the state of an anynchronous disk.
216 struct DiskState {
217     disk_image: Box<dyn AsyncDisk>,
218     read_only: bool,
219     sparse: bool,
220     id: Option<BlockId>,
221     /// A DiskState is owned by each worker's executor and cannot be shared by workers, thus
222     /// `worker_shared_state` holds the state shared by workers in Arc.
223     worker_shared_state: Arc<AsyncRwLock<WorkerSharedState>>,
224 }
225 
226 /// Disk state which can be modified by other worker threads
227 struct WorkerSharedState {
228     disk_size: Arc<AtomicU64>,
229 }
230 
process_one_request( avail_desc: &mut DescriptorChain, disk_state: &AsyncRwLock<DiskState>, flush_timer: &RefCell<TimerAsync<Timer>>, flush_timer_armed: &RefCell<bool>, ) -> result::Result<usize, ExecuteError>231 async fn process_one_request(
232     avail_desc: &mut DescriptorChain,
233     disk_state: &AsyncRwLock<DiskState>,
234     flush_timer: &RefCell<TimerAsync<Timer>>,
235     flush_timer_armed: &RefCell<bool>,
236 ) -> result::Result<usize, ExecuteError> {
237     let reader = &mut avail_desc.reader;
238     let writer = &mut avail_desc.writer;
239 
240     // The last byte of the buffer is virtio_blk_req::status.
241     // Split it into a separate Writer so that status_writer is the final byte and
242     // the original writer is left with just the actual block I/O data.
243     let available_bytes = writer.available_bytes();
244     let status_offset = available_bytes
245         .checked_sub(1)
246         .ok_or(ExecuteError::MissingStatus)?;
247     let mut status_writer = writer.split_at(status_offset);
248 
249     let status = match BlockAsync::execute_request(
250         reader,
251         writer,
252         disk_state,
253         flush_timer,
254         flush_timer_armed,
255     )
256     .await
257     {
258         Ok(()) => VIRTIO_BLK_S_OK,
259         Err(e) => {
260             match e.log_level() {
261                 LogLevel::Debug => debug!("failed executing disk request: {:#}", e),
262                 LogLevel::Error => error!("failed executing disk request: {:#}", e),
263             }
264             e.status()
265         }
266     };
267 
268     status_writer
269         .write_all(&[status])
270         .map_err(ExecuteError::WriteStatus)?;
271     Ok(available_bytes)
272 }
273 
274 /// Process one descriptor chain asynchronously.
process_one_chain( queue: &RefCell<Queue>, mut avail_desc: DescriptorChain, disk_state: &AsyncRwLock<DiskState>, flush_timer: &RefCell<TimerAsync<Timer>>, flush_timer_armed: &RefCell<bool>, )275 async fn process_one_chain(
276     queue: &RefCell<Queue>,
277     mut avail_desc: DescriptorChain,
278     disk_state: &AsyncRwLock<DiskState>,
279     flush_timer: &RefCell<TimerAsync<Timer>>,
280     flush_timer_armed: &RefCell<bool>,
281 ) {
282     let len = match process_one_request(&mut avail_desc, disk_state, flush_timer, flush_timer_armed)
283         .await
284     {
285         Ok(len) => len,
286         Err(e) => {
287             error!("block: failed to handle request: {:#}", e);
288             0
289         }
290     };
291 
292     let mut queue = queue.borrow_mut();
293     queue.add_used(avail_desc, len as u32);
294     queue.trigger_interrupt();
295 }
296 
297 // There is one async task running `handle_queue` per virtio queue in use.
298 // Receives messages from the guest and queues a task to complete the operations with the async
299 // executor.
handle_queue( disk_state: Rc<AsyncRwLock<DiskState>>, queue: Queue, evt: EventAsync, flush_timer: Rc<RefCell<TimerAsync<Timer>>>, flush_timer_armed: Rc<RefCell<bool>>, mut stop_rx: oneshot::Receiver<()>, ) -> Queue300 async fn handle_queue(
301     disk_state: Rc<AsyncRwLock<DiskState>>,
302     queue: Queue,
303     evt: EventAsync,
304     flush_timer: Rc<RefCell<TimerAsync<Timer>>>,
305     flush_timer_armed: Rc<RefCell<bool>>,
306     mut stop_rx: oneshot::Receiver<()>,
307 ) -> Queue {
308     let queue = RefCell::new(queue);
309     let mut background_tasks = FuturesUnordered::new();
310     let evt_future = evt.next_val().fuse();
311     pin_mut!(evt_future);
312     loop {
313         // Wait for the next signal from `evt` and process `background_tasks` in the meantime.
314         //
315         // NOTE: We can't call `evt.next_val()` directly in the `select!` expression. That would
316         // create a new future each time, which, in the completion-based async backends like
317         // io_uring, means we'd submit a new syscall each time (i.e. a race condition on the
318         // eventfd).
319         futures::select! {
320             _ = background_tasks.next() => continue,
321             res = evt_future => {
322                 evt_future.set(evt.next_val().fuse());
323                 if let Err(e) = res {
324                     error!("Failed to read the next queue event: {:#}", e);
325                     continue;
326                 }
327             }
328             _ = stop_rx => {
329                 // Process all the descriptors we've already popped from the queue so that we leave
330                 // the queue in a consistent state.
331                 background_tasks.collect::<()>().await;
332                 return queue.into_inner();
333             }
334         };
335         while let Some(descriptor_chain) = queue.borrow_mut().pop() {
336             background_tasks.push(process_one_chain(
337                 &queue,
338                 descriptor_chain,
339                 &disk_state,
340                 &flush_timer,
341                 &flush_timer_armed,
342             ));
343         }
344     }
345 }
346 
handle_command_tube( command_tube: &Option<AsyncTube>, interrupt: &RefCell<Option<Interrupt>>, disk_state: Rc<AsyncRwLock<DiskState>>, ) -> Result<(), ExecuteError>347 async fn handle_command_tube(
348     command_tube: &Option<AsyncTube>,
349     interrupt: &RefCell<Option<Interrupt>>,
350     disk_state: Rc<AsyncRwLock<DiskState>>,
351 ) -> Result<(), ExecuteError> {
352     let command_tube = match command_tube {
353         Some(c) => c,
354         None => {
355             futures::future::pending::<()>().await;
356             return Ok(());
357         }
358     };
359     loop {
360         match command_tube.next().await {
361             Ok(command) => {
362                 let resp = match command {
363                     DiskControlCommand::Resize { new_size } => resize(&disk_state, new_size).await,
364                 };
365 
366                 let resp_clone = resp.clone();
367                 command_tube
368                     .send(resp_clone)
369                     .await
370                     .map_err(ExecuteError::SendingResponse)?;
371                 if let DiskControlResult::Ok = resp {
372                     if let Some(interrupt) = &*interrupt.borrow() {
373                         interrupt.signal_config_changed();
374                     }
375                 }
376             }
377             Err(e) => return Err(ExecuteError::ReceivingCommand(e)),
378         }
379     }
380 }
381 
resize(disk_state: &AsyncRwLock<DiskState>, new_size: u64) -> DiskControlResult382 async fn resize(disk_state: &AsyncRwLock<DiskState>, new_size: u64) -> DiskControlResult {
383     // Acquire exclusive, mutable access to the state so the virtqueue task won't be able to read
384     // the state while resizing.
385     let disk_state = disk_state.lock().await;
386     // Prevent any other worker threads won't be able to do IO.
387     let worker_shared_state = Arc::clone(&disk_state.worker_shared_state);
388     let worker_shared_state = worker_shared_state.lock().await;
389 
390     if disk_state.read_only {
391         error!("Attempted to resize read-only block device");
392         return DiskControlResult::Err(SysError::new(libc::EROFS));
393     }
394 
395     info!("Resizing block device to {} bytes", new_size);
396 
397     if let Err(e) = disk_state.disk_image.set_len(new_size) {
398         error!("Resizing disk failed! {:#}", e);
399         return DiskControlResult::Err(SysError::new(libc::EIO));
400     }
401 
402     // Allocate new space if the disk image is not sparse.
403     if !disk_state.sparse {
404         if let Err(e) = disk_state.disk_image.allocate(0, new_size) {
405             error!("Allocating disk space after resize failed! {:#}", e);
406             return DiskControlResult::Err(SysError::new(libc::EIO));
407         }
408     }
409 
410     if let Ok(new_disk_size) = disk_state.disk_image.get_len() {
411         worker_shared_state
412             .disk_size
413             .store(new_disk_size, Ordering::Release);
414     }
415     DiskControlResult::Ok
416 }
417 
418 /// Periodically flushes the disk when the given timer fires.
flush_disk( disk_state: Rc<AsyncRwLock<DiskState>>, timer: TimerAsync<Timer>, armed: Rc<RefCell<bool>>, ) -> Result<(), ControlError>419 async fn flush_disk(
420     disk_state: Rc<AsyncRwLock<DiskState>>,
421     timer: TimerAsync<Timer>,
422     armed: Rc<RefCell<bool>>,
423 ) -> Result<(), ControlError> {
424     loop {
425         timer.wait().await.map_err(ControlError::FlushTimer)?;
426         if !*armed.borrow() {
427             continue;
428         }
429 
430         // Reset armed before calling fdatasync to guarantee that IO requests that started after we
431         // call fdatasync will be committed eventually.
432         *armed.borrow_mut() = false;
433 
434         disk_state
435             .read_lock()
436             .await
437             .disk_image
438             .fdatasync()
439             .await
440             .map_err(ControlError::FdatasyncDisk)?;
441     }
442 }
443 
444 enum WorkerCmd {
445     StartQueue {
446         index: usize,
447         queue: Queue,
448     },
449     StopQueue {
450         index: usize,
451         // Once the queue is stopped, it will be sent back over `response_tx`.
452         // `None` indicates that there was no queue at the given index.
453         response_tx: oneshot::Sender<Option<Queue>>,
454     },
455     // Stop all queues without recovering the queues' state and without completing any queued up
456     // work .
457     AbortQueues {
458         // Once the queues are stopped, a `()` value will be sent back over `response_tx`.
459         response_tx: oneshot::Sender<()>,
460     },
461 }
462 
463 // The main worker thread. Initialized the asynchronous worker tasks and passes them to the executor
464 // to be processed.
465 //
466 // `disk_state` is wrapped by `AsyncRwLock`, which provides both shared and exclusive locks. It's
467 // because the state can be read from the virtqueue task while the control task is processing a
468 // resizing command.
run_worker( ex: &Executor, disk_state: &Rc<AsyncRwLock<DiskState>>, control_tube: &Option<AsyncTube>, mut worker_rx: mpsc::UnboundedReceiver<WorkerCmd>, kill_evt: Event, ) -> anyhow::Result<()>469 async fn run_worker(
470     ex: &Executor,
471     disk_state: &Rc<AsyncRwLock<DiskState>>,
472     control_tube: &Option<AsyncTube>,
473     mut worker_rx: mpsc::UnboundedReceiver<WorkerCmd>,
474     kill_evt: Event,
475 ) -> anyhow::Result<()> {
476     // One flush timer per disk.
477     let timer = Timer::new().expect("Failed to create a timer");
478     let flush_timer_armed = Rc::new(RefCell::new(false));
479 
480     // Handles control requests.
481     let control_interrupt = RefCell::new(None);
482     let control = handle_command_tube(control_tube, &control_interrupt, disk_state.clone()).fuse();
483     pin_mut!(control);
484 
485     // Handle all the queues in one sub-select call.
486     let flush_timer = Rc::new(RefCell::new(
487         TimerAsync::new(
488             // Call try_clone() to share the same underlying FD with the `flush_disk` task.
489             timer.try_clone().expect("Failed to clone flush_timer"),
490             ex,
491         )
492         .expect("Failed to create an async timer"),
493     ));
494 
495     // Flushes the disk periodically.
496     let flush_timer2 = TimerAsync::new(timer, ex).expect("Failed to create an async timer");
497     let disk_flush = flush_disk(disk_state.clone(), flush_timer2, flush_timer_armed.clone()).fuse();
498     pin_mut!(disk_flush);
499 
500     // Exit if the kill event is triggered.
501     let kill = async_utils::await_and_exit(ex, kill_evt).fuse();
502     pin_mut!(kill);
503 
504     // Running queue handlers.
505     let mut queue_handlers = FuturesUnordered::new();
506     // Async stop functions for queue handlers, by queue index.
507     let mut queue_handler_stop_fns = std::collections::BTreeMap::new();
508 
509     loop {
510         futures::select! {
511             _ = queue_handlers.next() => continue,
512             r = disk_flush => return r.context("failed to flush a disk"),
513             r = control => return r.context("failed to handle a control request"),
514             r = kill => return r.context("failed to wait on the kill event"),
515             worker_cmd = worker_rx.next() => {
516                 match worker_cmd {
517                     None => anyhow::bail!("worker control channel unexpectedly closed"),
518                     Some(WorkerCmd::StartQueue{index, queue}) => {
519                         if control_interrupt.borrow().is_none() {
520                             *control_interrupt.borrow_mut() = Some(queue.interrupt().clone());
521                         }
522 
523                         let (tx, rx) = oneshot::channel();
524                         let kick_evt = queue.event().try_clone().expect("Failed to clone queue event");
525                         let (handle_queue_future, remote_handle) = handle_queue(
526                             Rc::clone(disk_state),
527                             queue,
528                             EventAsync::new(kick_evt, ex).expect("Failed to create async event for queue"),
529                             Rc::clone(&flush_timer),
530                             Rc::clone(&flush_timer_armed),
531                             rx,
532                         ).remote_handle();
533                         let old_stop_fn = queue_handler_stop_fns.insert(index, move || {
534                             // Ask the handler to stop.
535                             tx.send(()).unwrap_or_else(|_| panic!("queue handler channel closed early"));
536                             // Wait for its return value.
537                             remote_handle
538                         });
539 
540                         // If there was already a handler for this index, stop it before adding the
541                         // new handler future.
542                         if let Some(stop_fn) = old_stop_fn {
543                             warn!("Starting new queue handler without stopping old handler");
544                             // Unfortunately we can't just do `stop_fn().await` because the actual
545                             // work we are waiting on is in `queue_handlers`. So, run both.
546                             let mut fut = stop_fn().fuse();
547                             loop {
548                                 futures::select! {
549                                     _ = queue_handlers.next() => continue,
550                                     _queue = fut => break,
551                                 }
552                             }
553                         }
554 
555                         queue_handlers.push(handle_queue_future);
556                     }
557                     Some(WorkerCmd::StopQueue{index, response_tx}) => {
558                         match queue_handler_stop_fns.remove(&index) {
559                             Some(stop_fn) => {
560                                 // NOTE: This await is blocking the select loop. If we want to
561                                 // support stopping queues concurrently, then it needs to be moved.
562                                 // For now, keep it simple.
563                                 //
564                                 // Unfortunately we can't just do `stop_fn().await` because the
565                                 // actual work we are waiting on is in `queue_handlers`. So, run
566                                 // both.
567                                 let mut fut = stop_fn().fuse();
568                                 let queue = loop {
569                                     futures::select! {
570                                         _ = queue_handlers.next() => continue,
571                                         queue = fut => break queue,
572                                     }
573                                 };
574 
575                                 // If this is the last queue, drop references to the interrupt so
576                                 // that, when queues are started up again, we'll use the new
577                                 // interrupt passed with the first queue.
578                                 if queue_handlers.is_empty() {
579                                     *control_interrupt.borrow_mut() = None;
580                                 }
581 
582                                 let _ = response_tx.send(Some(queue));
583                             }
584                             None => { let _ = response_tx.send(None); },
585                         }
586 
587                     }
588                     Some(WorkerCmd::AbortQueues{response_tx}) => {
589                         queue_handlers.clear();
590                         queue_handler_stop_fns.clear();
591 
592                         *control_interrupt.borrow_mut() = None;
593 
594                         let _ = response_tx.send(());
595                     }
596                 }
597             }
598         };
599     }
600 }
601 
602 /// Virtio device for exposing block level read/write operations on a host file.
603 pub struct BlockAsync {
604     // We need to make boot_index public bc the field is used by the main crate to determine boot
605     // order
606     boot_index: Option<usize>,
607     // `None` iff `self.worker_per_queue == false` and the worker thread is running.
608     disk_image: Option<Box<dyn DiskFile>>,
609     disk_size: Arc<AtomicU64>,
610     avail_features: u64,
611     read_only: bool,
612     sparse: bool,
613     seg_max: u32,
614     block_size: u32,
615     id: Option<BlockId>,
616     control_tube: Option<Tube>,
617     queue_sizes: Vec<u16>,
618     pub(super) executor_kind: ExecutorKind,
619     // If `worker_per_queue == true`, `worker_threads` contains the worker for each running queue
620     // by index. Otherwise, contains the monolithic worker for all queues at index 0.
621     //
622     // Once a thread is started, we never stop it, except when `BlockAsync` itself is dropped. That
623     // is because we cannot easily convert the `AsyncDisk` back to a `DiskFile` when backed by
624     // Overlapped I/O on Windows because the file becomes permanently associated with the IOCP
625     // instance of the async executor.
626     worker_threads: BTreeMap<usize, (WorkerThread<()>, mpsc::UnboundedSender<WorkerCmd>)>,
627     shared_state: Arc<AsyncRwLock<WorkerSharedState>>,
628     // Whether to run worker threads in parallel for each queue
629     worker_per_queue: bool,
630     // Indices of running queues.
631     // TODO: The worker already tracks this. Only need it here to stop queues on sleep. Maybe add a
632     // worker cmd to stop all at once, then we can delete this field.
633     activated_queues: BTreeSet<usize>,
634     #[cfg(windows)]
635     pub(super) io_concurrency: u32,
636     pci_address: Option<PciAddress>,
637 }
638 
639 impl BlockAsync {
640     /// Create a new virtio block device that operates on the given AsyncDisk.
new( base_features: u64, disk_image: Box<dyn DiskFile>, disk_option: &DiskOption, control_tube: Option<Tube>, queue_size: Option<u16>, num_queues: Option<u16>, ) -> SysResult<BlockAsync>641     pub fn new(
642         base_features: u64,
643         disk_image: Box<dyn DiskFile>,
644         disk_option: &DiskOption,
645         control_tube: Option<Tube>,
646         queue_size: Option<u16>,
647         num_queues: Option<u16>,
648     ) -> SysResult<BlockAsync> {
649         let read_only = disk_option.read_only;
650         let sparse = disk_option.sparse;
651         let block_size = disk_option.block_size;
652         let packed_queue = disk_option.packed_queue;
653         let id = disk_option.id;
654         let mut worker_per_queue = disk_option.multiple_workers;
655         // Automatically disable multiple workers if the disk image can't be cloned.
656         if worker_per_queue && disk_image.try_clone().is_err() {
657             base::warn!("multiple workers requested, but not supported by disk image type");
658             worker_per_queue = false;
659         }
660         let executor_kind = disk_option.async_executor.unwrap_or_default();
661         let boot_index = disk_option.bootindex;
662         #[cfg(windows)]
663         let io_concurrency = disk_option.io_concurrency.get();
664 
665         if block_size % SECTOR_SIZE as u32 != 0 {
666             error!(
667                 "Block size {} is not a multiple of {}.",
668                 block_size, SECTOR_SIZE,
669             );
670             return Err(SysError::new(libc::EINVAL));
671         }
672         let disk_size = disk_image.get_len()?;
673         if disk_size % block_size as u64 != 0 {
674             warn!(
675                 "Disk size {} is not a multiple of block size {}; \
676                  the remainder will not be visible to the guest.",
677                 disk_size, block_size,
678             );
679         }
680         let num_queues = num_queues.unwrap_or(DEFAULT_NUM_QUEUES);
681         let multi_queue = match num_queues {
682             0 => panic!("Number of queues cannot be zero for a block device"),
683             1 => false,
684             _ => true,
685         };
686         let q_size = queue_size.unwrap_or(DEFAULT_QUEUE_SIZE);
687         if !q_size.is_power_of_two() {
688             error!("queue size {} is not a power of 2.", q_size);
689             return Err(SysError::new(libc::EINVAL));
690         }
691         let queue_sizes = vec![q_size; num_queues as usize];
692 
693         let avail_features =
694             Self::build_avail_features(base_features, read_only, sparse, multi_queue, packed_queue);
695 
696         let seg_max = get_seg_max(q_size);
697 
698         let disk_size = Arc::new(AtomicU64::new(disk_size));
699         let shared_state = Arc::new(AsyncRwLock::new(WorkerSharedState {
700             disk_size: disk_size.clone(),
701         }));
702 
703         Ok(BlockAsync {
704             disk_image: Some(disk_image),
705             disk_size,
706             avail_features,
707             read_only,
708             sparse,
709             seg_max,
710             block_size,
711             id,
712             queue_sizes,
713             worker_threads: BTreeMap::new(),
714             shared_state,
715             worker_per_queue,
716             control_tube,
717             executor_kind,
718             activated_queues: BTreeSet::new(),
719             boot_index,
720             #[cfg(windows)]
721             io_concurrency,
722             pci_address: disk_option.pci_address,
723         })
724     }
725 
726     /// Returns the feature flags given the specified attributes.
build_avail_features( base_features: u64, read_only: bool, sparse: bool, multi_queue: bool, packed_queue: bool, ) -> u64727     fn build_avail_features(
728         base_features: u64,
729         read_only: bool,
730         sparse: bool,
731         multi_queue: bool,
732         packed_queue: bool,
733     ) -> u64 {
734         let mut avail_features = base_features;
735         if read_only {
736             avail_features |= 1 << VIRTIO_BLK_F_RO;
737         } else {
738             if sparse {
739                 avail_features |= 1 << VIRTIO_BLK_F_DISCARD;
740             }
741             avail_features |= 1 << VIRTIO_BLK_F_FLUSH;
742             avail_features |= 1 << VIRTIO_BLK_F_WRITE_ZEROES;
743         }
744         avail_features |= 1 << VIRTIO_BLK_F_SEG_MAX;
745         avail_features |= 1 << VIRTIO_BLK_F_BLK_SIZE;
746         if multi_queue {
747             avail_features |= 1 << VIRTIO_BLK_F_MQ;
748         }
749         if packed_queue {
750             avail_features |= 1 << VIRTIO_F_RING_PACKED;
751         }
752         avail_features
753     }
754 
755     // Execute a single block device request.
756     // `writer` includes the data region only; the status byte is not included.
757     // It is up to the caller to convert the result of this function into a status byte
758     // and write it to the expected location in guest memory.
execute_request( reader: &mut Reader, writer: &mut Writer, disk_state: &AsyncRwLock<DiskState>, flush_timer: &RefCell<TimerAsync<Timer>>, flush_timer_armed: &RefCell<bool>, ) -> result::Result<(), ExecuteError>759     async fn execute_request(
760         reader: &mut Reader,
761         writer: &mut Writer,
762         disk_state: &AsyncRwLock<DiskState>,
763         flush_timer: &RefCell<TimerAsync<Timer>>,
764         flush_timer_armed: &RefCell<bool>,
765     ) -> result::Result<(), ExecuteError> {
766         // Acquire immutable access to prevent tasks from resizing disk.
767         let disk_state = disk_state.read_lock().await;
768         // Acquire immutable access to prevent other worker threads from resizing disk.
769         let worker_shared_state = disk_state.worker_shared_state.read_lock().await;
770 
771         let req_header: virtio_blk_req_header = reader.read_obj().map_err(ExecuteError::Read)?;
772 
773         let req_type = req_header.req_type.to_native();
774         let sector = req_header.sector.to_native();
775 
776         if disk_state.read_only && req_type != VIRTIO_BLK_T_IN && req_type != VIRTIO_BLK_T_GET_ID {
777             return Err(ExecuteError::ReadOnly {
778                 request_type: req_type,
779             });
780         }
781 
782         /// Check that a request accesses only data within the disk's current size.
783         /// All parameters are in units of bytes.
784         fn check_range(
785             io_start: u64,
786             io_length: u64,
787             disk_size: u64,
788         ) -> result::Result<(), ExecuteError> {
789             let io_end = io_start
790                 .checked_add(io_length)
791                 .ok_or(ExecuteError::OutOfRange)?;
792             if io_end > disk_size {
793                 Err(ExecuteError::OutOfRange)
794             } else {
795                 Ok(())
796             }
797         }
798 
799         let disk_size = worker_shared_state.disk_size.load(Ordering::Relaxed);
800         match req_type {
801             VIRTIO_BLK_T_IN => {
802                 let data_len = writer.available_bytes();
803                 if data_len == 0 {
804                     return Ok(());
805                 }
806                 let offset = sector
807                     .checked_shl(u32::from(SECTOR_SHIFT))
808                     .ok_or(ExecuteError::OutOfRange)?;
809                 check_range(offset, data_len as u64, disk_size)?;
810                 let disk_image = &disk_state.disk_image;
811                 writer
812                     .write_all_from_at_fut(&**disk_image, data_len, offset)
813                     .await
814                     .map_err(|desc_error| ExecuteError::ReadIo {
815                         length: data_len,
816                         sector,
817                         desc_error,
818                     })?;
819             }
820             VIRTIO_BLK_T_OUT => {
821                 let data_len = reader.available_bytes();
822                 if data_len == 0 {
823                     return Ok(());
824                 }
825                 let offset = sector
826                     .checked_shl(u32::from(SECTOR_SHIFT))
827                     .ok_or(ExecuteError::OutOfRange)?;
828                 check_range(offset, data_len as u64, disk_size)?;
829                 let disk_image = &disk_state.disk_image;
830                 reader
831                     .read_exact_to_at_fut(&**disk_image, data_len, offset)
832                     .await
833                     .map_err(|desc_error| ExecuteError::WriteIo {
834                         length: data_len,
835                         sector,
836                         desc_error,
837                     })?;
838 
839                 if !*flush_timer_armed.borrow() {
840                     *flush_timer_armed.borrow_mut() = true;
841 
842                     let flush_delay = Duration::from_secs(60);
843                     flush_timer
844                         .borrow_mut()
845                         .reset_oneshot(flush_delay)
846                         .map_err(ExecuteError::TimerReset)?;
847                 }
848             }
849             VIRTIO_BLK_T_DISCARD | VIRTIO_BLK_T_WRITE_ZEROES => {
850                 if req_type == VIRTIO_BLK_T_DISCARD && !disk_state.sparse {
851                     // Discard is a hint; if this is a non-sparse disk, just ignore it.
852                     return Ok(());
853                 }
854 
855                 while reader.available_bytes() >= size_of::<virtio_blk_discard_write_zeroes>() {
856                     let seg: virtio_blk_discard_write_zeroes =
857                         reader.read_obj().map_err(ExecuteError::Read)?;
858 
859                     let sector = seg.sector.to_native();
860                     let num_sectors = seg.num_sectors.to_native();
861                     let flags = seg.flags.to_native();
862 
863                     let valid_flags = if req_type == VIRTIO_BLK_T_WRITE_ZEROES {
864                         VIRTIO_BLK_DISCARD_WRITE_ZEROES_FLAG_UNMAP
865                     } else {
866                         0
867                     };
868 
869                     if (flags & !valid_flags) != 0 {
870                         return Err(ExecuteError::DiscardWriteZeroes {
871                             ioerr: None,
872                             sector,
873                             num_sectors,
874                             flags,
875                         });
876                     }
877 
878                     let offset = sector
879                         .checked_shl(u32::from(SECTOR_SHIFT))
880                         .ok_or(ExecuteError::OutOfRange)?;
881                     let length = u64::from(num_sectors)
882                         .checked_shl(u32::from(SECTOR_SHIFT))
883                         .ok_or(ExecuteError::OutOfRange)?;
884                     check_range(offset, length, disk_size)?;
885 
886                     if req_type == VIRTIO_BLK_T_DISCARD {
887                         // Since Discard is just a hint and some filesystems may not implement
888                         // FALLOC_FL_PUNCH_HOLE, ignore punch_hole errors.
889                         let _ = disk_state.disk_image.punch_hole(offset, length).await;
890                     } else {
891                         disk_state
892                             .disk_image
893                             .write_zeroes_at(offset, length)
894                             .await
895                             .map_err(|e| ExecuteError::DiscardWriteZeroes {
896                                 ioerr: Some(e),
897                                 sector,
898                                 num_sectors,
899                                 flags,
900                             })?;
901                     }
902                 }
903             }
904             VIRTIO_BLK_T_FLUSH => {
905                 disk_state
906                     .disk_image
907                     .fdatasync()
908                     .await
909                     .map_err(ExecuteError::Flush)?;
910 
911                 if *flush_timer_armed.borrow() {
912                     flush_timer
913                         .borrow_mut()
914                         .clear()
915                         .map_err(ExecuteError::TimerReset)?;
916                     *flush_timer_armed.borrow_mut() = false;
917                 }
918             }
919             VIRTIO_BLK_T_GET_ID => {
920                 if let Some(id) = disk_state.id {
921                     writer.write_all(&id).map_err(ExecuteError::CopyId)?;
922                 } else {
923                     return Err(ExecuteError::Unsupported(req_type));
924                 }
925             }
926             t => return Err(ExecuteError::Unsupported(t)),
927         };
928         Ok(())
929     }
930 
931     /// Builds and returns the config structure used to specify block features.
build_config_space( disk_size: u64, seg_max: u32, block_size: u32, num_queues: u16, ) -> virtio_blk_config932     fn build_config_space(
933         disk_size: u64,
934         seg_max: u32,
935         block_size: u32,
936         num_queues: u16,
937     ) -> virtio_blk_config {
938         virtio_blk_config {
939             // If the image is not a multiple of the sector size, the tail bits are not exposed.
940             capacity: Le64::from(disk_size >> SECTOR_SHIFT),
941             seg_max: Le32::from(seg_max),
942             blk_size: Le32::from(block_size),
943             num_queues: Le16::from(num_queues),
944             max_discard_sectors: Le32::from(MAX_DISCARD_SECTORS),
945             discard_sector_alignment: Le32::from(DISCARD_SECTOR_ALIGNMENT),
946             max_write_zeroes_sectors: Le32::from(MAX_WRITE_ZEROES_SECTORS),
947             write_zeroes_may_unmap: 1,
948             max_discard_seg: Le32::from(MAX_DISCARD_SEG),
949             max_write_zeroes_seg: Le32::from(MAX_WRITE_ZEROES_SEG),
950             ..Default::default()
951         }
952     }
953 
954     /// Get the worker for a queue, starting it if necessary.
955     // NOTE: Can't use `BTreeMap::entry` because it requires an exclusive ref for the whole branch.
956     #[allow(clippy::map_entry)]
start_worker( &mut self, idx: usize, ) -> anyhow::Result<&(WorkerThread<()>, mpsc::UnboundedSender<WorkerCmd>)>957     fn start_worker(
958         &mut self,
959         idx: usize,
960     ) -> anyhow::Result<&(WorkerThread<()>, mpsc::UnboundedSender<WorkerCmd>)> {
961         let key = if self.worker_per_queue { idx } else { 0 };
962         if self.worker_threads.contains_key(&key) {
963             return Ok(self.worker_threads.get(&key).unwrap());
964         }
965 
966         let ex = self.create_executor();
967         let control_tube = self.control_tube.take();
968         let disk_image = if self.worker_per_queue {
969             self.disk_image
970                 .as_ref()
971                 .context("Failed to ref a disk image")?
972                 .try_clone()
973                 .context("Failed to clone a disk image")?
974         } else {
975             self.disk_image
976                 .take()
977                 .context("Failed to take a disk image")?
978         };
979         let read_only = self.read_only;
980         let sparse = self.sparse;
981         let id = self.id;
982         let worker_shared_state = self.shared_state.clone();
983 
984         let (worker_tx, worker_rx) = mpsc::unbounded();
985         let worker_thread = WorkerThread::start("virtio_blk", move |kill_evt| {
986             let async_control =
987                 control_tube.map(|c| AsyncTube::new(&ex, c).expect("failed to create async tube"));
988 
989             let async_image = match disk_image.to_async_disk(&ex) {
990                 Ok(d) => d,
991                 Err(e) => panic!("Failed to create async disk {:#}", e),
992             };
993 
994             let disk_state = Rc::new(AsyncRwLock::new(DiskState {
995                 disk_image: async_image,
996                 read_only,
997                 sparse,
998                 id,
999                 worker_shared_state,
1000             }));
1001 
1002             if let Err(err_string) = ex
1003                 .run_until(async {
1004                     let r = run_worker(&ex, &disk_state, &async_control, worker_rx, kill_evt).await;
1005                     // Flush any in-memory disk image state to file.
1006                     if let Err(e) = disk_state.lock().await.disk_image.flush().await {
1007                         error!("failed to flush disk image when stopping worker: {e:?}");
1008                     }
1009                     r
1010                 })
1011                 .expect("run_until failed")
1012             {
1013                 error!("{:#}", err_string);
1014             }
1015         });
1016         match self.worker_threads.entry(key) {
1017             std::collections::btree_map::Entry::Occupied(_) => unreachable!(),
1018             std::collections::btree_map::Entry::Vacant(e) => {
1019                 Ok(e.insert((worker_thread, worker_tx)))
1020             }
1021         }
1022     }
1023 
start_queue( &mut self, idx: usize, queue: Queue, _mem: GuestMemory, ) -> anyhow::Result<()>1024     pub fn start_queue(
1025         &mut self,
1026         idx: usize,
1027         queue: Queue,
1028         _mem: GuestMemory,
1029     ) -> anyhow::Result<()> {
1030         let (_, worker_tx) = self.start_worker(idx)?;
1031         worker_tx
1032             .unbounded_send(WorkerCmd::StartQueue { index: idx, queue })
1033             .expect("worker channel closed early");
1034         self.activated_queues.insert(idx);
1035         Ok(())
1036     }
1037 
stop_queue(&mut self, idx: usize) -> anyhow::Result<Queue>1038     pub fn stop_queue(&mut self, idx: usize) -> anyhow::Result<Queue> {
1039         // TODO: Consider stopping the worker thread if this is the last queue managed by it. Then,
1040         // simplify `virtio_sleep` and/or `reset` methods.
1041         let (_, worker_tx) = self
1042             .worker_threads
1043             .get(if self.worker_per_queue { &idx } else { &0 })
1044             .context("worker not found")?;
1045         let (response_tx, response_rx) = oneshot::channel();
1046         worker_tx
1047             .unbounded_send(WorkerCmd::StopQueue {
1048                 index: idx,
1049                 response_tx,
1050             })
1051             .expect("worker channel closed early");
1052         let queue = cros_async::block_on(async {
1053             response_rx
1054                 .await
1055                 .expect("response_rx closed early")
1056                 .context("queue not found")
1057         })?;
1058         self.activated_queues.remove(&idx);
1059         Ok(queue)
1060     }
1061 }
1062 
1063 impl VirtioDevice for BlockAsync {
keep_rds(&self) -> Vec<RawDescriptor>1064     fn keep_rds(&self) -> Vec<RawDescriptor> {
1065         let mut keep_rds = Vec::new();
1066 
1067         if let Some(disk_image) = &self.disk_image {
1068             keep_rds.extend(disk_image.as_raw_descriptors());
1069         }
1070 
1071         if let Some(control_tube) = &self.control_tube {
1072             keep_rds.push(control_tube.as_raw_descriptor());
1073         }
1074 
1075         keep_rds
1076     }
1077 
features(&self) -> u641078     fn features(&self) -> u64 {
1079         self.avail_features
1080     }
1081 
device_type(&self) -> DeviceType1082     fn device_type(&self) -> DeviceType {
1083         DeviceType::Block
1084     }
1085 
queue_max_sizes(&self) -> &[u16]1086     fn queue_max_sizes(&self) -> &[u16] {
1087         &self.queue_sizes
1088     }
1089 
read_config(&self, offset: u64, data: &mut [u8])1090     fn read_config(&self, offset: u64, data: &mut [u8]) {
1091         let config_space = {
1092             let disk_size = self.disk_size.load(Ordering::Acquire);
1093             Self::build_config_space(
1094                 disk_size,
1095                 self.seg_max,
1096                 self.block_size,
1097                 self.queue_sizes.len() as u16,
1098             )
1099         };
1100         copy_config(data, 0, config_space.as_bytes(), offset);
1101     }
1102 
activate( &mut self, mem: GuestMemory, _interrupt: Interrupt, queues: BTreeMap<usize, Queue>, ) -> anyhow::Result<()>1103     fn activate(
1104         &mut self,
1105         mem: GuestMemory,
1106         _interrupt: Interrupt,
1107         queues: BTreeMap<usize, Queue>,
1108     ) -> anyhow::Result<()> {
1109         for (i, q) in queues {
1110             self.start_queue(i, q, mem.clone())?;
1111         }
1112         Ok(())
1113     }
1114 
reset(&mut self) -> anyhow::Result<()>1115     fn reset(&mut self) -> anyhow::Result<()> {
1116         for (_, (_, worker_tx)) in self.worker_threads.iter_mut() {
1117             let (response_tx, response_rx) = oneshot::channel();
1118             worker_tx
1119                 .unbounded_send(WorkerCmd::AbortQueues { response_tx })
1120                 .expect("worker channel closed early");
1121             cros_async::block_on(async { response_rx.await.expect("response_rx closed early") });
1122         }
1123         self.activated_queues.clear();
1124         Ok(())
1125     }
1126 
virtio_sleep(&mut self) -> anyhow::Result<Option<BTreeMap<usize, Queue>>>1127     fn virtio_sleep(&mut self) -> anyhow::Result<Option<BTreeMap<usize, Queue>>> {
1128         // Reclaim the queues from workers.
1129         let mut queues = BTreeMap::new();
1130         for index in self.activated_queues.clone() {
1131             queues.insert(index, self.stop_queue(index)?);
1132         }
1133         if queues.is_empty() {
1134             return Ok(None); // Not activated.
1135         }
1136         Ok(Some(queues))
1137     }
1138 
virtio_wake( &mut self, queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>, ) -> anyhow::Result<()>1139     fn virtio_wake(
1140         &mut self,
1141         queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>,
1142     ) -> anyhow::Result<()> {
1143         if let Some((mem, _interrupt, queues)) = queues_state {
1144             for (i, q) in queues {
1145                 self.start_queue(i, q, mem.clone())?
1146             }
1147         }
1148         Ok(())
1149     }
1150 
virtio_snapshot(&mut self) -> anyhow::Result<AnySnapshot>1151     fn virtio_snapshot(&mut self) -> anyhow::Result<AnySnapshot> {
1152         // `virtio_sleep` ensures there is no pending state, except for the `Queue`s, which are
1153         // handled at a higher layer.
1154         AnySnapshot::to_any(())
1155     }
1156 
virtio_restore(&mut self, data: AnySnapshot) -> anyhow::Result<()>1157     fn virtio_restore(&mut self, data: AnySnapshot) -> anyhow::Result<()> {
1158         let () = AnySnapshot::from_any(data)?;
1159         Ok(())
1160     }
1161 
pci_address(&self) -> Option<PciAddress>1162     fn pci_address(&self) -> Option<PciAddress> {
1163         self.pci_address
1164     }
1165 
bootorder_fw_cfg(&self, pci_slot: u8) -> Option<(Vec<u8>, usize)>1166     fn bootorder_fw_cfg(&self, pci_slot: u8) -> Option<(Vec<u8>, usize)> {
1167         self.boot_index
1168             .map(|s| (format!("scsi@{}/disk@0,0", pci_slot).as_bytes().to_vec(), s))
1169     }
1170 }
1171 
1172 #[cfg(test)]
1173 mod tests {
1174     use std::fs::File;
1175     use std::mem::size_of_val;
1176     use std::sync::atomic::AtomicU64;
1177 
1178     use data_model::Le32;
1179     use data_model::Le64;
1180     use disk::SingleFileDisk;
1181     use hypervisor::ProtectionType;
1182     use tempfile::tempfile;
1183     use tempfile::TempDir;
1184     use vm_memory::GuestAddress;
1185 
1186     use super::*;
1187     use crate::suspendable_virtio_tests;
1188     use crate::virtio::base_features;
1189     use crate::virtio::descriptor_utils::create_descriptor_chain;
1190     use crate::virtio::descriptor_utils::DescriptorType;
1191     use crate::virtio::QueueConfig;
1192 
1193     #[test]
read_size()1194     fn read_size() {
1195         let f = tempfile().unwrap();
1196         f.set_len(0x1000).unwrap();
1197 
1198         let features = base_features(ProtectionType::Unprotected);
1199         let disk_option = DiskOption::default();
1200         let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1201         let mut num_sectors = [0u8; 4];
1202         b.read_config(0, &mut num_sectors);
1203         // size is 0x1000, so num_sectors is 8 (4096/512).
1204         assert_eq!([0x08, 0x00, 0x00, 0x00], num_sectors);
1205         let mut msw_sectors = [0u8; 4];
1206         b.read_config(4, &mut msw_sectors);
1207         // size is 0x1000, so msw_sectors is 0.
1208         assert_eq!([0x00, 0x00, 0x00, 0x00], msw_sectors);
1209     }
1210 
1211     #[test]
read_block_size()1212     fn read_block_size() {
1213         let f = tempfile().unwrap();
1214         f.set_len(0x1000).unwrap();
1215 
1216         let features = base_features(ProtectionType::Unprotected);
1217         let disk_option = DiskOption {
1218             block_size: 4096,
1219             sparse: false,
1220             ..Default::default()
1221         };
1222         let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1223         let mut blk_size = [0u8; 4];
1224         b.read_config(20, &mut blk_size);
1225         // blk_size should be 4096 (0x1000).
1226         assert_eq!([0x00, 0x10, 0x00, 0x00], blk_size);
1227     }
1228 
1229     #[test]
read_features()1230     fn read_features() {
1231         let tempdir = TempDir::new().unwrap();
1232         let mut path = tempdir.path().to_owned();
1233         path.push("disk_image");
1234 
1235         // Feature bits 0-23 and 50-127 are specific for the device type, but
1236         // at the moment crosvm only supports 64 bits of feature bits.
1237         const DEVICE_FEATURE_BITS: u64 = 0xffffff;
1238 
1239         // read-write block device
1240         {
1241             let f = File::create(&path).unwrap();
1242             let features = base_features(ProtectionType::Unprotected);
1243             let disk_option = DiskOption::default();
1244             let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1245             // writable device should set VIRTIO_BLK_F_FLUSH + VIRTIO_BLK_F_DISCARD
1246             // + VIRTIO_BLK_F_WRITE_ZEROES + VIRTIO_BLK_F_BLK_SIZE + VIRTIO_BLK_F_SEG_MAX
1247             // + VIRTIO_BLK_F_MQ
1248             assert_eq!(0x7244, b.features() & DEVICE_FEATURE_BITS);
1249         }
1250 
1251         // read-write block device, non-sparse
1252         {
1253             let f = File::create(&path).unwrap();
1254             let features = base_features(ProtectionType::Unprotected);
1255             let disk_option = DiskOption {
1256                 sparse: false,
1257                 ..Default::default()
1258             };
1259             let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1260             // writable device should set VIRTIO_F_FLUSH + VIRTIO_BLK_F_RO
1261             // + VIRTIO_BLK_F_BLK_SIZE + VIRTIO_BLK_F_SEG_MAX + VIRTIO_BLK_F_MQ
1262             assert_eq!(0x5244, b.features() & DEVICE_FEATURE_BITS);
1263         }
1264 
1265         // read-only block device
1266         {
1267             let f = File::create(&path).unwrap();
1268             let features = base_features(ProtectionType::Unprotected);
1269             let disk_option = DiskOption {
1270                 read_only: true,
1271                 ..Default::default()
1272             };
1273             let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1274             // read-only device should set VIRTIO_BLK_F_RO
1275             // + VIRTIO_BLK_F_BLK_SIZE + VIRTIO_BLK_F_SEG_MAX + VIRTIO_BLK_F_MQ
1276             assert_eq!(0x1064, b.features() & DEVICE_FEATURE_BITS);
1277         }
1278     }
1279 
1280     #[test]
check_pci_adress_configurability()1281     fn check_pci_adress_configurability() {
1282         let f = tempfile().unwrap();
1283 
1284         let features = base_features(ProtectionType::Unprotected);
1285         let disk_option = DiskOption {
1286             pci_address: Some(PciAddress {
1287                 bus: 0,
1288                 dev: 1,
1289                 func: 1,
1290             }),
1291             ..Default::default()
1292         };
1293         let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1294 
1295         assert_eq!(b.pci_address(), disk_option.pci_address);
1296     }
1297 
1298     #[test]
check_runtime_blk_queue_configurability()1299     fn check_runtime_blk_queue_configurability() {
1300         let tempdir = TempDir::new().unwrap();
1301         let mut path = tempdir.path().to_owned();
1302         path.push("disk_image");
1303         let features = base_features(ProtectionType::Unprotected);
1304 
1305         // Default case
1306         let f = File::create(&path).unwrap();
1307         let disk_option = DiskOption::default();
1308         let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1309         assert_eq!(
1310             [DEFAULT_QUEUE_SIZE; DEFAULT_NUM_QUEUES as usize],
1311             b.queue_max_sizes()
1312         );
1313 
1314         // Single queue of size 128
1315         let f = File::create(&path).unwrap();
1316         let disk_option = DiskOption::default();
1317         let b = BlockAsync::new(
1318             features,
1319             Box::new(f),
1320             &disk_option,
1321             None,
1322             Some(128),
1323             Some(1),
1324         )
1325         .unwrap();
1326         assert_eq!([128; 1], b.queue_max_sizes());
1327         // Single queue device should not set VIRTIO_BLK_F_MQ
1328         assert_eq!(0, b.features() & (1 << VIRTIO_BLK_F_MQ) as u64);
1329     }
1330 
1331     #[test]
read_last_sector()1332     fn read_last_sector() {
1333         let ex = Executor::new().expect("creating an executor failed");
1334 
1335         let f = tempfile().unwrap();
1336         let disk_size = 0x1000;
1337         f.set_len(disk_size).unwrap();
1338         let af = SingleFileDisk::new(f, &ex).expect("Failed to create SFD");
1339 
1340         let mem = Rc::new(
1341             GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1342                 .expect("Creating guest memory failed."),
1343         );
1344 
1345         let req_hdr = virtio_blk_req_header {
1346             req_type: Le32::from(VIRTIO_BLK_T_IN),
1347             reserved: Le32::from(0),
1348             sector: Le64::from(7), // Disk is 8 sectors long, so this is the last valid sector.
1349         };
1350         mem.write_obj_at_addr(req_hdr, GuestAddress(0x1000))
1351             .expect("writing req failed");
1352 
1353         let mut avail_desc = create_descriptor_chain(
1354             &mem,
1355             GuestAddress(0x100),  // Place descriptor chain at 0x100.
1356             GuestAddress(0x1000), // Describe buffer at 0x1000.
1357             vec![
1358                 // Request header
1359                 (DescriptorType::Readable, size_of_val(&req_hdr) as u32),
1360                 // I/O buffer (1 sector of data)
1361                 (DescriptorType::Writable, 512),
1362                 // Request status
1363                 (DescriptorType::Writable, 1),
1364             ],
1365             0,
1366         )
1367         .expect("create_descriptor_chain failed");
1368 
1369         let timer = Timer::new().expect("Failed to create a timer");
1370         let flush_timer = Rc::new(RefCell::new(
1371             TimerAsync::new(timer, &ex).expect("Failed to create an async timer"),
1372         ));
1373         let flush_timer_armed = Rc::new(RefCell::new(false));
1374 
1375         let disk_state = Rc::new(AsyncRwLock::new(DiskState {
1376             disk_image: Box::new(af),
1377             read_only: false,
1378             sparse: true,
1379             id: None,
1380             worker_shared_state: Arc::new(AsyncRwLock::new(WorkerSharedState {
1381                 disk_size: Arc::new(AtomicU64::new(disk_size)),
1382             })),
1383         }));
1384 
1385         let fut = process_one_request(
1386             &mut avail_desc,
1387             &disk_state,
1388             &flush_timer,
1389             &flush_timer_armed,
1390         );
1391 
1392         ex.run_until(fut)
1393             .expect("running executor failed")
1394             .expect("execute failed");
1395 
1396         let status_offset = GuestAddress((0x1000 + size_of_val(&req_hdr) + 512) as u64);
1397         let status = mem.read_obj_from_addr::<u8>(status_offset).unwrap();
1398         assert_eq!(status, VIRTIO_BLK_S_OK);
1399     }
1400 
1401     #[test]
read_beyond_last_sector()1402     fn read_beyond_last_sector() {
1403         let f = tempfile().unwrap();
1404         let disk_size = 0x1000;
1405         f.set_len(disk_size).unwrap();
1406         let mem = Rc::new(
1407             GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1408                 .expect("Creating guest memory failed."),
1409         );
1410 
1411         let req_hdr = virtio_blk_req_header {
1412             req_type: Le32::from(VIRTIO_BLK_T_IN),
1413             reserved: Le32::from(0),
1414             sector: Le64::from(7), // Disk is 8 sectors long, so this is the last valid sector.
1415         };
1416         mem.write_obj_at_addr(req_hdr, GuestAddress(0x1000))
1417             .expect("writing req failed");
1418 
1419         let mut avail_desc = create_descriptor_chain(
1420             &mem,
1421             GuestAddress(0x100),  // Place descriptor chain at 0x100.
1422             GuestAddress(0x1000), // Describe buffer at 0x1000.
1423             vec![
1424                 // Request header
1425                 (DescriptorType::Readable, size_of_val(&req_hdr) as u32),
1426                 // I/O buffer (2 sectors of data - overlap the end of the disk).
1427                 (DescriptorType::Writable, 512 * 2),
1428                 // Request status
1429                 (DescriptorType::Writable, 1),
1430             ],
1431             0,
1432         )
1433         .expect("create_descriptor_chain failed");
1434 
1435         let ex = Executor::new().expect("creating an executor failed");
1436 
1437         let af = SingleFileDisk::new(f, &ex).expect("Failed to create SFD");
1438         let timer = Timer::new().expect("Failed to create a timer");
1439         let flush_timer = Rc::new(RefCell::new(
1440             TimerAsync::new(timer, &ex).expect("Failed to create an async timer"),
1441         ));
1442         let flush_timer_armed = Rc::new(RefCell::new(false));
1443         let disk_state = Rc::new(AsyncRwLock::new(DiskState {
1444             disk_image: Box::new(af),
1445             read_only: false,
1446             sparse: true,
1447             id: None,
1448             worker_shared_state: Arc::new(AsyncRwLock::new(WorkerSharedState {
1449                 disk_size: Arc::new(AtomicU64::new(disk_size)),
1450             })),
1451         }));
1452 
1453         let fut = process_one_request(
1454             &mut avail_desc,
1455             &disk_state,
1456             &flush_timer,
1457             &flush_timer_armed,
1458         );
1459 
1460         ex.run_until(fut)
1461             .expect("running executor failed")
1462             .expect("execute failed");
1463 
1464         let status_offset = GuestAddress((0x1000 + size_of_val(&req_hdr) + 512 * 2) as u64);
1465         let status = mem.read_obj_from_addr::<u8>(status_offset).unwrap();
1466         assert_eq!(status, VIRTIO_BLK_S_IOERR);
1467     }
1468 
1469     #[test]
get_id()1470     fn get_id() {
1471         let ex = Executor::new().expect("creating an executor failed");
1472 
1473         let f = tempfile().unwrap();
1474         let disk_size = 0x1000;
1475         f.set_len(disk_size).unwrap();
1476 
1477         let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1478             .expect("Creating guest memory failed.");
1479 
1480         let req_hdr = virtio_blk_req_header {
1481             req_type: Le32::from(VIRTIO_BLK_T_GET_ID),
1482             reserved: Le32::from(0),
1483             sector: Le64::from(0),
1484         };
1485         mem.write_obj_at_addr(req_hdr, GuestAddress(0x1000))
1486             .expect("writing req failed");
1487 
1488         let mut avail_desc = create_descriptor_chain(
1489             &mem,
1490             GuestAddress(0x100),  // Place descriptor chain at 0x100.
1491             GuestAddress(0x1000), // Describe buffer at 0x1000.
1492             vec![
1493                 // Request header
1494                 (DescriptorType::Readable, size_of_val(&req_hdr) as u32),
1495                 // I/O buffer (20 bytes for serial)
1496                 (DescriptorType::Writable, 20),
1497                 // Request status
1498                 (DescriptorType::Writable, 1),
1499             ],
1500             0,
1501         )
1502         .expect("create_descriptor_chain failed");
1503 
1504         let af = SingleFileDisk::new(f, &ex).expect("Failed to create SFD");
1505         let timer = Timer::new().expect("Failed to create a timer");
1506         let flush_timer = Rc::new(RefCell::new(
1507             TimerAsync::new(timer, &ex).expect("Failed to create an async timer"),
1508         ));
1509         let flush_timer_armed = Rc::new(RefCell::new(false));
1510 
1511         let id = b"a20-byteserialnumber";
1512 
1513         let disk_state = Rc::new(AsyncRwLock::new(DiskState {
1514             disk_image: Box::new(af),
1515             read_only: false,
1516             sparse: true,
1517             id: Some(*id),
1518             worker_shared_state: Arc::new(AsyncRwLock::new(WorkerSharedState {
1519                 disk_size: Arc::new(AtomicU64::new(disk_size)),
1520             })),
1521         }));
1522 
1523         let fut = process_one_request(
1524             &mut avail_desc,
1525             &disk_state,
1526             &flush_timer,
1527             &flush_timer_armed,
1528         );
1529 
1530         ex.run_until(fut)
1531             .expect("running executor failed")
1532             .expect("execute failed");
1533 
1534         let status_offset = GuestAddress((0x1000 + size_of_val(&req_hdr) + 512) as u64);
1535         let status = mem.read_obj_from_addr::<u8>(status_offset).unwrap();
1536         assert_eq!(status, VIRTIO_BLK_S_OK);
1537 
1538         let id_offset = GuestAddress(0x1000 + size_of_val(&req_hdr) as u64);
1539         let returned_id = mem.read_obj_from_addr::<[u8; 20]>(id_offset).unwrap();
1540         assert_eq!(returned_id, *id);
1541     }
1542 
1543     #[test]
reset_and_reactivate_single_worker()1544     fn reset_and_reactivate_single_worker() {
1545         reset_and_reactivate(false, None);
1546     }
1547 
1548     #[test]
reset_and_reactivate_multiple_workers()1549     fn reset_and_reactivate_multiple_workers() {
1550         reset_and_reactivate(true, None);
1551     }
1552 
1553     #[test]
1554     #[cfg(windows)]
reset_and_reactivate_overrlapped_io()1555     fn reset_and_reactivate_overrlapped_io() {
1556         reset_and_reactivate(
1557             false,
1558             Some(
1559                 cros_async::sys::windows::ExecutorKindSys::Overlapped { concurrency: None }.into(),
1560             ),
1561         );
1562     }
1563 
reset_and_reactivate( enables_multiple_workers: bool, async_executor: Option<cros_async::ExecutorKind>, )1564     fn reset_and_reactivate(
1565         enables_multiple_workers: bool,
1566         async_executor: Option<cros_async::ExecutorKind>,
1567     ) {
1568         // Create an empty disk image
1569         let f = tempfile::NamedTempFile::new().unwrap();
1570         f.as_file().set_len(0x1000).unwrap();
1571         // Close the file so that it is possible for the disk implementation to take exclusive
1572         // access when opening it.
1573         let path: tempfile::TempPath = f.into_temp_path();
1574 
1575         // Create an empty guest memory
1576         let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1577             .expect("Creating guest memory failed.");
1578 
1579         // Create a control tube.
1580         // NOTE: We don't want to drop the vmm half of the tube. That would cause the worker thread
1581         // will immediately fail, which isn't what we want to test in this case.
1582         let (_control_tube, control_tube_device) = Tube::pair().unwrap();
1583 
1584         // Create a BlockAsync to test
1585         let features = base_features(ProtectionType::Unprotected);
1586         let id = b"Block serial number\0";
1587         let disk_option = DiskOption {
1588             path: path.to_path_buf(),
1589             read_only: true,
1590             id: Some(*id),
1591             sparse: false,
1592             multiple_workers: enables_multiple_workers,
1593             async_executor,
1594             ..Default::default()
1595         };
1596         let disk_image = disk_option.open().unwrap();
1597         let mut b = BlockAsync::new(
1598             features,
1599             disk_image,
1600             &disk_option,
1601             Some(control_tube_device),
1602             None,
1603             None,
1604         )
1605         .unwrap();
1606 
1607         let interrupt = Interrupt::new_for_test();
1608 
1609         // activate with queues of an arbitrary size.
1610         let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1611         q0.set_ready(true);
1612         let q0 = q0
1613             .activate(&mem, Event::new().unwrap(), interrupt.clone())
1614             .expect("QueueConfig::activate");
1615 
1616         let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1617         q1.set_ready(true);
1618         let q1 = q1
1619             .activate(&mem, Event::new().unwrap(), interrupt.clone())
1620             .expect("QueueConfig::activate");
1621 
1622         b.activate(mem.clone(), interrupt, BTreeMap::from([(0, q0), (1, q1)]))
1623             .expect("activate should succeed");
1624         // assert resources are consumed
1625         if !enables_multiple_workers {
1626             assert!(
1627                 b.disk_image.is_none(),
1628                 "BlockAsync should not have a disk image"
1629             );
1630         }
1631         assert!(
1632             b.control_tube.is_none(),
1633             "BlockAsync should not have a control tube"
1634         );
1635         assert_eq!(
1636             b.worker_threads.len(),
1637             if enables_multiple_workers { 2 } else { 1 }
1638         );
1639 
1640         // reset and assert resources are still not back (should be in the worker thread)
1641         assert!(b.reset().is_ok(), "reset should succeed");
1642         if !enables_multiple_workers {
1643             assert!(
1644                 b.disk_image.is_none(),
1645                 "BlockAsync should not have a disk image"
1646             );
1647         }
1648         assert!(
1649             b.control_tube.is_none(),
1650             "BlockAsync should not have a control tube"
1651         );
1652         assert_eq!(
1653             b.worker_threads.len(),
1654             if enables_multiple_workers { 2 } else { 1 }
1655         );
1656         assert_eq!(b.id, Some(*b"Block serial number\0"));
1657 
1658         // re-activate should succeed
1659         let interrupt = Interrupt::new_for_test();
1660         let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1661         q0.set_ready(true);
1662         let q0 = q0
1663             .activate(&mem, Event::new().unwrap(), interrupt.clone())
1664             .expect("QueueConfig::activate");
1665 
1666         let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1667         q1.set_ready(true);
1668         let q1 = q1
1669             .activate(&mem, Event::new().unwrap(), interrupt.clone())
1670             .expect("QueueConfig::activate");
1671 
1672         b.activate(mem, interrupt, BTreeMap::from([(0, q0), (1, q1)]))
1673             .expect("re-activate should succeed");
1674     }
1675 
1676     #[test]
resize_with_single_worker()1677     fn resize_with_single_worker() {
1678         resize(false);
1679     }
1680 
1681     #[test]
resize_with_multiple_workers()1682     fn resize_with_multiple_workers() {
1683         // Test resize handled by one worker affect the whole state
1684         resize(true);
1685     }
1686 
resize(enables_multiple_workers: bool)1687     fn resize(enables_multiple_workers: bool) {
1688         // disk image size constants
1689         let original_size = 0x1000;
1690         let resized_size = 0x2000;
1691 
1692         // Create an empty disk image
1693         let f = tempfile().unwrap();
1694         f.set_len(original_size).unwrap();
1695         let disk_image: Box<dyn DiskFile> = Box::new(f);
1696         assert_eq!(disk_image.get_len().unwrap(), original_size);
1697 
1698         // Create an empty guest memory
1699         let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1700             .expect("Creating guest memory failed.");
1701 
1702         // Create a control tube
1703         let (control_tube, control_tube_device) = Tube::pair().unwrap();
1704 
1705         // Create a BlockAsync to test
1706         let features = base_features(ProtectionType::Unprotected);
1707         let disk_option = DiskOption {
1708             multiple_workers: enables_multiple_workers,
1709             ..Default::default()
1710         };
1711         let mut b = BlockAsync::new(
1712             features,
1713             disk_image.try_clone().unwrap(),
1714             &disk_option,
1715             Some(control_tube_device),
1716             None,
1717             None,
1718         )
1719         .unwrap();
1720 
1721         let interrupt = Interrupt::new_for_test();
1722 
1723         // activate with queues of an arbitrary size.
1724         let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1725         q0.set_ready(true);
1726         let q0 = q0
1727             .activate(&mem, Event::new().unwrap(), interrupt.clone())
1728             .expect("QueueConfig::activate");
1729 
1730         let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1731         q1.set_ready(true);
1732         let q1 = q1
1733             .activate(&mem, Event::new().unwrap(), interrupt.clone())
1734             .expect("QueueConfig::activate");
1735 
1736         b.activate(mem, interrupt.clone(), BTreeMap::from([(0, q0), (1, q1)]))
1737             .expect("activate should succeed");
1738 
1739         // assert the original size first
1740         assert_eq!(
1741             b.disk_size.load(Ordering::Acquire),
1742             original_size,
1743             "disk_size should be the original size first"
1744         );
1745         let mut capacity = [0u8; 8];
1746         b.read_config(0, &mut capacity);
1747         assert_eq!(
1748             capacity,
1749             // original_size (0x1000) >> SECTOR_SHIFT (9) = 0x8
1750             [0x8, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00],
1751             "read_config should read the original capacity first"
1752         );
1753 
1754         // assert resize works
1755         control_tube
1756             .send(&DiskControlCommand::Resize {
1757                 new_size: resized_size,
1758             })
1759             .unwrap();
1760         assert_eq!(
1761             control_tube.recv::<DiskControlResult>().unwrap(),
1762             DiskControlResult::Ok,
1763             "resize command should succeed"
1764         );
1765         assert_eq!(
1766             b.disk_size.load(Ordering::Acquire),
1767             resized_size,
1768             "disk_size should be resized to the new size"
1769         );
1770         assert_eq!(
1771             disk_image.get_len().unwrap(),
1772             resized_size,
1773             "underlying disk image should be resized to the new size"
1774         );
1775         let mut capacity = [0u8; 8];
1776         b.read_config(0, &mut capacity);
1777         assert_eq!(
1778             capacity,
1779             // resized_size (0x2000) >> SECTOR_SHIFT (9) = 0x10
1780             [0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00],
1781             "read_config should read the resized capacity"
1782         );
1783         // Wait until the blk signals the interrupt
1784         interrupt
1785             .get_interrupt_evt()
1786             .wait()
1787             .expect("interrupt should be signaled");
1788 
1789         assert_eq!(
1790             interrupt.read_interrupt_status(),
1791             crate::virtio::INTERRUPT_STATUS_CONFIG_CHANGED as u8,
1792             "INTERRUPT_STATUS_CONFIG_CHANGED should be signaled"
1793         );
1794     }
1795 
1796     #[test]
run_worker_threads()1797     fn run_worker_threads() {
1798         // Create an empty duplicable disk image
1799         let f = tempfile().unwrap();
1800         f.set_len(0x1000).unwrap();
1801         let disk_image: Box<dyn DiskFile> = Box::new(f);
1802 
1803         // Create an empty guest memory
1804         let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1805             .expect("Creating guest memory failed.");
1806 
1807         // Create a BlockAsync to test with single worker thread
1808         let features = base_features(ProtectionType::Unprotected);
1809         let disk_option = DiskOption::default();
1810         let mut b = BlockAsync::new(
1811             features,
1812             disk_image.try_clone().unwrap(),
1813             &disk_option,
1814             None,
1815             None,
1816             None,
1817         )
1818         .unwrap();
1819 
1820         // activate with queues of an arbitrary size.
1821         let interrupt = Interrupt::new_for_test();
1822         let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1823         q0.set_ready(true);
1824         let q0 = q0
1825             .activate(&mem, Event::new().unwrap(), interrupt.clone())
1826             .expect("QueueConfig::activate");
1827 
1828         let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1829         q1.set_ready(true);
1830         let q1 = q1
1831             .activate(&mem, Event::new().unwrap(), interrupt.clone())
1832             .expect("QueueConfig::activate");
1833 
1834         b.activate(mem.clone(), interrupt, BTreeMap::from([(0, q0), (1, q1)]))
1835             .expect("activate should succeed");
1836 
1837         assert_eq!(b.worker_threads.len(), 1, "1 threads should be spawned.");
1838         drop(b);
1839 
1840         // Create a BlockAsync to test with multiple worker threads
1841         let features = base_features(ProtectionType::Unprotected);
1842         let disk_option = DiskOption {
1843             read_only: true,
1844             sparse: false,
1845             multiple_workers: true,
1846             ..DiskOption::default()
1847         };
1848         let mut b = BlockAsync::new(features, disk_image, &disk_option, None, None, None).unwrap();
1849 
1850         // activate should succeed
1851         let interrupt = Interrupt::new_for_test();
1852         let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1853         q0.set_ready(true);
1854         let q0 = q0
1855             .activate(&mem, Event::new().unwrap(), interrupt.clone())
1856             .expect("QueueConfig::activate");
1857 
1858         let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1859         q1.set_ready(true);
1860         let q1 = q1
1861             .activate(&mem, Event::new().unwrap(), interrupt.clone())
1862             .expect("QueueConfig::activate");
1863 
1864         b.activate(mem, interrupt, BTreeMap::from([(0, q0), (1, q1)]))
1865             .expect("activate should succeed");
1866 
1867         assert_eq!(b.worker_threads.len(), 2, "2 threads should be spawned.");
1868     }
1869 
1870     struct BlockContext {}
1871 
modify_device(_block_context: &mut BlockContext, b: &mut BlockAsync)1872     fn modify_device(_block_context: &mut BlockContext, b: &mut BlockAsync) {
1873         b.avail_features = !b.avail_features;
1874     }
1875 
create_device() -> (BlockContext, BlockAsync)1876     fn create_device() -> (BlockContext, BlockAsync) {
1877         // Create an empty disk image
1878         let f = tempfile().unwrap();
1879         f.set_len(0x1000).unwrap();
1880         let disk_image: Box<dyn DiskFile> = Box::new(f);
1881 
1882         // Create a BlockAsync to test
1883         let features = base_features(ProtectionType::Unprotected);
1884         let id = b"Block serial number\0";
1885         let disk_option = DiskOption {
1886             read_only: true,
1887             id: Some(*id),
1888             sparse: false,
1889             multiple_workers: true,
1890             ..Default::default()
1891         };
1892         (
1893             BlockContext {},
1894             BlockAsync::new(
1895                 features,
1896                 disk_image.try_clone().unwrap(),
1897                 &disk_option,
1898                 None,
1899                 None,
1900                 None,
1901             )
1902             .unwrap(),
1903         )
1904     }
1905 
1906     #[cfg(any(target_os = "android", target_os = "linux"))]
1907     suspendable_virtio_tests!(asyncblock, create_device, 2, modify_device);
1908 }
1909