• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::fmt;
6 use std::io;
7 use std::io::Read;
8 use std::io::Write;
9 use std::rc::Rc;
10 use std::time::Duration;
11 
12 use async_trait::async_trait;
13 use audio_streams::capture::AsyncCaptureBuffer;
14 use audio_streams::AsyncPlaybackBuffer;
15 use audio_streams::BoxError;
16 use base::debug;
17 use base::error;
18 use cros_async::sync::Condvar;
19 use cros_async::sync::RwLock as AsyncRwLock;
20 use cros_async::EventAsync;
21 use cros_async::Executor;
22 use cros_async::TimerAsync;
23 use futures::channel::mpsc;
24 use futures::channel::oneshot;
25 use futures::pin_mut;
26 use futures::select;
27 use futures::FutureExt;
28 use futures::SinkExt;
29 use futures::StreamExt;
30 use thiserror::Error as ThisError;
31 use zerocopy::AsBytes;
32 
33 use super::Error;
34 use super::SndData;
35 use super::WorkerStatus;
36 use crate::virtio::snd::common::*;
37 use crate::virtio::snd::common_backend::stream_info::SetParams;
38 use crate::virtio::snd::common_backend::stream_info::StreamInfo;
39 use crate::virtio::snd::common_backend::DirectionalStream;
40 use crate::virtio::snd::common_backend::PcmResponse;
41 use crate::virtio::snd::constants::*;
42 use crate::virtio::snd::layout::*;
43 use crate::virtio::DescriptorChain;
44 use crate::virtio::Interrupt;
45 use crate::virtio::Queue;
46 use crate::virtio::Reader;
47 use crate::virtio::Writer;
48 
49 /// Trait to wrap system specific helpers for reading from the start point capture buffer.
50 #[async_trait(?Send)]
51 pub trait CaptureBufferReader {
get_next_capture_period( &mut self, ex: &Executor, ) -> Result<AsyncCaptureBuffer, BoxError>52     async fn get_next_capture_period(
53         &mut self,
54         ex: &Executor,
55     ) -> Result<AsyncCaptureBuffer, BoxError>;
56 }
57 
58 /// Trait to wrap system specific helpers for writing to endpoint playback buffers.
59 #[async_trait(?Send)]
60 pub trait PlaybackBufferWriter {
new(guest_period_bytes: usize) -> Self where Self: Sized61     fn new(guest_period_bytes: usize) -> Self
62     where
63         Self: Sized;
64 
65     /// Returns the period of the endpoint device.
endpoint_period_bytes(&self) -> usize66     fn endpoint_period_bytes(&self) -> usize;
67 
68     /// Read audio samples from the tx virtqueue.
copy_to_buffer( &mut self, dst_buf: &mut AsyncPlaybackBuffer<'_>, reader: &mut Reader, ) -> Result<usize, Error>69     fn copy_to_buffer(
70         &mut self,
71         dst_buf: &mut AsyncPlaybackBuffer<'_>,
72         reader: &mut Reader,
73     ) -> Result<usize, Error> {
74         dst_buf.copy_from(reader).map_err(Error::Io)
75     }
76 }
77 
78 #[derive(Debug)]
79 enum VirtioSndPcmCmd {
80     SetParams { set_params: SetParams },
81     Prepare,
82     Start,
83     Stop,
84     Release,
85 }
86 
87 impl fmt::Display for VirtioSndPcmCmd {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result88     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89         let cmd_code = match self {
90             VirtioSndPcmCmd::SetParams { set_params: _ } => VIRTIO_SND_R_PCM_SET_PARAMS,
91             VirtioSndPcmCmd::Prepare => VIRTIO_SND_R_PCM_PREPARE,
92             VirtioSndPcmCmd::Start => VIRTIO_SND_R_PCM_START,
93             VirtioSndPcmCmd::Stop => VIRTIO_SND_R_PCM_STOP,
94             VirtioSndPcmCmd::Release => VIRTIO_SND_R_PCM_RELEASE,
95         };
96         f.write_str(get_virtio_snd_r_pcm_cmd_name(cmd_code))
97     }
98 }
99 
100 #[derive(ThisError, Debug)]
101 enum VirtioSndPcmCmdError {
102     #[error("SetParams requires additional parameters")]
103     SetParams,
104     #[error("Invalid virtio snd command code")]
105     InvalidCode,
106 }
107 
108 impl TryFrom<u32> for VirtioSndPcmCmd {
109     type Error = VirtioSndPcmCmdError;
110 
try_from(code: u32) -> Result<Self, Self::Error>111     fn try_from(code: u32) -> Result<Self, Self::Error> {
112         match code {
113             VIRTIO_SND_R_PCM_PREPARE => Ok(VirtioSndPcmCmd::Prepare),
114             VIRTIO_SND_R_PCM_START => Ok(VirtioSndPcmCmd::Start),
115             VIRTIO_SND_R_PCM_STOP => Ok(VirtioSndPcmCmd::Stop),
116             VIRTIO_SND_R_PCM_RELEASE => Ok(VirtioSndPcmCmd::Release),
117             VIRTIO_SND_R_PCM_SET_PARAMS => Err(VirtioSndPcmCmdError::SetParams),
118             _ => Err(VirtioSndPcmCmdError::InvalidCode),
119         }
120     }
121 }
122 
123 impl VirtioSndPcmCmd {
with_set_params_and_direction( set_params: virtio_snd_pcm_set_params, dir: u8, ) -> VirtioSndPcmCmd124     fn with_set_params_and_direction(
125         set_params: virtio_snd_pcm_set_params,
126         dir: u8,
127     ) -> VirtioSndPcmCmd {
128         let buffer_bytes: u32 = set_params.buffer_bytes.into();
129         let period_bytes: u32 = set_params.period_bytes.into();
130         VirtioSndPcmCmd::SetParams {
131             set_params: SetParams {
132                 channels: set_params.channels,
133                 format: from_virtio_sample_format(set_params.format).unwrap(),
134                 frame_rate: from_virtio_frame_rate(set_params.rate).unwrap(),
135                 buffer_bytes: buffer_bytes as usize,
136                 period_bytes: period_bytes as usize,
137                 dir,
138             },
139         }
140     }
141 }
142 
143 // Returns true if the operation is successful. Returns error if there is
144 // a runtime/internal error
process_pcm_ctrl( ex: &Executor, tx_send: &mpsc::UnboundedSender<PcmResponse>, rx_send: &mpsc::UnboundedSender<PcmResponse>, streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>, cmd: VirtioSndPcmCmd, writer: &mut Writer, stream_id: usize, ) -> Result<(), Error>145 async fn process_pcm_ctrl(
146     ex: &Executor,
147     tx_send: &mpsc::UnboundedSender<PcmResponse>,
148     rx_send: &mpsc::UnboundedSender<PcmResponse>,
149     streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>,
150     cmd: VirtioSndPcmCmd,
151     writer: &mut Writer,
152     stream_id: usize,
153 ) -> Result<(), Error> {
154     let streams = streams.read_lock().await;
155     let mut stream = match streams.get(stream_id) {
156         Some(stream_info) => stream_info.lock().await,
157         None => {
158             error!(
159                 "Stream id={} not found for {}. Error code: VIRTIO_SND_S_BAD_MSG",
160                 stream_id, cmd
161             );
162             return writer
163                 .write_obj(VIRTIO_SND_S_BAD_MSG)
164                 .map_err(Error::WriteResponse);
165         }
166     };
167 
168     debug!("{} for stream id={}", cmd, stream_id);
169 
170     let result = match cmd {
171         VirtioSndPcmCmd::SetParams { set_params } => {
172             let result = stream.set_params(set_params).await;
173             if result.is_ok() {
174                 debug!(
175                     "VIRTIO_SND_R_PCM_SET_PARAMS for stream id={}. Stream info: {:#?}",
176                     stream_id, *stream
177                 );
178             }
179             result
180         }
181         VirtioSndPcmCmd::Prepare => stream.prepare(ex, tx_send, rx_send).await,
182         VirtioSndPcmCmd::Start => stream.start().await,
183         VirtioSndPcmCmd::Stop => stream.stop().await,
184         VirtioSndPcmCmd::Release => stream.release().await,
185     };
186     match result {
187         Ok(_) => writer
188             .write_obj(VIRTIO_SND_S_OK)
189             .map_err(Error::WriteResponse),
190         Err(Error::OperationNotSupported) => {
191             error!(
192                 "{} for stream id={} failed. Error code: VIRTIO_SND_S_NOT_SUPP.",
193                 cmd, stream_id
194             );
195 
196             writer
197                 .write_obj(VIRTIO_SND_S_NOT_SUPP)
198                 .map_err(Error::WriteResponse)
199         }
200         Err(e) => {
201             // Runtime/internal error would be more appropriate, but there's
202             // no such error type
203             error!(
204                 "{} for stream id={} failed. Error code: VIRTIO_SND_S_IO_ERR. Actual error: {}",
205                 cmd, stream_id, e
206             );
207             writer
208                 .write_obj(VIRTIO_SND_S_IO_ERR)
209                 .map_err(Error::WriteResponse)
210         }
211     }
212 }
213 
write_data( mut dst_buf: AsyncPlaybackBuffer<'_>, reader: Option<&mut Reader>, buffer_writer: &mut Box<dyn PlaybackBufferWriter>, ) -> Result<u32, Error>214 async fn write_data(
215     mut dst_buf: AsyncPlaybackBuffer<'_>,
216     reader: Option<&mut Reader>,
217     buffer_writer: &mut Box<dyn PlaybackBufferWriter>,
218 ) -> Result<u32, Error> {
219     let transferred = match reader {
220         Some(reader) => buffer_writer.copy_to_buffer(&mut dst_buf, reader)?,
221         None => dst_buf
222             .copy_from(&mut io::repeat(0).take(buffer_writer.endpoint_period_bytes() as u64))
223             .map_err(Error::Io)?,
224     };
225 
226     if transferred != buffer_writer.endpoint_period_bytes() {
227         error!(
228             "Bytes written {} != period_bytes {}",
229             transferred,
230             buffer_writer.endpoint_period_bytes()
231         );
232         Err(Error::InvalidBufferSize)
233     } else {
234         dst_buf.commit().await;
235         Ok(dst_buf.latency_bytes())
236     }
237 }
238 
read_data<'a>( mut src_buf: AsyncCaptureBuffer<'a>, writer: Option<&mut Writer>, period_bytes: usize, ) -> Result<u32, Error>239 async fn read_data<'a>(
240     mut src_buf: AsyncCaptureBuffer<'a>,
241     writer: Option<&mut Writer>,
242     period_bytes: usize,
243 ) -> Result<u32, Error> {
244     let transferred = match writer {
245         Some(writer) => src_buf.copy_to(writer),
246         None => src_buf.copy_to(&mut io::sink()),
247     }
248     .map_err(Error::Io)?;
249     if transferred != period_bytes {
250         error!(
251             "Bytes written {} != period_bytes {}",
252             transferred, period_bytes
253         );
254         Err(Error::InvalidBufferSize)
255     } else {
256         src_buf.commit().await;
257         Ok(src_buf.latency_bytes())
258     }
259 }
260 
261 impl From<Result<u32, Error>> for virtio_snd_pcm_status {
from(res: Result<u32, Error>) -> Self262     fn from(res: Result<u32, Error>) -> Self {
263         match res {
264             Ok(latency_bytes) => virtio_snd_pcm_status::new(StatusCode::OK, latency_bytes),
265             Err(e) => {
266                 error!("PCM I/O message failed: {}", e);
267                 virtio_snd_pcm_status::new(StatusCode::IoErr, 0)
268             }
269         }
270     }
271 }
272 
273 // Drain all DescriptorChain in desc_receiver during WorkerStatus::Quit process.
drain_desc_receiver( desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>, sender: &mut mpsc::UnboundedSender<PcmResponse>, ) -> Result<(), Error>274 async fn drain_desc_receiver(
275     desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>,
276     sender: &mut mpsc::UnboundedSender<PcmResponse>,
277 ) -> Result<(), Error> {
278     let mut o_desc_chain = desc_receiver.next().await;
279     while let Some(desc_chain) = o_desc_chain {
280         // From the virtio-snd spec:
281         // The device MUST complete all pending I/O messages for the specified stream ID.
282         let status = virtio_snd_pcm_status::new(StatusCode::OK, 0);
283         // Fetch next DescriptorChain to see if the current one is the last one.
284         o_desc_chain = desc_receiver.next().await;
285         let (done, future) = if o_desc_chain.is_none() {
286             let (done, future) = oneshot::channel();
287             (Some(done), Some(future))
288         } else {
289             (None, None)
290         };
291         sender
292             .send(PcmResponse {
293                 desc_chain,
294                 status,
295                 done,
296             })
297             .await
298             .map_err(Error::MpscSend)?;
299 
300         if let Some(f) = future {
301             // From the virtio-snd spec:
302             // The device MUST NOT complete the control request (VIRTIO_SND_R_PCM_RELEASE)
303             // while there are pending I/O messages for the specified stream ID.
304             f.await.map_err(Error::DoneNotTriggered)?;
305         };
306     }
307     Ok(())
308 }
309 
310 /// Start a pcm worker that receives descriptors containing PCM frames (audio data) from the tx/rx
311 /// queue, and forward them to CRAS. One pcm worker per stream.
312 ///
313 /// This worker is started when VIRTIO_SND_R_PCM_PREPARE is called, and returned before
314 /// VIRTIO_SND_R_PCM_RELEASE is completed for the stream.
start_pcm_worker( ex: Executor, dstream: DirectionalStream, mut desc_receiver: mpsc::UnboundedReceiver<DescriptorChain>, status_mutex: Rc<AsyncRwLock<WorkerStatus>>, mut sender: mpsc::UnboundedSender<PcmResponse>, period_dur: Duration, release_signal: Rc<(AsyncRwLock<bool>, Condvar)>, ) -> Result<(), Error>315 pub async fn start_pcm_worker(
316     ex: Executor,
317     dstream: DirectionalStream,
318     mut desc_receiver: mpsc::UnboundedReceiver<DescriptorChain>,
319     status_mutex: Rc<AsyncRwLock<WorkerStatus>>,
320     mut sender: mpsc::UnboundedSender<PcmResponse>,
321     period_dur: Duration,
322     release_signal: Rc<(AsyncRwLock<bool>, Condvar)>,
323 ) -> Result<(), Error> {
324     let res = pcm_worker_loop(
325         ex,
326         dstream,
327         &mut desc_receiver,
328         &status_mutex,
329         &mut sender,
330         period_dur,
331         release_signal,
332     )
333     .await;
334     *status_mutex.lock().await = WorkerStatus::Quit;
335     if res.is_err() {
336         error!(
337             "pcm_worker error: {:#?}. Draining desc_receiver",
338             res.as_ref().err()
339         );
340         // On error, guaranteed that desc_receiver has not been drained, so drain it here.
341         // Note that drain blocks until the stream is release.
342         drain_desc_receiver(&mut desc_receiver, &mut sender).await?;
343     }
344     res
345 }
346 
pcm_worker_loop( ex: Executor, dstream: DirectionalStream, desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>, status_mutex: &Rc<AsyncRwLock<WorkerStatus>>, sender: &mut mpsc::UnboundedSender<PcmResponse>, period_dur: Duration, release_signal: Rc<(AsyncRwLock<bool>, Condvar)>, ) -> Result<(), Error>347 async fn pcm_worker_loop(
348     ex: Executor,
349     dstream: DirectionalStream,
350     desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>,
351     status_mutex: &Rc<AsyncRwLock<WorkerStatus>>,
352     sender: &mut mpsc::UnboundedSender<PcmResponse>,
353     period_dur: Duration,
354     release_signal: Rc<(AsyncRwLock<bool>, Condvar)>,
355 ) -> Result<(), Error> {
356     let on_release = async {
357         await_reset_signal(Some(&*release_signal)).await;
358         // After receiving release signal, wait for up to 2 periods,
359         // giving it a chance to respond to the last buffer.
360         if let Err(e) = TimerAsync::sleep(&ex, period_dur * 2).await {
361             error!("Error on sleep after receiving reset signal: {}", e)
362         }
363     }
364     .fuse();
365     pin_mut!(on_release);
366 
367     match dstream {
368         DirectionalStream::Output(mut sys_direction_output) => loop {
369             #[cfg(windows)]
370             let (mut stream, mut buffer_writer_lock) = (
371                 sys_direction_output
372                     .async_playback_buffer_stream
373                     .lock()
374                     .await,
375                 sys_direction_output.buffer_writer.lock().await,
376             );
377             #[cfg(windows)]
378             let buffer_writer = &mut buffer_writer_lock;
379             #[cfg(any(target_os = "android", target_os = "linux"))]
380             let (stream, buffer_writer) = (
381                 &mut sys_direction_output.async_playback_buffer_stream,
382                 &mut sys_direction_output.buffer_writer,
383             );
384 
385             let next_buf = stream.next_playback_buffer(&ex).fuse();
386             pin_mut!(next_buf);
387 
388             let dst_buf = select! {
389                 _ = on_release => {
390                     drain_desc_receiver(desc_receiver, sender).await?;
391                     break Ok(());
392                 },
393                 buf = next_buf => buf.map_err(Error::FetchBuffer)?,
394             };
395             let worker_status = status_mutex.lock().await;
396             match *worker_status {
397                 WorkerStatus::Quit => {
398                     drain_desc_receiver(desc_receiver, sender).await?;
399                     if let Err(e) = write_data(dst_buf, None, buffer_writer).await {
400                         error!("Error on write_data after worker quit: {}", e)
401                     }
402                     break Ok(());
403                 }
404                 WorkerStatus::Pause => {
405                     write_data(dst_buf, None, buffer_writer).await?;
406                 }
407                 WorkerStatus::Running => match desc_receiver.try_next() {
408                     Err(e) => {
409                         error!("Underrun. No new DescriptorChain while running: {}", e);
410                         write_data(dst_buf, None, buffer_writer).await?;
411                     }
412                     Ok(None) => {
413                         error!("Unreachable. status should be Quit when the channel is closed");
414                         write_data(dst_buf, None, buffer_writer).await?;
415                         return Err(Error::InvalidPCMWorkerState);
416                     }
417                     Ok(Some(mut desc_chain)) => {
418                         // stream_id was already read in handle_pcm_queue
419                         let status =
420                             write_data(dst_buf, Some(&mut desc_chain.reader), buffer_writer)
421                                 .await
422                                 .into();
423                         sender
424                             .send(PcmResponse {
425                                 desc_chain,
426                                 status,
427                                 done: None,
428                             })
429                             .await
430                             .map_err(Error::MpscSend)?;
431                     }
432                 },
433             }
434         },
435         DirectionalStream::Input(period_bytes, mut buffer_reader) => loop {
436             let next_buf = buffer_reader.get_next_capture_period(&ex).fuse();
437             pin_mut!(next_buf);
438 
439             let src_buf = select! {
440                 _ = on_release => {
441                     drain_desc_receiver(desc_receiver, sender).await?;
442                     break Ok(());
443                 },
444                 buf = next_buf => buf.map_err(Error::FetchBuffer)?,
445             };
446 
447             let worker_status = status_mutex.lock().await;
448             match *worker_status {
449                 WorkerStatus::Quit => {
450                     drain_desc_receiver(desc_receiver, sender).await?;
451                     if let Err(e) = read_data(src_buf, None, period_bytes).await {
452                         error!("Error on read_data after worker quit: {}", e)
453                     }
454                     break Ok(());
455                 }
456                 WorkerStatus::Pause => {
457                     read_data(src_buf, None, period_bytes).await?;
458                 }
459                 WorkerStatus::Running => match desc_receiver.try_next() {
460                     Err(e) => {
461                         error!("Overrun. No new DescriptorChain while running: {}", e);
462                         read_data(src_buf, None, period_bytes).await?;
463                     }
464                     Ok(None) => {
465                         error!("Unreachable. status should be Quit when the channel is closed");
466                         read_data(src_buf, None, period_bytes).await?;
467                         return Err(Error::InvalidPCMWorkerState);
468                     }
469                     Ok(Some(mut desc_chain)) => {
470                         let status = read_data(src_buf, Some(&mut desc_chain.writer), period_bytes)
471                             .await
472                             .into();
473                         sender
474                             .send(PcmResponse {
475                                 desc_chain,
476                                 status,
477                                 done: None,
478                             })
479                             .await
480                             .map_err(Error::MpscSend)?;
481                     }
482                 },
483             }
484         },
485     }
486 }
487 
488 // Defer pcm message response to the pcm response worker
defer_pcm_response_to_worker( desc_chain: DescriptorChain, status: virtio_snd_pcm_status, response_sender: &mut mpsc::UnboundedSender<PcmResponse>, ) -> Result<(), Error>489 async fn defer_pcm_response_to_worker(
490     desc_chain: DescriptorChain,
491     status: virtio_snd_pcm_status,
492     response_sender: &mut mpsc::UnboundedSender<PcmResponse>,
493 ) -> Result<(), Error> {
494     response_sender
495         .send(PcmResponse {
496             desc_chain,
497             status,
498             done: None,
499         })
500         .await
501         .map_err(Error::MpscSend)
502 }
503 
send_pcm_response( mut desc_chain: DescriptorChain, queue: &mut Queue, interrupt: &Interrupt, status: virtio_snd_pcm_status, ) -> Result<(), Error>504 fn send_pcm_response(
505     mut desc_chain: DescriptorChain,
506     queue: &mut Queue,
507     interrupt: &Interrupt,
508     status: virtio_snd_pcm_status,
509 ) -> Result<(), Error> {
510     let writer = &mut desc_chain.writer;
511 
512     // For rx queue only. Fast forward the unused audio data buffer.
513     if writer.available_bytes() > std::mem::size_of::<virtio_snd_pcm_status>() {
514         writer
515             .consume_bytes(writer.available_bytes() - std::mem::size_of::<virtio_snd_pcm_status>());
516     }
517     writer.write_obj(status).map_err(Error::WriteResponse)?;
518     let len = writer.bytes_written() as u32;
519     queue.add_used(desc_chain, len);
520     queue.trigger_interrupt(interrupt);
521     Ok(())
522 }
523 
524 // Await until reset_signal has been released
await_reset_signal(reset_signal_option: Option<&(AsyncRwLock<bool>, Condvar)>)525 async fn await_reset_signal(reset_signal_option: Option<&(AsyncRwLock<bool>, Condvar)>) {
526     match reset_signal_option {
527         Some((lock, cvar)) => {
528             let mut reset = lock.lock().await;
529             while !*reset {
530                 reset = cvar.wait(reset).await;
531             }
532         }
533         None => futures::future::pending().await,
534     };
535 }
536 
send_pcm_response_worker( queue: Rc<AsyncRwLock<Queue>>, interrupt: Interrupt, recv: &mut mpsc::UnboundedReceiver<PcmResponse>, reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>, ) -> Result<(), Error>537 pub async fn send_pcm_response_worker(
538     queue: Rc<AsyncRwLock<Queue>>,
539     interrupt: Interrupt,
540     recv: &mut mpsc::UnboundedReceiver<PcmResponse>,
541     reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>,
542 ) -> Result<(), Error> {
543     let on_reset = await_reset_signal(reset_signal).fuse();
544     pin_mut!(on_reset);
545 
546     loop {
547         let next_async = recv.next().fuse();
548         pin_mut!(next_async);
549 
550         let res = select! {
551             _ = on_reset => break,
552             res = next_async => res,
553         };
554 
555         if let Some(r) = res {
556             send_pcm_response(r.desc_chain, &mut *queue.lock().await, &interrupt, r.status)?;
557 
558             // Resume pcm_worker
559             if let Some(done) = r.done {
560                 done.send(()).map_err(Error::OneshotSend)?;
561             }
562         } else {
563             debug!("PcmResponse channel is closed.");
564             break;
565         }
566     }
567     Ok(())
568 }
569 
570 /// Handle messages from the tx or the rx queue. One invocation is needed for
571 /// each queue.
handle_pcm_queue( streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>, mut response_sender: mpsc::UnboundedSender<PcmResponse>, queue: Rc<AsyncRwLock<Queue>>, queue_event: &EventAsync, reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>, ) -> Result<(), Error>572 pub async fn handle_pcm_queue(
573     streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>,
574     mut response_sender: mpsc::UnboundedSender<PcmResponse>,
575     queue: Rc<AsyncRwLock<Queue>>,
576     queue_event: &EventAsync,
577     reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>,
578 ) -> Result<(), Error> {
579     let on_reset = await_reset_signal(reset_signal).fuse();
580     pin_mut!(on_reset);
581 
582     loop {
583         // Manual queue.next_async() to avoid holding the mutex
584         let next_async = async {
585             loop {
586                 // Check if there are more descriptors available.
587                 if let Some(chain) = queue.lock().await.pop() {
588                     return Ok(chain);
589                 }
590                 queue_event.next_val().await?;
591             }
592         }
593         .fuse();
594         pin_mut!(next_async);
595 
596         let mut desc_chain = select! {
597             _ = on_reset => break,
598             res = next_async => res.map_err(Error::Async)?,
599         };
600 
601         let pcm_xfer: virtio_snd_pcm_xfer =
602             desc_chain.reader.read_obj().map_err(Error::ReadMessage)?;
603         let stream_id: usize = u32::from(pcm_xfer.stream_id) as usize;
604 
605         let streams = streams.read_lock().await;
606         let stream_info = match streams.get(stream_id) {
607             Some(stream_info) => stream_info.read_lock().await,
608             None => {
609                 error!(
610                     "stream_id ({}) >= num_streams ({})",
611                     stream_id,
612                     streams.len()
613                 );
614                 defer_pcm_response_to_worker(
615                     desc_chain,
616                     virtio_snd_pcm_status::new(StatusCode::IoErr, 0),
617                     &mut response_sender,
618                 )
619                 .await?;
620                 continue;
621             }
622         };
623 
624         match stream_info.sender.as_ref() {
625             Some(mut s) => {
626                 s.send(desc_chain).await.map_err(Error::MpscSend)?;
627                 if *stream_info.status_mutex.lock().await == WorkerStatus::Quit {
628                     // If sender channel is still intact but worker status is quit,
629                     // the worker quitted unexpectedly. Return error to request a reset.
630                     return Err(Error::PCMWorkerQuittedUnexpectedly);
631                 }
632             }
633             None => {
634                 if !stream_info.just_reset {
635                     error!(
636                         "stream {} is not ready. state: {}",
637                         stream_id,
638                         get_virtio_snd_r_pcm_cmd_name(stream_info.state)
639                     );
640                 }
641                 defer_pcm_response_to_worker(
642                     desc_chain,
643                     virtio_snd_pcm_status::new(StatusCode::IoErr, 0),
644                     &mut response_sender,
645                 )
646                 .await?;
647             }
648         };
649     }
650     Ok(())
651 }
652 
653 /// Handle all the control messages from the ctrl queue.
handle_ctrl_queue( ex: &Executor, streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>, snd_data: &SndData, queue: Rc<AsyncRwLock<Queue>>, queue_event: &mut EventAsync, interrupt: Interrupt, tx_send: mpsc::UnboundedSender<PcmResponse>, rx_send: mpsc::UnboundedSender<PcmResponse>, reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>, ) -> Result<(), Error>654 pub async fn handle_ctrl_queue(
655     ex: &Executor,
656     streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>,
657     snd_data: &SndData,
658     queue: Rc<AsyncRwLock<Queue>>,
659     queue_event: &mut EventAsync,
660     interrupt: Interrupt,
661     tx_send: mpsc::UnboundedSender<PcmResponse>,
662     rx_send: mpsc::UnboundedSender<PcmResponse>,
663     reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>,
664 ) -> Result<(), Error> {
665     let on_reset = await_reset_signal(reset_signal).fuse();
666     pin_mut!(on_reset);
667 
668     let mut queue = queue.lock().await;
669     loop {
670         let mut desc_chain = {
671             let next_async = queue.next_async(queue_event).fuse();
672             pin_mut!(next_async);
673 
674             select! {
675                 _ = on_reset => break,
676                 res = next_async => res.map_err(Error::Async)?,
677             }
678         };
679 
680         let reader = &mut desc_chain.reader;
681         let writer = &mut desc_chain.writer;
682         // Don't advance the reader
683         let code = reader
684             .peek_obj::<virtio_snd_hdr>()
685             .map_err(Error::ReadMessage)?
686             .code
687             .into();
688 
689         let handle_ctrl_msg = async {
690             return match code {
691                 VIRTIO_SND_R_JACK_INFO => {
692                     let query_info: virtio_snd_query_info =
693                         reader.read_obj().map_err(Error::ReadMessage)?;
694                     let start_id: usize = u32::from(query_info.start_id) as usize;
695                     let count: usize = u32::from(query_info.count) as usize;
696                     if start_id + count > snd_data.jack_info.len() {
697                         error!(
698                             "start_id({}) + count({}) must be smaller than \
699                             the number of jacks ({})",
700                             start_id,
701                             count,
702                             snd_data.jack_info.len()
703                         );
704                         return writer
705                             .write_obj(VIRTIO_SND_S_BAD_MSG)
706                             .map_err(Error::WriteResponse);
707                     }
708                     // The response consists of the virtio_snd_hdr structure (contains the request
709                     // status code), followed by the device-writable information structures of the
710                     // item. Each information structure begins with the following common header
711                     writer
712                         .write_obj(VIRTIO_SND_S_OK)
713                         .map_err(Error::WriteResponse)?;
714                     for i in start_id..(start_id + count) {
715                         writer
716                             .write_all(snd_data.jack_info[i].as_bytes())
717                             .map_err(Error::WriteResponse)?;
718                     }
719                     Ok(())
720                 }
721                 VIRTIO_SND_R_PCM_INFO => {
722                     let query_info: virtio_snd_query_info =
723                         reader.read_obj().map_err(Error::ReadMessage)?;
724                     let start_id: usize = u32::from(query_info.start_id) as usize;
725                     let count: usize = u32::from(query_info.count) as usize;
726                     if start_id + count > snd_data.pcm_info.len() {
727                         error!(
728                             "start_id({}) + count({}) must be smaller than \
729                             the number of streams ({})",
730                             start_id,
731                             count,
732                             snd_data.pcm_info.len()
733                         );
734                         return writer
735                             .write_obj(VIRTIO_SND_S_BAD_MSG)
736                             .map_err(Error::WriteResponse);
737                     }
738                     // The response consists of the virtio_snd_hdr structure (contains the request
739                     // status code), followed by the device-writable information structures of the
740                     // item. Each information structure begins with the following common header
741                     writer
742                         .write_obj(VIRTIO_SND_S_OK)
743                         .map_err(Error::WriteResponse)?;
744                     for i in start_id..(start_id + count) {
745                         writer
746                             .write_all(snd_data.pcm_info[i].as_bytes())
747                             .map_err(Error::WriteResponse)?;
748                     }
749                     Ok(())
750                 }
751                 VIRTIO_SND_R_CHMAP_INFO => {
752                     let query_info: virtio_snd_query_info =
753                         reader.read_obj().map_err(Error::ReadMessage)?;
754                     let start_id: usize = u32::from(query_info.start_id) as usize;
755                     let count: usize = u32::from(query_info.count) as usize;
756                     if start_id + count > snd_data.chmap_info.len() {
757                         error!(
758                             "start_id({}) + count({}) must be smaller than \
759                             the number of chmaps ({})",
760                             start_id,
761                             count,
762                             snd_data.chmap_info.len()
763                         );
764                         return writer
765                             .write_obj(VIRTIO_SND_S_BAD_MSG)
766                             .map_err(Error::WriteResponse);
767                     }
768                     // The response consists of the virtio_snd_hdr structure (contains the request
769                     // status code), followed by the device-writable information structures of the
770                     // item. Each information structure begins with the following common header
771                     writer
772                         .write_obj(VIRTIO_SND_S_OK)
773                         .map_err(Error::WriteResponse)?;
774                     for i in start_id..(start_id + count) {
775                         writer
776                             .write_all(snd_data.chmap_info[i].as_bytes())
777                             .map_err(Error::WriteResponse)?;
778                     }
779                     Ok(())
780                 }
781                 VIRTIO_SND_R_JACK_REMAP => {
782                     unreachable!("remap is unsupported");
783                 }
784                 VIRTIO_SND_R_PCM_SET_PARAMS => {
785                     // Raise VIRTIO_SND_S_BAD_MSG or IO error?
786                     let set_params: virtio_snd_pcm_set_params =
787                         reader.read_obj().map_err(Error::ReadMessage)?;
788                     let stream_id: usize = u32::from(set_params.hdr.stream_id) as usize;
789                     let buffer_bytes: u32 = set_params.buffer_bytes.into();
790                     let period_bytes: u32 = set_params.period_bytes.into();
791 
792                     let dir = match snd_data.pcm_info.get(stream_id) {
793                         Some(pcm_info) => {
794                             if set_params.channels < pcm_info.channels_min
795                                 || set_params.channels > pcm_info.channels_max
796                             {
797                                 error!(
798                                     "Number of channels ({}) must be between {} and {}",
799                                     set_params.channels,
800                                     pcm_info.channels_min,
801                                     pcm_info.channels_max
802                                 );
803                                 return writer
804                                     .write_obj(VIRTIO_SND_S_NOT_SUPP)
805                                     .map_err(Error::WriteResponse);
806                             }
807                             if (u64::from(pcm_info.formats) & (1 << set_params.format)) == 0 {
808                                 error!("PCM format {} is not supported.", set_params.format);
809                                 return writer
810                                     .write_obj(VIRTIO_SND_S_NOT_SUPP)
811                                     .map_err(Error::WriteResponse);
812                             }
813                             if (u64::from(pcm_info.rates) & (1 << set_params.rate)) == 0 {
814                                 error!("PCM frame rate {} is not supported.", set_params.rate);
815                                 return writer
816                                     .write_obj(VIRTIO_SND_S_NOT_SUPP)
817                                     .map_err(Error::WriteResponse);
818                             }
819 
820                             pcm_info.direction
821                         }
822                         None => {
823                             error!(
824                                 "stream_id {} < streams {}",
825                                 stream_id,
826                                 snd_data.pcm_info.len()
827                             );
828                             return writer
829                                 .write_obj(VIRTIO_SND_S_BAD_MSG)
830                                 .map_err(Error::WriteResponse);
831                         }
832                     };
833 
834                     if set_params.features != 0 {
835                         error!("No feature is supported");
836                         return writer
837                             .write_obj(VIRTIO_SND_S_NOT_SUPP)
838                             .map_err(Error::WriteResponse);
839                     }
840 
841                     if buffer_bytes % period_bytes != 0 {
842                         error!(
843                             "buffer_bytes({}) must be dividable by period_bytes({})",
844                             buffer_bytes, period_bytes
845                         );
846                         return writer
847                             .write_obj(VIRTIO_SND_S_BAD_MSG)
848                             .map_err(Error::WriteResponse);
849                     }
850 
851                     process_pcm_ctrl(
852                         ex,
853                         &tx_send,
854                         &rx_send,
855                         streams,
856                         VirtioSndPcmCmd::with_set_params_and_direction(set_params, dir),
857                         writer,
858                         stream_id,
859                     )
860                     .await
861                 }
862                 VIRTIO_SND_R_PCM_PREPARE
863                 | VIRTIO_SND_R_PCM_START
864                 | VIRTIO_SND_R_PCM_STOP
865                 | VIRTIO_SND_R_PCM_RELEASE => {
866                     let hdr: virtio_snd_pcm_hdr = reader.read_obj().map_err(Error::ReadMessage)?;
867                     let stream_id: usize = u32::from(hdr.stream_id) as usize;
868                     let cmd = match VirtioSndPcmCmd::try_from(code) {
869                         Ok(cmd) => cmd,
870                         Err(err) => {
871                             error!("Error converting code to command: {}", err);
872                             return writer
873                                 .write_obj(VIRTIO_SND_S_BAD_MSG)
874                                 .map_err(Error::WriteResponse);
875                         }
876                     };
877                     process_pcm_ctrl(ex, &tx_send, &rx_send, streams, cmd, writer, stream_id)
878                         .await
879                         .and(Ok(()))?;
880                     Ok(())
881                 }
882                 c => {
883                     error!("Unrecognized code: {}", c);
884                     return writer
885                         .write_obj(VIRTIO_SND_S_BAD_MSG)
886                         .map_err(Error::WriteResponse);
887                 }
888             };
889         };
890 
891         handle_ctrl_msg.await?;
892         let len = writer.bytes_written() as u32;
893         queue.add_used(desc_chain, len);
894         queue.trigger_interrupt(&interrupt);
895     }
896     Ok(())
897 }
898 
899 /// Send events to the audio driver.
handle_event_queue( mut queue: Queue, mut queue_event: EventAsync, interrupt: Interrupt, ) -> Result<(), Error>900 pub async fn handle_event_queue(
901     mut queue: Queue,
902     mut queue_event: EventAsync,
903     interrupt: Interrupt,
904 ) -> Result<(), Error> {
905     loop {
906         let desc_chain = queue
907             .next_async(&mut queue_event)
908             .await
909             .map_err(Error::Async)?;
910 
911         // TODO(woodychow): Poll and forward events from cras asynchronously (API to be added)
912         queue.add_used(desc_chain, 0);
913         queue.trigger_interrupt(&interrupt);
914     }
915 }
916