• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use futures::{
6     channel::{mpsc, oneshot},
7     SinkExt, StreamExt,
8 };
9 use std::io::{self, Read, Write};
10 use std::rc::Rc;
11 
12 use audio_streams::{capture::AsyncCaptureBuffer, AsyncPlaybackBuffer};
13 use base::{debug, error};
14 use cros_async::{sync::Mutex as AsyncMutex, EventAsync, Executor};
15 use data_model::{DataInit, Le32};
16 use vm_memory::GuestMemory;
17 
18 use crate::virtio::cras_backend::{Parameters, PcmResponse};
19 use crate::virtio::snd::common::*;
20 use crate::virtio::snd::constants::*;
21 use crate::virtio::snd::layout::*;
22 use crate::virtio::{DescriptorChain, Queue, Reader, SignalableInterrupt, Writer};
23 
24 use super::{DirectionalStream, Error, SndData, StreamInfo, WorkerStatus};
25 
26 // Returns true if the operation is successful. Returns error if there is
27 // a runtime/internal error
process_pcm_ctrl( ex: &Executor, mem: &GuestMemory, tx_send: &mpsc::UnboundedSender<PcmResponse>, rx_send: &mpsc::UnboundedSender<PcmResponse>, streams: &Rc<AsyncMutex<Vec<AsyncMutex<StreamInfo<'_>>>>>, params: &Parameters, cmd_code: u32, writer: &mut Writer, stream_id: usize, ) -> Result<(), Error>28 async fn process_pcm_ctrl(
29     ex: &Executor,
30     mem: &GuestMemory,
31     tx_send: &mpsc::UnboundedSender<PcmResponse>,
32     rx_send: &mpsc::UnboundedSender<PcmResponse>,
33     streams: &Rc<AsyncMutex<Vec<AsyncMutex<StreamInfo<'_>>>>>,
34     params: &Parameters,
35     cmd_code: u32,
36     writer: &mut Writer,
37     stream_id: usize,
38 ) -> Result<(), Error> {
39     let streams = streams.read_lock().await;
40     let mut stream = match streams.get(stream_id) {
41         Some(stream_info) => stream_info.lock().await,
42         None => {
43             error!(
44                 "Stream id={} not found for {}. Error code: VIRTIO_SND_S_BAD_MSG",
45                 stream_id,
46                 get_virtio_snd_r_pcm_cmd_name(cmd_code)
47             );
48             return writer
49                 .write_obj(VIRTIO_SND_S_BAD_MSG)
50                 .map_err(Error::WriteResponse);
51         }
52     };
53 
54     debug!(
55         "{} for stream id={}",
56         get_virtio_snd_r_pcm_cmd_name(cmd_code),
57         stream_id
58     );
59 
60     let result = match cmd_code {
61         VIRTIO_SND_R_PCM_PREPARE => {
62             stream
63                 .prepare(ex, mem.clone(), tx_send, rx_send, params)
64                 .await
65         }
66         VIRTIO_SND_R_PCM_START => stream.start().await,
67         VIRTIO_SND_R_PCM_STOP => stream.stop().await,
68         VIRTIO_SND_R_PCM_RELEASE => stream.release().await,
69         _ => unreachable!(),
70     };
71     match result {
72         Ok(_) => {
73             return writer
74                 .write_obj(VIRTIO_SND_S_OK)
75                 .map_err(Error::WriteResponse);
76         }
77         Err(Error::OperationNotSupported) => {
78             error!(
79                 "{} for stream id={} failed. Error code: VIRTIO_SND_S_NOT_SUPP.",
80                 get_virtio_snd_r_pcm_cmd_name(cmd_code),
81                 stream_id
82             );
83 
84             return writer
85                 .write_obj(VIRTIO_SND_S_NOT_SUPP)
86                 .map_err(Error::WriteResponse);
87         }
88         Err(e) => {
89             // Runtime/internal error would be more appropriate, but there's
90             // no such error type
91             error!(
92                 "{} for stream id={} failed. Error code: VIRTIO_SND_S_IO_ERR. Actual error: {}",
93                 get_virtio_snd_r_pcm_cmd_name(cmd_code),
94                 stream_id,
95                 e
96             );
97             return writer
98                 .write_obj(VIRTIO_SND_S_IO_ERR)
99                 .map_err(Error::WriteResponse);
100         }
101     };
102 }
103 
write_data<'a>( mut dst_buf: AsyncPlaybackBuffer<'a>, reader: Option<Reader>, period_bytes: usize, ) -> Result<(), Error>104 async fn write_data<'a>(
105     mut dst_buf: AsyncPlaybackBuffer<'a>,
106     reader: Option<Reader>,
107     period_bytes: usize,
108 ) -> Result<(), Error> {
109     let transferred = match reader {
110         Some(mut reader) => dst_buf.copy_from(&mut reader),
111         None => dst_buf.copy_from(&mut io::repeat(0).take(period_bytes as u64)),
112     }
113     .map_err(Error::Io)?;
114     if transferred as usize != period_bytes {
115         error!(
116             "Bytes written {} != period_bytes {}",
117             transferred, period_bytes
118         );
119         Err(Error::InvalidBufferSize)
120     } else {
121         dst_buf.commit().await;
122         Ok(())
123     }
124 }
125 
read_data<'a>( mut src_buf: AsyncCaptureBuffer<'a>, writer: Option<&mut Writer>, period_bytes: usize, ) -> Result<(), Error>126 async fn read_data<'a>(
127     mut src_buf: AsyncCaptureBuffer<'a>,
128     writer: Option<&mut Writer>,
129     period_bytes: usize,
130 ) -> Result<(), Error> {
131     let transferred = match writer {
132         Some(writer) => src_buf.copy_to(writer),
133         None => src_buf.copy_to(&mut io::sink()),
134     }
135     .map_err(Error::Io)?;
136     if transferred as usize != period_bytes {
137         error!(
138             "Bytes written {} != period_bytes {}",
139             transferred, period_bytes
140         );
141         Err(Error::InvalidBufferSize)
142     } else {
143         src_buf.commit().await;
144         Ok(())
145     }
146 }
147 
148 impl From<Result<(), Error>> for virtio_snd_pcm_status {
from(res: Result<(), Error>) -> Self149     fn from(res: Result<(), Error>) -> Self {
150         let status = match res {
151             Ok(()) => VIRTIO_SND_S_OK,
152             Err(e) => {
153                 error!("PCM I/O message failed: {}", e);
154                 VIRTIO_SND_S_IO_ERR
155             }
156         };
157 
158         // TODO(woodychow): Extend audio_streams API, and fetch latency_bytes from
159         // `next_playback_buffer` or `next_capture_buffer`"
160         Self {
161             status: Le32::from(status),
162             latency_bytes: Le32::from(0),
163         }
164     }
165 }
166 
167 // Drain all DescriptorChain in desc_receiver during WorkerStatus::Quit process.
drain_desc_receiver( desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>, mem: &GuestMemory, sender: &mut mpsc::UnboundedSender<PcmResponse>, ) -> Result<(), Error>168 async fn drain_desc_receiver(
169     desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>,
170     mem: &GuestMemory,
171     sender: &mut mpsc::UnboundedSender<PcmResponse>,
172 ) -> Result<(), Error> {
173     let mut o_desc_chain = desc_receiver.next().await;
174     while let Some(desc_chain) = o_desc_chain {
175         // From the virtio-snd spec:
176         // The device MUST complete all pending I/O messages for the specified stream ID.
177         let desc_index = desc_chain.index;
178         let writer = Writer::new(mem.clone(), desc_chain).map_err(Error::DescriptorChain)?;
179         let status = virtio_snd_pcm_status {
180             status: Le32::from(VIRTIO_SND_S_OK),
181             latency_bytes: Le32::from(0),
182         };
183         // Fetch next DescriptorChain to see if the current one is the last one.
184         o_desc_chain = desc_receiver.next().await;
185         let (done, future) = if o_desc_chain.is_none() {
186             let (done, future) = oneshot::channel();
187             (Some(done), Some(future))
188         } else {
189             (None, None)
190         };
191         sender
192             .send(PcmResponse {
193                 desc_index,
194                 status,
195                 writer,
196                 done,
197             })
198             .await
199             .map_err(Error::MpscSend)?;
200 
201         if let Some(f) = future {
202             // From the virtio-snd spec:
203             // The device MUST NOT complete the control request (VIRTIO_SND_R_PCM_RELEASE)
204             // while there are pending I/O messages for the specified stream ID.
205             f.await.map_err(Error::DoneNotTriggered)?;
206         };
207     }
208     Ok(())
209 }
210 
211 /// Start a pcm worker that receives descriptors containing PCM frames (audio data) from the tx/rx
212 /// queue, and forward them to CRAS. One pcm worker per stream.
213 ///
214 /// This worker is started when VIRTIO_SND_R_PCM_PREPARE is called, and returned before
215 /// VIRTIO_SND_R_PCM_RELEASE is completed for the stream.
start_pcm_worker( ex: Executor, dstream: DirectionalStream, mut desc_receiver: mpsc::UnboundedReceiver<DescriptorChain>, status_mutex: Rc<AsyncMutex<WorkerStatus>>, mem: GuestMemory, mut sender: mpsc::UnboundedSender<PcmResponse>, period_bytes: usize, ) -> Result<(), Error>216 pub async fn start_pcm_worker(
217     ex: Executor,
218     dstream: DirectionalStream,
219     mut desc_receiver: mpsc::UnboundedReceiver<DescriptorChain>,
220     status_mutex: Rc<AsyncMutex<WorkerStatus>>,
221     mem: GuestMemory,
222     mut sender: mpsc::UnboundedSender<PcmResponse>,
223     period_bytes: usize,
224 ) -> Result<(), Error> {
225     match dstream {
226         DirectionalStream::Output(mut stream) => {
227             loop {
228                 let dst_buf = stream
229                     .next_playback_buffer(&ex)
230                     .await
231                     .map_err(Error::FetchBuffer)?;
232                 let worker_status = status_mutex.lock().await;
233                 match *worker_status {
234                     WorkerStatus::Quit => {
235                         drain_desc_receiver(&mut desc_receiver, &mem, &mut sender).await?;
236                         write_data(dst_buf, None, period_bytes).await?;
237                         break Ok(());
238                     }
239                     WorkerStatus::Pause => {
240                         write_data(dst_buf, None, period_bytes).await?;
241                     }
242                     WorkerStatus::Running => match desc_receiver.try_next() {
243                         Err(e) => {
244                             error!("Underrun. No new DescriptorChain while running: {}", e);
245                             write_data(dst_buf, None, period_bytes).await?;
246                         }
247                         Ok(None) => {
248                             error!("Unreachable. status should be Quit when the channel is closed");
249                             write_data(dst_buf, None, period_bytes).await?;
250                             return Err(Error::InvalidPCMWorkerState);
251                         }
252                         Ok(Some(desc_chain)) => {
253                             let desc_index = desc_chain.index;
254                             let mut reader = Reader::new(mem.clone(), desc_chain.clone())
255                                 .map_err(Error::DescriptorChain)?;
256                             // stream_id was already read in handle_pcm_queue
257                             reader.consume(std::mem::size_of::<virtio_snd_pcm_xfer>());
258                             let writer = Writer::new(mem.clone(), desc_chain)
259                                 .map_err(Error::DescriptorChain)?;
260 
261                             sender
262                                 .send(PcmResponse {
263                                     desc_index,
264                                     status: write_data(dst_buf, Some(reader), period_bytes)
265                                         .await
266                                         .into(),
267                                     writer,
268                                     done: None,
269                                 })
270                                 .await
271                                 .map_err(Error::MpscSend)?;
272                         }
273                     },
274                 }
275             }
276         }
277         DirectionalStream::Input(mut stream) => {
278             loop {
279                 let src_buf = stream
280                     .next_capture_buffer(&ex)
281                     .await
282                     .map_err(Error::FetchBuffer)?;
283 
284                 let worker_status = status_mutex.lock().await;
285                 match *worker_status {
286                     WorkerStatus::Quit => {
287                         drain_desc_receiver(&mut desc_receiver, &mem, &mut sender).await?;
288                         read_data(src_buf, None, period_bytes).await?;
289                         break Ok(());
290                     }
291                     WorkerStatus::Pause => {
292                         read_data(src_buf, None, period_bytes).await?;
293                     }
294                     WorkerStatus::Running => match desc_receiver.try_next() {
295                         Err(e) => {
296                             error!("Overrun. No new DescriptorChain while running: {}", e);
297                             read_data(src_buf, None, period_bytes).await?;
298                         }
299                         Ok(None) => {
300                             error!("Unreachable. status should be Quit when the channel is closed");
301                             read_data(src_buf, None, period_bytes).await?;
302                             return Err(Error::InvalidPCMWorkerState);
303                         }
304                         Ok(Some(desc_chain)) => {
305                             let desc_index = desc_chain.index;
306                             let mut reader = Reader::new(mem.clone(), desc_chain.clone())
307                                 .map_err(Error::DescriptorChain)?;
308                             // stream_id was already read in handle_pcm_queue
309                             reader.consume(std::mem::size_of::<virtio_snd_pcm_xfer>());
310                             let mut writer = Writer::new(mem.clone(), desc_chain)
311                                 .map_err(Error::DescriptorChain)?;
312 
313                             sender
314                                 .send(PcmResponse {
315                                     desc_index,
316                                     status: read_data(src_buf, Some(&mut writer), period_bytes)
317                                         .await
318                                         .into(),
319                                     writer,
320                                     done: None,
321                                 })
322                                 .await
323                                 .map_err(Error::MpscSend)?;
324                         }
325                     },
326                 }
327             }
328         }
329     }
330 }
331 
332 // Defer pcm message response to the pcm response worker
defer_pcm_response_to_worker( desc_chain: DescriptorChain, mem: &GuestMemory, status: virtio_snd_pcm_status, response_sender: &mut mpsc::UnboundedSender<PcmResponse>, ) -> Result<(), Error>333 async fn defer_pcm_response_to_worker(
334     desc_chain: DescriptorChain,
335     mem: &GuestMemory,
336     status: virtio_snd_pcm_status,
337     response_sender: &mut mpsc::UnboundedSender<PcmResponse>,
338 ) -> Result<(), Error> {
339     let desc_index = desc_chain.index;
340     let writer = Writer::new(mem.clone(), desc_chain).map_err(Error::DescriptorChain)?;
341     response_sender
342         .send(PcmResponse {
343             desc_index,
344             status,
345             writer,
346             done: None,
347         })
348         .await
349         .map_err(Error::MpscSend)
350 }
351 
send_pcm_response_with_writer<I: SignalableInterrupt>( mut writer: Writer, desc_index: u16, mem: &GuestMemory, queue: &mut Queue, interrupt: &I, status: virtio_snd_pcm_status, ) -> Result<(), Error>352 fn send_pcm_response_with_writer<I: SignalableInterrupt>(
353     mut writer: Writer,
354     desc_index: u16,
355     mem: &GuestMemory,
356     queue: &mut Queue,
357     interrupt: &I,
358     status: virtio_snd_pcm_status,
359 ) -> Result<(), Error> {
360     // For rx queue only. Fast forward the unused audio data buffer.
361     if writer.available_bytes() > std::mem::size_of::<virtio_snd_pcm_status>() {
362         writer
363             .consume_bytes(writer.available_bytes() - std::mem::size_of::<virtio_snd_pcm_status>());
364     }
365     writer.write_obj(status).map_err(Error::WriteResponse)?;
366     queue.add_used(mem, desc_index, writer.bytes_written() as u32);
367     queue.trigger_interrupt(mem, interrupt);
368     Ok(())
369 }
370 
send_pcm_response_worker<I: SignalableInterrupt>( mem: &GuestMemory, queue: &Rc<AsyncMutex<Queue>>, interrupt: &I, recv: &mut mpsc::UnboundedReceiver<PcmResponse>, ) -> Result<(), Error>371 pub async fn send_pcm_response_worker<I: SignalableInterrupt>(
372     mem: &GuestMemory,
373     queue: &Rc<AsyncMutex<Queue>>,
374     interrupt: &I,
375     recv: &mut mpsc::UnboundedReceiver<PcmResponse>,
376 ) -> Result<(), Error> {
377     loop {
378         if let Some(r) = recv.next().await {
379             send_pcm_response_with_writer(
380                 r.writer,
381                 r.desc_index,
382                 &mem,
383                 &mut *queue.lock().await,
384                 interrupt,
385                 r.status,
386             )?;
387 
388             // Resume pcm_worker
389             if let Some(done) = r.done {
390                 done.send(()).map_err(Error::OneshotSend)?;
391             }
392         } else {
393             debug!("PcmResponse channel is closed.");
394             break;
395         }
396     }
397     Ok(())
398 }
399 
400 /// Handle messages from the tx or the rx queue. One invocation is needed for
401 /// each queue.
handle_pcm_queue<'a>( mem: &GuestMemory, streams: &Rc<AsyncMutex<Vec<AsyncMutex<StreamInfo<'a>>>>>, mut response_sender: mpsc::UnboundedSender<PcmResponse>, queue: &Rc<AsyncMutex<Queue>>, queue_event: EventAsync, ) -> Result<(), Error>402 pub async fn handle_pcm_queue<'a>(
403     mem: &GuestMemory,
404     streams: &Rc<AsyncMutex<Vec<AsyncMutex<StreamInfo<'a>>>>>,
405     mut response_sender: mpsc::UnboundedSender<PcmResponse>,
406     queue: &Rc<AsyncMutex<Queue>>,
407     queue_event: EventAsync,
408 ) -> Result<(), Error> {
409     loop {
410         // Manual queue.next_async() to avoid holding the mutex
411         let next_async = async {
412             loop {
413                 // Check if there are more descriptors available.
414                 if let Some(chain) = queue.lock().await.pop(mem) {
415                     return Ok(chain);
416                 }
417                 queue_event.next_val().await?;
418             }
419         };
420 
421         let desc_chain = next_async.await.map_err(Error::Async)?;
422         let mut reader =
423             Reader::new(mem.clone(), desc_chain.clone()).map_err(Error::DescriptorChain)?;
424 
425         let pcm_xfer: virtio_snd_pcm_xfer = reader.read_obj().map_err(Error::ReadMessage)?;
426         let stream_id: usize = u32::from(pcm_xfer.stream_id) as usize;
427 
428         let streams = streams.read_lock().await;
429         let stream_info = match streams.get(stream_id) {
430             Some(stream_info) => stream_info.read_lock().await,
431             None => {
432                 error!(
433                     "stream_id ({}) >= num_streams ({})",
434                     stream_id,
435                     streams.len()
436                 );
437                 defer_pcm_response_to_worker(
438                     desc_chain,
439                     mem,
440                     virtio_snd_pcm_status {
441                         status: Le32::from(VIRTIO_SND_S_IO_ERR),
442                         latency_bytes: Le32::from(0),
443                     },
444                     &mut response_sender,
445                 )
446                 .await?;
447                 continue;
448             }
449         };
450 
451         match stream_info.sender.as_ref() {
452             Some(mut s) => {
453                 s.send(desc_chain).await.map_err(Error::MpscSend)?;
454             }
455             None => {
456                 error!(
457                     "stream {} is not ready. state: {}",
458                     stream_id,
459                     get_virtio_snd_r_pcm_cmd_name(stream_info.state)
460                 );
461                 defer_pcm_response_to_worker(
462                     desc_chain,
463                     mem,
464                     virtio_snd_pcm_status {
465                         status: Le32::from(VIRTIO_SND_S_IO_ERR),
466                         latency_bytes: Le32::from(0),
467                     },
468                     &mut response_sender,
469                 )
470                 .await?;
471             }
472         };
473     }
474 }
475 
476 /// Handle all the control messages from the ctrl queue.
handle_ctrl_queue<I: SignalableInterrupt>( ex: &Executor, mem: &GuestMemory, streams: &Rc<AsyncMutex<Vec<AsyncMutex<StreamInfo<'_>>>>>, snd_data: &SndData, mut queue: Queue, mut queue_event: EventAsync, interrupt: &I, tx_send: mpsc::UnboundedSender<PcmResponse>, rx_send: mpsc::UnboundedSender<PcmResponse>, params: &Parameters, ) -> Result<(), Error>477 pub async fn handle_ctrl_queue<I: SignalableInterrupt>(
478     ex: &Executor,
479     mem: &GuestMemory,
480     streams: &Rc<AsyncMutex<Vec<AsyncMutex<StreamInfo<'_>>>>>,
481     snd_data: &SndData,
482     mut queue: Queue,
483     mut queue_event: EventAsync,
484     interrupt: &I,
485     tx_send: mpsc::UnboundedSender<PcmResponse>,
486     rx_send: mpsc::UnboundedSender<PcmResponse>,
487     params: &Parameters,
488 ) -> Result<(), Error> {
489     loop {
490         let desc_chain = queue
491             .next_async(mem, &mut queue_event)
492             .await
493             .map_err(Error::Async)?;
494 
495         let index = desc_chain.index;
496 
497         let mut reader =
498             Reader::new(mem.clone(), desc_chain.clone()).map_err(Error::DescriptorChain)?;
499         let mut writer = Writer::new(mem.clone(), desc_chain).map_err(Error::DescriptorChain)?;
500         // Don't advance the reader
501         let code = reader
502             .clone()
503             .read_obj::<virtio_snd_hdr>()
504             .map_err(Error::ReadMessage)?
505             .code
506             .into();
507 
508         let handle_ctrl_msg = async {
509             return match code {
510                 VIRTIO_SND_R_JACK_INFO => {
511                     let query_info: virtio_snd_query_info =
512                         reader.read_obj().map_err(Error::ReadMessage)?;
513                     let start_id: usize = u32::from(query_info.start_id) as usize;
514                     let count: usize = u32::from(query_info.count) as usize;
515                     if start_id + count > snd_data.jack_info.len() {
516                         error!(
517                             "start_id({}) + count({}) must be smaller than the number of jacks ({})",
518                             start_id,
519                             count,
520                             snd_data.jack_info.len()
521                         );
522                         return writer
523                             .write_obj(VIRTIO_SND_S_BAD_MSG)
524                             .map_err(Error::WriteResponse);
525                     }
526                     // The response consists of the virtio_snd_hdr structure (contains the request
527                     // status code), followed by the device-writable information structures of the
528                     // item. Each information structure begins with the following common header
529                     writer
530                         .write_obj(VIRTIO_SND_S_OK)
531                         .map_err(Error::WriteResponse)?;
532                     for i in start_id..(start_id + count) {
533                         writer
534                             .write_all(snd_data.jack_info[i].as_slice())
535                             .map_err(Error::WriteResponse)?;
536                     }
537                     Ok(())
538                 }
539                 VIRTIO_SND_R_PCM_INFO => {
540                     let query_info: virtio_snd_query_info =
541                         reader.read_obj().map_err(Error::ReadMessage)?;
542                     let start_id: usize = u32::from(query_info.start_id) as usize;
543                     let count: usize = u32::from(query_info.count) as usize;
544                     if start_id + count > snd_data.pcm_info.len() {
545                         error!(
546                             "start_id({}) + count({}) must be smaller than the number of streams ({})",
547                             start_id,
548                             count,
549                             snd_data.pcm_info.len()
550                         );
551                         return writer
552                             .write_obj(VIRTIO_SND_S_BAD_MSG)
553                             .map_err(Error::WriteResponse);
554                     }
555                     // The response consists of the virtio_snd_hdr structure (contains the request
556                     // status code), followed by the device-writable information structures of the
557                     // item. Each information structure begins with the following common header
558                     writer
559                         .write_obj(VIRTIO_SND_S_OK)
560                         .map_err(Error::WriteResponse)?;
561                     for i in start_id..(start_id + count) {
562                         writer
563                             .write_all(snd_data.pcm_info[i].as_slice())
564                             .map_err(Error::WriteResponse)?;
565                     }
566                     Ok(())
567                 }
568                 VIRTIO_SND_R_CHMAP_INFO => {
569                     let query_info: virtio_snd_query_info =
570                         reader.read_obj().map_err(Error::ReadMessage)?;
571                     let start_id: usize = u32::from(query_info.start_id) as usize;
572                     let count: usize = u32::from(query_info.count) as usize;
573                     if start_id + count > snd_data.chmap_info.len() {
574                         error!(
575                             "start_id({}) + count({}) must be smaller than the number of chmaps ({})",
576                             start_id,
577                             count,
578                             snd_data.pcm_info.len()
579                         );
580                         return writer
581                             .write_obj(VIRTIO_SND_S_BAD_MSG)
582                             .map_err(Error::WriteResponse);
583                     }
584                     // The response consists of the virtio_snd_hdr structure (contains the request
585                     // status code), followed by the device-writable information structures of the
586                     // item. Each information structure begins with the following common header
587                     writer
588                         .write_obj(VIRTIO_SND_S_OK)
589                         .map_err(Error::WriteResponse)?;
590                     for i in start_id..(start_id + count) {
591                         writer
592                             .write_all(snd_data.chmap_info[i].as_slice())
593                             .map_err(Error::WriteResponse)?;
594                     }
595                     Ok(())
596                 }
597                 VIRTIO_SND_R_JACK_REMAP => {
598                     unreachable!("remap is unsupported");
599                 }
600                 VIRTIO_SND_R_PCM_SET_PARAMS => {
601                     // Raise VIRTIO_SND_S_BAD_MSG or IO error?
602                     let set_params: virtio_snd_pcm_set_params =
603                         reader.read_obj().map_err(Error::ReadMessage)?;
604                     let stream_id: usize = u32::from(set_params.hdr.stream_id) as usize;
605                     let buffer_bytes: u32 = set_params.buffer_bytes.into();
606                     let period_bytes: u32 = set_params.period_bytes.into();
607 
608                     let dir = match snd_data.pcm_info.get(stream_id) {
609                         Some(pcm_info) => {
610                             if set_params.channels < pcm_info.channels_min
611                                 || set_params.channels > pcm_info.channels_max
612                             {
613                                 error!(
614                                     "Number of channels ({}) must be between {} and {}",
615                                     set_params.channels,
616                                     pcm_info.channels_min,
617                                     pcm_info.channels_max
618                                 );
619                                 return writer
620                                     .write_obj(VIRTIO_SND_S_NOT_SUPP)
621                                     .map_err(Error::WriteResponse);
622                             }
623                             if (u64::from(pcm_info.formats) & (1 << set_params.format)) == 0 {
624                                 error!("PCM format {} is not supported.", set_params.format);
625                                 return writer
626                                     .write_obj(VIRTIO_SND_S_NOT_SUPP)
627                                     .map_err(Error::WriteResponse);
628                             }
629                             if (u64::from(pcm_info.rates) & (1 << set_params.rate)) == 0 {
630                                 error!("PCM frame rate {} is not supported.", set_params.rate);
631                                 return writer
632                                     .write_obj(VIRTIO_SND_S_NOT_SUPP)
633                                     .map_err(Error::WriteResponse);
634                             }
635 
636                             pcm_info.direction
637                         }
638                         None => {
639                             error!(
640                                 "stream_id {} < streams {}",
641                                 stream_id,
642                                 snd_data.pcm_info.len()
643                             );
644                             return writer
645                                 .write_obj(VIRTIO_SND_S_BAD_MSG)
646                                 .map_err(Error::WriteResponse);
647                         }
648                     };
649 
650                     if set_params.features != 0 {
651                         error!("No feature is supported");
652                         return writer
653                             .write_obj(VIRTIO_SND_S_NOT_SUPP)
654                             .map_err(Error::WriteResponse);
655                     }
656 
657                     if buffer_bytes % period_bytes != 0 {
658                         error!(
659                             "buffer_bytes({}) must be dividable by period_bytes({})",
660                             buffer_bytes, period_bytes
661                         );
662                         return writer
663                             .write_obj(VIRTIO_SND_S_BAD_MSG)
664                             .map_err(Error::WriteResponse);
665                     }
666 
667                     let streams = streams.read_lock().await;
668                     let mut stream_info = match streams.get(stream_id) {
669                         Some(stream_info) => stream_info.lock().await,
670                         None => {
671                             error!("stream_id {} < streams {}", stream_id, streams.len());
672                             return writer
673                                 .write_obj(VIRTIO_SND_S_BAD_MSG)
674                                 .map_err(Error::WriteResponse);
675                         }
676                     };
677 
678                     if stream_info.state != 0
679                         && stream_info.state != VIRTIO_SND_R_PCM_SET_PARAMS
680                         && stream_info.state != VIRTIO_SND_R_PCM_PREPARE
681                         && stream_info.state != VIRTIO_SND_R_PCM_RELEASE
682                     {
683                         error!(
684                             "Invalid PCM state transition from {} to {}",
685                             get_virtio_snd_r_pcm_cmd_name(stream_info.state),
686                             get_virtio_snd_r_pcm_cmd_name(VIRTIO_SND_R_PCM_SET_PARAMS)
687                         );
688                         return writer
689                             .write_obj(VIRTIO_SND_S_NOT_SUPP)
690                             .map_err(Error::WriteResponse);
691                     }
692 
693                     // Only required for PREPARE -> SET_PARAMS
694                     stream_info.release_worker().await?;
695 
696                     stream_info.channels = set_params.channels;
697                     stream_info.format = from_virtio_sample_format(set_params.format).unwrap();
698                     stream_info.frame_rate = from_virtio_frame_rate(set_params.rate).unwrap();
699                     stream_info.buffer_bytes = buffer_bytes as usize;
700                     stream_info.period_bytes = period_bytes as usize;
701                     stream_info.direction = dir;
702                     stream_info.state = VIRTIO_SND_R_PCM_SET_PARAMS;
703 
704                     debug!(
705                         "VIRTIO_SND_R_PCM_SET_PARAMS for stream id={}. Stream info: {:#?}",
706                         stream_id, *stream_info
707                     );
708 
709                     writer
710                         .write_obj(VIRTIO_SND_S_OK)
711                         .map_err(Error::WriteResponse)
712                 }
713                 VIRTIO_SND_R_PCM_PREPARE
714                 | VIRTIO_SND_R_PCM_START
715                 | VIRTIO_SND_R_PCM_STOP
716                 | VIRTIO_SND_R_PCM_RELEASE => {
717                     let hdr: virtio_snd_pcm_hdr = reader.read_obj().map_err(Error::ReadMessage)?;
718                     let stream_id: usize = u32::from(hdr.stream_id) as usize;
719                     process_pcm_ctrl(
720                         ex,
721                         &mem.clone(),
722                         &tx_send,
723                         &rx_send,
724                         streams,
725                         params,
726                         code,
727                         &mut writer,
728                         stream_id,
729                     )
730                     .await
731                     .and(Ok(()))?;
732                     Ok(())
733                 }
734                 c => {
735                     error!("Unrecognized code: {}", c);
736                     return writer
737                         .write_obj(VIRTIO_SND_S_BAD_MSG)
738                         .map_err(Error::WriteResponse);
739                 }
740             };
741         };
742 
743         handle_ctrl_msg.await?;
744         queue.add_used(mem, index, writer.bytes_written() as u32);
745         queue.trigger_interrupt(&mem, interrupt);
746     }
747 }
748 
749 /// Send events to the audio driver.
handle_event_queue<I: SignalableInterrupt>( mem: &GuestMemory, mut queue: Queue, mut queue_event: EventAsync, interrupt: &I, ) -> Result<(), Error>750 pub async fn handle_event_queue<I: SignalableInterrupt>(
751     mem: &GuestMemory,
752     mut queue: Queue,
753     mut queue_event: EventAsync,
754     interrupt: &I,
755 ) -> Result<(), Error> {
756     loop {
757         let desc_chain = queue
758             .next_async(mem, &mut queue_event)
759             .await
760             .map_err(Error::Async)?;
761 
762         // TODO(woodychow): Poll and forward events from cras asynchronously (API to be added)
763         let index = desc_chain.index;
764         queue.add_used(mem, index, 0);
765         queue.trigger_interrupt(&mem, interrupt);
766     }
767 }
768