• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::fmt;
6 use std::io;
7 use std::io::Read;
8 use std::io::Write;
9 use std::rc::Rc;
10 
11 use async_trait::async_trait;
12 use audio_streams::capture::AsyncCaptureBuffer;
13 use audio_streams::AsyncPlaybackBuffer;
14 use base::debug;
15 use base::error;
16 use cros_async::sync::Condvar;
17 use cros_async::sync::Mutex as AsyncMutex;
18 use cros_async::EventAsync;
19 use cros_async::Executor;
20 use futures::channel::mpsc;
21 use futures::channel::oneshot;
22 use futures::pin_mut;
23 use futures::select;
24 use futures::FutureExt;
25 use futures::SinkExt;
26 use futures::StreamExt;
27 use thiserror::Error as ThisError;
28 use vm_memory::GuestMemory;
29 #[cfg(windows)]
30 use win_audio::AudioSharedFormat;
31 use zerocopy::AsBytes;
32 
33 use super::Error;
34 use super::SndData;
35 use super::WorkerStatus;
36 use crate::virtio::snd::common::*;
37 use crate::virtio::snd::common_backend::stream_info::SetParams;
38 use crate::virtio::snd::common_backend::stream_info::StreamInfo;
39 use crate::virtio::snd::common_backend::DirectionalStream;
40 use crate::virtio::snd::common_backend::PcmResponse;
41 use crate::virtio::snd::constants::*;
42 use crate::virtio::snd::layout::*;
43 use crate::virtio::DescriptorChain;
44 use crate::virtio::Queue;
45 use crate::virtio::Reader;
46 use crate::virtio::SignalableInterrupt;
47 use crate::virtio::Writer;
48 
49 // TODO(b/246601226): Remove once a generic audio_stream solution that can accpet
50 // arbitrarily size buffers.
51 /// Trait to wrap system specific helpers for writing to endpoint playback buffers.
52 #[async_trait(?Send)]
53 pub trait PlaybackBufferWriter {
new( guest_period_bytes: usize, #[cfg(windows)] frame_size: usize, #[cfg(windows)] frame_rate: usize, #[cfg(windows)] guest_num_channels: usize, #[cfg(windows)] audio_shared_format: AudioSharedFormat, ) -> Self where Self: Sized54     fn new(
55         guest_period_bytes: usize,
56         #[cfg(windows)] frame_size: usize,
57         #[cfg(windows)] frame_rate: usize,
58         #[cfg(windows)] guest_num_channels: usize,
59         #[cfg(windows)] audio_shared_format: AudioSharedFormat,
60     ) -> Self
61     where
62         Self: Sized;
63 
64     /// Returns the period of the endpoint device.
endpoint_period_bytes(&self) -> usize65     fn endpoint_period_bytes(&self) -> usize;
66 
67     /// Read audio samples from the tx virtqueue.
copy_to_buffer( &mut self, dst_buf: &mut AsyncPlaybackBuffer<'_>, reader: &mut Reader, ) -> Result<usize, Error>68     fn copy_to_buffer(
69         &mut self,
70         dst_buf: &mut AsyncPlaybackBuffer<'_>,
71         reader: &mut Reader,
72     ) -> Result<usize, Error> {
73         dst_buf.copy_from(reader).map_err(Error::Io)
74     }
75     /// Check to see if an additional read from the tx virtqueue is needed during a playback
76     /// loop. If so, read from the virtqueue.
77     ///
78     /// Prefill will happen, for example, if the endpoint buffer requires a 513 frame period, but
79     /// each tx virtqueue read only produces 480 frames.
80     #[cfg(windows)]
check_and_prefill( &mut self, mem: &GuestMemory, desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>, sender: &mut mpsc::UnboundedSender<PcmResponse>, ) -> Result<(), Error>81     async fn check_and_prefill(
82         &mut self,
83         mem: &GuestMemory,
84         desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>,
85         sender: &mut mpsc::UnboundedSender<PcmResponse>,
86     ) -> Result<(), Error>;
87 }
88 
89 #[derive(Debug)]
90 enum VirtioSndPcmCmd {
91     SetParams { set_params: SetParams },
92     Prepare,
93     Start,
94     Stop,
95     Release,
96 }
97 
98 impl fmt::Display for VirtioSndPcmCmd {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result99     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100         let cmd_code = match self {
101             VirtioSndPcmCmd::SetParams { set_params: _ } => VIRTIO_SND_R_PCM_SET_PARAMS,
102             VirtioSndPcmCmd::Prepare => VIRTIO_SND_R_PCM_PREPARE,
103             VirtioSndPcmCmd::Start => VIRTIO_SND_R_PCM_START,
104             VirtioSndPcmCmd::Stop => VIRTIO_SND_R_PCM_STOP,
105             VirtioSndPcmCmd::Release => VIRTIO_SND_R_PCM_RELEASE,
106         };
107         f.write_str(get_virtio_snd_r_pcm_cmd_name(cmd_code))
108     }
109 }
110 
111 #[derive(ThisError, Debug)]
112 enum VirtioSndPcmCmdError {
113     #[error("SetParams requires additional parameters")]
114     SetParams,
115     #[error("Invalid virtio snd command code")]
116     InvalidCode,
117 }
118 
119 impl TryFrom<u32> for VirtioSndPcmCmd {
120     type Error = VirtioSndPcmCmdError;
121 
try_from(code: u32) -> Result<Self, Self::Error>122     fn try_from(code: u32) -> Result<Self, Self::Error> {
123         match code {
124             VIRTIO_SND_R_PCM_PREPARE => Ok(VirtioSndPcmCmd::Prepare),
125             VIRTIO_SND_R_PCM_START => Ok(VirtioSndPcmCmd::Start),
126             VIRTIO_SND_R_PCM_STOP => Ok(VirtioSndPcmCmd::Stop),
127             VIRTIO_SND_R_PCM_RELEASE => Ok(VirtioSndPcmCmd::Release),
128             VIRTIO_SND_R_PCM_SET_PARAMS => Err(VirtioSndPcmCmdError::SetParams),
129             _ => Err(VirtioSndPcmCmdError::InvalidCode),
130         }
131     }
132 }
133 
134 impl VirtioSndPcmCmd {
with_set_params_and_direction( set_params: virtio_snd_pcm_set_params, dir: u8, ) -> VirtioSndPcmCmd135     fn with_set_params_and_direction(
136         set_params: virtio_snd_pcm_set_params,
137         dir: u8,
138     ) -> VirtioSndPcmCmd {
139         let buffer_bytes: u32 = set_params.buffer_bytes.into();
140         let period_bytes: u32 = set_params.period_bytes.into();
141         VirtioSndPcmCmd::SetParams {
142             set_params: SetParams {
143                 channels: set_params.channels,
144                 format: from_virtio_sample_format(set_params.format).unwrap(),
145                 frame_rate: from_virtio_frame_rate(set_params.rate).unwrap(),
146                 buffer_bytes: buffer_bytes as usize,
147                 period_bytes: period_bytes as usize,
148                 dir,
149             },
150         }
151     }
152 }
153 
154 // Returns true if the operation is successful. Returns error if there is
155 // a runtime/internal error
process_pcm_ctrl( ex: &Executor, mem: &GuestMemory, tx_send: &mpsc::UnboundedSender<PcmResponse>, rx_send: &mpsc::UnboundedSender<PcmResponse>, streams: &Rc<AsyncMutex<Vec<AsyncMutex<StreamInfo>>>>, cmd: VirtioSndPcmCmd, writer: &mut Writer, stream_id: usize, ) -> Result<(), Error>156 async fn process_pcm_ctrl(
157     ex: &Executor,
158     mem: &GuestMemory,
159     tx_send: &mpsc::UnboundedSender<PcmResponse>,
160     rx_send: &mpsc::UnboundedSender<PcmResponse>,
161     streams: &Rc<AsyncMutex<Vec<AsyncMutex<StreamInfo>>>>,
162     cmd: VirtioSndPcmCmd,
163     writer: &mut Writer,
164     stream_id: usize,
165 ) -> Result<(), Error> {
166     let streams = streams.read_lock().await;
167     let mut stream = match streams.get(stream_id) {
168         Some(stream_info) => stream_info.lock().await,
169         None => {
170             error!(
171                 "Stream id={} not found for {}. Error code: VIRTIO_SND_S_BAD_MSG",
172                 stream_id, cmd
173             );
174             return writer
175                 .write_obj(VIRTIO_SND_S_BAD_MSG)
176                 .map_err(Error::WriteResponse);
177         }
178     };
179 
180     debug!("{} for stream id={}", cmd, stream_id);
181 
182     let result = match cmd {
183         VirtioSndPcmCmd::SetParams { set_params } => {
184             let result = stream.set_params(set_params).await;
185             if result.is_ok() {
186                 debug!(
187                     "VIRTIO_SND_R_PCM_SET_PARAMS for stream id={}. Stream info: {:#?}",
188                     stream_id, *stream
189                 );
190             }
191             result
192         }
193         VirtioSndPcmCmd::Prepare => stream.prepare(ex, mem.clone(), tx_send, rx_send).await,
194         VirtioSndPcmCmd::Start => stream.start().await,
195         VirtioSndPcmCmd::Stop => stream.stop().await,
196         VirtioSndPcmCmd::Release => stream.release().await,
197     };
198     match result {
199         Ok(_) => writer
200             .write_obj(VIRTIO_SND_S_OK)
201             .map_err(Error::WriteResponse),
202         Err(Error::OperationNotSupported) => {
203             error!(
204                 "{} for stream id={} failed. Error code: VIRTIO_SND_S_NOT_SUPP.",
205                 cmd, stream_id
206             );
207 
208             writer
209                 .write_obj(VIRTIO_SND_S_NOT_SUPP)
210                 .map_err(Error::WriteResponse)
211         }
212         Err(e) => {
213             // Runtime/internal error would be more appropriate, but there's
214             // no such error type
215             error!(
216                 "{} for stream id={} failed. Error code: VIRTIO_SND_S_IO_ERR. Actual error: {}",
217                 cmd, stream_id, e
218             );
219             writer
220                 .write_obj(VIRTIO_SND_S_IO_ERR)
221                 .map_err(Error::WriteResponse)
222         }
223     }
224 }
225 
write_data( mut dst_buf: AsyncPlaybackBuffer<'_>, reader: Option<Reader>, buffer_writer: &mut Box<dyn PlaybackBufferWriter>, ) -> Result<u32, Error>226 async fn write_data(
227     mut dst_buf: AsyncPlaybackBuffer<'_>,
228     reader: Option<Reader>,
229     buffer_writer: &mut Box<dyn PlaybackBufferWriter>,
230 ) -> Result<u32, Error> {
231     let transferred = match reader {
232         Some(mut reader) => buffer_writer.copy_to_buffer(&mut dst_buf, &mut reader)?,
233         None => dst_buf
234             .copy_from(&mut io::repeat(0).take(buffer_writer.endpoint_period_bytes() as u64))
235             .map_err(Error::Io)?,
236     };
237 
238     if transferred as usize != buffer_writer.endpoint_period_bytes() {
239         error!(
240             "Bytes written {} != period_bytes {}",
241             transferred,
242             buffer_writer.endpoint_period_bytes()
243         );
244         Err(Error::InvalidBufferSize)
245     } else {
246         dst_buf.commit().await;
247         Ok(dst_buf.latency_bytes())
248     }
249 }
250 
read_data<'a>( mut src_buf: AsyncCaptureBuffer<'a>, writer: Option<&mut Writer>, period_bytes: usize, ) -> Result<u32, Error>251 async fn read_data<'a>(
252     mut src_buf: AsyncCaptureBuffer<'a>,
253     writer: Option<&mut Writer>,
254     period_bytes: usize,
255 ) -> Result<u32, Error> {
256     let transferred = match writer {
257         Some(writer) => src_buf.copy_to(writer),
258         None => src_buf.copy_to(&mut io::sink()),
259     }
260     .map_err(Error::Io)?;
261     if transferred as usize != period_bytes {
262         error!(
263             "Bytes written {} != period_bytes {}",
264             transferred, period_bytes
265         );
266         Err(Error::InvalidBufferSize)
267     } else {
268         src_buf.commit().await;
269         Ok(src_buf.latency_bytes())
270     }
271 }
272 
273 impl From<Result<u32, Error>> for virtio_snd_pcm_status {
from(res: Result<u32, Error>) -> Self274     fn from(res: Result<u32, Error>) -> Self {
275         match res {
276             Ok(latency_bytes) => virtio_snd_pcm_status::new(StatusCode::OK, latency_bytes),
277             Err(e) => {
278                 error!("PCM I/O message failed: {}", e);
279                 virtio_snd_pcm_status::new(StatusCode::IoErr, 0)
280             }
281         }
282     }
283 }
284 
285 // Drain all DescriptorChain in desc_receiver during WorkerStatus::Quit process.
drain_desc_receiver( desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>, mem: &GuestMemory, sender: &mut mpsc::UnboundedSender<PcmResponse>, ) -> Result<(), Error>286 async fn drain_desc_receiver(
287     desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>,
288     mem: &GuestMemory,
289     sender: &mut mpsc::UnboundedSender<PcmResponse>,
290 ) -> Result<(), Error> {
291     let mut o_desc_chain = desc_receiver.next().await;
292     while let Some(desc_chain) = o_desc_chain {
293         // From the virtio-snd spec:
294         // The device MUST complete all pending I/O messages for the specified stream ID.
295         let desc_index = desc_chain.index;
296         let writer = Writer::new(mem.clone(), desc_chain).map_err(Error::DescriptorChain)?;
297         let status = virtio_snd_pcm_status::new(StatusCode::OK, 0);
298         // Fetch next DescriptorChain to see if the current one is the last one.
299         o_desc_chain = desc_receiver.next().await;
300         let (done, future) = if o_desc_chain.is_none() {
301             let (done, future) = oneshot::channel();
302             (Some(done), Some(future))
303         } else {
304             (None, None)
305         };
306         sender
307             .send(PcmResponse {
308                 desc_index,
309                 status,
310                 writer,
311                 done,
312             })
313             .await
314             .map_err(Error::MpscSend)?;
315 
316         if let Some(f) = future {
317             // From the virtio-snd spec:
318             // The device MUST NOT complete the control request (VIRTIO_SND_R_PCM_RELEASE)
319             // while there are pending I/O messages for the specified stream ID.
320             f.await.map_err(Error::DoneNotTriggered)?;
321         };
322     }
323     Ok(())
324 }
325 
get_index_with_reader_and_writer( mem: &GuestMemory, desc_chain: DescriptorChain, ) -> Result<(u16, Reader, Writer), Error>326 pub(crate) fn get_index_with_reader_and_writer(
327     mem: &GuestMemory,
328     desc_chain: DescriptorChain,
329 ) -> Result<(u16, Reader, Writer), Error> {
330     let desc_index = desc_chain.index;
331     let mut reader =
332         Reader::new(mem.clone(), desc_chain.clone()).map_err(Error::DescriptorChain)?;
333     // stream_id was already read in handle_pcm_queue
334     reader.consume(std::mem::size_of::<virtio_snd_pcm_xfer>());
335     let writer = Writer::new(mem.clone(), desc_chain).map_err(Error::DescriptorChain)?;
336     Ok((desc_index, reader, writer))
337 }
338 
339 /// Start a pcm worker that receives descriptors containing PCM frames (audio data) from the tx/rx
340 /// queue, and forward them to CRAS. One pcm worker per stream.
341 ///
342 /// This worker is started when VIRTIO_SND_R_PCM_PREPARE is called, and returned before
343 /// VIRTIO_SND_R_PCM_RELEASE is completed for the stream.
start_pcm_worker( ex: Executor, dstream: DirectionalStream, mut desc_receiver: mpsc::UnboundedReceiver<DescriptorChain>, status_mutex: Rc<AsyncMutex<WorkerStatus>>, mem: GuestMemory, mut sender: mpsc::UnboundedSender<PcmResponse>, ) -> Result<(), Error>344 pub async fn start_pcm_worker(
345     ex: Executor,
346     dstream: DirectionalStream,
347     mut desc_receiver: mpsc::UnboundedReceiver<DescriptorChain>,
348     status_mutex: Rc<AsyncMutex<WorkerStatus>>,
349     mem: GuestMemory,
350     mut sender: mpsc::UnboundedSender<PcmResponse>,
351 ) -> Result<(), Error> {
352     let res = pcm_worker_loop(
353         ex,
354         dstream,
355         &mut desc_receiver,
356         &status_mutex,
357         &mem,
358         &mut sender,
359     )
360     .await;
361     *status_mutex.lock().await = WorkerStatus::Quit;
362     if res.is_err() {
363         error!(
364             "pcm_worker error: {:#?}. Draining desc_receiver",
365             res.as_ref().err()
366         );
367         // On error, guaranteed that desc_receiver has not been drained, so drain it here.
368         // Note that drain blocks until the stream is release.
369         drain_desc_receiver(&mut desc_receiver, &mem, &mut sender).await?;
370     }
371     res
372 }
373 
pcm_worker_loop( ex: Executor, dstream: DirectionalStream, desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>, status_mutex: &Rc<AsyncMutex<WorkerStatus>>, mem: &GuestMemory, sender: &mut mpsc::UnboundedSender<PcmResponse>, ) -> Result<(), Error>374 async fn pcm_worker_loop(
375     ex: Executor,
376     dstream: DirectionalStream,
377     desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>,
378     status_mutex: &Rc<AsyncMutex<WorkerStatus>>,
379     mem: &GuestMemory,
380     sender: &mut mpsc::UnboundedSender<PcmResponse>,
381 ) -> Result<(), Error> {
382     match dstream {
383         #[allow(unused_mut)]
384         DirectionalStream::Output(mut stream, mut buffer_writer) => loop {
385             let dst_buf = stream
386                 .next_playback_buffer(&ex)
387                 .await
388                 .map_err(Error::FetchBuffer)?;
389             let worker_status = status_mutex.lock().await;
390             match *worker_status {
391                 WorkerStatus::Quit => {
392                     drain_desc_receiver(desc_receiver, mem, sender).await?;
393                     if let Err(e) = write_data(dst_buf, None, &mut buffer_writer).await {
394                         error!("Error on write_data after worker quit: {}", e)
395                     }
396                     break Ok(());
397                 }
398                 WorkerStatus::Pause => {
399                     write_data(dst_buf, None, &mut buffer_writer).await?;
400                 }
401                 WorkerStatus::Running => {
402                     // TODO(b/246601226): Remove once a generic audio_stream solution that can
403                     // accpet arbitrarily size buffers
404                     #[cfg(windows)]
405                     buffer_writer
406                         .check_and_prefill(mem, desc_receiver, sender)
407                         .await?;
408 
409                     match desc_receiver.try_next() {
410                         Err(e) => {
411                             error!("Underrun. No new DescriptorChain while running: {}", e);
412                             write_data(dst_buf, None, &mut buffer_writer).await?;
413                         }
414                         Ok(None) => {
415                             error!("Unreachable. status should be Quit when the channel is closed");
416                             write_data(dst_buf, None, &mut buffer_writer).await?;
417                             return Err(Error::InvalidPCMWorkerState);
418                         }
419                         Ok(Some(desc_chain)) => {
420                             let (desc_index, reader, writer) =
421                                 get_index_with_reader_and_writer(mem, desc_chain)?;
422                             sender
423                                 .send(PcmResponse {
424                                     desc_index,
425                                     status: write_data(dst_buf, Some(reader), &mut buffer_writer)
426                                         .await
427                                         .into(),
428                                     writer,
429                                     done: None,
430                                 })
431                                 .await
432                                 .map_err(Error::MpscSend)?;
433                         }
434                     }
435                 }
436             }
437         },
438         DirectionalStream::Input(mut stream, period_bytes) => loop {
439             let src_buf = stream
440                 .next_capture_buffer(&ex)
441                 .await
442                 .map_err(Error::FetchBuffer)?;
443 
444             let worker_status = status_mutex.lock().await;
445             match *worker_status {
446                 WorkerStatus::Quit => {
447                     drain_desc_receiver(desc_receiver, mem, sender).await?;
448                     if let Err(e) = read_data(src_buf, None, period_bytes).await {
449                         error!("Error on read_data after worker quit: {}", e)
450                     }
451                     break Ok(());
452                 }
453                 WorkerStatus::Pause => {
454                     read_data(src_buf, None, period_bytes).await?;
455                 }
456                 WorkerStatus::Running => match desc_receiver.try_next() {
457                     Err(e) => {
458                         error!("Overrun. No new DescriptorChain while running: {}", e);
459                         read_data(src_buf, None, period_bytes).await?;
460                     }
461                     Ok(None) => {
462                         error!("Unreachable. status should be Quit when the channel is closed");
463                         read_data(src_buf, None, period_bytes).await?;
464                         return Err(Error::InvalidPCMWorkerState);
465                     }
466                     Ok(Some(desc_chain)) => {
467                         let (desc_index, _reader, mut writer) =
468                             get_index_with_reader_and_writer(mem, desc_chain)?;
469 
470                         sender
471                             .send(PcmResponse {
472                                 desc_index,
473                                 status: read_data(src_buf, Some(&mut writer), period_bytes)
474                                     .await
475                                     .into(),
476                                 writer,
477                                 done: None,
478                             })
479                             .await
480                             .map_err(Error::MpscSend)?;
481                     }
482                 },
483             }
484         },
485     }
486 }
487 
488 // Defer pcm message response to the pcm response worker
defer_pcm_response_to_worker( desc_chain: DescriptorChain, mem: &GuestMemory, status: virtio_snd_pcm_status, response_sender: &mut mpsc::UnboundedSender<PcmResponse>, ) -> Result<(), Error>489 async fn defer_pcm_response_to_worker(
490     desc_chain: DescriptorChain,
491     mem: &GuestMemory,
492     status: virtio_snd_pcm_status,
493     response_sender: &mut mpsc::UnboundedSender<PcmResponse>,
494 ) -> Result<(), Error> {
495     let desc_index = desc_chain.index;
496     let writer = Writer::new(mem.clone(), desc_chain).map_err(Error::DescriptorChain)?;
497     response_sender
498         .send(PcmResponse {
499             desc_index,
500             status,
501             writer,
502             done: None,
503         })
504         .await
505         .map_err(Error::MpscSend)
506 }
507 
send_pcm_response_with_writer<I: SignalableInterrupt>( mut writer: Writer, desc_index: u16, mem: &GuestMemory, queue: &mut Queue, interrupt: &I, status: virtio_snd_pcm_status, ) -> Result<(), Error>508 fn send_pcm_response_with_writer<I: SignalableInterrupt>(
509     mut writer: Writer,
510     desc_index: u16,
511     mem: &GuestMemory,
512     queue: &mut Queue,
513     interrupt: &I,
514     status: virtio_snd_pcm_status,
515 ) -> Result<(), Error> {
516     // For rx queue only. Fast forward the unused audio data buffer.
517     if writer.available_bytes() > std::mem::size_of::<virtio_snd_pcm_status>() {
518         writer
519             .consume_bytes(writer.available_bytes() - std::mem::size_of::<virtio_snd_pcm_status>());
520     }
521     writer.write_obj(status).map_err(Error::WriteResponse)?;
522     queue.add_used(mem, desc_index, writer.bytes_written() as u32);
523     queue.trigger_interrupt(mem, interrupt);
524     Ok(())
525 }
526 
527 // Await until reset_signal has been released
await_reset_signal(reset_signal_option: Option<&(AsyncMutex<bool>, Condvar)>)528 async fn await_reset_signal(reset_signal_option: Option<&(AsyncMutex<bool>, Condvar)>) {
529     match reset_signal_option {
530         Some((lock, cvar)) => {
531             let mut reset = lock.lock().await;
532             while !*reset {
533                 reset = cvar.wait(reset).await;
534             }
535         }
536         None => futures::future::pending().await,
537     };
538 }
539 
send_pcm_response_worker<I: SignalableInterrupt>( mem: &GuestMemory, queue: &Rc<AsyncMutex<Queue>>, interrupt: I, recv: &mut mpsc::UnboundedReceiver<PcmResponse>, reset_signal: Option<&(AsyncMutex<bool>, Condvar)>, ) -> Result<(), Error>540 pub async fn send_pcm_response_worker<I: SignalableInterrupt>(
541     mem: &GuestMemory,
542     queue: &Rc<AsyncMutex<Queue>>,
543     interrupt: I,
544     recv: &mut mpsc::UnboundedReceiver<PcmResponse>,
545     reset_signal: Option<&(AsyncMutex<bool>, Condvar)>,
546 ) -> Result<(), Error> {
547     let on_reset = await_reset_signal(reset_signal).fuse();
548     pin_mut!(on_reset);
549 
550     loop {
551         let next_async = recv.next().fuse();
552         pin_mut!(next_async);
553 
554         let res = select! {
555             _ = on_reset => break,
556             res = next_async => res,
557         };
558 
559         if let Some(r) = res {
560             send_pcm_response_with_writer(
561                 r.writer,
562                 r.desc_index,
563                 mem,
564                 &mut *queue.lock().await,
565                 &interrupt,
566                 r.status,
567             )?;
568 
569             // Resume pcm_worker
570             if let Some(done) = r.done {
571                 done.send(()).map_err(Error::OneshotSend)?;
572             }
573         } else {
574             debug!("PcmResponse channel is closed.");
575             break;
576         }
577     }
578     Ok(())
579 }
580 
581 /// Handle messages from the tx or the rx queue. One invocation is needed for
582 /// each queue.
handle_pcm_queue( mem: &GuestMemory, streams: &Rc<AsyncMutex<Vec<AsyncMutex<StreamInfo>>>>, mut response_sender: mpsc::UnboundedSender<PcmResponse>, queue: &Rc<AsyncMutex<Queue>>, queue_event: &EventAsync, reset_signal: Option<&(AsyncMutex<bool>, Condvar)>, ) -> Result<(), Error>583 pub async fn handle_pcm_queue(
584     mem: &GuestMemory,
585     streams: &Rc<AsyncMutex<Vec<AsyncMutex<StreamInfo>>>>,
586     mut response_sender: mpsc::UnboundedSender<PcmResponse>,
587     queue: &Rc<AsyncMutex<Queue>>,
588     queue_event: &EventAsync,
589     reset_signal: Option<&(AsyncMutex<bool>, Condvar)>,
590 ) -> Result<(), Error> {
591     let on_reset = await_reset_signal(reset_signal).fuse();
592     pin_mut!(on_reset);
593 
594     loop {
595         // Manual queue.next_async() to avoid holding the mutex
596         let next_async = async {
597             loop {
598                 // Check if there are more descriptors available.
599                 if let Some(chain) = queue.lock().await.pop(mem) {
600                     return Ok(chain);
601                 }
602                 queue_event.next_val().await?;
603             }
604         }
605         .fuse();
606         pin_mut!(next_async);
607 
608         let desc_chain = select! {
609             _ = on_reset => break,
610             res = next_async => res.map_err(Error::Async)?,
611         };
612 
613         let mut reader =
614             Reader::new(mem.clone(), desc_chain.clone()).map_err(Error::DescriptorChain)?;
615 
616         let pcm_xfer: virtio_snd_pcm_xfer = reader.read_obj().map_err(Error::ReadMessage)?;
617         let stream_id: usize = u32::from(pcm_xfer.stream_id) as usize;
618 
619         let streams = streams.read_lock().await;
620         let stream_info = match streams.get(stream_id) {
621             Some(stream_info) => stream_info.read_lock().await,
622             None => {
623                 error!(
624                     "stream_id ({}) >= num_streams ({})",
625                     stream_id,
626                     streams.len()
627                 );
628                 defer_pcm_response_to_worker(
629                     desc_chain,
630                     mem,
631                     virtio_snd_pcm_status::new(StatusCode::IoErr, 0),
632                     &mut response_sender,
633                 )
634                 .await?;
635                 continue;
636             }
637         };
638 
639         match stream_info.sender.as_ref() {
640             Some(mut s) => {
641                 s.send(desc_chain).await.map_err(Error::MpscSend)?;
642                 if *stream_info.status_mutex.lock().await == WorkerStatus::Quit {
643                     // If sender channel is still intact but worker status is quit,
644                     // the worker quitted unexpectedly. Return error to request a reset.
645                     return Err(Error::PCMWorkerQuittedUnexpectedly);
646                 }
647             }
648             None => {
649                 if !stream_info.just_reset {
650                     error!(
651                         "stream {} is not ready. state: {}",
652                         stream_id,
653                         get_virtio_snd_r_pcm_cmd_name(stream_info.state)
654                     );
655                 }
656                 defer_pcm_response_to_worker(
657                     desc_chain,
658                     mem,
659                     virtio_snd_pcm_status::new(StatusCode::IoErr, 0),
660                     &mut response_sender,
661                 )
662                 .await?;
663             }
664         };
665     }
666     Ok(())
667 }
668 
669 /// Handle all the control messages from the ctrl queue.
handle_ctrl_queue<I: SignalableInterrupt>( ex: &Executor, mem: &GuestMemory, streams: &Rc<AsyncMutex<Vec<AsyncMutex<StreamInfo>>>>, snd_data: &SndData, queue: &mut Queue, queue_event: &mut EventAsync, interrupt: I, tx_send: mpsc::UnboundedSender<PcmResponse>, rx_send: mpsc::UnboundedSender<PcmResponse>, reset_signal: Option<&(AsyncMutex<bool>, Condvar)>, ) -> Result<(), Error>670 pub async fn handle_ctrl_queue<I: SignalableInterrupt>(
671     ex: &Executor,
672     mem: &GuestMemory,
673     streams: &Rc<AsyncMutex<Vec<AsyncMutex<StreamInfo>>>>,
674     snd_data: &SndData,
675     queue: &mut Queue,
676     queue_event: &mut EventAsync,
677     interrupt: I,
678     tx_send: mpsc::UnboundedSender<PcmResponse>,
679     rx_send: mpsc::UnboundedSender<PcmResponse>,
680     reset_signal: Option<&(AsyncMutex<bool>, Condvar)>,
681 ) -> Result<(), Error> {
682     let on_reset = await_reset_signal(reset_signal).fuse();
683     pin_mut!(on_reset);
684 
685     loop {
686         let desc_chain = {
687             let next_async = queue.next_async(mem, queue_event).fuse();
688             pin_mut!(next_async);
689 
690             select! {
691                 _ = on_reset => break,
692                 res = next_async => res.map_err(Error::Async)?,
693             }
694         };
695 
696         let index = desc_chain.index;
697 
698         let mut reader =
699             Reader::new(mem.clone(), desc_chain.clone()).map_err(Error::DescriptorChain)?;
700         let mut writer = Writer::new(mem.clone(), desc_chain).map_err(Error::DescriptorChain)?;
701         // Don't advance the reader
702         let code = reader
703             .clone()
704             .read_obj::<virtio_snd_hdr>()
705             .map_err(Error::ReadMessage)?
706             .code
707             .into();
708 
709         let handle_ctrl_msg = async {
710             return match code {
711                 VIRTIO_SND_R_JACK_INFO => {
712                     let query_info: virtio_snd_query_info =
713                         reader.read_obj().map_err(Error::ReadMessage)?;
714                     let start_id: usize = u32::from(query_info.start_id) as usize;
715                     let count: usize = u32::from(query_info.count) as usize;
716                     if start_id + count > snd_data.jack_info.len() {
717                         error!(
718                             "start_id({}) + count({}) must be smaller than \
719                             the number of jacks ({})",
720                             start_id,
721                             count,
722                             snd_data.jack_info.len()
723                         );
724                         return writer
725                             .write_obj(VIRTIO_SND_S_BAD_MSG)
726                             .map_err(Error::WriteResponse);
727                     }
728                     // The response consists of the virtio_snd_hdr structure (contains the request
729                     // status code), followed by the device-writable information structures of the
730                     // item. Each information structure begins with the following common header
731                     writer
732                         .write_obj(VIRTIO_SND_S_OK)
733                         .map_err(Error::WriteResponse)?;
734                     for i in start_id..(start_id + count) {
735                         writer
736                             .write_all(snd_data.jack_info[i].as_bytes())
737                             .map_err(Error::WriteResponse)?;
738                     }
739                     Ok(())
740                 }
741                 VIRTIO_SND_R_PCM_INFO => {
742                     let query_info: virtio_snd_query_info =
743                         reader.read_obj().map_err(Error::ReadMessage)?;
744                     let start_id: usize = u32::from(query_info.start_id) as usize;
745                     let count: usize = u32::from(query_info.count) as usize;
746                     if start_id + count > snd_data.pcm_info.len() {
747                         error!(
748                             "start_id({}) + count({}) must be smaller than \
749                             the number of streams ({})",
750                             start_id,
751                             count,
752                             snd_data.pcm_info.len()
753                         );
754                         return writer
755                             .write_obj(VIRTIO_SND_S_BAD_MSG)
756                             .map_err(Error::WriteResponse);
757                     }
758                     // The response consists of the virtio_snd_hdr structure (contains the request
759                     // status code), followed by the device-writable information structures of the
760                     // item. Each information structure begins with the following common header
761                     writer
762                         .write_obj(VIRTIO_SND_S_OK)
763                         .map_err(Error::WriteResponse)?;
764                     for i in start_id..(start_id + count) {
765                         writer
766                             .write_all(snd_data.pcm_info[i].as_bytes())
767                             .map_err(Error::WriteResponse)?;
768                     }
769                     Ok(())
770                 }
771                 VIRTIO_SND_R_CHMAP_INFO => {
772                     let query_info: virtio_snd_query_info =
773                         reader.read_obj().map_err(Error::ReadMessage)?;
774                     let start_id: usize = u32::from(query_info.start_id) as usize;
775                     let count: usize = u32::from(query_info.count) as usize;
776                     if start_id + count > snd_data.chmap_info.len() {
777                         error!(
778                             "start_id({}) + count({}) must be smaller than \
779                             the number of chmaps ({})",
780                             start_id,
781                             count,
782                             snd_data.chmap_info.len()
783                         );
784                         return writer
785                             .write_obj(VIRTIO_SND_S_BAD_MSG)
786                             .map_err(Error::WriteResponse);
787                     }
788                     // The response consists of the virtio_snd_hdr structure (contains the request
789                     // status code), followed by the device-writable information structures of the
790                     // item. Each information structure begins with the following common header
791                     writer
792                         .write_obj(VIRTIO_SND_S_OK)
793                         .map_err(Error::WriteResponse)?;
794                     for i in start_id..(start_id + count) {
795                         writer
796                             .write_all(snd_data.chmap_info[i].as_bytes())
797                             .map_err(Error::WriteResponse)?;
798                     }
799                     Ok(())
800                 }
801                 VIRTIO_SND_R_JACK_REMAP => {
802                     unreachable!("remap is unsupported");
803                 }
804                 VIRTIO_SND_R_PCM_SET_PARAMS => {
805                     // Raise VIRTIO_SND_S_BAD_MSG or IO error?
806                     let set_params: virtio_snd_pcm_set_params =
807                         reader.read_obj().map_err(Error::ReadMessage)?;
808                     let stream_id: usize = u32::from(set_params.hdr.stream_id) as usize;
809                     let buffer_bytes: u32 = set_params.buffer_bytes.into();
810                     let period_bytes: u32 = set_params.period_bytes.into();
811 
812                     let dir = match snd_data.pcm_info.get(stream_id) {
813                         Some(pcm_info) => {
814                             if set_params.channels < pcm_info.channels_min
815                                 || set_params.channels > pcm_info.channels_max
816                             {
817                                 error!(
818                                     "Number of channels ({}) must be between {} and {}",
819                                     set_params.channels,
820                                     pcm_info.channels_min,
821                                     pcm_info.channels_max
822                                 );
823                                 return writer
824                                     .write_obj(VIRTIO_SND_S_NOT_SUPP)
825                                     .map_err(Error::WriteResponse);
826                             }
827                             if (u64::from(pcm_info.formats) & (1 << set_params.format)) == 0 {
828                                 error!("PCM format {} is not supported.", set_params.format);
829                                 return writer
830                                     .write_obj(VIRTIO_SND_S_NOT_SUPP)
831                                     .map_err(Error::WriteResponse);
832                             }
833                             if (u64::from(pcm_info.rates) & (1 << set_params.rate)) == 0 {
834                                 error!("PCM frame rate {} is not supported.", set_params.rate);
835                                 return writer
836                                     .write_obj(VIRTIO_SND_S_NOT_SUPP)
837                                     .map_err(Error::WriteResponse);
838                             }
839 
840                             pcm_info.direction
841                         }
842                         None => {
843                             error!(
844                                 "stream_id {} < streams {}",
845                                 stream_id,
846                                 snd_data.pcm_info.len()
847                             );
848                             return writer
849                                 .write_obj(VIRTIO_SND_S_BAD_MSG)
850                                 .map_err(Error::WriteResponse);
851                         }
852                     };
853 
854                     if set_params.features != 0 {
855                         error!("No feature is supported");
856                         return writer
857                             .write_obj(VIRTIO_SND_S_NOT_SUPP)
858                             .map_err(Error::WriteResponse);
859                     }
860 
861                     if buffer_bytes % period_bytes != 0 {
862                         error!(
863                             "buffer_bytes({}) must be dividable by period_bytes({})",
864                             buffer_bytes, period_bytes
865                         );
866                         return writer
867                             .write_obj(VIRTIO_SND_S_BAD_MSG)
868                             .map_err(Error::WriteResponse);
869                     }
870 
871                     process_pcm_ctrl(
872                         ex,
873                         &mem.clone(),
874                         &tx_send,
875                         &rx_send,
876                         streams,
877                         VirtioSndPcmCmd::with_set_params_and_direction(set_params, dir),
878                         &mut writer,
879                         stream_id,
880                     )
881                     .await
882                 }
883                 VIRTIO_SND_R_PCM_PREPARE
884                 | VIRTIO_SND_R_PCM_START
885                 | VIRTIO_SND_R_PCM_STOP
886                 | VIRTIO_SND_R_PCM_RELEASE => {
887                     let hdr: virtio_snd_pcm_hdr = reader.read_obj().map_err(Error::ReadMessage)?;
888                     let stream_id: usize = u32::from(hdr.stream_id) as usize;
889                     let cmd = match VirtioSndPcmCmd::try_from(code) {
890                         Ok(cmd) => cmd,
891                         Err(err) => {
892                             error!("Error converting code to command: {}", err);
893                             return writer
894                                 .write_obj(VIRTIO_SND_S_BAD_MSG)
895                                 .map_err(Error::WriteResponse);
896                         }
897                     };
898                     process_pcm_ctrl(
899                         ex,
900                         &mem.clone(),
901                         &tx_send,
902                         &rx_send,
903                         streams,
904                         cmd,
905                         &mut writer,
906                         stream_id,
907                     )
908                     .await
909                     .and(Ok(()))?;
910                     Ok(())
911                 }
912                 c => {
913                     error!("Unrecognized code: {}", c);
914                     return writer
915                         .write_obj(VIRTIO_SND_S_BAD_MSG)
916                         .map_err(Error::WriteResponse);
917                 }
918             };
919         };
920 
921         handle_ctrl_msg.await?;
922         queue.add_used(mem, index, writer.bytes_written() as u32);
923         queue.trigger_interrupt(mem, &interrupt);
924     }
925     Ok(())
926 }
927 
928 /// Send events to the audio driver.
handle_event_queue<I: SignalableInterrupt>( mem: &GuestMemory, mut queue: Queue, mut queue_event: EventAsync, interrupt: I, ) -> Result<(), Error>929 pub async fn handle_event_queue<I: SignalableInterrupt>(
930     mem: &GuestMemory,
931     mut queue: Queue,
932     mut queue_event: EventAsync,
933     interrupt: I,
934 ) -> Result<(), Error> {
935     loop {
936         let desc_chain = queue
937             .next_async(mem, &mut queue_event)
938             .await
939             .map_err(Error::Async)?;
940 
941         // TODO(woodychow): Poll and forward events from cras asynchronously (API to be added)
942         let index = desc_chain.index;
943         queue.add_used(mem, index, 0);
944         queue.trigger_interrupt(mem, &interrupt);
945     }
946 }
947