• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::fmt;
6 use std::io;
7 use std::io::Read;
8 use std::io::Write;
9 use std::rc::Rc;
10 use std::sync::atomic::AtomicBool;
11 use std::sync::atomic::Ordering;
12 use std::time::Duration;
13 
14 use async_trait::async_trait;
15 use audio_streams::capture::AsyncCaptureBuffer;
16 use audio_streams::AsyncPlaybackBuffer;
17 use audio_streams::BoxError;
18 use base::debug;
19 use base::error;
20 use base::info;
21 use cros_async::sync::Condvar;
22 use cros_async::sync::RwLock as AsyncRwLock;
23 use cros_async::AsyncTube;
24 use cros_async::EventAsync;
25 use cros_async::Executor;
26 use cros_async::TimerAsync;
27 use futures::channel::mpsc;
28 use futures::channel::oneshot;
29 use futures::pin_mut;
30 use futures::select;
31 use futures::FutureExt;
32 use futures::SinkExt;
33 use futures::StreamExt;
34 use thiserror::Error as ThisError;
35 use vm_control::SndControlCommand;
36 use vm_control::VmResponse;
37 use zerocopy::IntoBytes;
38 
39 use super::Error;
40 use super::SndData;
41 use super::WorkerStatus;
42 use crate::virtio::snd::common::*;
43 use crate::virtio::snd::common_backend::stream_info::SetParams;
44 use crate::virtio::snd::common_backend::stream_info::StreamInfo;
45 use crate::virtio::snd::common_backend::DirectionalStream;
46 use crate::virtio::snd::common_backend::PcmResponse;
47 use crate::virtio::snd::constants::*;
48 use crate::virtio::snd::layout::*;
49 use crate::virtio::DescriptorChain;
50 use crate::virtio::Queue;
51 use crate::virtio::Reader;
52 use crate::virtio::Writer;
53 
54 /// Trait to wrap system specific helpers for reading from the start point capture buffer.
55 #[async_trait(?Send)]
56 pub trait CaptureBufferReader {
get_next_capture_period( &mut self, ex: &Executor, ) -> Result<AsyncCaptureBuffer, BoxError>57     async fn get_next_capture_period(
58         &mut self,
59         ex: &Executor,
60     ) -> Result<AsyncCaptureBuffer, BoxError>;
61 }
62 
63 /// Trait to wrap system specific helpers for writing to endpoint playback buffers.
64 #[async_trait(?Send)]
65 pub trait PlaybackBufferWriter {
new(guest_period_bytes: usize) -> Self where Self: Sized66     fn new(guest_period_bytes: usize) -> Self
67     where
68         Self: Sized;
69 
70     /// Returns the period of the endpoint device.
endpoint_period_bytes(&self) -> usize71     fn endpoint_period_bytes(&self) -> usize;
72 
73     /// Read audio samples from the tx virtqueue.
copy_to_buffer( &mut self, dst_buf: &mut AsyncPlaybackBuffer<'_>, reader: &mut Reader, ) -> Result<usize, Error>74     fn copy_to_buffer(
75         &mut self,
76         dst_buf: &mut AsyncPlaybackBuffer<'_>,
77         reader: &mut Reader,
78     ) -> Result<usize, Error> {
79         dst_buf.copy_from(reader).map_err(Error::Io)
80     }
81 }
82 
83 #[derive(Debug)]
84 enum VirtioSndPcmCmd {
85     SetParams { set_params: SetParams },
86     Prepare,
87     Start,
88     Stop,
89     Release,
90 }
91 
92 impl fmt::Display for VirtioSndPcmCmd {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result93     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94         let cmd_code = match self {
95             VirtioSndPcmCmd::SetParams { set_params: _ } => VIRTIO_SND_R_PCM_SET_PARAMS,
96             VirtioSndPcmCmd::Prepare => VIRTIO_SND_R_PCM_PREPARE,
97             VirtioSndPcmCmd::Start => VIRTIO_SND_R_PCM_START,
98             VirtioSndPcmCmd::Stop => VIRTIO_SND_R_PCM_STOP,
99             VirtioSndPcmCmd::Release => VIRTIO_SND_R_PCM_RELEASE,
100         };
101         f.write_str(get_virtio_snd_r_pcm_cmd_name(cmd_code))
102     }
103 }
104 
105 #[derive(ThisError, Debug)]
106 enum VirtioSndPcmCmdError {
107     #[error("SetParams requires additional parameters")]
108     SetParams,
109     #[error("Invalid virtio snd command code")]
110     InvalidCode,
111 }
112 
113 impl TryFrom<u32> for VirtioSndPcmCmd {
114     type Error = VirtioSndPcmCmdError;
115 
try_from(code: u32) -> Result<Self, Self::Error>116     fn try_from(code: u32) -> Result<Self, Self::Error> {
117         match code {
118             VIRTIO_SND_R_PCM_PREPARE => Ok(VirtioSndPcmCmd::Prepare),
119             VIRTIO_SND_R_PCM_START => Ok(VirtioSndPcmCmd::Start),
120             VIRTIO_SND_R_PCM_STOP => Ok(VirtioSndPcmCmd::Stop),
121             VIRTIO_SND_R_PCM_RELEASE => Ok(VirtioSndPcmCmd::Release),
122             VIRTIO_SND_R_PCM_SET_PARAMS => Err(VirtioSndPcmCmdError::SetParams),
123             _ => Err(VirtioSndPcmCmdError::InvalidCode),
124         }
125     }
126 }
127 
128 impl VirtioSndPcmCmd {
with_set_params_and_direction( set_params: virtio_snd_pcm_set_params, dir: u8, ) -> VirtioSndPcmCmd129     fn with_set_params_and_direction(
130         set_params: virtio_snd_pcm_set_params,
131         dir: u8,
132     ) -> VirtioSndPcmCmd {
133         let buffer_bytes: u32 = set_params.buffer_bytes.into();
134         let period_bytes: u32 = set_params.period_bytes.into();
135         VirtioSndPcmCmd::SetParams {
136             set_params: SetParams {
137                 channels: set_params.channels,
138                 format: from_virtio_sample_format(set_params.format).unwrap(),
139                 frame_rate: from_virtio_frame_rate(set_params.rate).unwrap(),
140                 buffer_bytes: buffer_bytes as usize,
141                 period_bytes: period_bytes as usize,
142                 dir,
143             },
144         }
145     }
146 }
147 
148 // Returns true if the operation is successful. Returns error if there is
149 // a runtime/internal error
process_pcm_ctrl( ex: &Executor, tx_send: &mpsc::UnboundedSender<PcmResponse>, rx_send: &mpsc::UnboundedSender<PcmResponse>, streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>, cmd: VirtioSndPcmCmd, writer: &mut Writer, stream_id: usize, card_index: usize, ) -> Result<(), Error>150 async fn process_pcm_ctrl(
151     ex: &Executor,
152     tx_send: &mpsc::UnboundedSender<PcmResponse>,
153     rx_send: &mpsc::UnboundedSender<PcmResponse>,
154     streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>,
155     cmd: VirtioSndPcmCmd,
156     writer: &mut Writer,
157     stream_id: usize,
158     card_index: usize,
159 ) -> Result<(), Error> {
160     let streams = streams.read_lock().await;
161     let mut stream = match streams.get(stream_id) {
162         Some(stream_info) => stream_info.lock().await,
163         None => {
164             error!(
165                 "[Card {}] Stream id={} not found for {}. Error code: VIRTIO_SND_S_BAD_MSG",
166                 card_index, stream_id, cmd
167             );
168             return writer
169                 .write_obj(VIRTIO_SND_S_BAD_MSG)
170                 .map_err(Error::WriteResponse);
171         }
172     };
173 
174     debug!("[Card {}] {} for stream id={}", card_index, cmd, stream_id);
175 
176     let result = match cmd {
177         VirtioSndPcmCmd::SetParams { set_params } => {
178             let result = stream.set_params(set_params).await;
179             if result.is_ok() {
180                 debug!(
181                     "[Card {}] VIRTIO_SND_R_PCM_SET_PARAMS for stream id={}. Stream info: {:#?}",
182                     card_index, stream_id, *stream
183                 );
184             }
185             result
186         }
187         VirtioSndPcmCmd::Prepare => stream.prepare(ex, tx_send, rx_send).await,
188         VirtioSndPcmCmd::Start => stream.start().await,
189         VirtioSndPcmCmd::Stop => stream.stop().await,
190         VirtioSndPcmCmd::Release => stream.release().await,
191     };
192     match result {
193         Ok(_) => writer
194             .write_obj(VIRTIO_SND_S_OK)
195             .map_err(Error::WriteResponse),
196         Err(Error::OperationNotSupported) => {
197             error!(
198                 "[Card {}] {} for stream id={} failed. Error code: VIRTIO_SND_S_NOT_SUPP.",
199                 card_index, cmd, stream_id
200             );
201 
202             writer
203                 .write_obj(VIRTIO_SND_S_NOT_SUPP)
204                 .map_err(Error::WriteResponse)
205         }
206         Err(e) => {
207             // Runtime/internal error would be more appropriate, but there's
208             // no such error type
209             error!(
210                 "[Card {}] {} for stream id={} failed. Error code: VIRTIO_SND_S_IO_ERR. Actual error: {}",
211                 card_index, cmd, stream_id, e
212             );
213             writer
214                 .write_obj(VIRTIO_SND_S_IO_ERR)
215                 .map_err(Error::WriteResponse)
216         }
217     }
218 }
219 
write_data( mut dst_buf: AsyncPlaybackBuffer<'_>, reader: Option<&mut Reader>, buffer_writer: &mut Box<dyn PlaybackBufferWriter>, ) -> Result<u32, Error>220 async fn write_data(
221     mut dst_buf: AsyncPlaybackBuffer<'_>,
222     reader: Option<&mut Reader>,
223     buffer_writer: &mut Box<dyn PlaybackBufferWriter>,
224 ) -> Result<u32, Error> {
225     let transferred = match reader {
226         Some(reader) => buffer_writer.copy_to_buffer(&mut dst_buf, reader)?,
227         None => dst_buf
228             .copy_from(&mut io::repeat(0).take(buffer_writer.endpoint_period_bytes() as u64))
229             .map_err(Error::Io)?,
230     };
231 
232     if transferred != buffer_writer.endpoint_period_bytes() {
233         error!(
234             "Bytes written {} != period_bytes {}",
235             transferred,
236             buffer_writer.endpoint_period_bytes()
237         );
238         Err(Error::InvalidBufferSize)
239     } else {
240         dst_buf.commit().await;
241         Ok(dst_buf.latency_bytes())
242     }
243 }
244 
read_data( mut src_buf: AsyncCaptureBuffer<'_>, writer: Option<&mut Writer>, period_bytes: usize, ) -> Result<u32, Error>245 async fn read_data(
246     mut src_buf: AsyncCaptureBuffer<'_>,
247     writer: Option<&mut Writer>,
248     period_bytes: usize,
249 ) -> Result<u32, Error> {
250     let transferred = match writer {
251         Some(writer) => src_buf.copy_to(writer),
252         None => src_buf.copy_to(&mut io::sink()),
253     }
254     .map_err(Error::Io)?;
255     if transferred != period_bytes {
256         error!(
257             "Bytes written {} != period_bytes {}",
258             transferred, period_bytes
259         );
260         Err(Error::InvalidBufferSize)
261     } else {
262         src_buf.commit().await;
263         Ok(src_buf.latency_bytes())
264     }
265 }
266 
267 impl From<Result<u32, Error>> for virtio_snd_pcm_status {
from(res: Result<u32, Error>) -> Self268     fn from(res: Result<u32, Error>) -> Self {
269         match res {
270             Ok(latency_bytes) => virtio_snd_pcm_status::new(StatusCode::OK, latency_bytes),
271             Err(e) => {
272                 error!("PCM I/O message failed: {}", e);
273                 virtio_snd_pcm_status::new(StatusCode::IoErr, 0)
274             }
275         }
276     }
277 }
278 
279 // Drain all DescriptorChain in desc_receiver during WorkerStatus::Quit process.
drain_desc_receiver( desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>, sender: &mut mpsc::UnboundedSender<PcmResponse>, ) -> Result<(), Error>280 async fn drain_desc_receiver(
281     desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>,
282     sender: &mut mpsc::UnboundedSender<PcmResponse>,
283 ) -> Result<(), Error> {
284     let mut o_desc_chain = desc_receiver.next().await;
285     while let Some(desc_chain) = o_desc_chain {
286         // From the virtio-snd spec:
287         // The device MUST complete all pending I/O messages for the specified stream ID.
288         let status = virtio_snd_pcm_status::new(StatusCode::OK, 0);
289         // Fetch next DescriptorChain to see if the current one is the last one.
290         o_desc_chain = desc_receiver.next().await;
291         let (done, future) = if o_desc_chain.is_none() {
292             let (done, future) = oneshot::channel();
293             (Some(done), Some(future))
294         } else {
295             (None, None)
296         };
297         sender
298             .send(PcmResponse {
299                 desc_chain,
300                 status,
301                 done,
302             })
303             .await
304             .map_err(Error::MpscSend)?;
305 
306         if let Some(f) = future {
307             // From the virtio-snd spec:
308             // The device MUST NOT complete the control request (VIRTIO_SND_R_PCM_RELEASE)
309             // while there are pending I/O messages for the specified stream ID.
310             f.await.map_err(Error::DoneNotTriggered)?;
311         };
312     }
313     Ok(())
314 }
315 
316 /// Start a pcm worker that receives descriptors containing PCM frames (audio data) from the tx/rx
317 /// queue, and forward them to CRAS. One pcm worker per stream.
318 ///
319 /// This worker is started when VIRTIO_SND_R_PCM_PREPARE is called, and returned before
320 /// VIRTIO_SND_R_PCM_RELEASE is completed for the stream.
start_pcm_worker( ex: Executor, dstream: DirectionalStream, mut desc_receiver: mpsc::UnboundedReceiver<DescriptorChain>, status_mutex: Rc<AsyncRwLock<WorkerStatus>>, mut sender: mpsc::UnboundedSender<PcmResponse>, period_dur: Duration, card_index: usize, muted: Rc<AtomicBool>, release_signal: Rc<(AsyncRwLock<bool>, Condvar)>, ) -> Result<(), Error>321 pub async fn start_pcm_worker(
322     ex: Executor,
323     dstream: DirectionalStream,
324     mut desc_receiver: mpsc::UnboundedReceiver<DescriptorChain>,
325     status_mutex: Rc<AsyncRwLock<WorkerStatus>>,
326     mut sender: mpsc::UnboundedSender<PcmResponse>,
327     period_dur: Duration,
328     card_index: usize,
329     muted: Rc<AtomicBool>,
330     release_signal: Rc<(AsyncRwLock<bool>, Condvar)>,
331 ) -> Result<(), Error> {
332     let res = pcm_worker_loop(
333         ex,
334         dstream,
335         &mut desc_receiver,
336         &status_mutex,
337         &mut sender,
338         period_dur,
339         card_index,
340         muted,
341         release_signal,
342     )
343     .await;
344     *status_mutex.lock().await = WorkerStatus::Quit;
345     if res.is_err() {
346         error!(
347             "[Card {}] pcm_worker error: {:#?}. Draining desc_receiver",
348             card_index,
349             res.as_ref().err()
350         );
351         // On error, guaranteed that desc_receiver has not been drained, so drain it here.
352         // Note that drain blocks until the stream is release.
353         drain_desc_receiver(&mut desc_receiver, &mut sender).await?;
354     }
355     res
356 }
357 
pcm_worker_loop( ex: Executor, dstream: DirectionalStream, desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>, status_mutex: &Rc<AsyncRwLock<WorkerStatus>>, sender: &mut mpsc::UnboundedSender<PcmResponse>, period_dur: Duration, card_index: usize, muted: Rc<AtomicBool>, release_signal: Rc<(AsyncRwLock<bool>, Condvar)>, ) -> Result<(), Error>358 async fn pcm_worker_loop(
359     ex: Executor,
360     dstream: DirectionalStream,
361     desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>,
362     status_mutex: &Rc<AsyncRwLock<WorkerStatus>>,
363     sender: &mut mpsc::UnboundedSender<PcmResponse>,
364     period_dur: Duration,
365     card_index: usize,
366     muted: Rc<AtomicBool>,
367     release_signal: Rc<(AsyncRwLock<bool>, Condvar)>,
368 ) -> Result<(), Error> {
369     let on_release = async {
370         await_reset_signal(Some(&*release_signal)).await;
371         // After receiving release signal, wait for up to 2 periods,
372         // giving it a chance to respond to the last buffer.
373         if let Err(e) = TimerAsync::sleep(&ex, period_dur * 2).await {
374             error!(
375                 "[Card {}] Error on sleep after receiving reset signal: {}",
376                 card_index, e
377             )
378         }
379     }
380     .fuse();
381     pin_mut!(on_release);
382 
383     match dstream {
384         DirectionalStream::Output(mut sys_direction_output) => loop {
385             #[cfg(windows)]
386             let (mut stream, mut buffer_writer_lock) = (
387                 sys_direction_output
388                     .async_playback_buffer_stream
389                     .lock()
390                     .await,
391                 sys_direction_output.buffer_writer.lock().await,
392             );
393             #[cfg(windows)]
394             let buffer_writer = &mut buffer_writer_lock;
395             #[cfg(any(target_os = "android", target_os = "linux"))]
396             let (stream, buffer_writer) = (
397                 &mut sys_direction_output.async_playback_buffer_stream,
398                 &mut sys_direction_output.buffer_writer,
399             );
400 
401             let next_buf = stream.next_playback_buffer(&ex).fuse();
402             pin_mut!(next_buf);
403 
404             let dst_buf = select! {
405                 _ = on_release => {
406                     drain_desc_receiver(desc_receiver, sender).await?;
407                     break Ok(());
408                 },
409                 buf = next_buf => buf.map_err(Error::FetchBuffer)?,
410             };
411             let worker_status = status_mutex.lock().await;
412             match *worker_status {
413                 WorkerStatus::Quit => {
414                     drain_desc_receiver(desc_receiver, sender).await?;
415                     if let Err(e) = write_data(dst_buf, None, buffer_writer).await {
416                         error!(
417                             "[Card {}] Error on write_data after worker quit: {}",
418                             card_index, e
419                         )
420                     }
421                     break Ok(());
422                 }
423                 WorkerStatus::Pause => {
424                     write_data(dst_buf, None, buffer_writer).await?;
425                 }
426                 WorkerStatus::Running => match desc_receiver.try_next() {
427                     Err(e) => {
428                         error!(
429                             "[Card {}] Underrun. No new DescriptorChain while running: {}",
430                             card_index, e
431                         );
432                         write_data(dst_buf, None, buffer_writer).await?;
433                     }
434                     Ok(None) => {
435                         error!("[Card {}] Unreachable. status should be Quit when the channel is closed", card_index);
436                         write_data(dst_buf, None, buffer_writer).await?;
437                         return Err(Error::InvalidPCMWorkerState);
438                     }
439                     Ok(Some(mut desc_chain)) => {
440                         let reader = if muted.load(Ordering::Relaxed) {
441                             None
442                         } else {
443                             // stream_id was already read in handle_pcm_queue
444                             Some(&mut desc_chain.reader)
445                         };
446                         let status = write_data(dst_buf, reader, buffer_writer).await.into();
447                         sender
448                             .send(PcmResponse {
449                                 desc_chain,
450                                 status,
451                                 done: None,
452                             })
453                             .await
454                             .map_err(Error::MpscSend)?;
455                     }
456                 },
457             }
458         },
459         DirectionalStream::Input(period_bytes, mut buffer_reader) => loop {
460             let next_buf = buffer_reader.get_next_capture_period(&ex).fuse();
461             pin_mut!(next_buf);
462 
463             let src_buf = select! {
464                 _ = on_release => {
465                     drain_desc_receiver(desc_receiver, sender).await?;
466                     break Ok(());
467                 },
468                 buf = next_buf => buf.map_err(Error::FetchBuffer)?,
469             };
470 
471             let worker_status = status_mutex.lock().await;
472             match *worker_status {
473                 WorkerStatus::Quit => {
474                     drain_desc_receiver(desc_receiver, sender).await?;
475                     if let Err(e) = read_data(src_buf, None, period_bytes).await {
476                         error!(
477                             "[Card {}] Error on read_data after worker quit: {}",
478                             card_index, e
479                         )
480                     }
481                     break Ok(());
482                 }
483                 WorkerStatus::Pause => {
484                     read_data(src_buf, None, period_bytes).await?;
485                 }
486                 WorkerStatus::Running => match desc_receiver.try_next() {
487                     Err(e) => {
488                         error!(
489                             "[Card {}] Overrun. No new DescriptorChain while running: {}",
490                             card_index, e
491                         );
492                         read_data(src_buf, None, period_bytes).await?;
493                     }
494                     Ok(None) => {
495                         error!("[Card {}] Unreachable. status should be Quit when the channel is closed", card_index);
496                         read_data(src_buf, None, period_bytes).await?;
497                         return Err(Error::InvalidPCMWorkerState);
498                     }
499                     Ok(Some(mut desc_chain)) => {
500                         let writer = if muted.load(Ordering::Relaxed) {
501                             None
502                         } else {
503                             Some(&mut desc_chain.writer)
504                         };
505                         let status = read_data(src_buf, writer, period_bytes).await.into();
506                         sender
507                             .send(PcmResponse {
508                                 desc_chain,
509                                 status,
510                                 done: None,
511                             })
512                             .await
513                             .map_err(Error::MpscSend)?;
514                     }
515                 },
516             }
517         },
518     }
519 }
520 
521 // Defer pcm message response to the pcm response worker
defer_pcm_response_to_worker( desc_chain: DescriptorChain, status: virtio_snd_pcm_status, response_sender: &mut mpsc::UnboundedSender<PcmResponse>, ) -> Result<(), Error>522 async fn defer_pcm_response_to_worker(
523     desc_chain: DescriptorChain,
524     status: virtio_snd_pcm_status,
525     response_sender: &mut mpsc::UnboundedSender<PcmResponse>,
526 ) -> Result<(), Error> {
527     response_sender
528         .send(PcmResponse {
529             desc_chain,
530             status,
531             done: None,
532         })
533         .await
534         .map_err(Error::MpscSend)
535 }
536 
send_pcm_response( mut desc_chain: DescriptorChain, queue: &mut Queue, status: virtio_snd_pcm_status, ) -> Result<(), Error>537 fn send_pcm_response(
538     mut desc_chain: DescriptorChain,
539     queue: &mut Queue,
540     status: virtio_snd_pcm_status,
541 ) -> Result<(), Error> {
542     let writer = &mut desc_chain.writer;
543 
544     // For rx queue only. Fast forward the unused audio data buffer.
545     if writer.available_bytes() > std::mem::size_of::<virtio_snd_pcm_status>() {
546         writer
547             .consume_bytes(writer.available_bytes() - std::mem::size_of::<virtio_snd_pcm_status>());
548     }
549     writer.write_obj(status).map_err(Error::WriteResponse)?;
550     let len = writer.bytes_written() as u32;
551     queue.add_used(desc_chain, len);
552     queue.trigger_interrupt();
553     Ok(())
554 }
555 
556 // Await until reset_signal has been released
await_reset_signal(reset_signal_option: Option<&(AsyncRwLock<bool>, Condvar)>)557 async fn await_reset_signal(reset_signal_option: Option<&(AsyncRwLock<bool>, Condvar)>) {
558     match reset_signal_option {
559         Some((lock, cvar)) => {
560             let mut reset = lock.lock().await;
561             while !*reset {
562                 reset = cvar.wait(reset).await;
563             }
564         }
565         None => futures::future::pending().await,
566     };
567 }
568 
send_pcm_response_worker( queue: Rc<AsyncRwLock<Queue>>, recv: &mut mpsc::UnboundedReceiver<PcmResponse>, reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>, ) -> Result<(), Error>569 pub async fn send_pcm_response_worker(
570     queue: Rc<AsyncRwLock<Queue>>,
571     recv: &mut mpsc::UnboundedReceiver<PcmResponse>,
572     reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>,
573 ) -> Result<(), Error> {
574     let on_reset = await_reset_signal(reset_signal).fuse();
575     pin_mut!(on_reset);
576 
577     loop {
578         let next_async = recv.next().fuse();
579         pin_mut!(next_async);
580 
581         let res = select! {
582             _ = on_reset => break,
583             res = next_async => res,
584         };
585 
586         if let Some(r) = res {
587             send_pcm_response(r.desc_chain, &mut *queue.lock().await, r.status)?;
588 
589             // Resume pcm_worker
590             if let Some(done) = r.done {
591                 done.send(()).map_err(Error::OneshotSend)?;
592             }
593         } else {
594             debug!("PcmResponse channel is closed.");
595             break;
596         }
597     }
598     Ok(())
599 }
600 
601 /// Handle messages from the control tube. This one is not related to virtio spec.
handle_ctrl_tube( streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>, control_tube: &AsyncTube, reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>, ) -> Result<(), Error>602 pub async fn handle_ctrl_tube(
603     streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>,
604     control_tube: &AsyncTube,
605     reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>,
606 ) -> Result<(), Error> {
607     let on_reset = await_reset_signal(reset_signal).fuse();
608     pin_mut!(on_reset);
609 
610     loop {
611         let next_async = control_tube.next().fuse();
612         pin_mut!(next_async);
613 
614         let cmd = select! {
615             _ = on_reset => break,
616             res = next_async => res,
617         };
618 
619         match cmd {
620             Ok(cmd) => {
621                 let resp = match cmd {
622                     SndControlCommand::MuteAll(muted) => {
623                         let streams = streams.read_lock().await;
624                         for stream in streams.iter() {
625                             let stream_info = stream.lock().await;
626                             stream_info.muted.store(muted, Ordering::Relaxed);
627                             info!("Stream muted = {:?}", muted);
628                         }
629                         VmResponse::Ok
630                     }
631                 };
632                 control_tube
633                     .send(resp)
634                     .await
635                     .map_err(Error::ControlTubeError)?;
636             }
637             Err(e) => {
638                 error!("Failed to read the command: {}", e);
639                 return Err(Error::ControlTubeError(e));
640             }
641         }
642     }
643 
644     Ok(())
645 }
646 
647 /// Handle messages from the tx or the rx queue. One invocation is needed for
648 /// each queue.
handle_pcm_queue( streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>, mut response_sender: mpsc::UnboundedSender<PcmResponse>, queue: Rc<AsyncRwLock<Queue>>, queue_event: &EventAsync, card_index: usize, reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>, ) -> Result<(), Error>649 pub async fn handle_pcm_queue(
650     streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>,
651     mut response_sender: mpsc::UnboundedSender<PcmResponse>,
652     queue: Rc<AsyncRwLock<Queue>>,
653     queue_event: &EventAsync,
654     card_index: usize,
655     reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>,
656 ) -> Result<(), Error> {
657     let on_reset = await_reset_signal(reset_signal).fuse();
658     pin_mut!(on_reset);
659 
660     loop {
661         // Manual queue.next_async() to avoid holding the mutex
662         let next_async = async {
663             loop {
664                 // Check if there are more descriptors available.
665                 if let Some(chain) = queue.lock().await.pop() {
666                     return Ok(chain);
667                 }
668                 queue_event.next_val().await?;
669             }
670         }
671         .fuse();
672         pin_mut!(next_async);
673 
674         let mut desc_chain = select! {
675             _ = on_reset => break,
676             res = next_async => res.map_err(Error::Async)?,
677         };
678 
679         let pcm_xfer: virtio_snd_pcm_xfer =
680             desc_chain.reader.read_obj().map_err(Error::ReadMessage)?;
681         let stream_id: usize = u32::from(pcm_xfer.stream_id) as usize;
682 
683         let streams = streams.read_lock().await;
684         let stream_info = match streams.get(stream_id) {
685             Some(stream_info) => stream_info.read_lock().await,
686             None => {
687                 error!(
688                     "[Card {}] stream_id ({}) >= num_streams ({})",
689                     card_index,
690                     stream_id,
691                     streams.len()
692                 );
693                 defer_pcm_response_to_worker(
694                     desc_chain,
695                     virtio_snd_pcm_status::new(StatusCode::IoErr, 0),
696                     &mut response_sender,
697                 )
698                 .await?;
699                 continue;
700             }
701         };
702 
703         match stream_info.sender.as_ref() {
704             Some(mut s) => {
705                 s.send(desc_chain).await.map_err(Error::MpscSend)?;
706                 if *stream_info.status_mutex.lock().await == WorkerStatus::Quit {
707                     // If sender channel is still intact but worker status is quit,
708                     // the worker quitted unexpectedly. Return error to request a reset.
709                     return Err(Error::PCMWorkerQuittedUnexpectedly);
710                 }
711             }
712             None => {
713                 if !stream_info.just_reset {
714                     error!(
715                         "[Card {}] stream {} is not ready. state: {}",
716                         card_index,
717                         stream_id,
718                         get_virtio_snd_r_pcm_cmd_name(stream_info.state)
719                     );
720                 }
721                 defer_pcm_response_to_worker(
722                     desc_chain,
723                     virtio_snd_pcm_status::new(StatusCode::IoErr, 0),
724                     &mut response_sender,
725                 )
726                 .await?;
727             }
728         };
729     }
730     Ok(())
731 }
732 
733 /// Handle all the control messages from the ctrl queue.
handle_ctrl_queue( ex: &Executor, streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>, snd_data: &SndData, queue: Rc<AsyncRwLock<Queue>>, queue_event: &mut EventAsync, tx_send: mpsc::UnboundedSender<PcmResponse>, rx_send: mpsc::UnboundedSender<PcmResponse>, card_index: usize, reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>, ) -> Result<(), Error>734 pub async fn handle_ctrl_queue(
735     ex: &Executor,
736     streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>,
737     snd_data: &SndData,
738     queue: Rc<AsyncRwLock<Queue>>,
739     queue_event: &mut EventAsync,
740     tx_send: mpsc::UnboundedSender<PcmResponse>,
741     rx_send: mpsc::UnboundedSender<PcmResponse>,
742     card_index: usize,
743     reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>,
744 ) -> Result<(), Error> {
745     let on_reset = await_reset_signal(reset_signal).fuse();
746     pin_mut!(on_reset);
747 
748     let mut queue = queue.lock().await;
749     loop {
750         let mut desc_chain = {
751             let next_async = queue.next_async(queue_event).fuse();
752             pin_mut!(next_async);
753 
754             select! {
755                 _ = on_reset => break,
756                 res = next_async => res.map_err(Error::Async)?,
757             }
758         };
759 
760         let reader = &mut desc_chain.reader;
761         let writer = &mut desc_chain.writer;
762         // Don't advance the reader
763         let code = reader
764             .peek_obj::<virtio_snd_hdr>()
765             .map_err(Error::ReadMessage)?
766             .code
767             .into();
768 
769         let handle_ctrl_msg = async {
770             match code {
771                 VIRTIO_SND_R_JACK_INFO => {
772                     let query_info: virtio_snd_query_info =
773                         reader.read_obj().map_err(Error::ReadMessage)?;
774                     let start_id: usize = u32::from(query_info.start_id) as usize;
775                     let count: usize = u32::from(query_info.count) as usize;
776                     if start_id + count > snd_data.jack_info.len() {
777                         error!(
778                             "[Card {}] start_id({}) + count({}) must be smaller than \
779                             the number of jacks ({})",
780                             card_index,
781                             start_id,
782                             count,
783                             snd_data.jack_info.len()
784                         );
785                         return writer
786                             .write_obj(VIRTIO_SND_S_BAD_MSG)
787                             .map_err(Error::WriteResponse);
788                     }
789                     // The response consists of the virtio_snd_hdr structure (contains the request
790                     // status code), followed by the device-writable information structures of the
791                     // item. Each information structure begins with the following common header
792                     writer
793                         .write_obj(VIRTIO_SND_S_OK)
794                         .map_err(Error::WriteResponse)?;
795                     for i in start_id..(start_id + count) {
796                         writer
797                             .write_all(snd_data.jack_info[i].as_bytes())
798                             .map_err(Error::WriteResponse)?;
799                     }
800                     Ok(())
801                 }
802                 VIRTIO_SND_R_PCM_INFO => {
803                     let query_info: virtio_snd_query_info =
804                         reader.read_obj().map_err(Error::ReadMessage)?;
805                     let start_id: usize = u32::from(query_info.start_id) as usize;
806                     let count: usize = u32::from(query_info.count) as usize;
807                     if start_id + count > snd_data.pcm_info.len() {
808                         error!(
809                             "[Card {}] start_id({}) + count({}) must be smaller than \
810                             the number of streams ({})",
811                             card_index,
812                             start_id,
813                             count,
814                             snd_data.pcm_info.len()
815                         );
816                         return writer
817                             .write_obj(VIRTIO_SND_S_BAD_MSG)
818                             .map_err(Error::WriteResponse);
819                     }
820                     // The response consists of the virtio_snd_hdr structure (contains the request
821                     // status code), followed by the device-writable information structures of the
822                     // item. Each information structure begins with the following common header
823                     writer
824                         .write_obj(VIRTIO_SND_S_OK)
825                         .map_err(Error::WriteResponse)?;
826                     for i in start_id..(start_id + count) {
827                         writer
828                             .write_all(snd_data.pcm_info[i].as_bytes())
829                             .map_err(Error::WriteResponse)?;
830                     }
831                     Ok(())
832                 }
833                 VIRTIO_SND_R_CHMAP_INFO => {
834                     let query_info: virtio_snd_query_info =
835                         reader.read_obj().map_err(Error::ReadMessage)?;
836                     let start_id: usize = u32::from(query_info.start_id) as usize;
837                     let count: usize = u32::from(query_info.count) as usize;
838                     if start_id + count > snd_data.chmap_info.len() {
839                         error!(
840                             "[Card {}] start_id({}) + count({}) must be smaller than \
841                             the number of chmaps ({})",
842                             card_index,
843                             start_id,
844                             count,
845                             snd_data.chmap_info.len()
846                         );
847                         return writer
848                             .write_obj(VIRTIO_SND_S_BAD_MSG)
849                             .map_err(Error::WriteResponse);
850                     }
851                     // The response consists of the virtio_snd_hdr structure (contains the request
852                     // status code), followed by the device-writable information structures of the
853                     // item. Each information structure begins with the following common header
854                     writer
855                         .write_obj(VIRTIO_SND_S_OK)
856                         .map_err(Error::WriteResponse)?;
857                     for i in start_id..(start_id + count) {
858                         writer
859                             .write_all(snd_data.chmap_info[i].as_bytes())
860                             .map_err(Error::WriteResponse)?;
861                     }
862                     Ok(())
863                 }
864                 VIRTIO_SND_R_JACK_REMAP => {
865                     unreachable!("remap is unsupported");
866                 }
867                 VIRTIO_SND_R_PCM_SET_PARAMS => {
868                     // Raise VIRTIO_SND_S_BAD_MSG or IO error?
869                     let set_params: virtio_snd_pcm_set_params =
870                         reader.read_obj().map_err(Error::ReadMessage)?;
871                     let stream_id: usize = u32::from(set_params.hdr.stream_id) as usize;
872                     let buffer_bytes: u32 = set_params.buffer_bytes.into();
873                     let period_bytes: u32 = set_params.period_bytes.into();
874 
875                     let dir = match snd_data.pcm_info.get(stream_id) {
876                         Some(pcm_info) => {
877                             if set_params.channels < pcm_info.channels_min
878                                 || set_params.channels > pcm_info.channels_max
879                             {
880                                 error!(
881                                     "[Card {}] Number of channels ({}) must be between {} and {}",
882                                     card_index,
883                                     set_params.channels,
884                                     pcm_info.channels_min,
885                                     pcm_info.channels_max
886                                 );
887                                 return writer
888                                     .write_obj(VIRTIO_SND_S_NOT_SUPP)
889                                     .map_err(Error::WriteResponse);
890                             }
891                             if (u64::from(pcm_info.formats) & (1 << set_params.format)) == 0 {
892                                 error!(
893                                     "[Card {}] PCM format {} is not supported.",
894                                     card_index, set_params.format
895                                 );
896                                 return writer
897                                     .write_obj(VIRTIO_SND_S_NOT_SUPP)
898                                     .map_err(Error::WriteResponse);
899                             }
900                             if (u64::from(pcm_info.rates) & (1 << set_params.rate)) == 0 {
901                                 error!(
902                                     "[Card {}] PCM frame rate {} is not supported.",
903                                     card_index, set_params.rate
904                                 );
905                                 return writer
906                                     .write_obj(VIRTIO_SND_S_NOT_SUPP)
907                                     .map_err(Error::WriteResponse);
908                             }
909 
910                             pcm_info.direction
911                         }
912                         None => {
913                             error!(
914                                 "[Card {}] stream_id {} < streams {}",
915                                 card_index,
916                                 stream_id,
917                                 snd_data.pcm_info.len()
918                             );
919                             return writer
920                                 .write_obj(VIRTIO_SND_S_BAD_MSG)
921                                 .map_err(Error::WriteResponse);
922                         }
923                     };
924 
925                     if set_params.features != 0 {
926                         error!("[Card {}] No feature is supported", card_index);
927                         return writer
928                             .write_obj(VIRTIO_SND_S_NOT_SUPP)
929                             .map_err(Error::WriteResponse);
930                     }
931 
932                     if buffer_bytes % period_bytes != 0 {
933                         error!(
934                             "[Card {}] buffer_bytes({}) must be dividable by period_bytes({})",
935                             card_index, buffer_bytes, period_bytes
936                         );
937                         return writer
938                             .write_obj(VIRTIO_SND_S_BAD_MSG)
939                             .map_err(Error::WriteResponse);
940                     }
941 
942                     process_pcm_ctrl(
943                         ex,
944                         &tx_send,
945                         &rx_send,
946                         streams,
947                         VirtioSndPcmCmd::with_set_params_and_direction(set_params, dir),
948                         writer,
949                         stream_id,
950                         card_index,
951                     )
952                     .await
953                 }
954                 VIRTIO_SND_R_PCM_PREPARE
955                 | VIRTIO_SND_R_PCM_START
956                 | VIRTIO_SND_R_PCM_STOP
957                 | VIRTIO_SND_R_PCM_RELEASE => {
958                     let hdr: virtio_snd_pcm_hdr = reader.read_obj().map_err(Error::ReadMessage)?;
959                     let stream_id: usize = u32::from(hdr.stream_id) as usize;
960                     let cmd = match VirtioSndPcmCmd::try_from(code) {
961                         Ok(cmd) => cmd,
962                         Err(err) => {
963                             error!(
964                                 "[Card {}] Error converting code to command: {}",
965                                 card_index, err
966                             );
967                             return writer
968                                 .write_obj(VIRTIO_SND_S_BAD_MSG)
969                                 .map_err(Error::WriteResponse);
970                         }
971                     };
972                     process_pcm_ctrl(
973                         ex, &tx_send, &rx_send, streams, cmd, writer, stream_id, card_index,
974                     )
975                     .await
976                     .and(Ok(()))?;
977                     Ok(())
978                 }
979                 c => {
980                     error!("[Card {}] Unrecognized code: {}", card_index, c);
981                     writer
982                         .write_obj(VIRTIO_SND_S_BAD_MSG)
983                         .map_err(Error::WriteResponse)
984                 }
985             }
986         };
987 
988         handle_ctrl_msg.await?;
989         let len = writer.bytes_written() as u32;
990         queue.add_used(desc_chain, len);
991         queue.trigger_interrupt();
992     }
993     Ok(())
994 }
995 
996 /// Send events to the audio driver.
handle_event_queue( mut queue: Queue, mut queue_event: EventAsync, ) -> Result<(), Error>997 pub async fn handle_event_queue(
998     mut queue: Queue,
999     mut queue_event: EventAsync,
1000 ) -> Result<(), Error> {
1001     loop {
1002         let desc_chain = queue
1003             .next_async(&mut queue_event)
1004             .await
1005             .map_err(Error::Async)?;
1006 
1007         // TODO(woodychow): Poll and forward events from cras asynchronously (API to be added)
1008         queue.add_used(desc_chain, 0);
1009         queue.trigger_interrupt();
1010     }
1011 }
1012 
1013 #[cfg(test)]
1014 mod tests {
1015     use std::sync::Arc;
1016 
1017     use audio_streams::NoopStreamSourceGenerator;
1018     use base::Tube;
1019 
1020     use super::*;
1021     use crate::virtio::snd::common_backend::notify_reset_signal;
1022 
1023     #[test]
test_handle_ctrl_tube_reset_signal()1024     fn test_handle_ctrl_tube_reset_signal() {
1025         let ex = Executor::new().expect("Failed to create an executor");
1026         let result = ex.run_until(async {
1027             let streams: Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>> = Default::default();
1028             let (t0, _t1) = Tube::pair().expect("Failed to create tube pairs");
1029             let t0 = AsyncTube::new(&ex, t0).expect("Failed to create async tube");
1030             let reset_signal = (AsyncRwLock::new(false), Condvar::new());
1031 
1032             let handle_future = handle_ctrl_tube(&streams, &t0, Some(&reset_signal));
1033             let notify_future = notify_reset_signal(&reset_signal);
1034             let (result, _) = futures::join!(handle_future, notify_future);
1035 
1036             assert!(
1037                 result.is_ok(),
1038                 "handle_ctrl_tube returns an error after reset signal"
1039             );
1040         });
1041 
1042         assert!(result.is_ok(), "ex.run_until returns an error");
1043     }
1044 
new_stream() -> StreamInfo1045     fn new_stream() -> StreamInfo {
1046         let card_index = 0;
1047         StreamInfo::builder(
1048             Arc::new(Box::new(NoopStreamSourceGenerator::new())),
1049             card_index,
1050         )
1051         .build()
1052     }
1053 
1054     #[test]
test_handle_ctrl_tube_receive_mute_cmd()1055     fn test_handle_ctrl_tube_receive_mute_cmd() {
1056         let ex = Executor::new().expect("Failed to create an executor");
1057         let result = ex.run_until(async {
1058             let streams: Vec<AsyncRwLock<StreamInfo>> = vec![AsyncRwLock::new(new_stream())];
1059             let streams = Rc::new(AsyncRwLock::new(streams));
1060 
1061             let (t0, t1) = Tube::pair().expect("Failed to create tube pairs");
1062             let t0 = AsyncTube::new(&ex, t0).expect("Failed to create an async tube");
1063             let t1 = AsyncTube::new(&ex, t1).expect("Failed to create an async tube");
1064             let reset_signal = (AsyncRwLock::new(false), Condvar::new());
1065 
1066             let handle_future = handle_ctrl_tube(&streams, &t0, Some(&reset_signal));
1067             let tube_future = async {
1068                 let _ = t1.send(&SndControlCommand::MuteAll(true)).await;
1069                 let recv_result = t1.next::<VmResponse>().await;
1070                 notify_reset_signal(&reset_signal).await;
1071                 recv_result
1072             };
1073             let (handle_result, tube_result) = futures::join!(handle_future, tube_future);
1074 
1075             assert!(
1076                 handle_result.is_ok(),
1077                 "handle_ctrl_tube returns an error after reset signal"
1078             );
1079             assert!(tube_result.is_ok(), "Failed to receive data from the tube");
1080             assert!(
1081                 matches!(tube_result.unwrap(), VmResponse::Ok),
1082                 "tube_result is not Ok",
1083             );
1084 
1085             let streams = streams.read_lock().await;
1086             let stream = streams.first().unwrap().lock().await;
1087             assert!(stream.muted.load(Ordering::Relaxed), "Stream is not muted");
1088         });
1089 
1090         assert!(result.is_ok(), "ex.run_until returns an error");
1091     }
1092 }
1093