1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 // virtio-sound spec: https://github.com/oasis-tcs/virtio-spec/blob/master/virtio-sound.tex
6
7 use std::io;
8 use std::rc::Rc;
9 use std::sync::Arc;
10
11 use anyhow::anyhow;
12 use anyhow::Context;
13 use audio_streams::BoxError;
14 use base::debug;
15 use base::error;
16 use base::warn;
17 use base::AsRawDescriptor;
18 use base::Descriptor;
19 use base::Error as SysError;
20 use base::Event;
21 use base::RawDescriptor;
22 use base::WorkerThread;
23 use cros_async::block_on;
24 use cros_async::sync::Condvar;
25 use cros_async::sync::Mutex as AsyncMutex;
26 use cros_async::AsyncError;
27 use cros_async::EventAsync;
28 use cros_async::Executor;
29 use futures::channel::mpsc;
30 use futures::channel::oneshot;
31 use futures::channel::oneshot::Canceled;
32 use futures::future::FusedFuture;
33 use futures::join;
34 use futures::pin_mut;
35 use futures::select;
36 use futures::Future;
37 use futures::FutureExt;
38 use thiserror::Error as ThisError;
39 use vm_memory::GuestMemory;
40 use zerocopy::AsBytes;
41
42 use crate::virtio::async_utils;
43 use crate::virtio::copy_config;
44 use crate::virtio::device_constants::snd::virtio_snd_config;
45 use crate::virtio::snd::common_backend::async_funcs::*;
46 use crate::virtio::snd::common_backend::stream_info::StreamInfo;
47 use crate::virtio::snd::common_backend::stream_info::StreamInfoBuilder;
48 use crate::virtio::snd::constants::*;
49 use crate::virtio::snd::file_backend::create_file_stream_source_generators;
50 use crate::virtio::snd::file_backend::Error as FileError;
51 use crate::virtio::snd::layout::*;
52 use crate::virtio::snd::null_backend::create_null_stream_source_generators;
53 use crate::virtio::snd::parameters::Parameters;
54 use crate::virtio::snd::parameters::StreamSourceBackend;
55 use crate::virtio::snd::sys::create_stream_source_generators as sys_create_stream_source_generators;
56 use crate::virtio::snd::sys::set_audio_thread_priority;
57 use crate::virtio::snd::sys::SysAsyncStreamObjects;
58 use crate::virtio::snd::sys::SysAudioStreamSourceGenerator;
59 use crate::virtio::snd::sys::SysBufferWriter;
60 use crate::virtio::DescriptorError;
61 use crate::virtio::DeviceType;
62 use crate::virtio::Interrupt;
63 use crate::virtio::Queue;
64 use crate::virtio::VirtioDevice;
65 use crate::virtio::Writer;
66 use crate::Suspendable;
67
68 pub mod async_funcs;
69 pub mod stream_info;
70
71 // control + event + tx + rx queue
72 pub const MAX_QUEUE_NUM: usize = 4;
73 pub const MAX_VRING_LEN: u16 = 1024;
74
75 #[derive(ThisError, Debug)]
76 pub enum Error {
77 /// next_async failed.
78 #[error("Failed to read descriptor asynchronously: {0}")]
79 Async(AsyncError),
80 /// Creating stream failed.
81 #[error("Failed to create stream: {0}")]
82 CreateStream(BoxError),
83 /// Creating stream failed.
84 #[error("No stream source found.")]
85 EmptyStreamSource,
86 /// Creating kill event failed.
87 #[error("Failed to create kill event: {0}")]
88 CreateKillEvent(SysError),
89 /// Creating WaitContext failed.
90 #[error("Failed to create wait context: {0}")]
91 CreateWaitContext(SysError),
92 #[error("Failed to create file stream source generator")]
93 CreateFileStreamSourceGenerator(FileError),
94 /// Cloning kill event failed.
95 #[error("Failed to clone kill event: {0}")]
96 CloneKillEvent(SysError),
97 /// Descriptor chain was invalid.
98 #[error("Failed to valildate descriptor chain: {0}")]
99 DescriptorChain(DescriptorError),
100 // Future error.
101 #[error("Unexpected error. Done was not triggered before dropped: {0}")]
102 DoneNotTriggered(Canceled),
103 /// Error reading message from queue.
104 #[error("Failed to read message: {0}")]
105 ReadMessage(io::Error),
106 /// Failed writing a response to a control message.
107 #[error("Failed to write message response: {0}")]
108 WriteResponse(io::Error),
109 // Mpsc read error.
110 #[error("Error in mpsc: {0}")]
111 MpscSend(futures::channel::mpsc::SendError),
112 // Oneshot send error.
113 #[error("Error in oneshot send")]
114 OneshotSend(()),
115 /// Stream not found.
116 #[error("stream id ({0}) < num_streams ({1})")]
117 StreamNotFound(usize, usize),
118 /// Fetch buffer error
119 #[error("Failed to get buffer from CRAS: {0}")]
120 FetchBuffer(BoxError),
121 /// Invalid buffer size
122 #[error("Invalid buffer size")]
123 InvalidBufferSize,
124 /// IoError
125 #[error("I/O failed: {0}")]
126 Io(io::Error),
127 /// Operation not supported.
128 #[error("Operation not supported")]
129 OperationNotSupported,
130 /// Writing to a buffer in the guest failed.
131 #[error("failed to write to buffer: {0}")]
132 WriteBuffer(io::Error),
133 // Invalid PCM worker state.
134 #[error("Invalid PCM worker state")]
135 InvalidPCMWorkerState,
136 // Invalid backend.
137 #[error("Backend is not implemented")]
138 InvalidBackend,
139 // Failed to generate StreamSource
140 #[error("Failed to generate stream source: {0}")]
141 GenerateStreamSource(BoxError),
142 // PCM worker unexpectedly quitted.
143 #[error("PCM worker quitted unexpectedly")]
144 PCMWorkerQuittedUnexpectedly,
145 }
146
147 pub enum DirectionalStream {
148 Input(
149 Box<dyn audio_streams::capture::AsyncCaptureBufferStream>,
150 usize, // `period_size` in `usize`
151 ),
152 Output(
153 Box<dyn audio_streams::AsyncPlaybackBufferStream>,
154 Box<dyn PlaybackBufferWriter>,
155 ),
156 }
157
158 #[derive(Copy, Clone, std::cmp::PartialEq, Eq)]
159 pub enum WorkerStatus {
160 Pause = 0,
161 Running = 1,
162 Quit = 2,
163 }
164
165 // Stores constant data
166 #[derive(Clone)]
167 pub struct SndData {
168 pub(crate) jack_info: Vec<virtio_snd_jack_info>,
169 pub(crate) pcm_info: Vec<virtio_snd_pcm_info>,
170 pub(crate) chmap_info: Vec<virtio_snd_chmap_info>,
171 }
172
173 impl SndData {
pcm_info_len(&self) -> usize174 pub fn pcm_info_len(&self) -> usize {
175 self.pcm_info.len()
176 }
177
pcm_info_iter(&self) -> std::slice::Iter<'_, virtio_snd_pcm_info>178 pub fn pcm_info_iter(&self) -> std::slice::Iter<'_, virtio_snd_pcm_info> {
179 self.pcm_info.iter()
180 }
181 }
182
183 const SUPPORTED_FORMATS: u64 = 1 << VIRTIO_SND_PCM_FMT_U8
184 | 1 << VIRTIO_SND_PCM_FMT_S16
185 | 1 << VIRTIO_SND_PCM_FMT_S24
186 | 1 << VIRTIO_SND_PCM_FMT_S32;
187 const SUPPORTED_FRAME_RATES: u64 = 1 << VIRTIO_SND_PCM_RATE_8000
188 | 1 << VIRTIO_SND_PCM_RATE_11025
189 | 1 << VIRTIO_SND_PCM_RATE_16000
190 | 1 << VIRTIO_SND_PCM_RATE_22050
191 | 1 << VIRTIO_SND_PCM_RATE_32000
192 | 1 << VIRTIO_SND_PCM_RATE_44100
193 | 1 << VIRTIO_SND_PCM_RATE_48000;
194
195 // Response from pcm_worker to pcm_queue
196 pub struct PcmResponse {
197 pub(crate) desc_index: u16,
198 pub(crate) status: virtio_snd_pcm_status, // response to the pcm message
199 pub(crate) writer: Writer,
200 pub(crate) done: Option<oneshot::Sender<()>>, // when pcm response is written to the queue
201 }
202
203 pub struct VirtioSnd {
204 cfg: virtio_snd_config,
205 snd_data: SndData,
206 stream_info_builders: Vec<StreamInfoBuilder>,
207 avail_features: u64,
208 acked_features: u64,
209 queue_sizes: Box<[u16]>,
210 worker_thread: Option<WorkerThread<()>>,
211 keep_rds: Vec<Descriptor>,
212 }
213
214 impl VirtioSnd {
new(base_features: u64, params: Parameters) -> Result<VirtioSnd, Error>215 pub fn new(base_features: u64, params: Parameters) -> Result<VirtioSnd, Error> {
216 let params = resize_parameters_pcm_device_config(params);
217 let cfg = hardcoded_virtio_snd_config(¶ms);
218 let snd_data = hardcoded_snd_data(¶ms);
219 let avail_features = base_features;
220 let mut keep_rds: Vec<RawDescriptor> = Vec::new();
221
222 let stream_info_builders = create_stream_info_builders(¶ms, &snd_data, &mut keep_rds)?;
223
224 Ok(VirtioSnd {
225 cfg,
226 snd_data,
227 stream_info_builders,
228 avail_features,
229 acked_features: 0,
230 queue_sizes: vec![MAX_VRING_LEN; MAX_QUEUE_NUM].into_boxed_slice(),
231 worker_thread: None,
232 keep_rds: keep_rds.iter().map(|rd| Descriptor(*rd)).collect(),
233 })
234 }
235 }
236
create_stream_source_generators( params: &Parameters, snd_data: &SndData, keep_rds: &mut Vec<RawDescriptor>, ) -> Result<Vec<SysAudioStreamSourceGenerator>, Error>237 fn create_stream_source_generators(
238 params: &Parameters,
239 snd_data: &SndData,
240 keep_rds: &mut Vec<RawDescriptor>,
241 ) -> Result<Vec<SysAudioStreamSourceGenerator>, Error> {
242 let generators = match params.backend {
243 StreamSourceBackend::NULL => create_null_stream_source_generators(snd_data),
244 StreamSourceBackend::FILE => {
245 create_file_stream_source_generators(params, snd_data, keep_rds)
246 .map_err(Error::CreateFileStreamSourceGenerator)?
247 }
248 StreamSourceBackend::Sys(backend) => {
249 sys_create_stream_source_generators(backend, params, snd_data)
250 }
251 };
252 Ok(generators)
253 }
254
255 /// Creates [`StreamInfoBuilder`]s by calling [`create_stream_source_generators()`] then zip
256 /// them with [`crate::virtio::snd::parameters::PCMDeviceParameters`] from the params to set
257 /// the parameters on each [`StreamInfoBuilder`] (e.g. effects).
create_stream_info_builders( params: &Parameters, snd_data: &SndData, keep_rds: &mut Vec<RawDescriptor>, ) -> Result<Vec<StreamInfoBuilder>, Error>258 pub(crate) fn create_stream_info_builders(
259 params: &Parameters,
260 snd_data: &SndData,
261 keep_rds: &mut Vec<RawDescriptor>,
262 ) -> Result<Vec<StreamInfoBuilder>, Error> {
263 Ok(create_stream_source_generators(params, snd_data, keep_rds)?
264 .into_iter()
265 .map(Arc::new)
266 .zip(snd_data.pcm_info_iter())
267 .map(|(generator, pcm_info)| {
268 let device_params = params.get_device_params(pcm_info).unwrap_or_default();
269 StreamInfo::builder(generator).effects(device_params.effects.unwrap_or_default())
270 })
271 .collect())
272 }
273
274 // To be used with hardcoded_snd_data
hardcoded_virtio_snd_config(params: &Parameters) -> virtio_snd_config275 pub fn hardcoded_virtio_snd_config(params: &Parameters) -> virtio_snd_config {
276 virtio_snd_config {
277 jacks: 0.into(),
278 streams: params.get_total_streams().into(),
279 chmaps: (params.num_output_devices * 3 + params.num_input_devices).into(),
280 }
281 }
282
283 // To be used with hardcoded_virtio_snd_config
hardcoded_snd_data(params: &Parameters) -> SndData284 pub fn hardcoded_snd_data(params: &Parameters) -> SndData {
285 let jack_info: Vec<virtio_snd_jack_info> = Vec::new();
286 let mut pcm_info: Vec<virtio_snd_pcm_info> = Vec::new();
287 let mut chmap_info: Vec<virtio_snd_chmap_info> = Vec::new();
288
289 for dev in 0..params.num_output_devices {
290 for _ in 0..params.num_output_streams {
291 pcm_info.push(virtio_snd_pcm_info {
292 hdr: virtio_snd_info {
293 hda_fn_nid: dev.into(),
294 },
295 features: 0.into(), /* 1 << VIRTIO_SND_PCM_F_XXX */
296 formats: SUPPORTED_FORMATS.into(),
297 rates: SUPPORTED_FRAME_RATES.into(),
298 direction: VIRTIO_SND_D_OUTPUT,
299 channels_min: 1,
300 channels_max: 6,
301 padding: [0; 5],
302 });
303 }
304 }
305 for dev in 0..params.num_input_devices {
306 for _ in 0..params.num_input_streams {
307 pcm_info.push(virtio_snd_pcm_info {
308 hdr: virtio_snd_info {
309 hda_fn_nid: dev.into(),
310 },
311 features: 0.into(), /* 1 << VIRTIO_SND_PCM_F_XXX */
312 formats: SUPPORTED_FORMATS.into(),
313 rates: SUPPORTED_FRAME_RATES.into(),
314 direction: VIRTIO_SND_D_INPUT,
315 channels_min: 1,
316 channels_max: 2,
317 padding: [0; 5],
318 });
319 }
320 }
321 // Use stereo channel map.
322 let mut positions = [VIRTIO_SND_CHMAP_NONE; VIRTIO_SND_CHMAP_MAX_SIZE];
323 positions[0] = VIRTIO_SND_CHMAP_FL;
324 positions[1] = VIRTIO_SND_CHMAP_FR;
325 for dev in 0..params.num_output_devices {
326 chmap_info.push(virtio_snd_chmap_info {
327 hdr: virtio_snd_info {
328 hda_fn_nid: dev.into(),
329 },
330 direction: VIRTIO_SND_D_OUTPUT,
331 channels: 2,
332 positions,
333 });
334 }
335 for dev in 0..params.num_input_devices {
336 chmap_info.push(virtio_snd_chmap_info {
337 hdr: virtio_snd_info {
338 hda_fn_nid: dev.into(),
339 },
340 direction: VIRTIO_SND_D_INPUT,
341 channels: 2,
342 positions,
343 });
344 }
345 positions[2] = VIRTIO_SND_CHMAP_RL;
346 positions[3] = VIRTIO_SND_CHMAP_RR;
347 for dev in 0..params.num_output_devices {
348 chmap_info.push(virtio_snd_chmap_info {
349 hdr: virtio_snd_info {
350 hda_fn_nid: dev.into(),
351 },
352 direction: VIRTIO_SND_D_OUTPUT,
353 channels: 4,
354 positions,
355 });
356 }
357 positions[2] = VIRTIO_SND_CHMAP_FC;
358 positions[3] = VIRTIO_SND_CHMAP_LFE;
359 positions[4] = VIRTIO_SND_CHMAP_RL;
360 positions[5] = VIRTIO_SND_CHMAP_RR;
361 for dev in 0..params.num_output_devices {
362 chmap_info.push(virtio_snd_chmap_info {
363 hdr: virtio_snd_info {
364 hda_fn_nid: dev.into(),
365 },
366 direction: VIRTIO_SND_D_OUTPUT,
367 channels: 6,
368 positions,
369 });
370 }
371
372 SndData {
373 jack_info,
374 pcm_info,
375 chmap_info,
376 }
377 }
378
resize_parameters_pcm_device_config(mut params: Parameters) -> Parameters379 fn resize_parameters_pcm_device_config(mut params: Parameters) -> Parameters {
380 if params.output_device_config.len() > params.num_output_devices as usize {
381 warn!("Truncating output device config due to length > number of output devices");
382 }
383 params
384 .output_device_config
385 .resize_with(params.num_output_devices as usize, Default::default);
386
387 if params.input_device_config.len() > params.num_input_devices as usize {
388 warn!("Truncating input device config due to length > number of input devices");
389 }
390 params
391 .input_device_config
392 .resize_with(params.num_input_devices as usize, Default::default);
393
394 params
395 }
396
397 impl VirtioDevice for VirtioSnd {
keep_rds(&self) -> Vec<RawDescriptor>398 fn keep_rds(&self) -> Vec<RawDescriptor> {
399 self.keep_rds
400 .iter()
401 .map(|descr| descr.as_raw_descriptor())
402 .collect()
403 }
404
device_type(&self) -> DeviceType405 fn device_type(&self) -> DeviceType {
406 DeviceType::Sound
407 }
408
queue_max_sizes(&self) -> &[u16]409 fn queue_max_sizes(&self) -> &[u16] {
410 &self.queue_sizes
411 }
412
features(&self) -> u64413 fn features(&self) -> u64 {
414 self.avail_features
415 }
416
ack_features(&mut self, mut v: u64)417 fn ack_features(&mut self, mut v: u64) {
418 // Check if the guest is ACK'ing a feature that we didn't claim to have.
419 let unrequested_features = v & !self.avail_features;
420 if unrequested_features != 0 {
421 warn!("virtio_fs got unknown feature ack: {:x}", v);
422
423 // Don't count these features as acked.
424 v &= !unrequested_features;
425 }
426 self.acked_features |= v;
427 }
428
read_config(&self, offset: u64, data: &mut [u8])429 fn read_config(&self, offset: u64, data: &mut [u8]) {
430 copy_config(data, 0, self.cfg.as_bytes(), offset)
431 }
432
activate( &mut self, guest_mem: GuestMemory, interrupt: Interrupt, queues: Vec<(Queue, Event)>, ) -> anyhow::Result<()>433 fn activate(
434 &mut self,
435 guest_mem: GuestMemory,
436 interrupt: Interrupt,
437 queues: Vec<(Queue, Event)>,
438 ) -> anyhow::Result<()> {
439 if queues.len() != self.queue_sizes.len() {
440 return Err(anyhow!(
441 "snd: expected {} queues, got {}",
442 self.queue_sizes.len(),
443 queues.len()
444 ));
445 }
446
447 let snd_data = self.snd_data.clone();
448 let stream_info_builders = self.stream_info_builders.to_vec();
449
450 self.worker_thread = Some(WorkerThread::start("v_snd_common", move |kill_evt| {
451 set_audio_thread_priority();
452 if let Err(err_string) = run_worker(
453 interrupt,
454 queues,
455 guest_mem,
456 snd_data,
457 kill_evt,
458 stream_info_builders,
459 ) {
460 error!("{}", err_string);
461 }
462 }));
463
464 Ok(())
465 }
466
reset(&mut self) -> bool467 fn reset(&mut self) -> bool {
468 if let Some(worker_thread) = self.worker_thread.take() {
469 worker_thread.stop();
470 }
471
472 true
473 }
474 }
475
476 impl Suspendable for VirtioSnd {}
477
478 #[derive(PartialEq)]
479 enum LoopState {
480 Continue,
481 Break,
482 }
483
run_worker( interrupt: Interrupt, queues: Vec<(Queue, Event)>, mem: GuestMemory, snd_data: SndData, kill_evt: Event, stream_info_builders: Vec<StreamInfoBuilder>, ) -> Result<(), String>484 fn run_worker(
485 interrupt: Interrupt,
486 queues: Vec<(Queue, Event)>,
487 mem: GuestMemory,
488 snd_data: SndData,
489 kill_evt: Event,
490 stream_info_builders: Vec<StreamInfoBuilder>,
491 ) -> Result<(), String> {
492 let ex = Executor::new().expect("Failed to create an executor");
493
494 if snd_data.pcm_info_len() != stream_info_builders.len() {
495 error!(
496 "snd: expected {} streams, got {}",
497 snd_data.pcm_info_len(),
498 stream_info_builders.len(),
499 );
500 }
501 let streams = stream_info_builders
502 .into_iter()
503 .map(StreamInfoBuilder::build)
504 .map(AsyncMutex::new)
505 .collect();
506 let streams = Rc::new(AsyncMutex::new(streams));
507
508 let mut queues: Vec<(Queue, EventAsync)> = queues
509 .into_iter()
510 .map(|(q, e)| {
511 (
512 q,
513 EventAsync::new(e, &ex).expect("Failed to create async event for queue"),
514 )
515 })
516 .collect();
517
518 let (mut ctrl_queue, mut ctrl_queue_evt) = queues.remove(0);
519 let (_event_queue, _event_queue_evt) = queues.remove(0);
520 let (tx_queue, tx_queue_evt) = queues.remove(0);
521 let (rx_queue, rx_queue_evt) = queues.remove(0);
522
523 let tx_queue = Rc::new(AsyncMutex::new(tx_queue));
524 let rx_queue = Rc::new(AsyncMutex::new(rx_queue));
525
526 let (tx_send, mut tx_recv) = mpsc::unbounded();
527 let (rx_send, mut rx_recv) = mpsc::unbounded();
528
529 let f_resample = async_utils::handle_irq_resample(&ex, interrupt.clone()).fuse();
530
531 // Exit if the kill event is triggered.
532 let f_kill = async_utils::await_and_exit(&ex, kill_evt).fuse();
533
534 pin_mut!(f_resample, f_kill);
535
536 loop {
537 if run_worker_once(
538 &ex,
539 &streams,
540 &mem,
541 interrupt.clone(),
542 &snd_data,
543 &mut f_kill,
544 &mut f_resample,
545 &mut ctrl_queue,
546 &mut ctrl_queue_evt,
547 &tx_queue,
548 &tx_queue_evt,
549 tx_send.clone(),
550 &mut tx_recv,
551 &rx_queue,
552 &rx_queue_evt,
553 rx_send.clone(),
554 &mut rx_recv,
555 ) == LoopState::Break
556 {
557 break;
558 }
559
560 if let Err(e) = reset_streams(
561 &ex,
562 &streams,
563 &mem,
564 interrupt.clone(),
565 &tx_queue,
566 &mut tx_recv,
567 &rx_queue,
568 &mut rx_recv,
569 ) {
570 error!("Error reset streams: {}", e);
571 break;
572 }
573 }
574
575 Ok(())
576 }
577
notify_reset_signal(reset_signal: &(AsyncMutex<bool>, Condvar))578 async fn notify_reset_signal(reset_signal: &(AsyncMutex<bool>, Condvar)) {
579 let (lock, cvar) = reset_signal;
580 *lock.lock().await = true;
581 cvar.notify_all();
582 }
583
584 /// Runs all workers once and exit if any worker exit.
585 ///
586 /// Returns [`LoopState::Break`] if the worker `f_kill` or `f_resample` exit, or something went wrong
587 /// on shutdown process. The caller should not run the worker again and should exit the main loop.
588 ///
589 /// If this function returns [`LoopState::Continue`], the caller can continue the main loop by resetting
590 /// the streams and run the worker again.
run_worker_once( ex: &Executor, streams: &Rc<AsyncMutex<Vec<AsyncMutex<StreamInfo>>>>, mem: &GuestMemory, interrupt: Interrupt, snd_data: &SndData, mut f_kill: &mut (impl Future<Output = anyhow::Result<()>> + FusedFuture + Unpin), mut f_resample: &mut (impl Future<Output = anyhow::Result<()>> + FusedFuture + Unpin), ctrl_queue: &mut Queue, ctrl_queue_evt: &mut EventAsync, tx_queue: &Rc<AsyncMutex<Queue>>, tx_queue_evt: &EventAsync, tx_send: mpsc::UnboundedSender<PcmResponse>, tx_recv: &mut mpsc::UnboundedReceiver<PcmResponse>, rx_queue: &Rc<AsyncMutex<Queue>>, rx_queue_evt: &EventAsync, rx_send: mpsc::UnboundedSender<PcmResponse>, rx_recv: &mut mpsc::UnboundedReceiver<PcmResponse>, ) -> LoopState591 fn run_worker_once(
592 ex: &Executor,
593 streams: &Rc<AsyncMutex<Vec<AsyncMutex<StreamInfo>>>>,
594 mem: &GuestMemory,
595 interrupt: Interrupt,
596 snd_data: &SndData,
597 mut f_kill: &mut (impl Future<Output = anyhow::Result<()>> + FusedFuture + Unpin),
598 mut f_resample: &mut (impl Future<Output = anyhow::Result<()>> + FusedFuture + Unpin),
599 ctrl_queue: &mut Queue,
600 ctrl_queue_evt: &mut EventAsync,
601 tx_queue: &Rc<AsyncMutex<Queue>>,
602 tx_queue_evt: &EventAsync,
603 tx_send: mpsc::UnboundedSender<PcmResponse>,
604 tx_recv: &mut mpsc::UnboundedReceiver<PcmResponse>,
605 rx_queue: &Rc<AsyncMutex<Queue>>,
606 rx_queue_evt: &EventAsync,
607 rx_send: mpsc::UnboundedSender<PcmResponse>,
608 rx_recv: &mut mpsc::UnboundedReceiver<PcmResponse>,
609 ) -> LoopState {
610 let tx_send2 = tx_send.clone();
611 let rx_send2 = rx_send.clone();
612
613 let reset_signal = (AsyncMutex::new(false), Condvar::new());
614
615 let f_ctrl = handle_ctrl_queue(
616 ex,
617 mem,
618 streams,
619 snd_data,
620 ctrl_queue,
621 ctrl_queue_evt,
622 interrupt.clone(),
623 tx_send,
624 rx_send,
625 Some(&reset_signal),
626 )
627 .fuse();
628
629 // TODO(woodychow): Enable this when libcras sends jack connect/disconnect evts
630 // let f_event = handle_event_queue(
631 // &mem,
632 // snd_state,
633 // event_queue,
634 // event_queue_evt,
635 // interrupt,
636 // );
637 let f_tx = handle_pcm_queue(
638 mem,
639 streams,
640 tx_send2,
641 tx_queue,
642 tx_queue_evt,
643 Some(&reset_signal),
644 )
645 .fuse();
646 let f_tx_response = send_pcm_response_worker(
647 mem,
648 tx_queue,
649 interrupt.clone(),
650 tx_recv,
651 Some(&reset_signal),
652 )
653 .fuse();
654 let f_rx = handle_pcm_queue(
655 mem,
656 streams,
657 rx_send2,
658 rx_queue,
659 rx_queue_evt,
660 Some(&reset_signal),
661 )
662 .fuse();
663 let f_rx_response =
664 send_pcm_response_worker(mem, rx_queue, interrupt, rx_recv, Some(&reset_signal)).fuse();
665
666 pin_mut!(f_ctrl, f_tx, f_tx_response, f_rx, f_rx_response);
667
668 let done = async {
669 select! {
670 res = f_ctrl => (res.context("error in handling ctrl queue"), LoopState::Continue),
671 res = f_tx => (res.context("error in handling tx queue"), LoopState::Continue),
672 res = f_tx_response => (res.context("error in handling tx response"), LoopState::Continue),
673 res = f_rx => (res.context("error in handling rx queue"), LoopState::Continue),
674 res = f_rx_response => (res.context("error in handling rx response"), LoopState::Continue),
675
676 // For following workers, do not continue the loop
677 res = f_resample => (res.context("error in handle_irq_resample"), LoopState::Break),
678 res = f_kill => (res.context("error in await_and_exit"), LoopState::Break),
679 }
680 };
681
682 match ex.run_until(done) {
683 Ok((res, loop_state)) => {
684 if let Err(e) = res {
685 error!("Error in worker: {:#}", e);
686 }
687 if loop_state == LoopState::Break {
688 return LoopState::Break;
689 }
690 }
691 Err(e) => {
692 error!("Error happened in executor: {}", e);
693 }
694 }
695
696 warn!("Shutting down all workers for reset procedure");
697 block_on(notify_reset_signal(&reset_signal));
698
699 let shutdown = async {
700 loop {
701 let (res, worker_name) = select!(
702 res = f_ctrl => (res, "f_ctrl"),
703 res = f_tx => (res, "f_tx"),
704 res = f_tx_response => (res, "f_tx_response"),
705 res = f_rx => (res, "f_rx"),
706 res = f_rx_response => (res, "f_rx_response"),
707 complete => break,
708 );
709 match res {
710 Ok(_) => debug!("Worker {} stopped", worker_name),
711 Err(e) => error!("Worker {} stopped with error {}", worker_name, e),
712 };
713 }
714 };
715
716 if let Err(e) = ex.run_until(shutdown) {
717 error!("Error happened in executor while shutdown: {}", e);
718 return LoopState::Break;
719 }
720
721 LoopState::Continue
722 }
723
reset_streams( ex: &Executor, streams: &Rc<AsyncMutex<Vec<AsyncMutex<StreamInfo>>>>, mem: &GuestMemory, interrupt: Interrupt, tx_queue: &Rc<AsyncMutex<Queue>>, tx_recv: &mut mpsc::UnboundedReceiver<PcmResponse>, rx_queue: &Rc<AsyncMutex<Queue>>, rx_recv: &mut mpsc::UnboundedReceiver<PcmResponse>, ) -> Result<(), AsyncError>724 fn reset_streams(
725 ex: &Executor,
726 streams: &Rc<AsyncMutex<Vec<AsyncMutex<StreamInfo>>>>,
727 mem: &GuestMemory,
728 interrupt: Interrupt,
729 tx_queue: &Rc<AsyncMutex<Queue>>,
730 tx_recv: &mut mpsc::UnboundedReceiver<PcmResponse>,
731 rx_queue: &Rc<AsyncMutex<Queue>>,
732 rx_recv: &mut mpsc::UnboundedReceiver<PcmResponse>,
733 ) -> Result<(), AsyncError> {
734 let reset_signal = (AsyncMutex::new(false), Condvar::new());
735
736 let do_reset = async {
737 let streams = streams.read_lock().await;
738 for stream_info in &*streams {
739 let mut stream_info = stream_info.lock().await;
740 if stream_info.state == VIRTIO_SND_R_PCM_START {
741 if let Err(e) = stream_info.stop().await {
742 error!("Error on stop while resetting stream: {}", e);
743 }
744 }
745 if stream_info.state == VIRTIO_SND_R_PCM_STOP
746 || stream_info.state == VIRTIO_SND_R_PCM_PREPARE
747 {
748 if let Err(e) = stream_info.release().await {
749 error!("Error on release while resetting stream: {}", e);
750 }
751 }
752 stream_info.just_reset = true;
753 }
754
755 notify_reset_signal(&reset_signal).await;
756 };
757
758 // Run these in a loop to ensure that they will survive until do_reset is finished
759 let f_tx_response = async {
760 while send_pcm_response_worker(
761 mem,
762 tx_queue,
763 interrupt.clone(),
764 tx_recv,
765 Some(&reset_signal),
766 )
767 .await
768 .is_err()
769 {}
770 };
771
772 let f_rx_response = async {
773 while send_pcm_response_worker(
774 mem,
775 rx_queue,
776 interrupt.clone(),
777 rx_recv,
778 Some(&reset_signal),
779 )
780 .await
781 .is_err()
782 {}
783 };
784
785 let reset = async {
786 join!(f_tx_response, f_rx_response, do_reset);
787 };
788
789 ex.run_until(reset)
790 }
791
792 #[cfg(test)]
793 #[allow(clippy::needless_update)]
794 mod tests {
795 use audio_streams::StreamEffect;
796
797 use super::*;
798 use crate::virtio::snd::parameters::PCMDeviceParameters;
799
800 #[test]
test_virtio_snd_new()801 fn test_virtio_snd_new() {
802 let params = Parameters {
803 num_output_devices: 3,
804 num_input_devices: 2,
805 num_output_streams: 3,
806 num_input_streams: 2,
807 output_device_config: vec![PCMDeviceParameters {
808 effects: Some(vec![StreamEffect::EchoCancellation]),
809 ..PCMDeviceParameters::default()
810 }],
811 input_device_config: vec![PCMDeviceParameters {
812 effects: Some(vec![StreamEffect::EchoCancellation]),
813 ..PCMDeviceParameters::default()
814 }],
815 ..Default::default()
816 };
817
818 let res = VirtioSnd::new(123, params).unwrap();
819
820 // Default values
821 assert_eq!(res.snd_data.jack_info.len(), 0);
822 assert_eq!(res.acked_features, 0);
823 assert_eq!(res.worker_thread.is_none(), true);
824
825 assert_eq!(res.avail_features, 123); // avail_features must be equal to the input
826 assert_eq!(res.cfg.jacks.to_native(), 0);
827 assert_eq!(res.cfg.streams.to_native(), 13); // (Output = 3*3) + (Input = 2*2)
828 assert_eq!(res.cfg.chmaps.to_native(), 11); // (Output = 3*3) + (Input = 2*1)
829
830 // Check snd_data.pcm_info
831 assert_eq!(res.snd_data.pcm_info.len(), 13);
832 // Check hda_fn_nid (PCM Device number)
833 let expected_hda_fn_nid = vec![0, 0, 0, 1, 1, 1, 2, 2, 2, 0, 0, 1, 1];
834 for (i, pcm_info) in res.snd_data.pcm_info.iter().enumerate() {
835 assert_eq!(
836 pcm_info.hdr.hda_fn_nid.to_native(),
837 expected_hda_fn_nid[i],
838 "pcm_info index {} incorrect hda_fn_nid",
839 i
840 );
841 }
842 // First 9 devices must be OUTPUT
843 for i in 0..9 {
844 assert_eq!(
845 res.snd_data.pcm_info[i].direction, VIRTIO_SND_D_OUTPUT,
846 "pcm_info index {} incorrect direction",
847 i
848 );
849 }
850 // Next 4 devices must be INPUT
851 for i in 9..13 {
852 assert_eq!(
853 res.snd_data.pcm_info[i].direction, VIRTIO_SND_D_INPUT,
854 "pcm_info index {} incorrect direction",
855 i
856 );
857 }
858
859 // Check snd_data.chmap_info
860 assert_eq!(res.snd_data.chmap_info.len(), 11);
861 let expected_hda_fn_nid = vec![0, 1, 2, 0, 1, 0, 1, 2, 0, 1, 2];
862 // Check hda_fn_nid (PCM Device number)
863 for (i, chmap_info) in res.snd_data.chmap_info.iter().enumerate() {
864 assert_eq!(
865 chmap_info.hdr.hda_fn_nid.to_native(),
866 expected_hda_fn_nid[i],
867 "chmap_info index {} incorrect hda_fn_nid",
868 i
869 );
870 }
871 }
872
873 #[test]
test_resize_parameters_pcm_device_config_truncate()874 fn test_resize_parameters_pcm_device_config_truncate() {
875 // If pcm_device_config is larger than number of devices, it will be truncated
876 let params = Parameters {
877 num_output_devices: 1,
878 num_input_devices: 1,
879 output_device_config: vec![PCMDeviceParameters::default(); 3],
880 input_device_config: vec![PCMDeviceParameters::default(); 3],
881 ..Parameters::default()
882 };
883 let params = resize_parameters_pcm_device_config(params);
884 assert_eq!(params.output_device_config.len(), 1);
885 assert_eq!(params.input_device_config.len(), 1);
886 }
887
888 #[test]
test_resize_parameters_pcm_device_config_extend()889 fn test_resize_parameters_pcm_device_config_extend() {
890 let params = Parameters {
891 num_output_devices: 3,
892 num_input_devices: 2,
893 num_output_streams: 3,
894 num_input_streams: 2,
895 output_device_config: vec![PCMDeviceParameters {
896 effects: Some(vec![StreamEffect::EchoCancellation]),
897 ..PCMDeviceParameters::default()
898 }],
899 input_device_config: vec![PCMDeviceParameters {
900 effects: Some(vec![StreamEffect::EchoCancellation]),
901 ..PCMDeviceParameters::default()
902 }],
903 ..Default::default()
904 };
905
906 let params = resize_parameters_pcm_device_config(params);
907
908 // Check output_device_config correctly extended
909 assert_eq!(
910 params.output_device_config,
911 vec![
912 PCMDeviceParameters {
913 // Keep from the parameters
914 effects: Some(vec![StreamEffect::EchoCancellation]),
915 ..PCMDeviceParameters::default()
916 },
917 PCMDeviceParameters::default(), // Extended with default
918 PCMDeviceParameters::default(), // Extended with default
919 ]
920 );
921
922 // Check input_device_config correctly extended
923 assert_eq!(
924 params.input_device_config,
925 vec![
926 PCMDeviceParameters {
927 // Keep from the parameters
928 effects: Some(vec![StreamEffect::EchoCancellation]),
929 ..PCMDeviceParameters::default()
930 },
931 PCMDeviceParameters::default(), // Extended with default
932 ]
933 );
934 }
935 }
936