• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 // virtio-sound spec: https://github.com/oasis-tcs/virtio-spec/blob/master/virtio-sound.tex
6 
7 use std::io;
8 use std::rc::Rc;
9 use std::sync::Arc;
10 
11 use anyhow::anyhow;
12 use anyhow::Context;
13 use audio_streams::BoxError;
14 use base::debug;
15 use base::error;
16 use base::warn;
17 use base::AsRawDescriptor;
18 use base::Descriptor;
19 use base::Error as SysError;
20 use base::Event;
21 use base::RawDescriptor;
22 use base::WorkerThread;
23 use cros_async::block_on;
24 use cros_async::sync::Condvar;
25 use cros_async::sync::Mutex as AsyncMutex;
26 use cros_async::AsyncError;
27 use cros_async::EventAsync;
28 use cros_async::Executor;
29 use futures::channel::mpsc;
30 use futures::channel::oneshot;
31 use futures::channel::oneshot::Canceled;
32 use futures::future::FusedFuture;
33 use futures::join;
34 use futures::pin_mut;
35 use futures::select;
36 use futures::Future;
37 use futures::FutureExt;
38 use thiserror::Error as ThisError;
39 use vm_memory::GuestMemory;
40 use zerocopy::AsBytes;
41 
42 use crate::virtio::async_utils;
43 use crate::virtio::copy_config;
44 use crate::virtio::device_constants::snd::virtio_snd_config;
45 use crate::virtio::snd::common_backend::async_funcs::*;
46 use crate::virtio::snd::common_backend::stream_info::StreamInfo;
47 use crate::virtio::snd::common_backend::stream_info::StreamInfoBuilder;
48 use crate::virtio::snd::constants::*;
49 use crate::virtio::snd::file_backend::create_file_stream_source_generators;
50 use crate::virtio::snd::file_backend::Error as FileError;
51 use crate::virtio::snd::layout::*;
52 use crate::virtio::snd::null_backend::create_null_stream_source_generators;
53 use crate::virtio::snd::parameters::Parameters;
54 use crate::virtio::snd::parameters::StreamSourceBackend;
55 use crate::virtio::snd::sys::create_stream_source_generators as sys_create_stream_source_generators;
56 use crate::virtio::snd::sys::set_audio_thread_priority;
57 use crate::virtio::snd::sys::SysAsyncStreamObjects;
58 use crate::virtio::snd::sys::SysAudioStreamSourceGenerator;
59 use crate::virtio::snd::sys::SysBufferWriter;
60 use crate::virtio::DescriptorError;
61 use crate::virtio::DeviceType;
62 use crate::virtio::Interrupt;
63 use crate::virtio::Queue;
64 use crate::virtio::VirtioDevice;
65 use crate::virtio::Writer;
66 use crate::Suspendable;
67 
68 pub mod async_funcs;
69 pub mod stream_info;
70 
71 // control + event + tx + rx queue
72 pub const MAX_QUEUE_NUM: usize = 4;
73 pub const MAX_VRING_LEN: u16 = 1024;
74 
75 #[derive(ThisError, Debug)]
76 pub enum Error {
77     /// next_async failed.
78     #[error("Failed to read descriptor asynchronously: {0}")]
79     Async(AsyncError),
80     /// Creating stream failed.
81     #[error("Failed to create stream: {0}")]
82     CreateStream(BoxError),
83     /// Creating stream failed.
84     #[error("No stream source found.")]
85     EmptyStreamSource,
86     /// Creating kill event failed.
87     #[error("Failed to create kill event: {0}")]
88     CreateKillEvent(SysError),
89     /// Creating WaitContext failed.
90     #[error("Failed to create wait context: {0}")]
91     CreateWaitContext(SysError),
92     #[error("Failed to create file stream source generator")]
93     CreateFileStreamSourceGenerator(FileError),
94     /// Cloning kill event failed.
95     #[error("Failed to clone kill event: {0}")]
96     CloneKillEvent(SysError),
97     /// Descriptor chain was invalid.
98     #[error("Failed to valildate descriptor chain: {0}")]
99     DescriptorChain(DescriptorError),
100     // Future error.
101     #[error("Unexpected error. Done was not triggered before dropped: {0}")]
102     DoneNotTriggered(Canceled),
103     /// Error reading message from queue.
104     #[error("Failed to read message: {0}")]
105     ReadMessage(io::Error),
106     /// Failed writing a response to a control message.
107     #[error("Failed to write message response: {0}")]
108     WriteResponse(io::Error),
109     // Mpsc read error.
110     #[error("Error in mpsc: {0}")]
111     MpscSend(futures::channel::mpsc::SendError),
112     // Oneshot send error.
113     #[error("Error in oneshot send")]
114     OneshotSend(()),
115     /// Stream not found.
116     #[error("stream id ({0}) < num_streams ({1})")]
117     StreamNotFound(usize, usize),
118     /// Fetch buffer error
119     #[error("Failed to get buffer from CRAS: {0}")]
120     FetchBuffer(BoxError),
121     /// Invalid buffer size
122     #[error("Invalid buffer size")]
123     InvalidBufferSize,
124     /// IoError
125     #[error("I/O failed: {0}")]
126     Io(io::Error),
127     /// Operation not supported.
128     #[error("Operation not supported")]
129     OperationNotSupported,
130     /// Writing to a buffer in the guest failed.
131     #[error("failed to write to buffer: {0}")]
132     WriteBuffer(io::Error),
133     // Invalid PCM worker state.
134     #[error("Invalid PCM worker state")]
135     InvalidPCMWorkerState,
136     // Invalid backend.
137     #[error("Backend is not implemented")]
138     InvalidBackend,
139     // Failed to generate StreamSource
140     #[error("Failed to generate stream source: {0}")]
141     GenerateStreamSource(BoxError),
142     // PCM worker unexpectedly quitted.
143     #[error("PCM worker quitted unexpectedly")]
144     PCMWorkerQuittedUnexpectedly,
145 }
146 
147 pub enum DirectionalStream {
148     Input(
149         Box<dyn audio_streams::capture::AsyncCaptureBufferStream>,
150         usize, // `period_size` in `usize`
151     ),
152     Output(
153         Box<dyn audio_streams::AsyncPlaybackBufferStream>,
154         Box<dyn PlaybackBufferWriter>,
155     ),
156 }
157 
158 #[derive(Copy, Clone, std::cmp::PartialEq, Eq)]
159 pub enum WorkerStatus {
160     Pause = 0,
161     Running = 1,
162     Quit = 2,
163 }
164 
165 // Stores constant data
166 #[derive(Clone)]
167 pub struct SndData {
168     pub(crate) jack_info: Vec<virtio_snd_jack_info>,
169     pub(crate) pcm_info: Vec<virtio_snd_pcm_info>,
170     pub(crate) chmap_info: Vec<virtio_snd_chmap_info>,
171 }
172 
173 impl SndData {
pcm_info_len(&self) -> usize174     pub fn pcm_info_len(&self) -> usize {
175         self.pcm_info.len()
176     }
177 
pcm_info_iter(&self) -> std::slice::Iter<'_, virtio_snd_pcm_info>178     pub fn pcm_info_iter(&self) -> std::slice::Iter<'_, virtio_snd_pcm_info> {
179         self.pcm_info.iter()
180     }
181 }
182 
183 const SUPPORTED_FORMATS: u64 = 1 << VIRTIO_SND_PCM_FMT_U8
184     | 1 << VIRTIO_SND_PCM_FMT_S16
185     | 1 << VIRTIO_SND_PCM_FMT_S24
186     | 1 << VIRTIO_SND_PCM_FMT_S32;
187 const SUPPORTED_FRAME_RATES: u64 = 1 << VIRTIO_SND_PCM_RATE_8000
188     | 1 << VIRTIO_SND_PCM_RATE_11025
189     | 1 << VIRTIO_SND_PCM_RATE_16000
190     | 1 << VIRTIO_SND_PCM_RATE_22050
191     | 1 << VIRTIO_SND_PCM_RATE_32000
192     | 1 << VIRTIO_SND_PCM_RATE_44100
193     | 1 << VIRTIO_SND_PCM_RATE_48000;
194 
195 // Response from pcm_worker to pcm_queue
196 pub struct PcmResponse {
197     pub(crate) desc_index: u16,
198     pub(crate) status: virtio_snd_pcm_status, // response to the pcm message
199     pub(crate) writer: Writer,
200     pub(crate) done: Option<oneshot::Sender<()>>, // when pcm response is written to the queue
201 }
202 
203 pub struct VirtioSnd {
204     cfg: virtio_snd_config,
205     snd_data: SndData,
206     stream_info_builders: Vec<StreamInfoBuilder>,
207     avail_features: u64,
208     acked_features: u64,
209     queue_sizes: Box<[u16]>,
210     worker_thread: Option<WorkerThread<()>>,
211     keep_rds: Vec<Descriptor>,
212 }
213 
214 impl VirtioSnd {
new(base_features: u64, params: Parameters) -> Result<VirtioSnd, Error>215     pub fn new(base_features: u64, params: Parameters) -> Result<VirtioSnd, Error> {
216         let params = resize_parameters_pcm_device_config(params);
217         let cfg = hardcoded_virtio_snd_config(&params);
218         let snd_data = hardcoded_snd_data(&params);
219         let avail_features = base_features;
220         let mut keep_rds: Vec<RawDescriptor> = Vec::new();
221 
222         let stream_info_builders = create_stream_info_builders(&params, &snd_data, &mut keep_rds)?;
223 
224         Ok(VirtioSnd {
225             cfg,
226             snd_data,
227             stream_info_builders,
228             avail_features,
229             acked_features: 0,
230             queue_sizes: vec![MAX_VRING_LEN; MAX_QUEUE_NUM].into_boxed_slice(),
231             worker_thread: None,
232             keep_rds: keep_rds.iter().map(|rd| Descriptor(*rd)).collect(),
233         })
234     }
235 }
236 
create_stream_source_generators( params: &Parameters, snd_data: &SndData, keep_rds: &mut Vec<RawDescriptor>, ) -> Result<Vec<SysAudioStreamSourceGenerator>, Error>237 fn create_stream_source_generators(
238     params: &Parameters,
239     snd_data: &SndData,
240     keep_rds: &mut Vec<RawDescriptor>,
241 ) -> Result<Vec<SysAudioStreamSourceGenerator>, Error> {
242     let generators = match params.backend {
243         StreamSourceBackend::NULL => create_null_stream_source_generators(snd_data),
244         StreamSourceBackend::FILE => {
245             create_file_stream_source_generators(params, snd_data, keep_rds)
246                 .map_err(Error::CreateFileStreamSourceGenerator)?
247         }
248         StreamSourceBackend::Sys(backend) => {
249             sys_create_stream_source_generators(backend, params, snd_data)
250         }
251     };
252     Ok(generators)
253 }
254 
255 /// Creates [`StreamInfoBuilder`]s by calling [`create_stream_source_generators()`] then zip
256 /// them with [`crate::virtio::snd::parameters::PCMDeviceParameters`] from the params to set
257 /// the parameters on each [`StreamInfoBuilder`] (e.g. effects).
create_stream_info_builders( params: &Parameters, snd_data: &SndData, keep_rds: &mut Vec<RawDescriptor>, ) -> Result<Vec<StreamInfoBuilder>, Error>258 pub(crate) fn create_stream_info_builders(
259     params: &Parameters,
260     snd_data: &SndData,
261     keep_rds: &mut Vec<RawDescriptor>,
262 ) -> Result<Vec<StreamInfoBuilder>, Error> {
263     Ok(create_stream_source_generators(params, snd_data, keep_rds)?
264         .into_iter()
265         .map(Arc::new)
266         .zip(snd_data.pcm_info_iter())
267         .map(|(generator, pcm_info)| {
268             let device_params = params.get_device_params(pcm_info).unwrap_or_default();
269             StreamInfo::builder(generator).effects(device_params.effects.unwrap_or_default())
270         })
271         .collect())
272 }
273 
274 // To be used with hardcoded_snd_data
hardcoded_virtio_snd_config(params: &Parameters) -> virtio_snd_config275 pub fn hardcoded_virtio_snd_config(params: &Parameters) -> virtio_snd_config {
276     virtio_snd_config {
277         jacks: 0.into(),
278         streams: params.get_total_streams().into(),
279         chmaps: (params.num_output_devices * 3 + params.num_input_devices).into(),
280     }
281 }
282 
283 // To be used with hardcoded_virtio_snd_config
hardcoded_snd_data(params: &Parameters) -> SndData284 pub fn hardcoded_snd_data(params: &Parameters) -> SndData {
285     let jack_info: Vec<virtio_snd_jack_info> = Vec::new();
286     let mut pcm_info: Vec<virtio_snd_pcm_info> = Vec::new();
287     let mut chmap_info: Vec<virtio_snd_chmap_info> = Vec::new();
288 
289     for dev in 0..params.num_output_devices {
290         for _ in 0..params.num_output_streams {
291             pcm_info.push(virtio_snd_pcm_info {
292                 hdr: virtio_snd_info {
293                     hda_fn_nid: dev.into(),
294                 },
295                 features: 0.into(), /* 1 << VIRTIO_SND_PCM_F_XXX */
296                 formats: SUPPORTED_FORMATS.into(),
297                 rates: SUPPORTED_FRAME_RATES.into(),
298                 direction: VIRTIO_SND_D_OUTPUT,
299                 channels_min: 1,
300                 channels_max: 6,
301                 padding: [0; 5],
302             });
303         }
304     }
305     for dev in 0..params.num_input_devices {
306         for _ in 0..params.num_input_streams {
307             pcm_info.push(virtio_snd_pcm_info {
308                 hdr: virtio_snd_info {
309                     hda_fn_nid: dev.into(),
310                 },
311                 features: 0.into(), /* 1 << VIRTIO_SND_PCM_F_XXX */
312                 formats: SUPPORTED_FORMATS.into(),
313                 rates: SUPPORTED_FRAME_RATES.into(),
314                 direction: VIRTIO_SND_D_INPUT,
315                 channels_min: 1,
316                 channels_max: 2,
317                 padding: [0; 5],
318             });
319         }
320     }
321     // Use stereo channel map.
322     let mut positions = [VIRTIO_SND_CHMAP_NONE; VIRTIO_SND_CHMAP_MAX_SIZE];
323     positions[0] = VIRTIO_SND_CHMAP_FL;
324     positions[1] = VIRTIO_SND_CHMAP_FR;
325     for dev in 0..params.num_output_devices {
326         chmap_info.push(virtio_snd_chmap_info {
327             hdr: virtio_snd_info {
328                 hda_fn_nid: dev.into(),
329             },
330             direction: VIRTIO_SND_D_OUTPUT,
331             channels: 2,
332             positions,
333         });
334     }
335     for dev in 0..params.num_input_devices {
336         chmap_info.push(virtio_snd_chmap_info {
337             hdr: virtio_snd_info {
338                 hda_fn_nid: dev.into(),
339             },
340             direction: VIRTIO_SND_D_INPUT,
341             channels: 2,
342             positions,
343         });
344     }
345     positions[2] = VIRTIO_SND_CHMAP_RL;
346     positions[3] = VIRTIO_SND_CHMAP_RR;
347     for dev in 0..params.num_output_devices {
348         chmap_info.push(virtio_snd_chmap_info {
349             hdr: virtio_snd_info {
350                 hda_fn_nid: dev.into(),
351             },
352             direction: VIRTIO_SND_D_OUTPUT,
353             channels: 4,
354             positions,
355         });
356     }
357     positions[2] = VIRTIO_SND_CHMAP_FC;
358     positions[3] = VIRTIO_SND_CHMAP_LFE;
359     positions[4] = VIRTIO_SND_CHMAP_RL;
360     positions[5] = VIRTIO_SND_CHMAP_RR;
361     for dev in 0..params.num_output_devices {
362         chmap_info.push(virtio_snd_chmap_info {
363             hdr: virtio_snd_info {
364                 hda_fn_nid: dev.into(),
365             },
366             direction: VIRTIO_SND_D_OUTPUT,
367             channels: 6,
368             positions,
369         });
370     }
371 
372     SndData {
373         jack_info,
374         pcm_info,
375         chmap_info,
376     }
377 }
378 
resize_parameters_pcm_device_config(mut params: Parameters) -> Parameters379 fn resize_parameters_pcm_device_config(mut params: Parameters) -> Parameters {
380     if params.output_device_config.len() > params.num_output_devices as usize {
381         warn!("Truncating output device config due to length > number of output devices");
382     }
383     params
384         .output_device_config
385         .resize_with(params.num_output_devices as usize, Default::default);
386 
387     if params.input_device_config.len() > params.num_input_devices as usize {
388         warn!("Truncating input device config due to length > number of input devices");
389     }
390     params
391         .input_device_config
392         .resize_with(params.num_input_devices as usize, Default::default);
393 
394     params
395 }
396 
397 impl VirtioDevice for VirtioSnd {
keep_rds(&self) -> Vec<RawDescriptor>398     fn keep_rds(&self) -> Vec<RawDescriptor> {
399         self.keep_rds
400             .iter()
401             .map(|descr| descr.as_raw_descriptor())
402             .collect()
403     }
404 
device_type(&self) -> DeviceType405     fn device_type(&self) -> DeviceType {
406         DeviceType::Sound
407     }
408 
queue_max_sizes(&self) -> &[u16]409     fn queue_max_sizes(&self) -> &[u16] {
410         &self.queue_sizes
411     }
412 
features(&self) -> u64413     fn features(&self) -> u64 {
414         self.avail_features
415     }
416 
ack_features(&mut self, mut v: u64)417     fn ack_features(&mut self, mut v: u64) {
418         // Check if the guest is ACK'ing a feature that we didn't claim to have.
419         let unrequested_features = v & !self.avail_features;
420         if unrequested_features != 0 {
421             warn!("virtio_fs got unknown feature ack: {:x}", v);
422 
423             // Don't count these features as acked.
424             v &= !unrequested_features;
425         }
426         self.acked_features |= v;
427     }
428 
read_config(&self, offset: u64, data: &mut [u8])429     fn read_config(&self, offset: u64, data: &mut [u8]) {
430         copy_config(data, 0, self.cfg.as_bytes(), offset)
431     }
432 
activate( &mut self, guest_mem: GuestMemory, interrupt: Interrupt, queues: Vec<(Queue, Event)>, ) -> anyhow::Result<()>433     fn activate(
434         &mut self,
435         guest_mem: GuestMemory,
436         interrupt: Interrupt,
437         queues: Vec<(Queue, Event)>,
438     ) -> anyhow::Result<()> {
439         if queues.len() != self.queue_sizes.len() {
440             return Err(anyhow!(
441                 "snd: expected {} queues, got {}",
442                 self.queue_sizes.len(),
443                 queues.len()
444             ));
445         }
446 
447         let snd_data = self.snd_data.clone();
448         let stream_info_builders = self.stream_info_builders.to_vec();
449 
450         self.worker_thread = Some(WorkerThread::start("v_snd_common", move |kill_evt| {
451             set_audio_thread_priority();
452             if let Err(err_string) = run_worker(
453                 interrupt,
454                 queues,
455                 guest_mem,
456                 snd_data,
457                 kill_evt,
458                 stream_info_builders,
459             ) {
460                 error!("{}", err_string);
461             }
462         }));
463 
464         Ok(())
465     }
466 
reset(&mut self) -> bool467     fn reset(&mut self) -> bool {
468         if let Some(worker_thread) = self.worker_thread.take() {
469             worker_thread.stop();
470         }
471 
472         true
473     }
474 }
475 
476 impl Suspendable for VirtioSnd {}
477 
478 #[derive(PartialEq)]
479 enum LoopState {
480     Continue,
481     Break,
482 }
483 
run_worker( interrupt: Interrupt, queues: Vec<(Queue, Event)>, mem: GuestMemory, snd_data: SndData, kill_evt: Event, stream_info_builders: Vec<StreamInfoBuilder>, ) -> Result<(), String>484 fn run_worker(
485     interrupt: Interrupt,
486     queues: Vec<(Queue, Event)>,
487     mem: GuestMemory,
488     snd_data: SndData,
489     kill_evt: Event,
490     stream_info_builders: Vec<StreamInfoBuilder>,
491 ) -> Result<(), String> {
492     let ex = Executor::new().expect("Failed to create an executor");
493 
494     if snd_data.pcm_info_len() != stream_info_builders.len() {
495         error!(
496             "snd: expected {} streams, got {}",
497             snd_data.pcm_info_len(),
498             stream_info_builders.len(),
499         );
500     }
501     let streams = stream_info_builders
502         .into_iter()
503         .map(StreamInfoBuilder::build)
504         .map(AsyncMutex::new)
505         .collect();
506     let streams = Rc::new(AsyncMutex::new(streams));
507 
508     let mut queues: Vec<(Queue, EventAsync)> = queues
509         .into_iter()
510         .map(|(q, e)| {
511             (
512                 q,
513                 EventAsync::new(e, &ex).expect("Failed to create async event for queue"),
514             )
515         })
516         .collect();
517 
518     let (mut ctrl_queue, mut ctrl_queue_evt) = queues.remove(0);
519     let (_event_queue, _event_queue_evt) = queues.remove(0);
520     let (tx_queue, tx_queue_evt) = queues.remove(0);
521     let (rx_queue, rx_queue_evt) = queues.remove(0);
522 
523     let tx_queue = Rc::new(AsyncMutex::new(tx_queue));
524     let rx_queue = Rc::new(AsyncMutex::new(rx_queue));
525 
526     let (tx_send, mut tx_recv) = mpsc::unbounded();
527     let (rx_send, mut rx_recv) = mpsc::unbounded();
528 
529     let f_resample = async_utils::handle_irq_resample(&ex, interrupt.clone()).fuse();
530 
531     // Exit if the kill event is triggered.
532     let f_kill = async_utils::await_and_exit(&ex, kill_evt).fuse();
533 
534     pin_mut!(f_resample, f_kill);
535 
536     loop {
537         if run_worker_once(
538             &ex,
539             &streams,
540             &mem,
541             interrupt.clone(),
542             &snd_data,
543             &mut f_kill,
544             &mut f_resample,
545             &mut ctrl_queue,
546             &mut ctrl_queue_evt,
547             &tx_queue,
548             &tx_queue_evt,
549             tx_send.clone(),
550             &mut tx_recv,
551             &rx_queue,
552             &rx_queue_evt,
553             rx_send.clone(),
554             &mut rx_recv,
555         ) == LoopState::Break
556         {
557             break;
558         }
559 
560         if let Err(e) = reset_streams(
561             &ex,
562             &streams,
563             &mem,
564             interrupt.clone(),
565             &tx_queue,
566             &mut tx_recv,
567             &rx_queue,
568             &mut rx_recv,
569         ) {
570             error!("Error reset streams: {}", e);
571             break;
572         }
573     }
574 
575     Ok(())
576 }
577 
notify_reset_signal(reset_signal: &(AsyncMutex<bool>, Condvar))578 async fn notify_reset_signal(reset_signal: &(AsyncMutex<bool>, Condvar)) {
579     let (lock, cvar) = reset_signal;
580     *lock.lock().await = true;
581     cvar.notify_all();
582 }
583 
584 /// Runs all workers once and exit if any worker exit.
585 ///
586 /// Returns [`LoopState::Break`] if the worker `f_kill` or `f_resample` exit, or something went wrong
587 /// on shutdown process. The caller should not run the worker again and should exit the main loop.
588 ///
589 /// If this function returns [`LoopState::Continue`], the caller can continue the main loop by resetting
590 /// the streams and run the worker again.
run_worker_once( ex: &Executor, streams: &Rc<AsyncMutex<Vec<AsyncMutex<StreamInfo>>>>, mem: &GuestMemory, interrupt: Interrupt, snd_data: &SndData, mut f_kill: &mut (impl Future<Output = anyhow::Result<()>> + FusedFuture + Unpin), mut f_resample: &mut (impl Future<Output = anyhow::Result<()>> + FusedFuture + Unpin), ctrl_queue: &mut Queue, ctrl_queue_evt: &mut EventAsync, tx_queue: &Rc<AsyncMutex<Queue>>, tx_queue_evt: &EventAsync, tx_send: mpsc::UnboundedSender<PcmResponse>, tx_recv: &mut mpsc::UnboundedReceiver<PcmResponse>, rx_queue: &Rc<AsyncMutex<Queue>>, rx_queue_evt: &EventAsync, rx_send: mpsc::UnboundedSender<PcmResponse>, rx_recv: &mut mpsc::UnboundedReceiver<PcmResponse>, ) -> LoopState591 fn run_worker_once(
592     ex: &Executor,
593     streams: &Rc<AsyncMutex<Vec<AsyncMutex<StreamInfo>>>>,
594     mem: &GuestMemory,
595     interrupt: Interrupt,
596     snd_data: &SndData,
597     mut f_kill: &mut (impl Future<Output = anyhow::Result<()>> + FusedFuture + Unpin),
598     mut f_resample: &mut (impl Future<Output = anyhow::Result<()>> + FusedFuture + Unpin),
599     ctrl_queue: &mut Queue,
600     ctrl_queue_evt: &mut EventAsync,
601     tx_queue: &Rc<AsyncMutex<Queue>>,
602     tx_queue_evt: &EventAsync,
603     tx_send: mpsc::UnboundedSender<PcmResponse>,
604     tx_recv: &mut mpsc::UnboundedReceiver<PcmResponse>,
605     rx_queue: &Rc<AsyncMutex<Queue>>,
606     rx_queue_evt: &EventAsync,
607     rx_send: mpsc::UnboundedSender<PcmResponse>,
608     rx_recv: &mut mpsc::UnboundedReceiver<PcmResponse>,
609 ) -> LoopState {
610     let tx_send2 = tx_send.clone();
611     let rx_send2 = rx_send.clone();
612 
613     let reset_signal = (AsyncMutex::new(false), Condvar::new());
614 
615     let f_ctrl = handle_ctrl_queue(
616         ex,
617         mem,
618         streams,
619         snd_data,
620         ctrl_queue,
621         ctrl_queue_evt,
622         interrupt.clone(),
623         tx_send,
624         rx_send,
625         Some(&reset_signal),
626     )
627     .fuse();
628 
629     // TODO(woodychow): Enable this when libcras sends jack connect/disconnect evts
630     // let f_event = handle_event_queue(
631     //     &mem,
632     //     snd_state,
633     //     event_queue,
634     //     event_queue_evt,
635     //     interrupt,
636     // );
637     let f_tx = handle_pcm_queue(
638         mem,
639         streams,
640         tx_send2,
641         tx_queue,
642         tx_queue_evt,
643         Some(&reset_signal),
644     )
645     .fuse();
646     let f_tx_response = send_pcm_response_worker(
647         mem,
648         tx_queue,
649         interrupt.clone(),
650         tx_recv,
651         Some(&reset_signal),
652     )
653     .fuse();
654     let f_rx = handle_pcm_queue(
655         mem,
656         streams,
657         rx_send2,
658         rx_queue,
659         rx_queue_evt,
660         Some(&reset_signal),
661     )
662     .fuse();
663     let f_rx_response =
664         send_pcm_response_worker(mem, rx_queue, interrupt, rx_recv, Some(&reset_signal)).fuse();
665 
666     pin_mut!(f_ctrl, f_tx, f_tx_response, f_rx, f_rx_response);
667 
668     let done = async {
669         select! {
670             res = f_ctrl => (res.context("error in handling ctrl queue"), LoopState::Continue),
671             res = f_tx => (res.context("error in handling tx queue"), LoopState::Continue),
672             res = f_tx_response => (res.context("error in handling tx response"), LoopState::Continue),
673             res = f_rx => (res.context("error in handling rx queue"), LoopState::Continue),
674             res = f_rx_response => (res.context("error in handling rx response"), LoopState::Continue),
675 
676             // For following workers, do not continue the loop
677             res = f_resample => (res.context("error in handle_irq_resample"), LoopState::Break),
678             res = f_kill => (res.context("error in await_and_exit"), LoopState::Break),
679         }
680     };
681 
682     match ex.run_until(done) {
683         Ok((res, loop_state)) => {
684             if let Err(e) = res {
685                 error!("Error in worker: {:#}", e);
686             }
687             if loop_state == LoopState::Break {
688                 return LoopState::Break;
689             }
690         }
691         Err(e) => {
692             error!("Error happened in executor: {}", e);
693         }
694     }
695 
696     warn!("Shutting down all workers for reset procedure");
697     block_on(notify_reset_signal(&reset_signal));
698 
699     let shutdown = async {
700         loop {
701             let (res, worker_name) = select!(
702                 res = f_ctrl => (res, "f_ctrl"),
703                 res = f_tx => (res, "f_tx"),
704                 res = f_tx_response => (res, "f_tx_response"),
705                 res = f_rx => (res, "f_rx"),
706                 res = f_rx_response => (res, "f_rx_response"),
707                 complete => break,
708             );
709             match res {
710                 Ok(_) => debug!("Worker {} stopped", worker_name),
711                 Err(e) => error!("Worker {} stopped with error {}", worker_name, e),
712             };
713         }
714     };
715 
716     if let Err(e) = ex.run_until(shutdown) {
717         error!("Error happened in executor while shutdown: {}", e);
718         return LoopState::Break;
719     }
720 
721     LoopState::Continue
722 }
723 
reset_streams( ex: &Executor, streams: &Rc<AsyncMutex<Vec<AsyncMutex<StreamInfo>>>>, mem: &GuestMemory, interrupt: Interrupt, tx_queue: &Rc<AsyncMutex<Queue>>, tx_recv: &mut mpsc::UnboundedReceiver<PcmResponse>, rx_queue: &Rc<AsyncMutex<Queue>>, rx_recv: &mut mpsc::UnboundedReceiver<PcmResponse>, ) -> Result<(), AsyncError>724 fn reset_streams(
725     ex: &Executor,
726     streams: &Rc<AsyncMutex<Vec<AsyncMutex<StreamInfo>>>>,
727     mem: &GuestMemory,
728     interrupt: Interrupt,
729     tx_queue: &Rc<AsyncMutex<Queue>>,
730     tx_recv: &mut mpsc::UnboundedReceiver<PcmResponse>,
731     rx_queue: &Rc<AsyncMutex<Queue>>,
732     rx_recv: &mut mpsc::UnboundedReceiver<PcmResponse>,
733 ) -> Result<(), AsyncError> {
734     let reset_signal = (AsyncMutex::new(false), Condvar::new());
735 
736     let do_reset = async {
737         let streams = streams.read_lock().await;
738         for stream_info in &*streams {
739             let mut stream_info = stream_info.lock().await;
740             if stream_info.state == VIRTIO_SND_R_PCM_START {
741                 if let Err(e) = stream_info.stop().await {
742                     error!("Error on stop while resetting stream: {}", e);
743                 }
744             }
745             if stream_info.state == VIRTIO_SND_R_PCM_STOP
746                 || stream_info.state == VIRTIO_SND_R_PCM_PREPARE
747             {
748                 if let Err(e) = stream_info.release().await {
749                     error!("Error on release while resetting stream: {}", e);
750                 }
751             }
752             stream_info.just_reset = true;
753         }
754 
755         notify_reset_signal(&reset_signal).await;
756     };
757 
758     // Run these in a loop to ensure that they will survive until do_reset is finished
759     let f_tx_response = async {
760         while send_pcm_response_worker(
761             mem,
762             tx_queue,
763             interrupt.clone(),
764             tx_recv,
765             Some(&reset_signal),
766         )
767         .await
768         .is_err()
769         {}
770     };
771 
772     let f_rx_response = async {
773         while send_pcm_response_worker(
774             mem,
775             rx_queue,
776             interrupt.clone(),
777             rx_recv,
778             Some(&reset_signal),
779         )
780         .await
781         .is_err()
782         {}
783     };
784 
785     let reset = async {
786         join!(f_tx_response, f_rx_response, do_reset);
787     };
788 
789     ex.run_until(reset)
790 }
791 
792 #[cfg(test)]
793 #[allow(clippy::needless_update)]
794 mod tests {
795     use audio_streams::StreamEffect;
796 
797     use super::*;
798     use crate::virtio::snd::parameters::PCMDeviceParameters;
799 
800     #[test]
test_virtio_snd_new()801     fn test_virtio_snd_new() {
802         let params = Parameters {
803             num_output_devices: 3,
804             num_input_devices: 2,
805             num_output_streams: 3,
806             num_input_streams: 2,
807             output_device_config: vec![PCMDeviceParameters {
808                 effects: Some(vec![StreamEffect::EchoCancellation]),
809                 ..PCMDeviceParameters::default()
810             }],
811             input_device_config: vec![PCMDeviceParameters {
812                 effects: Some(vec![StreamEffect::EchoCancellation]),
813                 ..PCMDeviceParameters::default()
814             }],
815             ..Default::default()
816         };
817 
818         let res = VirtioSnd::new(123, params).unwrap();
819 
820         // Default values
821         assert_eq!(res.snd_data.jack_info.len(), 0);
822         assert_eq!(res.acked_features, 0);
823         assert_eq!(res.worker_thread.is_none(), true);
824 
825         assert_eq!(res.avail_features, 123); // avail_features must be equal to the input
826         assert_eq!(res.cfg.jacks.to_native(), 0);
827         assert_eq!(res.cfg.streams.to_native(), 13); // (Output = 3*3) + (Input = 2*2)
828         assert_eq!(res.cfg.chmaps.to_native(), 11); // (Output = 3*3) + (Input = 2*1)
829 
830         // Check snd_data.pcm_info
831         assert_eq!(res.snd_data.pcm_info.len(), 13);
832         // Check hda_fn_nid (PCM Device number)
833         let expected_hda_fn_nid = vec![0, 0, 0, 1, 1, 1, 2, 2, 2, 0, 0, 1, 1];
834         for (i, pcm_info) in res.snd_data.pcm_info.iter().enumerate() {
835             assert_eq!(
836                 pcm_info.hdr.hda_fn_nid.to_native(),
837                 expected_hda_fn_nid[i],
838                 "pcm_info index {} incorrect hda_fn_nid",
839                 i
840             );
841         }
842         // First 9 devices must be OUTPUT
843         for i in 0..9 {
844             assert_eq!(
845                 res.snd_data.pcm_info[i].direction, VIRTIO_SND_D_OUTPUT,
846                 "pcm_info index {} incorrect direction",
847                 i
848             );
849         }
850         // Next 4 devices must be INPUT
851         for i in 9..13 {
852             assert_eq!(
853                 res.snd_data.pcm_info[i].direction, VIRTIO_SND_D_INPUT,
854                 "pcm_info index {} incorrect direction",
855                 i
856             );
857         }
858 
859         // Check snd_data.chmap_info
860         assert_eq!(res.snd_data.chmap_info.len(), 11);
861         let expected_hda_fn_nid = vec![0, 1, 2, 0, 1, 0, 1, 2, 0, 1, 2];
862         // Check hda_fn_nid (PCM Device number)
863         for (i, chmap_info) in res.snd_data.chmap_info.iter().enumerate() {
864             assert_eq!(
865                 chmap_info.hdr.hda_fn_nid.to_native(),
866                 expected_hda_fn_nid[i],
867                 "chmap_info index {} incorrect hda_fn_nid",
868                 i
869             );
870         }
871     }
872 
873     #[test]
test_resize_parameters_pcm_device_config_truncate()874     fn test_resize_parameters_pcm_device_config_truncate() {
875         // If pcm_device_config is larger than number of devices, it will be truncated
876         let params = Parameters {
877             num_output_devices: 1,
878             num_input_devices: 1,
879             output_device_config: vec![PCMDeviceParameters::default(); 3],
880             input_device_config: vec![PCMDeviceParameters::default(); 3],
881             ..Parameters::default()
882         };
883         let params = resize_parameters_pcm_device_config(params);
884         assert_eq!(params.output_device_config.len(), 1);
885         assert_eq!(params.input_device_config.len(), 1);
886     }
887 
888     #[test]
test_resize_parameters_pcm_device_config_extend()889     fn test_resize_parameters_pcm_device_config_extend() {
890         let params = Parameters {
891             num_output_devices: 3,
892             num_input_devices: 2,
893             num_output_streams: 3,
894             num_input_streams: 2,
895             output_device_config: vec![PCMDeviceParameters {
896                 effects: Some(vec![StreamEffect::EchoCancellation]),
897                 ..PCMDeviceParameters::default()
898             }],
899             input_device_config: vec![PCMDeviceParameters {
900                 effects: Some(vec![StreamEffect::EchoCancellation]),
901                 ..PCMDeviceParameters::default()
902             }],
903             ..Default::default()
904         };
905 
906         let params = resize_parameters_pcm_device_config(params);
907 
908         // Check output_device_config correctly extended
909         assert_eq!(
910             params.output_device_config,
911             vec![
912                 PCMDeviceParameters {
913                     // Keep from the parameters
914                     effects: Some(vec![StreamEffect::EchoCancellation]),
915                     ..PCMDeviceParameters::default()
916                 },
917                 PCMDeviceParameters::default(), // Extended with default
918                 PCMDeviceParameters::default(), // Extended with default
919             ]
920         );
921 
922         // Check input_device_config correctly extended
923         assert_eq!(
924             params.input_device_config,
925             vec![
926                 PCMDeviceParameters {
927                     // Keep from the parameters
928                     effects: Some(vec![StreamEffect::EchoCancellation]),
929                     ..PCMDeviceParameters::default()
930                 },
931                 PCMDeviceParameters::default(), // Extended with default
932             ]
933         );
934     }
935 }
936