1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::cell::RefCell;
6 use std::collections::BTreeMap;
7 use std::collections::BTreeSet;
8 use std::io;
9 use std::io::Write;
10 use std::mem::size_of;
11 #[cfg(windows)]
12 use std::num::NonZeroU32;
13 use std::rc::Rc;
14 use std::result;
15 use std::sync::atomic::AtomicU64;
16 use std::sync::atomic::Ordering;
17 use std::sync::Arc;
18 use std::time::Duration;
19 use std::u32;
20
21 use anyhow::Context;
22 use base::debug;
23 use base::error;
24 use base::info;
25 use base::warn;
26 use base::AsRawDescriptor;
27 use base::Error as SysError;
28 use base::Event;
29 use base::RawDescriptor;
30 use base::Result as SysResult;
31 use base::Timer;
32 use base::Tube;
33 use base::TubeError;
34 use base::WorkerThread;
35 use cros_async::sync::RwLock as AsyncRwLock;
36 use cros_async::AsyncError;
37 use cros_async::AsyncTube;
38 use cros_async::EventAsync;
39 use cros_async::Executor;
40 use cros_async::ExecutorKind;
41 use cros_async::TimerAsync;
42 use data_model::Le16;
43 use data_model::Le32;
44 use data_model::Le64;
45 use disk::AsyncDisk;
46 use disk::DiskFile;
47 use futures::channel::mpsc;
48 use futures::channel::oneshot;
49 use futures::pin_mut;
50 use futures::stream::FuturesUnordered;
51 use futures::stream::StreamExt;
52 use futures::FutureExt;
53 use remain::sorted;
54 use thiserror::Error as ThisError;
55 use virtio_sys::virtio_config::VIRTIO_F_RING_PACKED;
56 use vm_control::DiskControlCommand;
57 use vm_control::DiskControlResult;
58 use vm_memory::GuestMemory;
59 use zerocopy::AsBytes;
60
61 use crate::virtio::async_utils;
62 use crate::virtio::block::sys::*;
63 use crate::virtio::block::DiskOption;
64 use crate::virtio::copy_config;
65 use crate::virtio::device_constants::block::virtio_blk_config;
66 use crate::virtio::device_constants::block::virtio_blk_discard_write_zeroes;
67 use crate::virtio::device_constants::block::virtio_blk_req_header;
68 use crate::virtio::device_constants::block::VIRTIO_BLK_DISCARD_WRITE_ZEROES_FLAG_UNMAP;
69 use crate::virtio::device_constants::block::VIRTIO_BLK_F_BLK_SIZE;
70 use crate::virtio::device_constants::block::VIRTIO_BLK_F_DISCARD;
71 use crate::virtio::device_constants::block::VIRTIO_BLK_F_FLUSH;
72 use crate::virtio::device_constants::block::VIRTIO_BLK_F_MQ;
73 use crate::virtio::device_constants::block::VIRTIO_BLK_F_RO;
74 use crate::virtio::device_constants::block::VIRTIO_BLK_F_SEG_MAX;
75 use crate::virtio::device_constants::block::VIRTIO_BLK_F_WRITE_ZEROES;
76 use crate::virtio::device_constants::block::VIRTIO_BLK_S_IOERR;
77 use crate::virtio::device_constants::block::VIRTIO_BLK_S_OK;
78 use crate::virtio::device_constants::block::VIRTIO_BLK_S_UNSUPP;
79 use crate::virtio::device_constants::block::VIRTIO_BLK_T_DISCARD;
80 use crate::virtio::device_constants::block::VIRTIO_BLK_T_FLUSH;
81 use crate::virtio::device_constants::block::VIRTIO_BLK_T_GET_ID;
82 use crate::virtio::device_constants::block::VIRTIO_BLK_T_IN;
83 use crate::virtio::device_constants::block::VIRTIO_BLK_T_OUT;
84 use crate::virtio::device_constants::block::VIRTIO_BLK_T_WRITE_ZEROES;
85 use crate::virtio::DescriptorChain;
86 use crate::virtio::DeviceType;
87 use crate::virtio::Interrupt;
88 use crate::virtio::Queue;
89 use crate::virtio::Reader;
90 use crate::virtio::VirtioDevice;
91 use crate::virtio::Writer;
92 use crate::PciAddress;
93
94 const DEFAULT_QUEUE_SIZE: u16 = 256;
95 const DEFAULT_NUM_QUEUES: u16 = 16;
96
97 const SECTOR_SHIFT: u8 = 9;
98 const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT;
99
100 const MAX_DISCARD_SECTORS: u32 = u32::MAX;
101 const MAX_WRITE_ZEROES_SECTORS: u32 = u32::MAX;
102 // Arbitrary limits for number of discard/write zeroes segments.
103 const MAX_DISCARD_SEG: u32 = 32;
104 const MAX_WRITE_ZEROES_SEG: u32 = 32;
105 // Hard-coded to 64 KiB (in 512-byte sectors) for now,
106 // but this should probably be based on cluster size for qcow.
107 const DISCARD_SECTOR_ALIGNMENT: u32 = 128;
108
109 #[sorted]
110 #[derive(ThisError, Debug)]
111 enum ExecuteError {
112 #[error("failed to copy ID string: {0}")]
113 CopyId(io::Error),
114 #[error("failed to perform discard or write zeroes; sector={sector} num_sectors={num_sectors} flags={flags}; {ioerr:?}")]
115 DiscardWriteZeroes {
116 ioerr: Option<disk::Error>,
117 sector: u64,
118 num_sectors: u32,
119 flags: u32,
120 },
121 #[error("failed to flush: {0}")]
122 Flush(disk::Error),
123 #[error("not enough space in descriptor chain to write status")]
124 MissingStatus,
125 #[error("out of range")]
126 OutOfRange,
127 #[error("failed to read message: {0}")]
128 Read(io::Error),
129 #[error("io error reading {length} bytes from sector {sector}: {desc_error}")]
130 ReadIo {
131 length: usize,
132 sector: u64,
133 desc_error: disk::Error,
134 },
135 #[error("read only; request_type={request_type}")]
136 ReadOnly { request_type: u32 },
137 #[error("failed to recieve command message: {0}")]
138 ReceivingCommand(TubeError),
139 #[error("failed to send command response: {0}")]
140 SendingResponse(TubeError),
141 #[error("couldn't reset the timer: {0}")]
142 TimerReset(base::Error),
143 #[error("unsupported ({0})")]
144 Unsupported(u32),
145 #[error("io error writing {length} bytes from sector {sector}: {desc_error}")]
146 WriteIo {
147 length: usize,
148 sector: u64,
149 desc_error: disk::Error,
150 },
151 #[error("failed to write request status: {0}")]
152 WriteStatus(io::Error),
153 }
154
155 enum LogLevel {
156 Debug,
157 Error,
158 }
159
160 impl ExecuteError {
status(&self) -> u8161 fn status(&self) -> u8 {
162 match self {
163 ExecuteError::CopyId(_) => VIRTIO_BLK_S_IOERR,
164 ExecuteError::DiscardWriteZeroes { .. } => VIRTIO_BLK_S_IOERR,
165 ExecuteError::Flush(_) => VIRTIO_BLK_S_IOERR,
166 ExecuteError::MissingStatus => VIRTIO_BLK_S_IOERR,
167 ExecuteError::OutOfRange { .. } => VIRTIO_BLK_S_IOERR,
168 ExecuteError::Read(_) => VIRTIO_BLK_S_IOERR,
169 ExecuteError::ReadIo { .. } => VIRTIO_BLK_S_IOERR,
170 ExecuteError::ReadOnly { .. } => VIRTIO_BLK_S_IOERR,
171 ExecuteError::ReceivingCommand(_) => VIRTIO_BLK_S_IOERR,
172 ExecuteError::SendingResponse(_) => VIRTIO_BLK_S_IOERR,
173 ExecuteError::TimerReset(_) => VIRTIO_BLK_S_IOERR,
174 ExecuteError::WriteIo { .. } => VIRTIO_BLK_S_IOERR,
175 ExecuteError::WriteStatus(_) => VIRTIO_BLK_S_IOERR,
176 ExecuteError::Unsupported(_) => VIRTIO_BLK_S_UNSUPP,
177 }
178 }
179
log_level(&self) -> LogLevel180 fn log_level(&self) -> LogLevel {
181 match self {
182 // Since there is no feature bit for the guest to detect support for
183 // VIRTIO_BLK_T_GET_ID, the driver has to try executing the request to see if it works.
184 ExecuteError::Unsupported(VIRTIO_BLK_T_GET_ID) => LogLevel::Debug,
185 // Log disk I/O errors at debug level to avoid flooding the logs.
186 ExecuteError::ReadIo { .. }
187 | ExecuteError::WriteIo { .. }
188 | ExecuteError::Flush { .. }
189 | ExecuteError::DiscardWriteZeroes { .. } => LogLevel::Debug,
190 // Log all other failures as errors.
191 _ => LogLevel::Error,
192 }
193 }
194 }
195
196 /// Errors that happen in block outside of executing a request.
197 /// This includes errors during resize and flush operations.
198 #[sorted]
199 #[derive(ThisError, Debug)]
200 enum ControlError {
201 #[error("couldn't get a value from a timer for flushing: {0}")]
202 FlushTimer(AsyncError),
203 #[error("failed to fsync the disk: {0}")]
204 FsyncDisk(disk::Error),
205 }
206
207 /// Maximum length of the virtio-block ID string field.
208 const ID_LEN: usize = 20;
209
210 /// Virtio block device identifier.
211 /// This is an ASCII string terminated by a \0, unless all 20 bytes are used,
212 /// in which case the \0 terminator is omitted.
213 type BlockId = [u8; ID_LEN];
214
215 /// Tracks the state of an anynchronous disk.
216 struct DiskState {
217 disk_image: Box<dyn AsyncDisk>,
218 read_only: bool,
219 sparse: bool,
220 id: Option<BlockId>,
221 /// A DiskState is owned by each worker's executor and cannot be shared by workers, thus
222 /// `worker_shared_state` holds the state shared by workers in Arc.
223 worker_shared_state: Arc<AsyncRwLock<WorkerSharedState>>,
224 }
225
226 /// Disk state which can be modified by other worker threads
227 struct WorkerSharedState {
228 disk_size: Arc<AtomicU64>,
229 }
230
process_one_request( avail_desc: &mut DescriptorChain, disk_state: &AsyncRwLock<DiskState>, flush_timer: &RefCell<TimerAsync<Timer>>, flush_timer_armed: &RefCell<bool>, ) -> result::Result<usize, ExecuteError>231 async fn process_one_request(
232 avail_desc: &mut DescriptorChain,
233 disk_state: &AsyncRwLock<DiskState>,
234 flush_timer: &RefCell<TimerAsync<Timer>>,
235 flush_timer_armed: &RefCell<bool>,
236 ) -> result::Result<usize, ExecuteError> {
237 let reader = &mut avail_desc.reader;
238 let writer = &mut avail_desc.writer;
239
240 // The last byte of the buffer is virtio_blk_req::status.
241 // Split it into a separate Writer so that status_writer is the final byte and
242 // the original writer is left with just the actual block I/O data.
243 let available_bytes = writer.available_bytes();
244 let status_offset = available_bytes
245 .checked_sub(1)
246 .ok_or(ExecuteError::MissingStatus)?;
247 let mut status_writer = writer.split_at(status_offset);
248
249 let status = match BlockAsync::execute_request(
250 reader,
251 writer,
252 disk_state,
253 flush_timer,
254 flush_timer_armed,
255 )
256 .await
257 {
258 Ok(()) => VIRTIO_BLK_S_OK,
259 Err(e) => {
260 match e.log_level() {
261 LogLevel::Debug => debug!("failed executing disk request: {:#}", e),
262 LogLevel::Error => error!("failed executing disk request: {:#}", e),
263 }
264 e.status()
265 }
266 };
267
268 status_writer
269 .write_all(&[status])
270 .map_err(ExecuteError::WriteStatus)?;
271 Ok(available_bytes)
272 }
273
274 /// Process one descriptor chain asynchronously.
process_one_chain( queue: &RefCell<Queue>, mut avail_desc: DescriptorChain, disk_state: &AsyncRwLock<DiskState>, interrupt: &Interrupt, flush_timer: &RefCell<TimerAsync<Timer>>, flush_timer_armed: &RefCell<bool>, )275 async fn process_one_chain(
276 queue: &RefCell<Queue>,
277 mut avail_desc: DescriptorChain,
278 disk_state: &AsyncRwLock<DiskState>,
279 interrupt: &Interrupt,
280 flush_timer: &RefCell<TimerAsync<Timer>>,
281 flush_timer_armed: &RefCell<bool>,
282 ) {
283 let _trace = cros_tracing::trace_event!(VirtioBlk, "process_one_chain");
284 let len = match process_one_request(&mut avail_desc, disk_state, flush_timer, flush_timer_armed)
285 .await
286 {
287 Ok(len) => len,
288 Err(e) => {
289 error!("block: failed to handle request: {:#}", e);
290 0
291 }
292 };
293
294 let mut queue = queue.borrow_mut();
295 queue.add_used(avail_desc, len as u32);
296 queue.trigger_interrupt(interrupt);
297 }
298
299 // There is one async task running `handle_queue` per virtio queue in use.
300 // Receives messages from the guest and queues a task to complete the operations with the async
301 // executor.
handle_queue( disk_state: Rc<AsyncRwLock<DiskState>>, queue: Queue, evt: EventAsync, interrupt: Interrupt, flush_timer: Rc<RefCell<TimerAsync<Timer>>>, flush_timer_armed: Rc<RefCell<bool>>, mut stop_rx: oneshot::Receiver<()>, ) -> Queue302 async fn handle_queue(
303 disk_state: Rc<AsyncRwLock<DiskState>>,
304 queue: Queue,
305 evt: EventAsync,
306 interrupt: Interrupt,
307 flush_timer: Rc<RefCell<TimerAsync<Timer>>>,
308 flush_timer_armed: Rc<RefCell<bool>>,
309 mut stop_rx: oneshot::Receiver<()>,
310 ) -> Queue {
311 let queue = RefCell::new(queue);
312 let mut background_tasks = FuturesUnordered::new();
313 let evt_future = evt.next_val().fuse();
314 pin_mut!(evt_future);
315 loop {
316 // Wait for the next signal from `evt` and process `background_tasks` in the meantime.
317 //
318 // NOTE: We can't call `evt.next_val()` directly in the `select!` expression. That would
319 // create a new future each time, which, in the completion-based async backends like
320 // io_uring, means we'd submit a new syscall each time (i.e. a race condition on the
321 // eventfd).
322 futures::select! {
323 _ = background_tasks.next() => continue,
324 res = evt_future => {
325 evt_future.set(evt.next_val().fuse());
326 if let Err(e) = res {
327 error!("Failed to read the next queue event: {:#}", e);
328 continue;
329 }
330 }
331 _ = stop_rx => {
332 // Process all the descriptors we've already popped from the queue so that we leave
333 // the queue in a consistent state.
334 background_tasks.collect::<()>().await;
335 return queue.into_inner();
336 }
337 };
338 while let Some(descriptor_chain) = queue.borrow_mut().pop() {
339 background_tasks.push(process_one_chain(
340 &queue,
341 descriptor_chain,
342 &disk_state,
343 &interrupt,
344 &flush_timer,
345 &flush_timer_armed,
346 ));
347 }
348 }
349 }
350
handle_command_tube( command_tube: &Option<AsyncTube>, interrupt: Interrupt, disk_state: Rc<AsyncRwLock<DiskState>>, ) -> Result<(), ExecuteError>351 async fn handle_command_tube(
352 command_tube: &Option<AsyncTube>,
353 interrupt: Interrupt,
354 disk_state: Rc<AsyncRwLock<DiskState>>,
355 ) -> Result<(), ExecuteError> {
356 let command_tube = match command_tube {
357 Some(c) => c,
358 None => {
359 futures::future::pending::<()>().await;
360 return Ok(());
361 }
362 };
363 loop {
364 match command_tube.next().await {
365 Ok(command) => {
366 let resp = match command {
367 DiskControlCommand::Resize { new_size } => resize(&disk_state, new_size).await,
368 };
369
370 let resp_clone = resp.clone();
371 command_tube
372 .send(resp_clone)
373 .await
374 .map_err(ExecuteError::SendingResponse)?;
375 if let DiskControlResult::Ok = resp {
376 interrupt.signal_config_changed();
377 }
378 }
379 Err(e) => return Err(ExecuteError::ReceivingCommand(e)),
380 }
381 }
382 }
383
resize(disk_state: &AsyncRwLock<DiskState>, new_size: u64) -> DiskControlResult384 async fn resize(disk_state: &AsyncRwLock<DiskState>, new_size: u64) -> DiskControlResult {
385 // Acquire exclusive, mutable access to the state so the virtqueue task won't be able to read
386 // the state while resizing.
387 let mut disk_state = disk_state.lock().await;
388 // Prevent any other worker threads won't be able to do IO.
389 let worker_shared_state = Arc::clone(&disk_state.worker_shared_state);
390 let worker_shared_state = worker_shared_state.lock().await;
391
392 if disk_state.read_only {
393 error!("Attempted to resize read-only block device");
394 return DiskControlResult::Err(SysError::new(libc::EROFS));
395 }
396
397 info!("Resizing block device to {} bytes", new_size);
398
399 if let Err(e) = disk_state.disk_image.set_len(new_size) {
400 error!("Resizing disk failed! {:#}", e);
401 return DiskControlResult::Err(SysError::new(libc::EIO));
402 }
403
404 // Allocate new space if the disk image is not sparse.
405 if !disk_state.sparse {
406 if let Err(e) = disk_state.disk_image.allocate(0, new_size) {
407 error!("Allocating disk space after resize failed! {:#}", e);
408 return DiskControlResult::Err(SysError::new(libc::EIO));
409 }
410 }
411
412 if let Ok(new_disk_size) = disk_state.disk_image.get_len() {
413 worker_shared_state
414 .disk_size
415 .store(new_disk_size, Ordering::Release);
416 }
417 DiskControlResult::Ok
418 }
419
420 /// Periodically flushes the disk when the given timer fires.
flush_disk( disk_state: Rc<AsyncRwLock<DiskState>>, timer: TimerAsync<Timer>, armed: Rc<RefCell<bool>>, ) -> Result<(), ControlError>421 async fn flush_disk(
422 disk_state: Rc<AsyncRwLock<DiskState>>,
423 timer: TimerAsync<Timer>,
424 armed: Rc<RefCell<bool>>,
425 ) -> Result<(), ControlError> {
426 loop {
427 timer.wait().await.map_err(ControlError::FlushTimer)?;
428 if !*armed.borrow() {
429 continue;
430 }
431
432 // Reset armed before calling fsync to guarantee that IO requests that started after we call
433 // fsync will be committed eventually.
434 *armed.borrow_mut() = false;
435
436 disk_state
437 .read_lock()
438 .await
439 .disk_image
440 .fsync()
441 .await
442 .map_err(ControlError::FsyncDisk)?;
443 }
444 }
445
446 enum WorkerCmd {
447 StartQueue {
448 index: usize,
449 queue: Queue,
450 interrupt: Interrupt,
451 },
452 StopQueue {
453 index: usize,
454 // Once the queue is stopped, it will be sent back over `response_tx`.
455 // `None` indicates that there was no queue at the given index.
456 response_tx: oneshot::Sender<Option<Queue>>,
457 },
458 }
459
460 // The main worker thread. Initialized the asynchronous worker tasks and passes them to the executor
461 // to be processed.
462 //
463 // `disk_state` is wrapped by `AsyncRwLock`, which provides both shared and exclusive locks. It's
464 // because the state can be read from the virtqueue task while the control task is processing a
465 // resizing command.
run_worker( ex: &Executor, interrupt: Interrupt, disk_state: &Rc<AsyncRwLock<DiskState>>, control_tube: &Option<AsyncTube>, mut worker_rx: mpsc::UnboundedReceiver<WorkerCmd>, kill_evt: Event, ) -> anyhow::Result<()>466 async fn run_worker(
467 ex: &Executor,
468 interrupt: Interrupt,
469 disk_state: &Rc<AsyncRwLock<DiskState>>,
470 control_tube: &Option<AsyncTube>,
471 mut worker_rx: mpsc::UnboundedReceiver<WorkerCmd>,
472 kill_evt: Event,
473 ) -> anyhow::Result<()> {
474 // One flush timer per disk.
475 let timer = Timer::new().expect("Failed to create a timer");
476 let flush_timer_armed = Rc::new(RefCell::new(false));
477
478 // Handles control requests.
479 let control = handle_command_tube(control_tube, interrupt.clone(), disk_state.clone()).fuse();
480 pin_mut!(control);
481
482 // Handle all the queues in one sub-select call.
483 let flush_timer = Rc::new(RefCell::new(
484 TimerAsync::new(
485 // Call try_clone() to share the same underlying FD with the `flush_disk` task.
486 timer.try_clone().expect("Failed to clone flush_timer"),
487 ex,
488 )
489 .expect("Failed to create an async timer"),
490 ));
491
492 // Flushes the disk periodically.
493 let flush_timer2 = TimerAsync::new(timer, ex).expect("Failed to create an async timer");
494 let disk_flush = flush_disk(disk_state.clone(), flush_timer2, flush_timer_armed.clone()).fuse();
495 pin_mut!(disk_flush);
496
497 // Exit if the kill event is triggered.
498 let kill = async_utils::await_and_exit(ex, kill_evt).fuse();
499 pin_mut!(kill);
500
501 // Process any requests to resample the irq value.
502 let resample_future = async_utils::handle_irq_resample(ex, interrupt.clone()).fuse();
503 pin_mut!(resample_future);
504
505 // Running queue handlers.
506 let mut queue_handlers = FuturesUnordered::new();
507 // Async stop functions for queue handlers, by queue index.
508 let mut queue_handler_stop_fns = std::collections::BTreeMap::new();
509
510 loop {
511 futures::select! {
512 _ = queue_handlers.next() => continue,
513 r = disk_flush => return r.context("failed to flush a disk"),
514 r = control => return r.context("failed to handle a control request"),
515 r = resample_future => return r.context("failed to resample an irq value"),
516 r = kill => return r.context("failed to wait on the kill event"),
517 worker_cmd = worker_rx.next() => {
518 match worker_cmd {
519 None => anyhow::bail!("worker control channel unexpectedly closed"),
520 Some(WorkerCmd::StartQueue{index, queue, interrupt}) => {
521 let (tx, rx) = oneshot::channel();
522 let kick_evt = queue.event().try_clone().expect("Failed to clone queue event");
523 let (handle_queue_future, remote_handle) = handle_queue(
524 Rc::clone(disk_state),
525 queue,
526 EventAsync::new(kick_evt, ex).expect("Failed to create async event for queue"),
527 interrupt,
528 Rc::clone(&flush_timer),
529 Rc::clone(&flush_timer_armed),
530 rx,
531 ).remote_handle();
532 let old_stop_fn = queue_handler_stop_fns.insert(index, move || {
533 // Ask the handler to stop.
534 tx.send(()).unwrap_or_else(|_| panic!("queue handler channel closed early"));
535 // Wait for its return value.
536 remote_handle
537 });
538
539 // If there was already a handler for this index, stop it before adding the
540 // new handler future.
541 if let Some(stop_fn) = old_stop_fn {
542 warn!("Starting new queue handler without stopping old handler");
543 // Unfortunately we can't just do `stop_fn().await` because the actual
544 // work we are waiting on is in `queue_handlers`. So, run both.
545 let mut fut = stop_fn().fuse();
546 loop {
547 futures::select! {
548 _ = queue_handlers.next() => continue,
549 _queue = fut => break,
550 }
551 }
552 }
553
554 queue_handlers.push(handle_queue_future);
555 }
556 Some(WorkerCmd::StopQueue{index, response_tx}) => {
557 match queue_handler_stop_fns.remove(&index) {
558 Some(stop_fn) => {
559 // NOTE: This await is blocking the select loop. If we want to
560 // support stopping queues concurrently, then it needs to be moved.
561 // For now, keep it simple.
562 //
563 // Unfortunately we can't just do `stop_fn().await` because the
564 // actual work we are waiting on is in `queue_handlers`. So, run
565 // both.
566 let mut fut = stop_fn().fuse();
567 let queue = loop {
568 futures::select! {
569 _ = queue_handlers.next() => continue,
570 queue = fut => break queue,
571 }
572 };
573 let _ = response_tx.send(Some(queue));
574 }
575 None => { let _ = response_tx.send(None); },
576 }
577 }
578 }
579 }
580 };
581 }
582 }
583
584 /// Virtio device for exposing block level read/write operations on a host file.
585 pub struct BlockAsync {
586 // We need to make boot_index public bc the field is used by the main crate to determine boot
587 // order
588 boot_index: Option<usize>,
589 // `None` iff `self.worker_per_queue == false` and the worker thread is running.
590 disk_image: Option<Box<dyn DiskFile>>,
591 disk_size: Arc<AtomicU64>,
592 avail_features: u64,
593 read_only: bool,
594 sparse: bool,
595 seg_max: u32,
596 block_size: u32,
597 id: Option<BlockId>,
598 control_tube: Option<Tube>,
599 queue_sizes: Vec<u16>,
600 pub(super) executor_kind: ExecutorKind,
601 // If `worker_per_queue == true`, `worker_threads` contains the worker for each running queue
602 // by index. Otherwise, contains the monolithic worker for all queues at index 0.
603 worker_threads: BTreeMap<
604 usize,
605 (
606 WorkerThread<(Box<dyn DiskFile>, Option<Tube>)>,
607 mpsc::UnboundedSender<WorkerCmd>,
608 ),
609 >,
610 shared_state: Arc<AsyncRwLock<WorkerSharedState>>,
611 // Whether to run worker threads in parallel for each queue
612 worker_per_queue: bool,
613 // Indices of running queues.
614 // TODO: The worker already tracks this. Only need it here to stop queues on sleep. Maybe add a
615 // worker cmd to stop all at once, then we can delete this field.
616 activated_queues: BTreeSet<usize>,
617 #[cfg(windows)]
618 pub(super) io_concurrency: u32,
619 pci_address: Option<PciAddress>,
620 }
621
622 impl BlockAsync {
623 /// Create a new virtio block device that operates on the given AsyncDisk.
new( base_features: u64, disk_image: Box<dyn DiskFile>, disk_option: &DiskOption, control_tube: Option<Tube>, queue_size: Option<u16>, num_queues: Option<u16>, ) -> SysResult<BlockAsync>624 pub fn new(
625 base_features: u64,
626 disk_image: Box<dyn DiskFile>,
627 disk_option: &DiskOption,
628 control_tube: Option<Tube>,
629 queue_size: Option<u16>,
630 num_queues: Option<u16>,
631 ) -> SysResult<BlockAsync> {
632 let read_only = disk_option.read_only;
633 let sparse = disk_option.sparse;
634 let block_size = disk_option.block_size;
635 let packed_queue = disk_option.packed_queue;
636 let id = disk_option.id;
637 let mut worker_per_queue = disk_option.multiple_workers;
638 // Automatically disable multiple workers if the disk image can't be cloned.
639 if worker_per_queue && disk_image.try_clone().is_err() {
640 base::warn!("multiple workers requested, but not supported by disk image type");
641 worker_per_queue = false;
642 }
643 let executor_kind = disk_option.async_executor;
644 let boot_index = disk_option.bootindex;
645 #[cfg(windows)]
646 let io_concurrency = disk_option.io_concurrency.get();
647
648 if block_size % SECTOR_SIZE as u32 != 0 {
649 error!(
650 "Block size {} is not a multiple of {}.",
651 block_size, SECTOR_SIZE,
652 );
653 return Err(SysError::new(libc::EINVAL));
654 }
655 let disk_size = disk_image.get_len()?;
656 if disk_size % block_size as u64 != 0 {
657 warn!(
658 "Disk size {} is not a multiple of block size {}; \
659 the remainder will not be visible to the guest.",
660 disk_size, block_size,
661 );
662 }
663 let num_queues = num_queues.unwrap_or(DEFAULT_NUM_QUEUES);
664 let multi_queue = match num_queues {
665 0 => panic!("Number of queues cannot be zero for a block device"),
666 1 => false,
667 _ => true,
668 };
669 let q_size = queue_size.unwrap_or(DEFAULT_QUEUE_SIZE);
670 if !q_size.is_power_of_two() {
671 error!("queue size {} is not a power of 2.", q_size);
672 return Err(SysError::new(libc::EINVAL));
673 }
674 let queue_sizes = vec![q_size; num_queues as usize];
675
676 let avail_features =
677 Self::build_avail_features(base_features, read_only, sparse, multi_queue, packed_queue);
678
679 let seg_max = get_seg_max(q_size);
680 let executor_kind = executor_kind.unwrap_or_default();
681
682 let disk_size = Arc::new(AtomicU64::new(disk_size));
683 let shared_state = Arc::new(AsyncRwLock::new(WorkerSharedState {
684 disk_size: disk_size.clone(),
685 }));
686
687 Ok(BlockAsync {
688 disk_image: Some(disk_image),
689 disk_size,
690 avail_features,
691 read_only,
692 sparse,
693 seg_max,
694 block_size,
695 id,
696 queue_sizes,
697 worker_threads: BTreeMap::new(),
698 shared_state,
699 worker_per_queue,
700 control_tube,
701 executor_kind,
702 activated_queues: BTreeSet::new(),
703 boot_index,
704 #[cfg(windows)]
705 io_concurrency,
706 pci_address: disk_option.pci_address,
707 })
708 }
709
710 /// Returns the feature flags given the specified attributes.
build_avail_features( base_features: u64, read_only: bool, sparse: bool, multi_queue: bool, packed_queue: bool, ) -> u64711 fn build_avail_features(
712 base_features: u64,
713 read_only: bool,
714 sparse: bool,
715 multi_queue: bool,
716 packed_queue: bool,
717 ) -> u64 {
718 let mut avail_features = base_features;
719 if read_only {
720 avail_features |= 1 << VIRTIO_BLK_F_RO;
721 } else {
722 if sparse {
723 avail_features |= 1 << VIRTIO_BLK_F_DISCARD;
724 }
725 avail_features |= 1 << VIRTIO_BLK_F_FLUSH;
726 avail_features |= 1 << VIRTIO_BLK_F_WRITE_ZEROES;
727 }
728 avail_features |= 1 << VIRTIO_BLK_F_SEG_MAX;
729 avail_features |= 1 << VIRTIO_BLK_F_BLK_SIZE;
730 if multi_queue {
731 avail_features |= 1 << VIRTIO_BLK_F_MQ;
732 }
733 if packed_queue {
734 avail_features |= 1 << VIRTIO_F_RING_PACKED;
735 }
736 avail_features
737 }
738
739 // Execute a single block device request.
740 // `writer` includes the data region only; the status byte is not included.
741 // It is up to the caller to convert the result of this function into a status byte
742 // and write it to the expected location in guest memory.
execute_request( reader: &mut Reader, writer: &mut Writer, disk_state: &AsyncRwLock<DiskState>, flush_timer: &RefCell<TimerAsync<Timer>>, flush_timer_armed: &RefCell<bool>, ) -> result::Result<(), ExecuteError>743 async fn execute_request(
744 reader: &mut Reader,
745 writer: &mut Writer,
746 disk_state: &AsyncRwLock<DiskState>,
747 flush_timer: &RefCell<TimerAsync<Timer>>,
748 flush_timer_armed: &RefCell<bool>,
749 ) -> result::Result<(), ExecuteError> {
750 // Acquire immutable access to prevent tasks from resizing disk.
751 let disk_state = disk_state.read_lock().await;
752 // Acquire immutable access to prevent other worker threads from resizing disk.
753 let worker_shared_state = disk_state.worker_shared_state.read_lock().await;
754
755 let req_header: virtio_blk_req_header = reader.read_obj().map_err(ExecuteError::Read)?;
756
757 let req_type = req_header.req_type.to_native();
758 let sector = req_header.sector.to_native();
759
760 if disk_state.read_only && req_type != VIRTIO_BLK_T_IN && req_type != VIRTIO_BLK_T_GET_ID {
761 return Err(ExecuteError::ReadOnly {
762 request_type: req_type,
763 });
764 }
765
766 /// Check that a request accesses only data within the disk's current size.
767 /// All parameters are in units of bytes.
768 fn check_range(
769 io_start: u64,
770 io_length: u64,
771 disk_size: u64,
772 ) -> result::Result<(), ExecuteError> {
773 let io_end = io_start
774 .checked_add(io_length)
775 .ok_or(ExecuteError::OutOfRange)?;
776 if io_end > disk_size {
777 Err(ExecuteError::OutOfRange)
778 } else {
779 Ok(())
780 }
781 }
782
783 let disk_size = worker_shared_state.disk_size.load(Ordering::Relaxed);
784 match req_type {
785 VIRTIO_BLK_T_IN => {
786 let data_len = writer.available_bytes();
787 if data_len == 0 {
788 return Ok(());
789 }
790 let offset = sector
791 .checked_shl(u32::from(SECTOR_SHIFT))
792 .ok_or(ExecuteError::OutOfRange)?;
793 let _trace = cros_tracing::trace_event!(VirtioBlk, "in", offset, data_len);
794 check_range(offset, data_len as u64, disk_size)?;
795 let disk_image = &disk_state.disk_image;
796 writer
797 .write_all_from_at_fut(&**disk_image, data_len, offset)
798 .await
799 .map_err(|desc_error| ExecuteError::ReadIo {
800 length: data_len,
801 sector,
802 desc_error,
803 })?;
804 }
805 VIRTIO_BLK_T_OUT => {
806 let data_len = reader.available_bytes();
807 if data_len == 0 {
808 return Ok(());
809 }
810 let offset = sector
811 .checked_shl(u32::from(SECTOR_SHIFT))
812 .ok_or(ExecuteError::OutOfRange)?;
813 let _trace = cros_tracing::trace_event!(VirtioBlk, "out", offset, data_len);
814 check_range(offset, data_len as u64, disk_size)?;
815 let disk_image = &disk_state.disk_image;
816 reader
817 .read_exact_to_at_fut(&**disk_image, data_len, offset)
818 .await
819 .map_err(|desc_error| ExecuteError::WriteIo {
820 length: data_len,
821 sector,
822 desc_error,
823 })?;
824
825 if !*flush_timer_armed.borrow() {
826 *flush_timer_armed.borrow_mut() = true;
827
828 let flush_delay = Duration::from_secs(60);
829 flush_timer
830 .borrow_mut()
831 .reset(flush_delay, None)
832 .map_err(ExecuteError::TimerReset)?;
833 }
834 }
835 VIRTIO_BLK_T_DISCARD | VIRTIO_BLK_T_WRITE_ZEROES => {
836 #[allow(clippy::if_same_then_else)]
837 let _trace = if req_type == VIRTIO_BLK_T_DISCARD {
838 cros_tracing::trace_event!(VirtioBlk, "discard")
839 } else {
840 cros_tracing::trace_event!(VirtioBlk, "write_zeroes")
841 };
842 if req_type == VIRTIO_BLK_T_DISCARD && !disk_state.sparse {
843 // Discard is a hint; if this is a non-sparse disk, just ignore it.
844 return Ok(());
845 }
846
847 while reader.available_bytes() >= size_of::<virtio_blk_discard_write_zeroes>() {
848 let seg: virtio_blk_discard_write_zeroes =
849 reader.read_obj().map_err(ExecuteError::Read)?;
850
851 let sector = seg.sector.to_native();
852 let num_sectors = seg.num_sectors.to_native();
853 let flags = seg.flags.to_native();
854
855 let valid_flags = if req_type == VIRTIO_BLK_T_WRITE_ZEROES {
856 VIRTIO_BLK_DISCARD_WRITE_ZEROES_FLAG_UNMAP
857 } else {
858 0
859 };
860
861 if (flags & !valid_flags) != 0 {
862 return Err(ExecuteError::DiscardWriteZeroes {
863 ioerr: None,
864 sector,
865 num_sectors,
866 flags,
867 });
868 }
869
870 let offset = sector
871 .checked_shl(u32::from(SECTOR_SHIFT))
872 .ok_or(ExecuteError::OutOfRange)?;
873 let length = u64::from(num_sectors)
874 .checked_shl(u32::from(SECTOR_SHIFT))
875 .ok_or(ExecuteError::OutOfRange)?;
876 check_range(offset, length, disk_size)?;
877
878 if req_type == VIRTIO_BLK_T_DISCARD {
879 // Since Discard is just a hint and some filesystems may not implement
880 // FALLOC_FL_PUNCH_HOLE, ignore punch_hole errors.
881 let _ = disk_state.disk_image.punch_hole(offset, length).await;
882 } else {
883 disk_state
884 .disk_image
885 .write_zeroes_at(offset, length)
886 .await
887 .map_err(|e| ExecuteError::DiscardWriteZeroes {
888 ioerr: Some(e),
889 sector,
890 num_sectors,
891 flags,
892 })?;
893 }
894 }
895 }
896 VIRTIO_BLK_T_FLUSH => {
897 let _trace = cros_tracing::trace_event!(VirtioBlk, "flush");
898 disk_state
899 .disk_image
900 .fdatasync()
901 .await
902 .map_err(ExecuteError::Flush)?;
903
904 if *flush_timer_armed.borrow() {
905 flush_timer
906 .borrow_mut()
907 .clear()
908 .map_err(ExecuteError::TimerReset)?;
909 *flush_timer_armed.borrow_mut() = false;
910 }
911 }
912 VIRTIO_BLK_T_GET_ID => {
913 let _trace = cros_tracing::trace_event!(VirtioBlk, "get_id");
914 if let Some(id) = disk_state.id {
915 writer.write_all(&id).map_err(ExecuteError::CopyId)?;
916 } else {
917 return Err(ExecuteError::Unsupported(req_type));
918 }
919 }
920 t => return Err(ExecuteError::Unsupported(t)),
921 };
922 Ok(())
923 }
924
925 /// Builds and returns the config structure used to specify block features.
build_config_space( disk_size: u64, seg_max: u32, block_size: u32, num_queues: u16, ) -> virtio_blk_config926 fn build_config_space(
927 disk_size: u64,
928 seg_max: u32,
929 block_size: u32,
930 num_queues: u16,
931 ) -> virtio_blk_config {
932 virtio_blk_config {
933 // If the image is not a multiple of the sector size, the tail bits are not exposed.
934 capacity: Le64::from(disk_size >> SECTOR_SHIFT),
935 seg_max: Le32::from(seg_max),
936 blk_size: Le32::from(block_size),
937 num_queues: Le16::from(num_queues),
938 max_discard_sectors: Le32::from(MAX_DISCARD_SECTORS),
939 discard_sector_alignment: Le32::from(DISCARD_SECTOR_ALIGNMENT),
940 max_write_zeroes_sectors: Le32::from(MAX_WRITE_ZEROES_SECTORS),
941 write_zeroes_may_unmap: 1,
942 max_discard_seg: Le32::from(MAX_DISCARD_SEG),
943 max_write_zeroes_seg: Le32::from(MAX_WRITE_ZEROES_SEG),
944 ..Default::default()
945 }
946 }
947
948 /// Get the worker for a queue, starting it if necessary.
949 // NOTE: Can't use `BTreeMap::entry` because it requires an exclusive ref for the whole branch.
950 #[allow(clippy::map_entry)]
start_worker( &mut self, idx: usize, interrupt: Interrupt, ) -> anyhow::Result<&( WorkerThread<(Box<dyn DiskFile>, Option<Tube>)>, mpsc::UnboundedSender<WorkerCmd>, )>951 fn start_worker(
952 &mut self,
953 idx: usize,
954 interrupt: Interrupt,
955 ) -> anyhow::Result<&(
956 WorkerThread<(Box<dyn DiskFile>, Option<Tube>)>,
957 mpsc::UnboundedSender<WorkerCmd>,
958 )> {
959 let key = if self.worker_per_queue { idx } else { 0 };
960 if self.worker_threads.contains_key(&key) {
961 return Ok(self.worker_threads.get(&key).unwrap());
962 }
963
964 let ex = self.create_executor();
965 let control_tube = self.control_tube.take();
966 let disk_image = if self.worker_per_queue {
967 self.disk_image
968 .as_ref()
969 .context("Failed to ref a disk image")?
970 .try_clone()
971 .context("Failed to clone a disk image")?
972 } else {
973 self.disk_image
974 .take()
975 .context("Failed to take a disk image")?
976 };
977 let read_only = self.read_only;
978 let sparse = self.sparse;
979 let id = self.id;
980 let worker_shared_state = self.shared_state.clone();
981
982 let (worker_tx, worker_rx) = mpsc::unbounded();
983 let worker_thread = WorkerThread::start("virtio_blk", move |kill_evt| {
984 let async_control =
985 control_tube.map(|c| AsyncTube::new(&ex, c).expect("failed to create async tube"));
986
987 let async_image = match disk_image.to_async_disk(&ex) {
988 Ok(d) => d,
989 Err(e) => panic!("Failed to create async disk {:#}", e),
990 };
991
992 let disk_state = Rc::new(AsyncRwLock::new(DiskState {
993 disk_image: async_image,
994 read_only,
995 sparse,
996 id,
997 worker_shared_state,
998 }));
999
1000 if let Err(err_string) = ex
1001 .run_until(async {
1002 let r = run_worker(
1003 &ex,
1004 interrupt,
1005 &disk_state,
1006 &async_control,
1007 worker_rx,
1008 kill_evt,
1009 )
1010 .await;
1011 // Flush any in-memory disk image state to file.
1012 if let Err(e) = disk_state.lock().await.disk_image.flush().await {
1013 error!("failed to flush disk image when stopping worker: {e:?}");
1014 }
1015 r
1016 })
1017 .expect("run_until failed")
1018 {
1019 error!("{:#}", err_string);
1020 }
1021
1022 let disk_state = match Rc::try_unwrap(disk_state) {
1023 Ok(d) => d.into_inner(),
1024 Err(_) => panic!("too many refs to the disk"),
1025 };
1026 (
1027 disk_state.disk_image.into_inner(),
1028 async_control.map(Tube::from),
1029 )
1030 });
1031 match self.worker_threads.entry(key) {
1032 std::collections::btree_map::Entry::Occupied(_) => unreachable!(),
1033 std::collections::btree_map::Entry::Vacant(e) => {
1034 Ok(e.insert((worker_thread, worker_tx)))
1035 }
1036 }
1037 }
1038
start_queue( &mut self, idx: usize, queue: Queue, _mem: GuestMemory, doorbell: Interrupt, ) -> anyhow::Result<()>1039 pub fn start_queue(
1040 &mut self,
1041 idx: usize,
1042 queue: Queue,
1043 _mem: GuestMemory,
1044 doorbell: Interrupt,
1045 ) -> anyhow::Result<()> {
1046 let (_, worker_tx) = self.start_worker(idx, doorbell.clone())?;
1047 worker_tx
1048 .unbounded_send(WorkerCmd::StartQueue {
1049 index: idx,
1050 queue,
1051 interrupt: doorbell,
1052 })
1053 .expect("worker channel closed early");
1054 self.activated_queues.insert(idx);
1055 Ok(())
1056 }
1057
stop_queue(&mut self, idx: usize) -> anyhow::Result<Queue>1058 pub fn stop_queue(&mut self, idx: usize) -> anyhow::Result<Queue> {
1059 // TODO: Consider stopping the worker thread if this is the last queue managed by it. Then,
1060 // simplify `virtio_sleep` and/or `reset` methods.
1061 let (_, worker_tx) = self
1062 .worker_threads
1063 .get(if self.worker_per_queue { &idx } else { &0 })
1064 .context("worker not found")?;
1065 let (response_tx, response_rx) = oneshot::channel();
1066 worker_tx
1067 .unbounded_send(WorkerCmd::StopQueue {
1068 index: idx,
1069 response_tx,
1070 })
1071 .expect("worker channel closed early");
1072 let queue = cros_async::block_on(async {
1073 response_rx
1074 .await
1075 .expect("response_rx closed early")
1076 .context("queue not found")
1077 })?;
1078 self.activated_queues.remove(&idx);
1079 Ok(queue)
1080 }
1081 }
1082
1083 impl VirtioDevice for BlockAsync {
keep_rds(&self) -> Vec<RawDescriptor>1084 fn keep_rds(&self) -> Vec<RawDescriptor> {
1085 let mut keep_rds = Vec::new();
1086
1087 if let Some(disk_image) = &self.disk_image {
1088 keep_rds.extend(disk_image.as_raw_descriptors());
1089 }
1090
1091 if let Some(control_tube) = &self.control_tube {
1092 keep_rds.push(control_tube.as_raw_descriptor());
1093 }
1094
1095 keep_rds
1096 }
1097
features(&self) -> u641098 fn features(&self) -> u64 {
1099 self.avail_features
1100 }
1101
device_type(&self) -> DeviceType1102 fn device_type(&self) -> DeviceType {
1103 DeviceType::Block
1104 }
1105
queue_max_sizes(&self) -> &[u16]1106 fn queue_max_sizes(&self) -> &[u16] {
1107 &self.queue_sizes
1108 }
1109
read_config(&self, offset: u64, data: &mut [u8])1110 fn read_config(&self, offset: u64, data: &mut [u8]) {
1111 let config_space = {
1112 let disk_size = self.disk_size.load(Ordering::Acquire);
1113 Self::build_config_space(
1114 disk_size,
1115 self.seg_max,
1116 self.block_size,
1117 self.queue_sizes.len() as u16,
1118 )
1119 };
1120 copy_config(data, 0, config_space.as_bytes(), offset);
1121 }
1122
activate( &mut self, mem: GuestMemory, interrupt: Interrupt, queues: BTreeMap<usize, Queue>, ) -> anyhow::Result<()>1123 fn activate(
1124 &mut self,
1125 mem: GuestMemory,
1126 interrupt: Interrupt,
1127 queues: BTreeMap<usize, Queue>,
1128 ) -> anyhow::Result<()> {
1129 for (i, q) in queues {
1130 self.start_queue(i, q, mem.clone(), interrupt.clone())?;
1131 }
1132 Ok(())
1133 }
1134
reset(&mut self) -> anyhow::Result<()>1135 fn reset(&mut self) -> anyhow::Result<()> {
1136 while let Some((_, (worker_thread, _))) = self.worker_threads.pop_first() {
1137 let (disk_image, control_tube) = worker_thread.stop();
1138 self.disk_image = Some(disk_image);
1139 if let Some(control_tube) = control_tube {
1140 self.control_tube = Some(control_tube);
1141 }
1142 }
1143 self.activated_queues.clear();
1144 Ok(())
1145 }
1146
virtio_sleep(&mut self) -> anyhow::Result<Option<BTreeMap<usize, Queue>>>1147 fn virtio_sleep(&mut self) -> anyhow::Result<Option<BTreeMap<usize, Queue>>> {
1148 if self.worker_threads.is_empty() {
1149 return Ok(None); // Not activated.
1150 }
1151
1152 // Reclaim the queues from workers.
1153 let mut queues = BTreeMap::new();
1154 for index in self.activated_queues.clone() {
1155 queues.insert(index, self.stop_queue(index)?);
1156 }
1157 // Shutdown the workers.
1158 while let Some((_, (worker_thread, _))) = self.worker_threads.pop_first() {
1159 let (disk_image, control_tube) = worker_thread.stop();
1160 self.disk_image = Some(disk_image);
1161 if let Some(control_tube) = control_tube {
1162 self.control_tube = Some(control_tube);
1163 }
1164 }
1165 Ok(Some(queues))
1166 }
1167
virtio_wake( &mut self, queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>, ) -> anyhow::Result<()>1168 fn virtio_wake(
1169 &mut self,
1170 queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>,
1171 ) -> anyhow::Result<()> {
1172 if let Some((mem, interrupt, queues)) = queues_state {
1173 for (i, q) in queues {
1174 self.start_queue(i, q, mem.clone(), interrupt.clone())?
1175 }
1176 }
1177 Ok(())
1178 }
1179
virtio_snapshot(&mut self) -> anyhow::Result<serde_json::Value>1180 fn virtio_snapshot(&mut self) -> anyhow::Result<serde_json::Value> {
1181 // `virtio_sleep` ensures there is no pending state, except for the `Queue`s, which are
1182 // handled at a higher layer.
1183 Ok(serde_json::Value::Null)
1184 }
1185
virtio_restore(&mut self, data: serde_json::Value) -> anyhow::Result<()>1186 fn virtio_restore(&mut self, data: serde_json::Value) -> anyhow::Result<()> {
1187 anyhow::ensure!(
1188 data == serde_json::Value::Null,
1189 "unexpected snapshot data: should be null, got {}",
1190 data,
1191 );
1192 Ok(())
1193 }
1194
pci_address(&self) -> Option<PciAddress>1195 fn pci_address(&self) -> Option<PciAddress> {
1196 self.pci_address
1197 }
1198
bootorder_fw_cfg(&self, pci_slot: u8) -> Option<(Vec<u8>, usize)>1199 fn bootorder_fw_cfg(&self, pci_slot: u8) -> Option<(Vec<u8>, usize)> {
1200 self.boot_index
1201 .map(|s| (format!("scsi@{}/disk@0,0", pci_slot).as_bytes().to_vec(), s))
1202 }
1203 }
1204
1205 #[cfg(test)]
1206 mod tests {
1207 use std::fs::File;
1208 use std::mem::size_of_val;
1209 use std::sync::atomic::AtomicU64;
1210
1211 use data_model::Le32;
1212 use data_model::Le64;
1213 use disk::SingleFileDisk;
1214 use hypervisor::ProtectionType;
1215 use tempfile::tempfile;
1216 use tempfile::TempDir;
1217 use vm_memory::GuestAddress;
1218
1219 use super::*;
1220 use crate::suspendable_virtio_tests;
1221 use crate::virtio::base_features;
1222 use crate::virtio::descriptor_utils::create_descriptor_chain;
1223 use crate::virtio::descriptor_utils::DescriptorType;
1224 use crate::virtio::QueueConfig;
1225
1226 #[test]
read_size()1227 fn read_size() {
1228 let f = tempfile().unwrap();
1229 f.set_len(0x1000).unwrap();
1230
1231 let features = base_features(ProtectionType::Unprotected);
1232 let disk_option = DiskOption::default();
1233 let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1234 let mut num_sectors = [0u8; 4];
1235 b.read_config(0, &mut num_sectors);
1236 // size is 0x1000, so num_sectors is 8 (4096/512).
1237 assert_eq!([0x08, 0x00, 0x00, 0x00], num_sectors);
1238 let mut msw_sectors = [0u8; 4];
1239 b.read_config(4, &mut msw_sectors);
1240 // size is 0x1000, so msw_sectors is 0.
1241 assert_eq!([0x00, 0x00, 0x00, 0x00], msw_sectors);
1242 }
1243
1244 #[test]
read_block_size()1245 fn read_block_size() {
1246 let f = tempfile().unwrap();
1247 f.set_len(0x1000).unwrap();
1248
1249 let features = base_features(ProtectionType::Unprotected);
1250 let disk_option = DiskOption {
1251 block_size: 4096,
1252 sparse: false,
1253 ..Default::default()
1254 };
1255 let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1256 let mut blk_size = [0u8; 4];
1257 b.read_config(20, &mut blk_size);
1258 // blk_size should be 4096 (0x1000).
1259 assert_eq!([0x00, 0x10, 0x00, 0x00], blk_size);
1260 }
1261
1262 #[test]
read_features()1263 fn read_features() {
1264 let tempdir = TempDir::new().unwrap();
1265 let mut path = tempdir.path().to_owned();
1266 path.push("disk_image");
1267
1268 // read-write block device
1269 {
1270 let f = File::create(&path).unwrap();
1271 let features = base_features(ProtectionType::Unprotected);
1272 let disk_option = DiskOption::default();
1273 let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1274 // writable device should set VIRTIO_BLK_F_FLUSH + VIRTIO_BLK_F_DISCARD
1275 // + VIRTIO_BLK_F_WRITE_ZEROES + VIRTIO_F_VERSION_1 + VIRTIO_BLK_F_BLK_SIZE
1276 // + VIRTIO_BLK_F_SEG_MAX + VIRTIO_BLK_F_MQ + VIRTIO_RING_F_EVENT_IDX
1277 assert_eq!(0x120007244, b.features());
1278 }
1279
1280 // read-write block device, non-sparse
1281 {
1282 let f = File::create(&path).unwrap();
1283 let features = base_features(ProtectionType::Unprotected);
1284 let disk_option = DiskOption {
1285 sparse: false,
1286 ..Default::default()
1287 };
1288 let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1289 // writable device should set VIRTIO_F_FLUSH + VIRTIO_BLK_F_RO
1290 // + VIRTIO_F_VERSION_1 + VIRTIO_BLK_F_BLK_SIZE + VIRTIO_BLK_F_SEG_MAX
1291 // + VIRTIO_BLK_F_MQ + VIRTIO_RING_F_EVENT_IDX
1292 assert_eq!(0x120005244, b.features());
1293 }
1294
1295 // read-only block device
1296 {
1297 let f = File::create(&path).unwrap();
1298 let features = base_features(ProtectionType::Unprotected);
1299 let disk_option = DiskOption {
1300 read_only: true,
1301 ..Default::default()
1302 };
1303 let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1304 // read-only device should set VIRTIO_BLK_F_RO
1305 // + VIRTIO_F_VERSION_1 + VIRTIO_BLK_F_BLK_SIZE + VIRTIO_BLK_F_SEG_MAX
1306 // + VIRTIO_BLK_F_MQ + VIRTIO_RING_F_EVENT_IDX
1307 assert_eq!(0x120001064, b.features());
1308 }
1309 }
1310
1311 #[test]
check_pci_adress_configurability()1312 fn check_pci_adress_configurability() {
1313 let f = tempfile().unwrap();
1314
1315 let features = base_features(ProtectionType::Unprotected);
1316 let disk_option = DiskOption {
1317 pci_address: Some(PciAddress {
1318 bus: 0,
1319 dev: 1,
1320 func: 1,
1321 }),
1322 ..Default::default()
1323 };
1324 let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1325
1326 assert_eq!(b.pci_address(), disk_option.pci_address);
1327 }
1328
1329 #[test]
check_runtime_blk_queue_configurability()1330 fn check_runtime_blk_queue_configurability() {
1331 let tempdir = TempDir::new().unwrap();
1332 let mut path = tempdir.path().to_owned();
1333 path.push("disk_image");
1334 let features = base_features(ProtectionType::Unprotected);
1335
1336 // Default case
1337 let f = File::create(&path).unwrap();
1338 let disk_option = DiskOption::default();
1339 let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1340 assert_eq!(
1341 [DEFAULT_QUEUE_SIZE; DEFAULT_NUM_QUEUES as usize],
1342 b.queue_max_sizes()
1343 );
1344
1345 // Single queue of size 128
1346 let f = File::create(&path).unwrap();
1347 let disk_option = DiskOption::default();
1348 let b = BlockAsync::new(
1349 features,
1350 Box::new(f),
1351 &disk_option,
1352 None,
1353 Some(128),
1354 Some(1),
1355 )
1356 .unwrap();
1357 assert_eq!([128; 1], b.queue_max_sizes());
1358 // Single queue device should not set VIRTIO_BLK_F_MQ
1359 assert_eq!(0, b.features() & (1 << VIRTIO_BLK_F_MQ) as u64);
1360 }
1361
1362 #[test]
read_last_sector()1363 fn read_last_sector() {
1364 let ex = Executor::new().expect("creating an executor failed");
1365
1366 let f = tempfile().unwrap();
1367 let disk_size = 0x1000;
1368 f.set_len(disk_size).unwrap();
1369 let af = SingleFileDisk::new(f, &ex).expect("Failed to create SFD");
1370
1371 let mem = Rc::new(
1372 GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1373 .expect("Creating guest memory failed."),
1374 );
1375
1376 let req_hdr = virtio_blk_req_header {
1377 req_type: Le32::from(VIRTIO_BLK_T_IN),
1378 reserved: Le32::from(0),
1379 sector: Le64::from(7), // Disk is 8 sectors long, so this is the last valid sector.
1380 };
1381 mem.write_obj_at_addr(req_hdr, GuestAddress(0x1000))
1382 .expect("writing req failed");
1383
1384 let mut avail_desc = create_descriptor_chain(
1385 &mem,
1386 GuestAddress(0x100), // Place descriptor chain at 0x100.
1387 GuestAddress(0x1000), // Describe buffer at 0x1000.
1388 vec![
1389 // Request header
1390 (DescriptorType::Readable, size_of_val(&req_hdr) as u32),
1391 // I/O buffer (1 sector of data)
1392 (DescriptorType::Writable, 512),
1393 // Request status
1394 (DescriptorType::Writable, 1),
1395 ],
1396 0,
1397 )
1398 .expect("create_descriptor_chain failed");
1399
1400 let timer = Timer::new().expect("Failed to create a timer");
1401 let flush_timer = Rc::new(RefCell::new(
1402 TimerAsync::new(timer, &ex).expect("Failed to create an async timer"),
1403 ));
1404 let flush_timer_armed = Rc::new(RefCell::new(false));
1405
1406 let disk_state = Rc::new(AsyncRwLock::new(DiskState {
1407 disk_image: Box::new(af),
1408 read_only: false,
1409 sparse: true,
1410 id: None,
1411 worker_shared_state: Arc::new(AsyncRwLock::new(WorkerSharedState {
1412 disk_size: Arc::new(AtomicU64::new(disk_size)),
1413 })),
1414 }));
1415
1416 let fut = process_one_request(
1417 &mut avail_desc,
1418 &disk_state,
1419 &flush_timer,
1420 &flush_timer_armed,
1421 );
1422
1423 ex.run_until(fut)
1424 .expect("running executor failed")
1425 .expect("execute failed");
1426
1427 let status_offset = GuestAddress((0x1000 + size_of_val(&req_hdr) + 512) as u64);
1428 let status = mem.read_obj_from_addr::<u8>(status_offset).unwrap();
1429 assert_eq!(status, VIRTIO_BLK_S_OK);
1430 }
1431
1432 #[test]
read_beyond_last_sector()1433 fn read_beyond_last_sector() {
1434 let f = tempfile().unwrap();
1435 let disk_size = 0x1000;
1436 f.set_len(disk_size).unwrap();
1437 let mem = Rc::new(
1438 GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1439 .expect("Creating guest memory failed."),
1440 );
1441
1442 let req_hdr = virtio_blk_req_header {
1443 req_type: Le32::from(VIRTIO_BLK_T_IN),
1444 reserved: Le32::from(0),
1445 sector: Le64::from(7), // Disk is 8 sectors long, so this is the last valid sector.
1446 };
1447 mem.write_obj_at_addr(req_hdr, GuestAddress(0x1000))
1448 .expect("writing req failed");
1449
1450 let mut avail_desc = create_descriptor_chain(
1451 &mem,
1452 GuestAddress(0x100), // Place descriptor chain at 0x100.
1453 GuestAddress(0x1000), // Describe buffer at 0x1000.
1454 vec![
1455 // Request header
1456 (DescriptorType::Readable, size_of_val(&req_hdr) as u32),
1457 // I/O buffer (2 sectors of data - overlap the end of the disk).
1458 (DescriptorType::Writable, 512 * 2),
1459 // Request status
1460 (DescriptorType::Writable, 1),
1461 ],
1462 0,
1463 )
1464 .expect("create_descriptor_chain failed");
1465
1466 let ex = Executor::new().expect("creating an executor failed");
1467
1468 let af = SingleFileDisk::new(f, &ex).expect("Failed to create SFD");
1469 let timer = Timer::new().expect("Failed to create a timer");
1470 let flush_timer = Rc::new(RefCell::new(
1471 TimerAsync::new(timer, &ex).expect("Failed to create an async timer"),
1472 ));
1473 let flush_timer_armed = Rc::new(RefCell::new(false));
1474 let disk_state = Rc::new(AsyncRwLock::new(DiskState {
1475 disk_image: Box::new(af),
1476 read_only: false,
1477 sparse: true,
1478 id: None,
1479 worker_shared_state: Arc::new(AsyncRwLock::new(WorkerSharedState {
1480 disk_size: Arc::new(AtomicU64::new(disk_size)),
1481 })),
1482 }));
1483
1484 let fut = process_one_request(
1485 &mut avail_desc,
1486 &disk_state,
1487 &flush_timer,
1488 &flush_timer_armed,
1489 );
1490
1491 ex.run_until(fut)
1492 .expect("running executor failed")
1493 .expect("execute failed");
1494
1495 let status_offset = GuestAddress((0x1000 + size_of_val(&req_hdr) + 512 * 2) as u64);
1496 let status = mem.read_obj_from_addr::<u8>(status_offset).unwrap();
1497 assert_eq!(status, VIRTIO_BLK_S_IOERR);
1498 }
1499
1500 #[test]
get_id()1501 fn get_id() {
1502 let ex = Executor::new().expect("creating an executor failed");
1503
1504 let f = tempfile().unwrap();
1505 let disk_size = 0x1000;
1506 f.set_len(disk_size).unwrap();
1507
1508 let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1509 .expect("Creating guest memory failed.");
1510
1511 let req_hdr = virtio_blk_req_header {
1512 req_type: Le32::from(VIRTIO_BLK_T_GET_ID),
1513 reserved: Le32::from(0),
1514 sector: Le64::from(0),
1515 };
1516 mem.write_obj_at_addr(req_hdr, GuestAddress(0x1000))
1517 .expect("writing req failed");
1518
1519 let mut avail_desc = create_descriptor_chain(
1520 &mem,
1521 GuestAddress(0x100), // Place descriptor chain at 0x100.
1522 GuestAddress(0x1000), // Describe buffer at 0x1000.
1523 vec![
1524 // Request header
1525 (DescriptorType::Readable, size_of_val(&req_hdr) as u32),
1526 // I/O buffer (20 bytes for serial)
1527 (DescriptorType::Writable, 20),
1528 // Request status
1529 (DescriptorType::Writable, 1),
1530 ],
1531 0,
1532 )
1533 .expect("create_descriptor_chain failed");
1534
1535 let af = SingleFileDisk::new(f, &ex).expect("Failed to create SFD");
1536 let timer = Timer::new().expect("Failed to create a timer");
1537 let flush_timer = Rc::new(RefCell::new(
1538 TimerAsync::new(timer, &ex).expect("Failed to create an async timer"),
1539 ));
1540 let flush_timer_armed = Rc::new(RefCell::new(false));
1541
1542 let id = b"a20-byteserialnumber";
1543
1544 let disk_state = Rc::new(AsyncRwLock::new(DiskState {
1545 disk_image: Box::new(af),
1546 read_only: false,
1547 sparse: true,
1548 id: Some(*id),
1549 worker_shared_state: Arc::new(AsyncRwLock::new(WorkerSharedState {
1550 disk_size: Arc::new(AtomicU64::new(disk_size)),
1551 })),
1552 }));
1553
1554 let fut = process_one_request(
1555 &mut avail_desc,
1556 &disk_state,
1557 &flush_timer,
1558 &flush_timer_armed,
1559 );
1560
1561 ex.run_until(fut)
1562 .expect("running executor failed")
1563 .expect("execute failed");
1564
1565 let status_offset = GuestAddress((0x1000 + size_of_val(&req_hdr) + 512) as u64);
1566 let status = mem.read_obj_from_addr::<u8>(status_offset).unwrap();
1567 assert_eq!(status, VIRTIO_BLK_S_OK);
1568
1569 let id_offset = GuestAddress(0x1000 + size_of_val(&req_hdr) as u64);
1570 let returned_id = mem.read_obj_from_addr::<[u8; 20]>(id_offset).unwrap();
1571 assert_eq!(returned_id, *id);
1572 }
1573
1574 // TODO(b/270225199): enable this test on Windows once IoSource::into_source is implemented
1575 #[cfg(any(target_os = "android", target_os = "linux"))]
1576 #[test]
reset_and_reactivate_single_worker()1577 fn reset_and_reactivate_single_worker() {
1578 reset_and_reactivate(false);
1579 }
1580
1581 // TODO(b/270225199): enable this test on Windows once IoSource::into_source is implemented
1582 #[cfg(any(target_os = "android", target_os = "linux"))]
1583 #[test]
reset_and_reactivate_multiple_workers()1584 fn reset_and_reactivate_multiple_workers() {
1585 reset_and_reactivate(true);
1586 }
1587
1588 #[cfg(any(target_os = "android", target_os = "linux"))]
reset_and_reactivate(enables_multiple_workers: bool)1589 fn reset_and_reactivate(enables_multiple_workers: bool) {
1590 // Create an empty disk image
1591 let f = tempfile().unwrap();
1592 f.set_len(0x1000).unwrap();
1593 let disk_image: Box<dyn DiskFile> = Box::new(f);
1594
1595 // Create an empty guest memory
1596 let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1597 .expect("Creating guest memory failed.");
1598
1599 // Create a control tube.
1600 // NOTE: We don't want to drop the vmm half of the tube. That would cause the worker thread
1601 // will immediately fail, which isn't what we want to test in this case.
1602 let (_control_tube, control_tube_device) = Tube::pair().unwrap();
1603
1604 // Create a BlockAsync to test
1605 let features = base_features(ProtectionType::Unprotected);
1606 let id = b"Block serial number\0";
1607 let disk_option = DiskOption {
1608 read_only: true,
1609 id: Some(*id),
1610 sparse: false,
1611 multiple_workers: enables_multiple_workers,
1612 ..Default::default()
1613 };
1614 let mut b = BlockAsync::new(
1615 features,
1616 disk_image.try_clone().unwrap(),
1617 &disk_option,
1618 Some(control_tube_device),
1619 None,
1620 None,
1621 )
1622 .unwrap();
1623
1624 // activate with queues of an arbitrary size.
1625 let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1626 q0.set_ready(true);
1627 let q0 = q0
1628 .activate(&mem, Event::new().unwrap())
1629 .expect("QueueConfig::activate");
1630
1631 let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1632 q1.set_ready(true);
1633 let q1 = q1
1634 .activate(&mem, Event::new().unwrap())
1635 .expect("QueueConfig::activate");
1636
1637 b.activate(
1638 mem.clone(),
1639 Interrupt::new_for_test(),
1640 BTreeMap::from([(0, q0), (1, q1)]),
1641 )
1642 .expect("activate should succeed");
1643 // assert resources are consumed
1644 if !enables_multiple_workers {
1645 assert!(
1646 b.disk_image.is_none(),
1647 "BlockAsync should not have a disk image"
1648 );
1649 }
1650 assert!(
1651 b.control_tube.is_none(),
1652 "BlockAsync should not have a control tube"
1653 );
1654
1655 // reset and assert resources are got back
1656 assert!(b.reset().is_ok(), "reset should succeed");
1657 assert!(
1658 b.disk_image.is_some(),
1659 "BlockAsync should have a disk image"
1660 );
1661 assert!(
1662 b.control_tube.is_some(),
1663 "BlockAsync should have a control tube"
1664 );
1665 assert_eq!(b.id, Some(*b"Block serial number\0"));
1666
1667 // re-activate should succeed
1668 let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1669 q0.set_ready(true);
1670 let q0 = q0
1671 .activate(&mem, Event::new().unwrap())
1672 .expect("QueueConfig::activate");
1673
1674 let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1675 q1.set_ready(true);
1676 let q1 = q1
1677 .activate(&mem, Event::new().unwrap())
1678 .expect("QueueConfig::activate");
1679
1680 b.activate(
1681 mem,
1682 Interrupt::new_for_test(),
1683 BTreeMap::from([(0, q0), (1, q1)]),
1684 )
1685 .expect("re-activate should succeed");
1686 }
1687
1688 // TODO(b/270225199): enable this test on Windows once IoSource::into_source is implemented,
1689 // or after finding a good way to prevent BlockAsync::drop() from panicking due to that.
1690 #[cfg(any(target_os = "android", target_os = "linux"))]
1691 #[test]
resize_with_single_worker()1692 fn resize_with_single_worker() {
1693 resize(false);
1694 }
1695
1696 // TODO(b/270225199): enable this test on Windows once IoSource::into_source is implemented,
1697 // or after finding a good way to prevent BlockAsync::drop() from panicking due to that.
1698 #[cfg(any(target_os = "android", target_os = "linux"))]
1699 #[test]
resize_with_multiple_workers()1700 fn resize_with_multiple_workers() {
1701 // Test resize handled by one worker affect the whole state
1702 resize(true);
1703 }
1704
resize(enables_multiple_workers: bool)1705 fn resize(enables_multiple_workers: bool) {
1706 // disk image size constants
1707 let original_size = 0x1000;
1708 let resized_size = 0x2000;
1709
1710 // Create an empty disk image
1711 let f = tempfile().unwrap();
1712 f.set_len(original_size).unwrap();
1713 let disk_image: Box<dyn DiskFile> = Box::new(f);
1714 assert_eq!(disk_image.get_len().unwrap(), original_size);
1715
1716 // Create an empty guest memory
1717 let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1718 .expect("Creating guest memory failed.");
1719
1720 // Create a control tube
1721 let (control_tube, control_tube_device) = Tube::pair().unwrap();
1722
1723 // Create a BlockAsync to test
1724 let features = base_features(ProtectionType::Unprotected);
1725 let disk_option = DiskOption {
1726 multiple_workers: enables_multiple_workers,
1727 ..Default::default()
1728 };
1729 let mut b = BlockAsync::new(
1730 features,
1731 disk_image.try_clone().unwrap(),
1732 &disk_option,
1733 Some(control_tube_device),
1734 None,
1735 None,
1736 )
1737 .unwrap();
1738
1739 // activate with queues of an arbitrary size.
1740 let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1741 q0.set_ready(true);
1742 let q0 = q0
1743 .activate(&mem, Event::new().unwrap())
1744 .expect("QueueConfig::activate");
1745
1746 let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1747 q1.set_ready(true);
1748 let q1 = q1
1749 .activate(&mem, Event::new().unwrap())
1750 .expect("QueueConfig::activate");
1751
1752 let interrupt = Interrupt::new_for_test();
1753 b.activate(mem, interrupt.clone(), BTreeMap::from([(0, q0), (1, q1)]))
1754 .expect("activate should succeed");
1755
1756 // assert the original size first
1757 assert_eq!(
1758 b.disk_size.load(Ordering::Acquire),
1759 original_size,
1760 "disk_size should be the original size first"
1761 );
1762 let mut capacity = [0u8; 8];
1763 b.read_config(0, &mut capacity);
1764 assert_eq!(
1765 capacity,
1766 // original_size (0x1000) >> SECTOR_SHIFT (9) = 0x8
1767 [0x8, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00],
1768 "read_config should read the original capacity first"
1769 );
1770
1771 // assert resize works
1772 control_tube
1773 .send(&DiskControlCommand::Resize {
1774 new_size: resized_size,
1775 })
1776 .unwrap();
1777 assert_eq!(
1778 control_tube.recv::<DiskControlResult>().unwrap(),
1779 DiskControlResult::Ok,
1780 "resize command should succeed"
1781 );
1782 assert_eq!(
1783 b.disk_size.load(Ordering::Acquire),
1784 resized_size,
1785 "disk_size should be resized to the new size"
1786 );
1787 assert_eq!(
1788 disk_image.get_len().unwrap(),
1789 resized_size,
1790 "underlying disk image should be resized to the new size"
1791 );
1792 let mut capacity = [0u8; 8];
1793 b.read_config(0, &mut capacity);
1794 assert_eq!(
1795 capacity,
1796 // resized_size (0x2000) >> SECTOR_SHIFT (9) = 0x10
1797 [0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00],
1798 "read_config should read the resized capacity"
1799 );
1800 assert_eq!(
1801 interrupt
1802 .get_interrupt_evt()
1803 // Wait a bit until the blk signals the interrupt
1804 .wait_timeout(Duration::from_millis(300)),
1805 Ok(base::EventWaitResult::Signaled),
1806 "interrupt should be signaled"
1807 );
1808 assert_eq!(
1809 interrupt.read_interrupt_status(),
1810 crate::virtio::INTERRUPT_STATUS_CONFIG_CHANGED as u8,
1811 "INTERRUPT_STATUS_CONFIG_CHANGED should be signaled"
1812 );
1813 }
1814
1815 // TODO(b/270225199): enable this test on Windows once IoSource::into_source is implemented,
1816 // or after finding a good way to prevent BlockAsync::drop() from panicking due to that.
1817 #[cfg(any(target_os = "android", target_os = "linux"))]
1818 #[test]
run_worker_threads()1819 fn run_worker_threads() {
1820 // Create an empty duplicable disk image
1821 let f = tempfile().unwrap();
1822 f.set_len(0x1000).unwrap();
1823 let disk_image: Box<dyn DiskFile> = Box::new(f);
1824
1825 // Create an empty guest memory
1826 let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1827 .expect("Creating guest memory failed.");
1828
1829 // Create a BlockAsync to test with single worker thread
1830 let features = base_features(ProtectionType::Unprotected);
1831 let disk_option = DiskOption::default();
1832 let mut b = BlockAsync::new(
1833 features,
1834 disk_image.try_clone().unwrap(),
1835 &disk_option,
1836 None,
1837 None,
1838 None,
1839 )
1840 .unwrap();
1841
1842 // activate with queues of an arbitrary size.
1843 let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1844 q0.set_ready(true);
1845 let q0 = q0
1846 .activate(&mem, Event::new().unwrap())
1847 .expect("QueueConfig::activate");
1848
1849 let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1850 q1.set_ready(true);
1851 let q1 = q1
1852 .activate(&mem, Event::new().unwrap())
1853 .expect("QueueConfig::activate");
1854
1855 b.activate(
1856 mem.clone(),
1857 Interrupt::new_for_test(),
1858 BTreeMap::from([(0, q0), (1, q1)]),
1859 )
1860 .expect("activate should succeed");
1861
1862 assert_eq!(b.worker_threads.len(), 1, "1 threads should be spawned.");
1863 drop(b);
1864
1865 // Create a BlockAsync to test with multiple worker threads
1866 let features = base_features(ProtectionType::Unprotected);
1867 let disk_option = DiskOption {
1868 read_only: true,
1869 sparse: false,
1870 multiple_workers: true,
1871 ..DiskOption::default()
1872 };
1873 let mut b = BlockAsync::new(features, disk_image, &disk_option, None, None, None).unwrap();
1874
1875 // activate should succeed
1876 let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1877 q0.set_ready(true);
1878 let q0 = q0
1879 .activate(&mem, Event::new().unwrap())
1880 .expect("QueueConfig::activate");
1881
1882 let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1883 q1.set_ready(true);
1884 let q1 = q1
1885 .activate(&mem, Event::new().unwrap())
1886 .expect("QueueConfig::activate");
1887
1888 b.activate(
1889 mem,
1890 Interrupt::new_for_test(),
1891 BTreeMap::from([(0, q0), (1, q1)]),
1892 )
1893 .expect("activate should succeed");
1894
1895 assert_eq!(b.worker_threads.len(), 2, "2 threads should be spawned.");
1896 }
1897
1898 struct BlockContext {}
1899
modify_device(_block_context: &mut BlockContext, b: &mut BlockAsync)1900 fn modify_device(_block_context: &mut BlockContext, b: &mut BlockAsync) {
1901 b.avail_features = !b.avail_features;
1902 }
1903
create_device() -> (BlockContext, BlockAsync)1904 fn create_device() -> (BlockContext, BlockAsync) {
1905 // Create an empty disk image
1906 let f = tempfile().unwrap();
1907 f.set_len(0x1000).unwrap();
1908 let disk_image: Box<dyn DiskFile> = Box::new(f);
1909
1910 // Create a BlockAsync to test
1911 let features = base_features(ProtectionType::Unprotected);
1912 let id = b"Block serial number\0";
1913 let disk_option = DiskOption {
1914 read_only: true,
1915 id: Some(*id),
1916 sparse: false,
1917 multiple_workers: true,
1918 ..Default::default()
1919 };
1920 (
1921 BlockContext {},
1922 BlockAsync::new(
1923 features,
1924 disk_image.try_clone().unwrap(),
1925 &disk_option,
1926 None,
1927 None,
1928 None,
1929 )
1930 .unwrap(),
1931 )
1932 }
1933
1934 #[cfg(any(target_os = "android", target_os = "linux"))]
1935 suspendable_virtio_tests!(asyncblock, create_device, 2, modify_device);
1936 }
1937