1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::cell::RefCell;
6 use std::collections::BTreeMap;
7 use std::collections::BTreeSet;
8 use std::io;
9 use std::io::Write;
10 use std::mem::size_of;
11 #[cfg(windows)]
12 use std::num::NonZeroU32;
13 use std::rc::Rc;
14 use std::result;
15 use std::sync::atomic::AtomicU64;
16 use std::sync::atomic::Ordering;
17 use std::sync::Arc;
18 use std::time::Duration;
19
20 use anyhow::Context;
21 use base::debug;
22 use base::error;
23 use base::info;
24 use base::warn;
25 use base::AsRawDescriptor;
26 use base::Error as SysError;
27 use base::Event;
28 use base::RawDescriptor;
29 use base::Result as SysResult;
30 use base::Timer;
31 use base::Tube;
32 use base::TubeError;
33 use base::WorkerThread;
34 use cros_async::sync::RwLock as AsyncRwLock;
35 use cros_async::AsyncError;
36 use cros_async::AsyncTube;
37 use cros_async::EventAsync;
38 use cros_async::Executor;
39 use cros_async::ExecutorKind;
40 use cros_async::TimerAsync;
41 use data_model::Le16;
42 use data_model::Le32;
43 use data_model::Le64;
44 use disk::AsyncDisk;
45 use disk::DiskFile;
46 use futures::channel::mpsc;
47 use futures::channel::oneshot;
48 use futures::pin_mut;
49 use futures::stream::FuturesUnordered;
50 use futures::stream::StreamExt;
51 use futures::FutureExt;
52 use remain::sorted;
53 use snapshot::AnySnapshot;
54 use thiserror::Error as ThisError;
55 use virtio_sys::virtio_config::VIRTIO_F_RING_PACKED;
56 use vm_control::DiskControlCommand;
57 use vm_control::DiskControlResult;
58 use vm_memory::GuestMemory;
59 use zerocopy::IntoBytes;
60
61 use crate::virtio::async_utils;
62 use crate::virtio::block::sys::*;
63 use crate::virtio::block::DiskOption;
64 use crate::virtio::copy_config;
65 use crate::virtio::device_constants::block::virtio_blk_config;
66 use crate::virtio::device_constants::block::virtio_blk_discard_write_zeroes;
67 use crate::virtio::device_constants::block::virtio_blk_req_header;
68 use crate::virtio::device_constants::block::VIRTIO_BLK_DISCARD_WRITE_ZEROES_FLAG_UNMAP;
69 use crate::virtio::device_constants::block::VIRTIO_BLK_F_BLK_SIZE;
70 use crate::virtio::device_constants::block::VIRTIO_BLK_F_DISCARD;
71 use crate::virtio::device_constants::block::VIRTIO_BLK_F_FLUSH;
72 use crate::virtio::device_constants::block::VIRTIO_BLK_F_MQ;
73 use crate::virtio::device_constants::block::VIRTIO_BLK_F_RO;
74 use crate::virtio::device_constants::block::VIRTIO_BLK_F_SEG_MAX;
75 use crate::virtio::device_constants::block::VIRTIO_BLK_F_WRITE_ZEROES;
76 use crate::virtio::device_constants::block::VIRTIO_BLK_S_IOERR;
77 use crate::virtio::device_constants::block::VIRTIO_BLK_S_OK;
78 use crate::virtio::device_constants::block::VIRTIO_BLK_S_UNSUPP;
79 use crate::virtio::device_constants::block::VIRTIO_BLK_T_DISCARD;
80 use crate::virtio::device_constants::block::VIRTIO_BLK_T_FLUSH;
81 use crate::virtio::device_constants::block::VIRTIO_BLK_T_GET_ID;
82 use crate::virtio::device_constants::block::VIRTIO_BLK_T_IN;
83 use crate::virtio::device_constants::block::VIRTIO_BLK_T_OUT;
84 use crate::virtio::device_constants::block::VIRTIO_BLK_T_WRITE_ZEROES;
85 use crate::virtio::DescriptorChain;
86 use crate::virtio::DeviceType;
87 use crate::virtio::Interrupt;
88 use crate::virtio::Queue;
89 use crate::virtio::Reader;
90 use crate::virtio::VirtioDevice;
91 use crate::virtio::Writer;
92 use crate::PciAddress;
93
94 const DEFAULT_QUEUE_SIZE: u16 = 256;
95 const DEFAULT_NUM_QUEUES: u16 = 16;
96
97 const SECTOR_SHIFT: u8 = 9;
98 const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT;
99
100 const MAX_DISCARD_SECTORS: u32 = u32::MAX;
101 const MAX_WRITE_ZEROES_SECTORS: u32 = u32::MAX;
102 // Arbitrary limits for number of discard/write zeroes segments.
103 const MAX_DISCARD_SEG: u32 = 32;
104 const MAX_WRITE_ZEROES_SEG: u32 = 32;
105 // Hard-coded to 64 KiB (in 512-byte sectors) for now,
106 // but this should probably be based on cluster size for qcow.
107 const DISCARD_SECTOR_ALIGNMENT: u32 = 128;
108
109 #[sorted]
110 #[derive(ThisError, Debug)]
111 enum ExecuteError {
112 #[error("failed to copy ID string: {0}")]
113 CopyId(io::Error),
114 #[error("failed to perform discard or write zeroes; sector={sector} num_sectors={num_sectors} flags={flags}; {ioerr:?}")]
115 DiscardWriteZeroes {
116 ioerr: Option<disk::Error>,
117 sector: u64,
118 num_sectors: u32,
119 flags: u32,
120 },
121 #[error("failed to flush: {0}")]
122 Flush(disk::Error),
123 #[error("not enough space in descriptor chain to write status")]
124 MissingStatus,
125 #[error("out of range")]
126 OutOfRange,
127 #[error("failed to read message: {0}")]
128 Read(io::Error),
129 #[error("io error reading {length} bytes from sector {sector}: {desc_error}")]
130 ReadIo {
131 length: usize,
132 sector: u64,
133 desc_error: disk::Error,
134 },
135 #[error("read only; request_type={request_type}")]
136 ReadOnly { request_type: u32 },
137 #[error("failed to recieve command message: {0}")]
138 ReceivingCommand(TubeError),
139 #[error("failed to send command response: {0}")]
140 SendingResponse(TubeError),
141 #[error("couldn't reset the timer: {0}")]
142 TimerReset(base::Error),
143 #[error("unsupported ({0})")]
144 Unsupported(u32),
145 #[error("io error writing {length} bytes from sector {sector}: {desc_error}")]
146 WriteIo {
147 length: usize,
148 sector: u64,
149 desc_error: disk::Error,
150 },
151 #[error("failed to write request status: {0}")]
152 WriteStatus(io::Error),
153 }
154
155 enum LogLevel {
156 Debug,
157 Error,
158 }
159
160 impl ExecuteError {
status(&self) -> u8161 fn status(&self) -> u8 {
162 match self {
163 ExecuteError::CopyId(_) => VIRTIO_BLK_S_IOERR,
164 ExecuteError::DiscardWriteZeroes { .. } => VIRTIO_BLK_S_IOERR,
165 ExecuteError::Flush(_) => VIRTIO_BLK_S_IOERR,
166 ExecuteError::MissingStatus => VIRTIO_BLK_S_IOERR,
167 ExecuteError::OutOfRange { .. } => VIRTIO_BLK_S_IOERR,
168 ExecuteError::Read(_) => VIRTIO_BLK_S_IOERR,
169 ExecuteError::ReadIo { .. } => VIRTIO_BLK_S_IOERR,
170 ExecuteError::ReadOnly { .. } => VIRTIO_BLK_S_IOERR,
171 ExecuteError::ReceivingCommand(_) => VIRTIO_BLK_S_IOERR,
172 ExecuteError::SendingResponse(_) => VIRTIO_BLK_S_IOERR,
173 ExecuteError::TimerReset(_) => VIRTIO_BLK_S_IOERR,
174 ExecuteError::WriteIo { .. } => VIRTIO_BLK_S_IOERR,
175 ExecuteError::WriteStatus(_) => VIRTIO_BLK_S_IOERR,
176 ExecuteError::Unsupported(_) => VIRTIO_BLK_S_UNSUPP,
177 }
178 }
179
log_level(&self) -> LogLevel180 fn log_level(&self) -> LogLevel {
181 match self {
182 // Since there is no feature bit for the guest to detect support for
183 // VIRTIO_BLK_T_GET_ID, the driver has to try executing the request to see if it works.
184 ExecuteError::Unsupported(VIRTIO_BLK_T_GET_ID) => LogLevel::Debug,
185 // Log disk I/O errors at debug level to avoid flooding the logs.
186 ExecuteError::ReadIo { .. }
187 | ExecuteError::WriteIo { .. }
188 | ExecuteError::Flush { .. }
189 | ExecuteError::DiscardWriteZeroes { .. } => LogLevel::Debug,
190 // Log all other failures as errors.
191 _ => LogLevel::Error,
192 }
193 }
194 }
195
196 /// Errors that happen in block outside of executing a request.
197 /// This includes errors during resize and flush operations.
198 #[sorted]
199 #[derive(ThisError, Debug)]
200 enum ControlError {
201 #[error("failed to fdatasync the disk: {0}")]
202 FdatasyncDisk(disk::Error),
203 #[error("couldn't get a value from a timer for flushing: {0}")]
204 FlushTimer(AsyncError),
205 }
206
207 /// Maximum length of the virtio-block ID string field.
208 const ID_LEN: usize = 20;
209
210 /// Virtio block device identifier.
211 /// This is an ASCII string terminated by a \0, unless all 20 bytes are used,
212 /// in which case the \0 terminator is omitted.
213 type BlockId = [u8; ID_LEN];
214
215 /// Tracks the state of an anynchronous disk.
216 struct DiskState {
217 disk_image: Box<dyn AsyncDisk>,
218 read_only: bool,
219 sparse: bool,
220 id: Option<BlockId>,
221 /// A DiskState is owned by each worker's executor and cannot be shared by workers, thus
222 /// `worker_shared_state` holds the state shared by workers in Arc.
223 worker_shared_state: Arc<AsyncRwLock<WorkerSharedState>>,
224 }
225
226 /// Disk state which can be modified by other worker threads
227 struct WorkerSharedState {
228 disk_size: Arc<AtomicU64>,
229 }
230
process_one_request( avail_desc: &mut DescriptorChain, disk_state: &AsyncRwLock<DiskState>, flush_timer: &RefCell<TimerAsync<Timer>>, flush_timer_armed: &RefCell<bool>, ) -> result::Result<usize, ExecuteError>231 async fn process_one_request(
232 avail_desc: &mut DescriptorChain,
233 disk_state: &AsyncRwLock<DiskState>,
234 flush_timer: &RefCell<TimerAsync<Timer>>,
235 flush_timer_armed: &RefCell<bool>,
236 ) -> result::Result<usize, ExecuteError> {
237 let reader = &mut avail_desc.reader;
238 let writer = &mut avail_desc.writer;
239
240 // The last byte of the buffer is virtio_blk_req::status.
241 // Split it into a separate Writer so that status_writer is the final byte and
242 // the original writer is left with just the actual block I/O data.
243 let available_bytes = writer.available_bytes();
244 let status_offset = available_bytes
245 .checked_sub(1)
246 .ok_or(ExecuteError::MissingStatus)?;
247 let mut status_writer = writer.split_at(status_offset);
248
249 let status = match BlockAsync::execute_request(
250 reader,
251 writer,
252 disk_state,
253 flush_timer,
254 flush_timer_armed,
255 )
256 .await
257 {
258 Ok(()) => VIRTIO_BLK_S_OK,
259 Err(e) => {
260 match e.log_level() {
261 LogLevel::Debug => debug!("failed executing disk request: {:#}", e),
262 LogLevel::Error => error!("failed executing disk request: {:#}", e),
263 }
264 e.status()
265 }
266 };
267
268 status_writer
269 .write_all(&[status])
270 .map_err(ExecuteError::WriteStatus)?;
271 Ok(available_bytes)
272 }
273
274 /// Process one descriptor chain asynchronously.
process_one_chain( queue: &RefCell<Queue>, mut avail_desc: DescriptorChain, disk_state: &AsyncRwLock<DiskState>, flush_timer: &RefCell<TimerAsync<Timer>>, flush_timer_armed: &RefCell<bool>, )275 async fn process_one_chain(
276 queue: &RefCell<Queue>,
277 mut avail_desc: DescriptorChain,
278 disk_state: &AsyncRwLock<DiskState>,
279 flush_timer: &RefCell<TimerAsync<Timer>>,
280 flush_timer_armed: &RefCell<bool>,
281 ) {
282 let len = match process_one_request(&mut avail_desc, disk_state, flush_timer, flush_timer_armed)
283 .await
284 {
285 Ok(len) => len,
286 Err(e) => {
287 error!("block: failed to handle request: {:#}", e);
288 0
289 }
290 };
291
292 let mut queue = queue.borrow_mut();
293 queue.add_used(avail_desc, len as u32);
294 queue.trigger_interrupt();
295 }
296
297 // There is one async task running `handle_queue` per virtio queue in use.
298 // Receives messages from the guest and queues a task to complete the operations with the async
299 // executor.
handle_queue( disk_state: Rc<AsyncRwLock<DiskState>>, queue: Queue, evt: EventAsync, flush_timer: Rc<RefCell<TimerAsync<Timer>>>, flush_timer_armed: Rc<RefCell<bool>>, mut stop_rx: oneshot::Receiver<()>, ) -> Queue300 async fn handle_queue(
301 disk_state: Rc<AsyncRwLock<DiskState>>,
302 queue: Queue,
303 evt: EventAsync,
304 flush_timer: Rc<RefCell<TimerAsync<Timer>>>,
305 flush_timer_armed: Rc<RefCell<bool>>,
306 mut stop_rx: oneshot::Receiver<()>,
307 ) -> Queue {
308 let queue = RefCell::new(queue);
309 let mut background_tasks = FuturesUnordered::new();
310 let evt_future = evt.next_val().fuse();
311 pin_mut!(evt_future);
312 loop {
313 // Wait for the next signal from `evt` and process `background_tasks` in the meantime.
314 //
315 // NOTE: We can't call `evt.next_val()` directly in the `select!` expression. That would
316 // create a new future each time, which, in the completion-based async backends like
317 // io_uring, means we'd submit a new syscall each time (i.e. a race condition on the
318 // eventfd).
319 futures::select! {
320 _ = background_tasks.next() => continue,
321 res = evt_future => {
322 evt_future.set(evt.next_val().fuse());
323 if let Err(e) = res {
324 error!("Failed to read the next queue event: {:#}", e);
325 continue;
326 }
327 }
328 _ = stop_rx => {
329 // Process all the descriptors we've already popped from the queue so that we leave
330 // the queue in a consistent state.
331 background_tasks.collect::<()>().await;
332 return queue.into_inner();
333 }
334 };
335 while let Some(descriptor_chain) = queue.borrow_mut().pop() {
336 background_tasks.push(process_one_chain(
337 &queue,
338 descriptor_chain,
339 &disk_state,
340 &flush_timer,
341 &flush_timer_armed,
342 ));
343 }
344 }
345 }
346
handle_command_tube( command_tube: &Option<AsyncTube>, interrupt: &RefCell<Option<Interrupt>>, disk_state: Rc<AsyncRwLock<DiskState>>, ) -> Result<(), ExecuteError>347 async fn handle_command_tube(
348 command_tube: &Option<AsyncTube>,
349 interrupt: &RefCell<Option<Interrupt>>,
350 disk_state: Rc<AsyncRwLock<DiskState>>,
351 ) -> Result<(), ExecuteError> {
352 let command_tube = match command_tube {
353 Some(c) => c,
354 None => {
355 futures::future::pending::<()>().await;
356 return Ok(());
357 }
358 };
359 loop {
360 match command_tube.next().await {
361 Ok(command) => {
362 let resp = match command {
363 DiskControlCommand::Resize { new_size } => resize(&disk_state, new_size).await,
364 };
365
366 let resp_clone = resp.clone();
367 command_tube
368 .send(resp_clone)
369 .await
370 .map_err(ExecuteError::SendingResponse)?;
371 if let DiskControlResult::Ok = resp {
372 if let Some(interrupt) = &*interrupt.borrow() {
373 interrupt.signal_config_changed();
374 }
375 }
376 }
377 Err(e) => return Err(ExecuteError::ReceivingCommand(e)),
378 }
379 }
380 }
381
resize(disk_state: &AsyncRwLock<DiskState>, new_size: u64) -> DiskControlResult382 async fn resize(disk_state: &AsyncRwLock<DiskState>, new_size: u64) -> DiskControlResult {
383 // Acquire exclusive, mutable access to the state so the virtqueue task won't be able to read
384 // the state while resizing.
385 let disk_state = disk_state.lock().await;
386 // Prevent any other worker threads won't be able to do IO.
387 let worker_shared_state = Arc::clone(&disk_state.worker_shared_state);
388 let worker_shared_state = worker_shared_state.lock().await;
389
390 if disk_state.read_only {
391 error!("Attempted to resize read-only block device");
392 return DiskControlResult::Err(SysError::new(libc::EROFS));
393 }
394
395 info!("Resizing block device to {} bytes", new_size);
396
397 if let Err(e) = disk_state.disk_image.set_len(new_size) {
398 error!("Resizing disk failed! {:#}", e);
399 return DiskControlResult::Err(SysError::new(libc::EIO));
400 }
401
402 // Allocate new space if the disk image is not sparse.
403 if !disk_state.sparse {
404 if let Err(e) = disk_state.disk_image.allocate(0, new_size) {
405 error!("Allocating disk space after resize failed! {:#}", e);
406 return DiskControlResult::Err(SysError::new(libc::EIO));
407 }
408 }
409
410 if let Ok(new_disk_size) = disk_state.disk_image.get_len() {
411 worker_shared_state
412 .disk_size
413 .store(new_disk_size, Ordering::Release);
414 }
415 DiskControlResult::Ok
416 }
417
418 /// Periodically flushes the disk when the given timer fires.
flush_disk( disk_state: Rc<AsyncRwLock<DiskState>>, timer: TimerAsync<Timer>, armed: Rc<RefCell<bool>>, ) -> Result<(), ControlError>419 async fn flush_disk(
420 disk_state: Rc<AsyncRwLock<DiskState>>,
421 timer: TimerAsync<Timer>,
422 armed: Rc<RefCell<bool>>,
423 ) -> Result<(), ControlError> {
424 loop {
425 timer.wait().await.map_err(ControlError::FlushTimer)?;
426 if !*armed.borrow() {
427 continue;
428 }
429
430 // Reset armed before calling fdatasync to guarantee that IO requests that started after we
431 // call fdatasync will be committed eventually.
432 *armed.borrow_mut() = false;
433
434 disk_state
435 .read_lock()
436 .await
437 .disk_image
438 .fdatasync()
439 .await
440 .map_err(ControlError::FdatasyncDisk)?;
441 }
442 }
443
444 enum WorkerCmd {
445 StartQueue {
446 index: usize,
447 queue: Queue,
448 },
449 StopQueue {
450 index: usize,
451 // Once the queue is stopped, it will be sent back over `response_tx`.
452 // `None` indicates that there was no queue at the given index.
453 response_tx: oneshot::Sender<Option<Queue>>,
454 },
455 // Stop all queues without recovering the queues' state and without completing any queued up
456 // work .
457 AbortQueues {
458 // Once the queues are stopped, a `()` value will be sent back over `response_tx`.
459 response_tx: oneshot::Sender<()>,
460 },
461 }
462
463 // The main worker thread. Initialized the asynchronous worker tasks and passes them to the executor
464 // to be processed.
465 //
466 // `disk_state` is wrapped by `AsyncRwLock`, which provides both shared and exclusive locks. It's
467 // because the state can be read from the virtqueue task while the control task is processing a
468 // resizing command.
run_worker( ex: &Executor, disk_state: &Rc<AsyncRwLock<DiskState>>, control_tube: &Option<AsyncTube>, mut worker_rx: mpsc::UnboundedReceiver<WorkerCmd>, kill_evt: Event, ) -> anyhow::Result<()>469 async fn run_worker(
470 ex: &Executor,
471 disk_state: &Rc<AsyncRwLock<DiskState>>,
472 control_tube: &Option<AsyncTube>,
473 mut worker_rx: mpsc::UnboundedReceiver<WorkerCmd>,
474 kill_evt: Event,
475 ) -> anyhow::Result<()> {
476 // One flush timer per disk.
477 let timer = Timer::new().expect("Failed to create a timer");
478 let flush_timer_armed = Rc::new(RefCell::new(false));
479
480 // Handles control requests.
481 let control_interrupt = RefCell::new(None);
482 let control = handle_command_tube(control_tube, &control_interrupt, disk_state.clone()).fuse();
483 pin_mut!(control);
484
485 // Handle all the queues in one sub-select call.
486 let flush_timer = Rc::new(RefCell::new(
487 TimerAsync::new(
488 // Call try_clone() to share the same underlying FD with the `flush_disk` task.
489 timer.try_clone().expect("Failed to clone flush_timer"),
490 ex,
491 )
492 .expect("Failed to create an async timer"),
493 ));
494
495 // Flushes the disk periodically.
496 let flush_timer2 = TimerAsync::new(timer, ex).expect("Failed to create an async timer");
497 let disk_flush = flush_disk(disk_state.clone(), flush_timer2, flush_timer_armed.clone()).fuse();
498 pin_mut!(disk_flush);
499
500 // Exit if the kill event is triggered.
501 let kill = async_utils::await_and_exit(ex, kill_evt).fuse();
502 pin_mut!(kill);
503
504 // Running queue handlers.
505 let mut queue_handlers = FuturesUnordered::new();
506 // Async stop functions for queue handlers, by queue index.
507 let mut queue_handler_stop_fns = std::collections::BTreeMap::new();
508
509 loop {
510 futures::select! {
511 _ = queue_handlers.next() => continue,
512 r = disk_flush => return r.context("failed to flush a disk"),
513 r = control => return r.context("failed to handle a control request"),
514 r = kill => return r.context("failed to wait on the kill event"),
515 worker_cmd = worker_rx.next() => {
516 match worker_cmd {
517 None => anyhow::bail!("worker control channel unexpectedly closed"),
518 Some(WorkerCmd::StartQueue{index, queue}) => {
519 if control_interrupt.borrow().is_none() {
520 *control_interrupt.borrow_mut() = Some(queue.interrupt().clone());
521 }
522
523 let (tx, rx) = oneshot::channel();
524 let kick_evt = queue.event().try_clone().expect("Failed to clone queue event");
525 let (handle_queue_future, remote_handle) = handle_queue(
526 Rc::clone(disk_state),
527 queue,
528 EventAsync::new(kick_evt, ex).expect("Failed to create async event for queue"),
529 Rc::clone(&flush_timer),
530 Rc::clone(&flush_timer_armed),
531 rx,
532 ).remote_handle();
533 let old_stop_fn = queue_handler_stop_fns.insert(index, move || {
534 // Ask the handler to stop.
535 tx.send(()).unwrap_or_else(|_| panic!("queue handler channel closed early"));
536 // Wait for its return value.
537 remote_handle
538 });
539
540 // If there was already a handler for this index, stop it before adding the
541 // new handler future.
542 if let Some(stop_fn) = old_stop_fn {
543 warn!("Starting new queue handler without stopping old handler");
544 // Unfortunately we can't just do `stop_fn().await` because the actual
545 // work we are waiting on is in `queue_handlers`. So, run both.
546 let mut fut = stop_fn().fuse();
547 loop {
548 futures::select! {
549 _ = queue_handlers.next() => continue,
550 _queue = fut => break,
551 }
552 }
553 }
554
555 queue_handlers.push(handle_queue_future);
556 }
557 Some(WorkerCmd::StopQueue{index, response_tx}) => {
558 match queue_handler_stop_fns.remove(&index) {
559 Some(stop_fn) => {
560 // NOTE: This await is blocking the select loop. If we want to
561 // support stopping queues concurrently, then it needs to be moved.
562 // For now, keep it simple.
563 //
564 // Unfortunately we can't just do `stop_fn().await` because the
565 // actual work we are waiting on is in `queue_handlers`. So, run
566 // both.
567 let mut fut = stop_fn().fuse();
568 let queue = loop {
569 futures::select! {
570 _ = queue_handlers.next() => continue,
571 queue = fut => break queue,
572 }
573 };
574
575 // If this is the last queue, drop references to the interrupt so
576 // that, when queues are started up again, we'll use the new
577 // interrupt passed with the first queue.
578 if queue_handlers.is_empty() {
579 *control_interrupt.borrow_mut() = None;
580 }
581
582 let _ = response_tx.send(Some(queue));
583 }
584 None => { let _ = response_tx.send(None); },
585 }
586
587 }
588 Some(WorkerCmd::AbortQueues{response_tx}) => {
589 queue_handlers.clear();
590 queue_handler_stop_fns.clear();
591
592 *control_interrupt.borrow_mut() = None;
593
594 let _ = response_tx.send(());
595 }
596 }
597 }
598 };
599 }
600 }
601
602 /// Virtio device for exposing block level read/write operations on a host file.
603 pub struct BlockAsync {
604 // We need to make boot_index public bc the field is used by the main crate to determine boot
605 // order
606 boot_index: Option<usize>,
607 // `None` iff `self.worker_per_queue == false` and the worker thread is running.
608 disk_image: Option<Box<dyn DiskFile>>,
609 disk_size: Arc<AtomicU64>,
610 avail_features: u64,
611 read_only: bool,
612 sparse: bool,
613 seg_max: u32,
614 block_size: u32,
615 id: Option<BlockId>,
616 control_tube: Option<Tube>,
617 queue_sizes: Vec<u16>,
618 pub(super) executor_kind: ExecutorKind,
619 // If `worker_per_queue == true`, `worker_threads` contains the worker for each running queue
620 // by index. Otherwise, contains the monolithic worker for all queues at index 0.
621 //
622 // Once a thread is started, we never stop it, except when `BlockAsync` itself is dropped. That
623 // is because we cannot easily convert the `AsyncDisk` back to a `DiskFile` when backed by
624 // Overlapped I/O on Windows because the file becomes permanently associated with the IOCP
625 // instance of the async executor.
626 worker_threads: BTreeMap<usize, (WorkerThread<()>, mpsc::UnboundedSender<WorkerCmd>)>,
627 shared_state: Arc<AsyncRwLock<WorkerSharedState>>,
628 // Whether to run worker threads in parallel for each queue
629 worker_per_queue: bool,
630 // Indices of running queues.
631 // TODO: The worker already tracks this. Only need it here to stop queues on sleep. Maybe add a
632 // worker cmd to stop all at once, then we can delete this field.
633 activated_queues: BTreeSet<usize>,
634 #[cfg(windows)]
635 pub(super) io_concurrency: u32,
636 pci_address: Option<PciAddress>,
637 }
638
639 impl BlockAsync {
640 /// Create a new virtio block device that operates on the given AsyncDisk.
new( base_features: u64, disk_image: Box<dyn DiskFile>, disk_option: &DiskOption, control_tube: Option<Tube>, queue_size: Option<u16>, num_queues: Option<u16>, ) -> SysResult<BlockAsync>641 pub fn new(
642 base_features: u64,
643 disk_image: Box<dyn DiskFile>,
644 disk_option: &DiskOption,
645 control_tube: Option<Tube>,
646 queue_size: Option<u16>,
647 num_queues: Option<u16>,
648 ) -> SysResult<BlockAsync> {
649 let read_only = disk_option.read_only;
650 let sparse = disk_option.sparse;
651 let block_size = disk_option.block_size;
652 let packed_queue = disk_option.packed_queue;
653 let id = disk_option.id;
654 let mut worker_per_queue = disk_option.multiple_workers;
655 // Automatically disable multiple workers if the disk image can't be cloned.
656 if worker_per_queue && disk_image.try_clone().is_err() {
657 base::warn!("multiple workers requested, but not supported by disk image type");
658 worker_per_queue = false;
659 }
660 let executor_kind = disk_option.async_executor.unwrap_or_default();
661 let boot_index = disk_option.bootindex;
662 #[cfg(windows)]
663 let io_concurrency = disk_option.io_concurrency.get();
664
665 if block_size % SECTOR_SIZE as u32 != 0 {
666 error!(
667 "Block size {} is not a multiple of {}.",
668 block_size, SECTOR_SIZE,
669 );
670 return Err(SysError::new(libc::EINVAL));
671 }
672 let disk_size = disk_image.get_len()?;
673 if disk_size % block_size as u64 != 0 {
674 warn!(
675 "Disk size {} is not a multiple of block size {}; \
676 the remainder will not be visible to the guest.",
677 disk_size, block_size,
678 );
679 }
680 let num_queues = num_queues.unwrap_or(DEFAULT_NUM_QUEUES);
681 let multi_queue = match num_queues {
682 0 => panic!("Number of queues cannot be zero for a block device"),
683 1 => false,
684 _ => true,
685 };
686 let q_size = queue_size.unwrap_or(DEFAULT_QUEUE_SIZE);
687 if !q_size.is_power_of_two() {
688 error!("queue size {} is not a power of 2.", q_size);
689 return Err(SysError::new(libc::EINVAL));
690 }
691 let queue_sizes = vec![q_size; num_queues as usize];
692
693 let avail_features =
694 Self::build_avail_features(base_features, read_only, sparse, multi_queue, packed_queue);
695
696 let seg_max = get_seg_max(q_size);
697
698 let disk_size = Arc::new(AtomicU64::new(disk_size));
699 let shared_state = Arc::new(AsyncRwLock::new(WorkerSharedState {
700 disk_size: disk_size.clone(),
701 }));
702
703 Ok(BlockAsync {
704 disk_image: Some(disk_image),
705 disk_size,
706 avail_features,
707 read_only,
708 sparse,
709 seg_max,
710 block_size,
711 id,
712 queue_sizes,
713 worker_threads: BTreeMap::new(),
714 shared_state,
715 worker_per_queue,
716 control_tube,
717 executor_kind,
718 activated_queues: BTreeSet::new(),
719 boot_index,
720 #[cfg(windows)]
721 io_concurrency,
722 pci_address: disk_option.pci_address,
723 })
724 }
725
726 /// Returns the feature flags given the specified attributes.
build_avail_features( base_features: u64, read_only: bool, sparse: bool, multi_queue: bool, packed_queue: bool, ) -> u64727 fn build_avail_features(
728 base_features: u64,
729 read_only: bool,
730 sparse: bool,
731 multi_queue: bool,
732 packed_queue: bool,
733 ) -> u64 {
734 let mut avail_features = base_features;
735 if read_only {
736 avail_features |= 1 << VIRTIO_BLK_F_RO;
737 } else {
738 if sparse {
739 avail_features |= 1 << VIRTIO_BLK_F_DISCARD;
740 }
741 avail_features |= 1 << VIRTIO_BLK_F_FLUSH;
742 avail_features |= 1 << VIRTIO_BLK_F_WRITE_ZEROES;
743 }
744 avail_features |= 1 << VIRTIO_BLK_F_SEG_MAX;
745 avail_features |= 1 << VIRTIO_BLK_F_BLK_SIZE;
746 if multi_queue {
747 avail_features |= 1 << VIRTIO_BLK_F_MQ;
748 }
749 if packed_queue {
750 avail_features |= 1 << VIRTIO_F_RING_PACKED;
751 }
752 avail_features
753 }
754
755 // Execute a single block device request.
756 // `writer` includes the data region only; the status byte is not included.
757 // It is up to the caller to convert the result of this function into a status byte
758 // and write it to the expected location in guest memory.
execute_request( reader: &mut Reader, writer: &mut Writer, disk_state: &AsyncRwLock<DiskState>, flush_timer: &RefCell<TimerAsync<Timer>>, flush_timer_armed: &RefCell<bool>, ) -> result::Result<(), ExecuteError>759 async fn execute_request(
760 reader: &mut Reader,
761 writer: &mut Writer,
762 disk_state: &AsyncRwLock<DiskState>,
763 flush_timer: &RefCell<TimerAsync<Timer>>,
764 flush_timer_armed: &RefCell<bool>,
765 ) -> result::Result<(), ExecuteError> {
766 // Acquire immutable access to prevent tasks from resizing disk.
767 let disk_state = disk_state.read_lock().await;
768 // Acquire immutable access to prevent other worker threads from resizing disk.
769 let worker_shared_state = disk_state.worker_shared_state.read_lock().await;
770
771 let req_header: virtio_blk_req_header = reader.read_obj().map_err(ExecuteError::Read)?;
772
773 let req_type = req_header.req_type.to_native();
774 let sector = req_header.sector.to_native();
775
776 if disk_state.read_only && req_type != VIRTIO_BLK_T_IN && req_type != VIRTIO_BLK_T_GET_ID {
777 return Err(ExecuteError::ReadOnly {
778 request_type: req_type,
779 });
780 }
781
782 /// Check that a request accesses only data within the disk's current size.
783 /// All parameters are in units of bytes.
784 fn check_range(
785 io_start: u64,
786 io_length: u64,
787 disk_size: u64,
788 ) -> result::Result<(), ExecuteError> {
789 let io_end = io_start
790 .checked_add(io_length)
791 .ok_or(ExecuteError::OutOfRange)?;
792 if io_end > disk_size {
793 Err(ExecuteError::OutOfRange)
794 } else {
795 Ok(())
796 }
797 }
798
799 let disk_size = worker_shared_state.disk_size.load(Ordering::Relaxed);
800 match req_type {
801 VIRTIO_BLK_T_IN => {
802 let data_len = writer.available_bytes();
803 if data_len == 0 {
804 return Ok(());
805 }
806 let offset = sector
807 .checked_shl(u32::from(SECTOR_SHIFT))
808 .ok_or(ExecuteError::OutOfRange)?;
809 check_range(offset, data_len as u64, disk_size)?;
810 let disk_image = &disk_state.disk_image;
811 writer
812 .write_all_from_at_fut(&**disk_image, data_len, offset)
813 .await
814 .map_err(|desc_error| ExecuteError::ReadIo {
815 length: data_len,
816 sector,
817 desc_error,
818 })?;
819 }
820 VIRTIO_BLK_T_OUT => {
821 let data_len = reader.available_bytes();
822 if data_len == 0 {
823 return Ok(());
824 }
825 let offset = sector
826 .checked_shl(u32::from(SECTOR_SHIFT))
827 .ok_or(ExecuteError::OutOfRange)?;
828 check_range(offset, data_len as u64, disk_size)?;
829 let disk_image = &disk_state.disk_image;
830 reader
831 .read_exact_to_at_fut(&**disk_image, data_len, offset)
832 .await
833 .map_err(|desc_error| ExecuteError::WriteIo {
834 length: data_len,
835 sector,
836 desc_error,
837 })?;
838
839 if !*flush_timer_armed.borrow() {
840 *flush_timer_armed.borrow_mut() = true;
841
842 let flush_delay = Duration::from_secs(60);
843 flush_timer
844 .borrow_mut()
845 .reset_oneshot(flush_delay)
846 .map_err(ExecuteError::TimerReset)?;
847 }
848 }
849 VIRTIO_BLK_T_DISCARD | VIRTIO_BLK_T_WRITE_ZEROES => {
850 if req_type == VIRTIO_BLK_T_DISCARD && !disk_state.sparse {
851 // Discard is a hint; if this is a non-sparse disk, just ignore it.
852 return Ok(());
853 }
854
855 while reader.available_bytes() >= size_of::<virtio_blk_discard_write_zeroes>() {
856 let seg: virtio_blk_discard_write_zeroes =
857 reader.read_obj().map_err(ExecuteError::Read)?;
858
859 let sector = seg.sector.to_native();
860 let num_sectors = seg.num_sectors.to_native();
861 let flags = seg.flags.to_native();
862
863 let valid_flags = if req_type == VIRTIO_BLK_T_WRITE_ZEROES {
864 VIRTIO_BLK_DISCARD_WRITE_ZEROES_FLAG_UNMAP
865 } else {
866 0
867 };
868
869 if (flags & !valid_flags) != 0 {
870 return Err(ExecuteError::DiscardWriteZeroes {
871 ioerr: None,
872 sector,
873 num_sectors,
874 flags,
875 });
876 }
877
878 let offset = sector
879 .checked_shl(u32::from(SECTOR_SHIFT))
880 .ok_or(ExecuteError::OutOfRange)?;
881 let length = u64::from(num_sectors)
882 .checked_shl(u32::from(SECTOR_SHIFT))
883 .ok_or(ExecuteError::OutOfRange)?;
884 check_range(offset, length, disk_size)?;
885
886 if req_type == VIRTIO_BLK_T_DISCARD {
887 // Since Discard is just a hint and some filesystems may not implement
888 // FALLOC_FL_PUNCH_HOLE, ignore punch_hole errors.
889 let _ = disk_state.disk_image.punch_hole(offset, length).await;
890 } else {
891 disk_state
892 .disk_image
893 .write_zeroes_at(offset, length)
894 .await
895 .map_err(|e| ExecuteError::DiscardWriteZeroes {
896 ioerr: Some(e),
897 sector,
898 num_sectors,
899 flags,
900 })?;
901 }
902 }
903 }
904 VIRTIO_BLK_T_FLUSH => {
905 disk_state
906 .disk_image
907 .fdatasync()
908 .await
909 .map_err(ExecuteError::Flush)?;
910
911 if *flush_timer_armed.borrow() {
912 flush_timer
913 .borrow_mut()
914 .clear()
915 .map_err(ExecuteError::TimerReset)?;
916 *flush_timer_armed.borrow_mut() = false;
917 }
918 }
919 VIRTIO_BLK_T_GET_ID => {
920 if let Some(id) = disk_state.id {
921 writer.write_all(&id).map_err(ExecuteError::CopyId)?;
922 } else {
923 return Err(ExecuteError::Unsupported(req_type));
924 }
925 }
926 t => return Err(ExecuteError::Unsupported(t)),
927 };
928 Ok(())
929 }
930
931 /// Builds and returns the config structure used to specify block features.
build_config_space( disk_size: u64, seg_max: u32, block_size: u32, num_queues: u16, ) -> virtio_blk_config932 fn build_config_space(
933 disk_size: u64,
934 seg_max: u32,
935 block_size: u32,
936 num_queues: u16,
937 ) -> virtio_blk_config {
938 virtio_blk_config {
939 // If the image is not a multiple of the sector size, the tail bits are not exposed.
940 capacity: Le64::from(disk_size >> SECTOR_SHIFT),
941 seg_max: Le32::from(seg_max),
942 blk_size: Le32::from(block_size),
943 num_queues: Le16::from(num_queues),
944 max_discard_sectors: Le32::from(MAX_DISCARD_SECTORS),
945 discard_sector_alignment: Le32::from(DISCARD_SECTOR_ALIGNMENT),
946 max_write_zeroes_sectors: Le32::from(MAX_WRITE_ZEROES_SECTORS),
947 write_zeroes_may_unmap: 1,
948 max_discard_seg: Le32::from(MAX_DISCARD_SEG),
949 max_write_zeroes_seg: Le32::from(MAX_WRITE_ZEROES_SEG),
950 ..Default::default()
951 }
952 }
953
954 /// Get the worker for a queue, starting it if necessary.
955 // NOTE: Can't use `BTreeMap::entry` because it requires an exclusive ref for the whole branch.
956 #[allow(clippy::map_entry)]
start_worker( &mut self, idx: usize, ) -> anyhow::Result<&(WorkerThread<()>, mpsc::UnboundedSender<WorkerCmd>)>957 fn start_worker(
958 &mut self,
959 idx: usize,
960 ) -> anyhow::Result<&(WorkerThread<()>, mpsc::UnboundedSender<WorkerCmd>)> {
961 let key = if self.worker_per_queue { idx } else { 0 };
962 if self.worker_threads.contains_key(&key) {
963 return Ok(self.worker_threads.get(&key).unwrap());
964 }
965
966 let ex = self.create_executor();
967 let control_tube = self.control_tube.take();
968 let disk_image = if self.worker_per_queue {
969 self.disk_image
970 .as_ref()
971 .context("Failed to ref a disk image")?
972 .try_clone()
973 .context("Failed to clone a disk image")?
974 } else {
975 self.disk_image
976 .take()
977 .context("Failed to take a disk image")?
978 };
979 let read_only = self.read_only;
980 let sparse = self.sparse;
981 let id = self.id;
982 let worker_shared_state = self.shared_state.clone();
983
984 let (worker_tx, worker_rx) = mpsc::unbounded();
985 let worker_thread = WorkerThread::start("virtio_blk", move |kill_evt| {
986 let async_control =
987 control_tube.map(|c| AsyncTube::new(&ex, c).expect("failed to create async tube"));
988
989 let async_image = match disk_image.to_async_disk(&ex) {
990 Ok(d) => d,
991 Err(e) => panic!("Failed to create async disk {:#}", e),
992 };
993
994 let disk_state = Rc::new(AsyncRwLock::new(DiskState {
995 disk_image: async_image,
996 read_only,
997 sparse,
998 id,
999 worker_shared_state,
1000 }));
1001
1002 if let Err(err_string) = ex
1003 .run_until(async {
1004 let r = run_worker(&ex, &disk_state, &async_control, worker_rx, kill_evt).await;
1005 // Flush any in-memory disk image state to file.
1006 if let Err(e) = disk_state.lock().await.disk_image.flush().await {
1007 error!("failed to flush disk image when stopping worker: {e:?}");
1008 }
1009 r
1010 })
1011 .expect("run_until failed")
1012 {
1013 error!("{:#}", err_string);
1014 }
1015 });
1016 match self.worker_threads.entry(key) {
1017 std::collections::btree_map::Entry::Occupied(_) => unreachable!(),
1018 std::collections::btree_map::Entry::Vacant(e) => {
1019 Ok(e.insert((worker_thread, worker_tx)))
1020 }
1021 }
1022 }
1023
start_queue( &mut self, idx: usize, queue: Queue, _mem: GuestMemory, ) -> anyhow::Result<()>1024 pub fn start_queue(
1025 &mut self,
1026 idx: usize,
1027 queue: Queue,
1028 _mem: GuestMemory,
1029 ) -> anyhow::Result<()> {
1030 let (_, worker_tx) = self.start_worker(idx)?;
1031 worker_tx
1032 .unbounded_send(WorkerCmd::StartQueue { index: idx, queue })
1033 .expect("worker channel closed early");
1034 self.activated_queues.insert(idx);
1035 Ok(())
1036 }
1037
stop_queue(&mut self, idx: usize) -> anyhow::Result<Queue>1038 pub fn stop_queue(&mut self, idx: usize) -> anyhow::Result<Queue> {
1039 // TODO: Consider stopping the worker thread if this is the last queue managed by it. Then,
1040 // simplify `virtio_sleep` and/or `reset` methods.
1041 let (_, worker_tx) = self
1042 .worker_threads
1043 .get(if self.worker_per_queue { &idx } else { &0 })
1044 .context("worker not found")?;
1045 let (response_tx, response_rx) = oneshot::channel();
1046 worker_tx
1047 .unbounded_send(WorkerCmd::StopQueue {
1048 index: idx,
1049 response_tx,
1050 })
1051 .expect("worker channel closed early");
1052 let queue = cros_async::block_on(async {
1053 response_rx
1054 .await
1055 .expect("response_rx closed early")
1056 .context("queue not found")
1057 })?;
1058 self.activated_queues.remove(&idx);
1059 Ok(queue)
1060 }
1061 }
1062
1063 impl VirtioDevice for BlockAsync {
keep_rds(&self) -> Vec<RawDescriptor>1064 fn keep_rds(&self) -> Vec<RawDescriptor> {
1065 let mut keep_rds = Vec::new();
1066
1067 if let Some(disk_image) = &self.disk_image {
1068 keep_rds.extend(disk_image.as_raw_descriptors());
1069 }
1070
1071 if let Some(control_tube) = &self.control_tube {
1072 keep_rds.push(control_tube.as_raw_descriptor());
1073 }
1074
1075 keep_rds
1076 }
1077
features(&self) -> u641078 fn features(&self) -> u64 {
1079 self.avail_features
1080 }
1081
device_type(&self) -> DeviceType1082 fn device_type(&self) -> DeviceType {
1083 DeviceType::Block
1084 }
1085
queue_max_sizes(&self) -> &[u16]1086 fn queue_max_sizes(&self) -> &[u16] {
1087 &self.queue_sizes
1088 }
1089
read_config(&self, offset: u64, data: &mut [u8])1090 fn read_config(&self, offset: u64, data: &mut [u8]) {
1091 let config_space = {
1092 let disk_size = self.disk_size.load(Ordering::Acquire);
1093 Self::build_config_space(
1094 disk_size,
1095 self.seg_max,
1096 self.block_size,
1097 self.queue_sizes.len() as u16,
1098 )
1099 };
1100 copy_config(data, 0, config_space.as_bytes(), offset);
1101 }
1102
activate( &mut self, mem: GuestMemory, _interrupt: Interrupt, queues: BTreeMap<usize, Queue>, ) -> anyhow::Result<()>1103 fn activate(
1104 &mut self,
1105 mem: GuestMemory,
1106 _interrupt: Interrupt,
1107 queues: BTreeMap<usize, Queue>,
1108 ) -> anyhow::Result<()> {
1109 for (i, q) in queues {
1110 self.start_queue(i, q, mem.clone())?;
1111 }
1112 Ok(())
1113 }
1114
reset(&mut self) -> anyhow::Result<()>1115 fn reset(&mut self) -> anyhow::Result<()> {
1116 for (_, (_, worker_tx)) in self.worker_threads.iter_mut() {
1117 let (response_tx, response_rx) = oneshot::channel();
1118 worker_tx
1119 .unbounded_send(WorkerCmd::AbortQueues { response_tx })
1120 .expect("worker channel closed early");
1121 cros_async::block_on(async { response_rx.await.expect("response_rx closed early") });
1122 }
1123 self.activated_queues.clear();
1124 Ok(())
1125 }
1126
virtio_sleep(&mut self) -> anyhow::Result<Option<BTreeMap<usize, Queue>>>1127 fn virtio_sleep(&mut self) -> anyhow::Result<Option<BTreeMap<usize, Queue>>> {
1128 // Reclaim the queues from workers.
1129 let mut queues = BTreeMap::new();
1130 for index in self.activated_queues.clone() {
1131 queues.insert(index, self.stop_queue(index)?);
1132 }
1133 if queues.is_empty() {
1134 return Ok(None); // Not activated.
1135 }
1136 Ok(Some(queues))
1137 }
1138
virtio_wake( &mut self, queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>, ) -> anyhow::Result<()>1139 fn virtio_wake(
1140 &mut self,
1141 queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>,
1142 ) -> anyhow::Result<()> {
1143 if let Some((mem, _interrupt, queues)) = queues_state {
1144 for (i, q) in queues {
1145 self.start_queue(i, q, mem.clone())?
1146 }
1147 }
1148 Ok(())
1149 }
1150
virtio_snapshot(&mut self) -> anyhow::Result<AnySnapshot>1151 fn virtio_snapshot(&mut self) -> anyhow::Result<AnySnapshot> {
1152 // `virtio_sleep` ensures there is no pending state, except for the `Queue`s, which are
1153 // handled at a higher layer.
1154 AnySnapshot::to_any(())
1155 }
1156
virtio_restore(&mut self, data: AnySnapshot) -> anyhow::Result<()>1157 fn virtio_restore(&mut self, data: AnySnapshot) -> anyhow::Result<()> {
1158 let () = AnySnapshot::from_any(data)?;
1159 Ok(())
1160 }
1161
pci_address(&self) -> Option<PciAddress>1162 fn pci_address(&self) -> Option<PciAddress> {
1163 self.pci_address
1164 }
1165
bootorder_fw_cfg(&self, pci_slot: u8) -> Option<(Vec<u8>, usize)>1166 fn bootorder_fw_cfg(&self, pci_slot: u8) -> Option<(Vec<u8>, usize)> {
1167 self.boot_index
1168 .map(|s| (format!("scsi@{}/disk@0,0", pci_slot).as_bytes().to_vec(), s))
1169 }
1170 }
1171
1172 #[cfg(test)]
1173 mod tests {
1174 use std::fs::File;
1175 use std::mem::size_of_val;
1176 use std::sync::atomic::AtomicU64;
1177
1178 use data_model::Le32;
1179 use data_model::Le64;
1180 use disk::SingleFileDisk;
1181 use hypervisor::ProtectionType;
1182 use tempfile::tempfile;
1183 use tempfile::TempDir;
1184 use vm_memory::GuestAddress;
1185
1186 use super::*;
1187 use crate::suspendable_virtio_tests;
1188 use crate::virtio::base_features;
1189 use crate::virtio::descriptor_utils::create_descriptor_chain;
1190 use crate::virtio::descriptor_utils::DescriptorType;
1191 use crate::virtio::QueueConfig;
1192
1193 #[test]
read_size()1194 fn read_size() {
1195 let f = tempfile().unwrap();
1196 f.set_len(0x1000).unwrap();
1197
1198 let features = base_features(ProtectionType::Unprotected);
1199 let disk_option = DiskOption::default();
1200 let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1201 let mut num_sectors = [0u8; 4];
1202 b.read_config(0, &mut num_sectors);
1203 // size is 0x1000, so num_sectors is 8 (4096/512).
1204 assert_eq!([0x08, 0x00, 0x00, 0x00], num_sectors);
1205 let mut msw_sectors = [0u8; 4];
1206 b.read_config(4, &mut msw_sectors);
1207 // size is 0x1000, so msw_sectors is 0.
1208 assert_eq!([0x00, 0x00, 0x00, 0x00], msw_sectors);
1209 }
1210
1211 #[test]
read_block_size()1212 fn read_block_size() {
1213 let f = tempfile().unwrap();
1214 f.set_len(0x1000).unwrap();
1215
1216 let features = base_features(ProtectionType::Unprotected);
1217 let disk_option = DiskOption {
1218 block_size: 4096,
1219 sparse: false,
1220 ..Default::default()
1221 };
1222 let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1223 let mut blk_size = [0u8; 4];
1224 b.read_config(20, &mut blk_size);
1225 // blk_size should be 4096 (0x1000).
1226 assert_eq!([0x00, 0x10, 0x00, 0x00], blk_size);
1227 }
1228
1229 #[test]
read_features()1230 fn read_features() {
1231 let tempdir = TempDir::new().unwrap();
1232 let mut path = tempdir.path().to_owned();
1233 path.push("disk_image");
1234
1235 // Feature bits 0-23 and 50-127 are specific for the device type, but
1236 // at the moment crosvm only supports 64 bits of feature bits.
1237 const DEVICE_FEATURE_BITS: u64 = 0xffffff;
1238
1239 // read-write block device
1240 {
1241 let f = File::create(&path).unwrap();
1242 let features = base_features(ProtectionType::Unprotected);
1243 let disk_option = DiskOption::default();
1244 let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1245 // writable device should set VIRTIO_BLK_F_FLUSH + VIRTIO_BLK_F_DISCARD
1246 // + VIRTIO_BLK_F_WRITE_ZEROES + VIRTIO_BLK_F_BLK_SIZE + VIRTIO_BLK_F_SEG_MAX
1247 // + VIRTIO_BLK_F_MQ
1248 assert_eq!(0x7244, b.features() & DEVICE_FEATURE_BITS);
1249 }
1250
1251 // read-write block device, non-sparse
1252 {
1253 let f = File::create(&path).unwrap();
1254 let features = base_features(ProtectionType::Unprotected);
1255 let disk_option = DiskOption {
1256 sparse: false,
1257 ..Default::default()
1258 };
1259 let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1260 // writable device should set VIRTIO_F_FLUSH + VIRTIO_BLK_F_RO
1261 // + VIRTIO_BLK_F_BLK_SIZE + VIRTIO_BLK_F_SEG_MAX + VIRTIO_BLK_F_MQ
1262 assert_eq!(0x5244, b.features() & DEVICE_FEATURE_BITS);
1263 }
1264
1265 // read-only block device
1266 {
1267 let f = File::create(&path).unwrap();
1268 let features = base_features(ProtectionType::Unprotected);
1269 let disk_option = DiskOption {
1270 read_only: true,
1271 ..Default::default()
1272 };
1273 let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1274 // read-only device should set VIRTIO_BLK_F_RO
1275 // + VIRTIO_BLK_F_BLK_SIZE + VIRTIO_BLK_F_SEG_MAX + VIRTIO_BLK_F_MQ
1276 assert_eq!(0x1064, b.features() & DEVICE_FEATURE_BITS);
1277 }
1278 }
1279
1280 #[test]
check_pci_adress_configurability()1281 fn check_pci_adress_configurability() {
1282 let f = tempfile().unwrap();
1283
1284 let features = base_features(ProtectionType::Unprotected);
1285 let disk_option = DiskOption {
1286 pci_address: Some(PciAddress {
1287 bus: 0,
1288 dev: 1,
1289 func: 1,
1290 }),
1291 ..Default::default()
1292 };
1293 let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1294
1295 assert_eq!(b.pci_address(), disk_option.pci_address);
1296 }
1297
1298 #[test]
check_runtime_blk_queue_configurability()1299 fn check_runtime_blk_queue_configurability() {
1300 let tempdir = TempDir::new().unwrap();
1301 let mut path = tempdir.path().to_owned();
1302 path.push("disk_image");
1303 let features = base_features(ProtectionType::Unprotected);
1304
1305 // Default case
1306 let f = File::create(&path).unwrap();
1307 let disk_option = DiskOption::default();
1308 let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1309 assert_eq!(
1310 [DEFAULT_QUEUE_SIZE; DEFAULT_NUM_QUEUES as usize],
1311 b.queue_max_sizes()
1312 );
1313
1314 // Single queue of size 128
1315 let f = File::create(&path).unwrap();
1316 let disk_option = DiskOption::default();
1317 let b = BlockAsync::new(
1318 features,
1319 Box::new(f),
1320 &disk_option,
1321 None,
1322 Some(128),
1323 Some(1),
1324 )
1325 .unwrap();
1326 assert_eq!([128; 1], b.queue_max_sizes());
1327 // Single queue device should not set VIRTIO_BLK_F_MQ
1328 assert_eq!(0, b.features() & (1 << VIRTIO_BLK_F_MQ) as u64);
1329 }
1330
1331 #[test]
read_last_sector()1332 fn read_last_sector() {
1333 let ex = Executor::new().expect("creating an executor failed");
1334
1335 let f = tempfile().unwrap();
1336 let disk_size = 0x1000;
1337 f.set_len(disk_size).unwrap();
1338 let af = SingleFileDisk::new(f, &ex).expect("Failed to create SFD");
1339
1340 let mem = Rc::new(
1341 GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1342 .expect("Creating guest memory failed."),
1343 );
1344
1345 let req_hdr = virtio_blk_req_header {
1346 req_type: Le32::from(VIRTIO_BLK_T_IN),
1347 reserved: Le32::from(0),
1348 sector: Le64::from(7), // Disk is 8 sectors long, so this is the last valid sector.
1349 };
1350 mem.write_obj_at_addr(req_hdr, GuestAddress(0x1000))
1351 .expect("writing req failed");
1352
1353 let mut avail_desc = create_descriptor_chain(
1354 &mem,
1355 GuestAddress(0x100), // Place descriptor chain at 0x100.
1356 GuestAddress(0x1000), // Describe buffer at 0x1000.
1357 vec![
1358 // Request header
1359 (DescriptorType::Readable, size_of_val(&req_hdr) as u32),
1360 // I/O buffer (1 sector of data)
1361 (DescriptorType::Writable, 512),
1362 // Request status
1363 (DescriptorType::Writable, 1),
1364 ],
1365 0,
1366 )
1367 .expect("create_descriptor_chain failed");
1368
1369 let timer = Timer::new().expect("Failed to create a timer");
1370 let flush_timer = Rc::new(RefCell::new(
1371 TimerAsync::new(timer, &ex).expect("Failed to create an async timer"),
1372 ));
1373 let flush_timer_armed = Rc::new(RefCell::new(false));
1374
1375 let disk_state = Rc::new(AsyncRwLock::new(DiskState {
1376 disk_image: Box::new(af),
1377 read_only: false,
1378 sparse: true,
1379 id: None,
1380 worker_shared_state: Arc::new(AsyncRwLock::new(WorkerSharedState {
1381 disk_size: Arc::new(AtomicU64::new(disk_size)),
1382 })),
1383 }));
1384
1385 let fut = process_one_request(
1386 &mut avail_desc,
1387 &disk_state,
1388 &flush_timer,
1389 &flush_timer_armed,
1390 );
1391
1392 ex.run_until(fut)
1393 .expect("running executor failed")
1394 .expect("execute failed");
1395
1396 let status_offset = GuestAddress((0x1000 + size_of_val(&req_hdr) + 512) as u64);
1397 let status = mem.read_obj_from_addr::<u8>(status_offset).unwrap();
1398 assert_eq!(status, VIRTIO_BLK_S_OK);
1399 }
1400
1401 #[test]
read_beyond_last_sector()1402 fn read_beyond_last_sector() {
1403 let f = tempfile().unwrap();
1404 let disk_size = 0x1000;
1405 f.set_len(disk_size).unwrap();
1406 let mem = Rc::new(
1407 GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1408 .expect("Creating guest memory failed."),
1409 );
1410
1411 let req_hdr = virtio_blk_req_header {
1412 req_type: Le32::from(VIRTIO_BLK_T_IN),
1413 reserved: Le32::from(0),
1414 sector: Le64::from(7), // Disk is 8 sectors long, so this is the last valid sector.
1415 };
1416 mem.write_obj_at_addr(req_hdr, GuestAddress(0x1000))
1417 .expect("writing req failed");
1418
1419 let mut avail_desc = create_descriptor_chain(
1420 &mem,
1421 GuestAddress(0x100), // Place descriptor chain at 0x100.
1422 GuestAddress(0x1000), // Describe buffer at 0x1000.
1423 vec![
1424 // Request header
1425 (DescriptorType::Readable, size_of_val(&req_hdr) as u32),
1426 // I/O buffer (2 sectors of data - overlap the end of the disk).
1427 (DescriptorType::Writable, 512 * 2),
1428 // Request status
1429 (DescriptorType::Writable, 1),
1430 ],
1431 0,
1432 )
1433 .expect("create_descriptor_chain failed");
1434
1435 let ex = Executor::new().expect("creating an executor failed");
1436
1437 let af = SingleFileDisk::new(f, &ex).expect("Failed to create SFD");
1438 let timer = Timer::new().expect("Failed to create a timer");
1439 let flush_timer = Rc::new(RefCell::new(
1440 TimerAsync::new(timer, &ex).expect("Failed to create an async timer"),
1441 ));
1442 let flush_timer_armed = Rc::new(RefCell::new(false));
1443 let disk_state = Rc::new(AsyncRwLock::new(DiskState {
1444 disk_image: Box::new(af),
1445 read_only: false,
1446 sparse: true,
1447 id: None,
1448 worker_shared_state: Arc::new(AsyncRwLock::new(WorkerSharedState {
1449 disk_size: Arc::new(AtomicU64::new(disk_size)),
1450 })),
1451 }));
1452
1453 let fut = process_one_request(
1454 &mut avail_desc,
1455 &disk_state,
1456 &flush_timer,
1457 &flush_timer_armed,
1458 );
1459
1460 ex.run_until(fut)
1461 .expect("running executor failed")
1462 .expect("execute failed");
1463
1464 let status_offset = GuestAddress((0x1000 + size_of_val(&req_hdr) + 512 * 2) as u64);
1465 let status = mem.read_obj_from_addr::<u8>(status_offset).unwrap();
1466 assert_eq!(status, VIRTIO_BLK_S_IOERR);
1467 }
1468
1469 #[test]
get_id()1470 fn get_id() {
1471 let ex = Executor::new().expect("creating an executor failed");
1472
1473 let f = tempfile().unwrap();
1474 let disk_size = 0x1000;
1475 f.set_len(disk_size).unwrap();
1476
1477 let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1478 .expect("Creating guest memory failed.");
1479
1480 let req_hdr = virtio_blk_req_header {
1481 req_type: Le32::from(VIRTIO_BLK_T_GET_ID),
1482 reserved: Le32::from(0),
1483 sector: Le64::from(0),
1484 };
1485 mem.write_obj_at_addr(req_hdr, GuestAddress(0x1000))
1486 .expect("writing req failed");
1487
1488 let mut avail_desc = create_descriptor_chain(
1489 &mem,
1490 GuestAddress(0x100), // Place descriptor chain at 0x100.
1491 GuestAddress(0x1000), // Describe buffer at 0x1000.
1492 vec![
1493 // Request header
1494 (DescriptorType::Readable, size_of_val(&req_hdr) as u32),
1495 // I/O buffer (20 bytes for serial)
1496 (DescriptorType::Writable, 20),
1497 // Request status
1498 (DescriptorType::Writable, 1),
1499 ],
1500 0,
1501 )
1502 .expect("create_descriptor_chain failed");
1503
1504 let af = SingleFileDisk::new(f, &ex).expect("Failed to create SFD");
1505 let timer = Timer::new().expect("Failed to create a timer");
1506 let flush_timer = Rc::new(RefCell::new(
1507 TimerAsync::new(timer, &ex).expect("Failed to create an async timer"),
1508 ));
1509 let flush_timer_armed = Rc::new(RefCell::new(false));
1510
1511 let id = b"a20-byteserialnumber";
1512
1513 let disk_state = Rc::new(AsyncRwLock::new(DiskState {
1514 disk_image: Box::new(af),
1515 read_only: false,
1516 sparse: true,
1517 id: Some(*id),
1518 worker_shared_state: Arc::new(AsyncRwLock::new(WorkerSharedState {
1519 disk_size: Arc::new(AtomicU64::new(disk_size)),
1520 })),
1521 }));
1522
1523 let fut = process_one_request(
1524 &mut avail_desc,
1525 &disk_state,
1526 &flush_timer,
1527 &flush_timer_armed,
1528 );
1529
1530 ex.run_until(fut)
1531 .expect("running executor failed")
1532 .expect("execute failed");
1533
1534 let status_offset = GuestAddress((0x1000 + size_of_val(&req_hdr) + 512) as u64);
1535 let status = mem.read_obj_from_addr::<u8>(status_offset).unwrap();
1536 assert_eq!(status, VIRTIO_BLK_S_OK);
1537
1538 let id_offset = GuestAddress(0x1000 + size_of_val(&req_hdr) as u64);
1539 let returned_id = mem.read_obj_from_addr::<[u8; 20]>(id_offset).unwrap();
1540 assert_eq!(returned_id, *id);
1541 }
1542
1543 #[test]
reset_and_reactivate_single_worker()1544 fn reset_and_reactivate_single_worker() {
1545 reset_and_reactivate(false, None);
1546 }
1547
1548 #[test]
reset_and_reactivate_multiple_workers()1549 fn reset_and_reactivate_multiple_workers() {
1550 reset_and_reactivate(true, None);
1551 }
1552
1553 #[test]
1554 #[cfg(windows)]
reset_and_reactivate_overrlapped_io()1555 fn reset_and_reactivate_overrlapped_io() {
1556 reset_and_reactivate(
1557 false,
1558 Some(
1559 cros_async::sys::windows::ExecutorKindSys::Overlapped { concurrency: None }.into(),
1560 ),
1561 );
1562 }
1563
reset_and_reactivate( enables_multiple_workers: bool, async_executor: Option<cros_async::ExecutorKind>, )1564 fn reset_and_reactivate(
1565 enables_multiple_workers: bool,
1566 async_executor: Option<cros_async::ExecutorKind>,
1567 ) {
1568 // Create an empty disk image
1569 let f = tempfile::NamedTempFile::new().unwrap();
1570 f.as_file().set_len(0x1000).unwrap();
1571 // Close the file so that it is possible for the disk implementation to take exclusive
1572 // access when opening it.
1573 let path: tempfile::TempPath = f.into_temp_path();
1574
1575 // Create an empty guest memory
1576 let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1577 .expect("Creating guest memory failed.");
1578
1579 // Create a control tube.
1580 // NOTE: We don't want to drop the vmm half of the tube. That would cause the worker thread
1581 // will immediately fail, which isn't what we want to test in this case.
1582 let (_control_tube, control_tube_device) = Tube::pair().unwrap();
1583
1584 // Create a BlockAsync to test
1585 let features = base_features(ProtectionType::Unprotected);
1586 let id = b"Block serial number\0";
1587 let disk_option = DiskOption {
1588 path: path.to_path_buf(),
1589 read_only: true,
1590 id: Some(*id),
1591 sparse: false,
1592 multiple_workers: enables_multiple_workers,
1593 async_executor,
1594 ..Default::default()
1595 };
1596 let disk_image = disk_option.open().unwrap();
1597 let mut b = BlockAsync::new(
1598 features,
1599 disk_image,
1600 &disk_option,
1601 Some(control_tube_device),
1602 None,
1603 None,
1604 )
1605 .unwrap();
1606
1607 let interrupt = Interrupt::new_for_test();
1608
1609 // activate with queues of an arbitrary size.
1610 let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1611 q0.set_ready(true);
1612 let q0 = q0
1613 .activate(&mem, Event::new().unwrap(), interrupt.clone())
1614 .expect("QueueConfig::activate");
1615
1616 let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1617 q1.set_ready(true);
1618 let q1 = q1
1619 .activate(&mem, Event::new().unwrap(), interrupt.clone())
1620 .expect("QueueConfig::activate");
1621
1622 b.activate(mem.clone(), interrupt, BTreeMap::from([(0, q0), (1, q1)]))
1623 .expect("activate should succeed");
1624 // assert resources are consumed
1625 if !enables_multiple_workers {
1626 assert!(
1627 b.disk_image.is_none(),
1628 "BlockAsync should not have a disk image"
1629 );
1630 }
1631 assert!(
1632 b.control_tube.is_none(),
1633 "BlockAsync should not have a control tube"
1634 );
1635 assert_eq!(
1636 b.worker_threads.len(),
1637 if enables_multiple_workers { 2 } else { 1 }
1638 );
1639
1640 // reset and assert resources are still not back (should be in the worker thread)
1641 assert!(b.reset().is_ok(), "reset should succeed");
1642 if !enables_multiple_workers {
1643 assert!(
1644 b.disk_image.is_none(),
1645 "BlockAsync should not have a disk image"
1646 );
1647 }
1648 assert!(
1649 b.control_tube.is_none(),
1650 "BlockAsync should not have a control tube"
1651 );
1652 assert_eq!(
1653 b.worker_threads.len(),
1654 if enables_multiple_workers { 2 } else { 1 }
1655 );
1656 assert_eq!(b.id, Some(*b"Block serial number\0"));
1657
1658 // re-activate should succeed
1659 let interrupt = Interrupt::new_for_test();
1660 let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1661 q0.set_ready(true);
1662 let q0 = q0
1663 .activate(&mem, Event::new().unwrap(), interrupt.clone())
1664 .expect("QueueConfig::activate");
1665
1666 let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1667 q1.set_ready(true);
1668 let q1 = q1
1669 .activate(&mem, Event::new().unwrap(), interrupt.clone())
1670 .expect("QueueConfig::activate");
1671
1672 b.activate(mem, interrupt, BTreeMap::from([(0, q0), (1, q1)]))
1673 .expect("re-activate should succeed");
1674 }
1675
1676 #[test]
resize_with_single_worker()1677 fn resize_with_single_worker() {
1678 resize(false);
1679 }
1680
1681 #[test]
resize_with_multiple_workers()1682 fn resize_with_multiple_workers() {
1683 // Test resize handled by one worker affect the whole state
1684 resize(true);
1685 }
1686
resize(enables_multiple_workers: bool)1687 fn resize(enables_multiple_workers: bool) {
1688 // disk image size constants
1689 let original_size = 0x1000;
1690 let resized_size = 0x2000;
1691
1692 // Create an empty disk image
1693 let f = tempfile().unwrap();
1694 f.set_len(original_size).unwrap();
1695 let disk_image: Box<dyn DiskFile> = Box::new(f);
1696 assert_eq!(disk_image.get_len().unwrap(), original_size);
1697
1698 // Create an empty guest memory
1699 let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1700 .expect("Creating guest memory failed.");
1701
1702 // Create a control tube
1703 let (control_tube, control_tube_device) = Tube::pair().unwrap();
1704
1705 // Create a BlockAsync to test
1706 let features = base_features(ProtectionType::Unprotected);
1707 let disk_option = DiskOption {
1708 multiple_workers: enables_multiple_workers,
1709 ..Default::default()
1710 };
1711 let mut b = BlockAsync::new(
1712 features,
1713 disk_image.try_clone().unwrap(),
1714 &disk_option,
1715 Some(control_tube_device),
1716 None,
1717 None,
1718 )
1719 .unwrap();
1720
1721 let interrupt = Interrupt::new_for_test();
1722
1723 // activate with queues of an arbitrary size.
1724 let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1725 q0.set_ready(true);
1726 let q0 = q0
1727 .activate(&mem, Event::new().unwrap(), interrupt.clone())
1728 .expect("QueueConfig::activate");
1729
1730 let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1731 q1.set_ready(true);
1732 let q1 = q1
1733 .activate(&mem, Event::new().unwrap(), interrupt.clone())
1734 .expect("QueueConfig::activate");
1735
1736 b.activate(mem, interrupt.clone(), BTreeMap::from([(0, q0), (1, q1)]))
1737 .expect("activate should succeed");
1738
1739 // assert the original size first
1740 assert_eq!(
1741 b.disk_size.load(Ordering::Acquire),
1742 original_size,
1743 "disk_size should be the original size first"
1744 );
1745 let mut capacity = [0u8; 8];
1746 b.read_config(0, &mut capacity);
1747 assert_eq!(
1748 capacity,
1749 // original_size (0x1000) >> SECTOR_SHIFT (9) = 0x8
1750 [0x8, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00],
1751 "read_config should read the original capacity first"
1752 );
1753
1754 // assert resize works
1755 control_tube
1756 .send(&DiskControlCommand::Resize {
1757 new_size: resized_size,
1758 })
1759 .unwrap();
1760 assert_eq!(
1761 control_tube.recv::<DiskControlResult>().unwrap(),
1762 DiskControlResult::Ok,
1763 "resize command should succeed"
1764 );
1765 assert_eq!(
1766 b.disk_size.load(Ordering::Acquire),
1767 resized_size,
1768 "disk_size should be resized to the new size"
1769 );
1770 assert_eq!(
1771 disk_image.get_len().unwrap(),
1772 resized_size,
1773 "underlying disk image should be resized to the new size"
1774 );
1775 let mut capacity = [0u8; 8];
1776 b.read_config(0, &mut capacity);
1777 assert_eq!(
1778 capacity,
1779 // resized_size (0x2000) >> SECTOR_SHIFT (9) = 0x10
1780 [0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00],
1781 "read_config should read the resized capacity"
1782 );
1783 // Wait until the blk signals the interrupt
1784 interrupt
1785 .get_interrupt_evt()
1786 .wait()
1787 .expect("interrupt should be signaled");
1788
1789 assert_eq!(
1790 interrupt.read_interrupt_status(),
1791 crate::virtio::INTERRUPT_STATUS_CONFIG_CHANGED as u8,
1792 "INTERRUPT_STATUS_CONFIG_CHANGED should be signaled"
1793 );
1794 }
1795
1796 #[test]
run_worker_threads()1797 fn run_worker_threads() {
1798 // Create an empty duplicable disk image
1799 let f = tempfile().unwrap();
1800 f.set_len(0x1000).unwrap();
1801 let disk_image: Box<dyn DiskFile> = Box::new(f);
1802
1803 // Create an empty guest memory
1804 let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1805 .expect("Creating guest memory failed.");
1806
1807 // Create a BlockAsync to test with single worker thread
1808 let features = base_features(ProtectionType::Unprotected);
1809 let disk_option = DiskOption::default();
1810 let mut b = BlockAsync::new(
1811 features,
1812 disk_image.try_clone().unwrap(),
1813 &disk_option,
1814 None,
1815 None,
1816 None,
1817 )
1818 .unwrap();
1819
1820 // activate with queues of an arbitrary size.
1821 let interrupt = Interrupt::new_for_test();
1822 let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1823 q0.set_ready(true);
1824 let q0 = q0
1825 .activate(&mem, Event::new().unwrap(), interrupt.clone())
1826 .expect("QueueConfig::activate");
1827
1828 let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1829 q1.set_ready(true);
1830 let q1 = q1
1831 .activate(&mem, Event::new().unwrap(), interrupt.clone())
1832 .expect("QueueConfig::activate");
1833
1834 b.activate(mem.clone(), interrupt, BTreeMap::from([(0, q0), (1, q1)]))
1835 .expect("activate should succeed");
1836
1837 assert_eq!(b.worker_threads.len(), 1, "1 threads should be spawned.");
1838 drop(b);
1839
1840 // Create a BlockAsync to test with multiple worker threads
1841 let features = base_features(ProtectionType::Unprotected);
1842 let disk_option = DiskOption {
1843 read_only: true,
1844 sparse: false,
1845 multiple_workers: true,
1846 ..DiskOption::default()
1847 };
1848 let mut b = BlockAsync::new(features, disk_image, &disk_option, None, None, None).unwrap();
1849
1850 // activate should succeed
1851 let interrupt = Interrupt::new_for_test();
1852 let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1853 q0.set_ready(true);
1854 let q0 = q0
1855 .activate(&mem, Event::new().unwrap(), interrupt.clone())
1856 .expect("QueueConfig::activate");
1857
1858 let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1859 q1.set_ready(true);
1860 let q1 = q1
1861 .activate(&mem, Event::new().unwrap(), interrupt.clone())
1862 .expect("QueueConfig::activate");
1863
1864 b.activate(mem, interrupt, BTreeMap::from([(0, q0), (1, q1)]))
1865 .expect("activate should succeed");
1866
1867 assert_eq!(b.worker_threads.len(), 2, "2 threads should be spawned.");
1868 }
1869
1870 struct BlockContext {}
1871
modify_device(_block_context: &mut BlockContext, b: &mut BlockAsync)1872 fn modify_device(_block_context: &mut BlockContext, b: &mut BlockAsync) {
1873 b.avail_features = !b.avail_features;
1874 }
1875
create_device() -> (BlockContext, BlockAsync)1876 fn create_device() -> (BlockContext, BlockAsync) {
1877 // Create an empty disk image
1878 let f = tempfile().unwrap();
1879 f.set_len(0x1000).unwrap();
1880 let disk_image: Box<dyn DiskFile> = Box::new(f);
1881
1882 // Create a BlockAsync to test
1883 let features = base_features(ProtectionType::Unprotected);
1884 let id = b"Block serial number\0";
1885 let disk_option = DiskOption {
1886 read_only: true,
1887 id: Some(*id),
1888 sparse: false,
1889 multiple_workers: true,
1890 ..Default::default()
1891 };
1892 (
1893 BlockContext {},
1894 BlockAsync::new(
1895 features,
1896 disk_image.try_clone().unwrap(),
1897 &disk_option,
1898 None,
1899 None,
1900 None,
1901 )
1902 .unwrap(),
1903 )
1904 }
1905
1906 #[cfg(any(target_os = "android", target_os = "linux"))]
1907 suspendable_virtio_tests!(asyncblock, create_device, 2, modify_device);
1908 }
1909