1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::fmt;
6 use std::io;
7 use std::io::Read;
8 use std::io::Write;
9 use std::rc::Rc;
10
11 use async_trait::async_trait;
12 use audio_streams::capture::AsyncCaptureBuffer;
13 use audio_streams::AsyncPlaybackBuffer;
14 use base::debug;
15 use base::error;
16 use cros_async::sync::Condvar;
17 use cros_async::sync::Mutex as AsyncMutex;
18 use cros_async::EventAsync;
19 use cros_async::Executor;
20 use futures::channel::mpsc;
21 use futures::channel::oneshot;
22 use futures::pin_mut;
23 use futures::select;
24 use futures::FutureExt;
25 use futures::SinkExt;
26 use futures::StreamExt;
27 use thiserror::Error as ThisError;
28 use vm_memory::GuestMemory;
29 #[cfg(windows)]
30 use win_audio::AudioSharedFormat;
31 use zerocopy::AsBytes;
32
33 use super::Error;
34 use super::SndData;
35 use super::WorkerStatus;
36 use crate::virtio::snd::common::*;
37 use crate::virtio::snd::common_backend::stream_info::SetParams;
38 use crate::virtio::snd::common_backend::stream_info::StreamInfo;
39 use crate::virtio::snd::common_backend::DirectionalStream;
40 use crate::virtio::snd::common_backend::PcmResponse;
41 use crate::virtio::snd::constants::*;
42 use crate::virtio::snd::layout::*;
43 use crate::virtio::DescriptorChain;
44 use crate::virtio::Queue;
45 use crate::virtio::Reader;
46 use crate::virtio::SignalableInterrupt;
47 use crate::virtio::Writer;
48
49 // TODO(b/246601226): Remove once a generic audio_stream solution that can accpet
50 // arbitrarily size buffers.
51 /// Trait to wrap system specific helpers for writing to endpoint playback buffers.
52 #[async_trait(?Send)]
53 pub trait PlaybackBufferWriter {
new( guest_period_bytes: usize, #[cfg(windows)] frame_size: usize, #[cfg(windows)] frame_rate: usize, #[cfg(windows)] guest_num_channels: usize, #[cfg(windows)] audio_shared_format: AudioSharedFormat, ) -> Self where Self: Sized54 fn new(
55 guest_period_bytes: usize,
56 #[cfg(windows)] frame_size: usize,
57 #[cfg(windows)] frame_rate: usize,
58 #[cfg(windows)] guest_num_channels: usize,
59 #[cfg(windows)] audio_shared_format: AudioSharedFormat,
60 ) -> Self
61 where
62 Self: Sized;
63
64 /// Returns the period of the endpoint device.
endpoint_period_bytes(&self) -> usize65 fn endpoint_period_bytes(&self) -> usize;
66
67 /// Read audio samples from the tx virtqueue.
copy_to_buffer( &mut self, dst_buf: &mut AsyncPlaybackBuffer<'_>, reader: &mut Reader, ) -> Result<usize, Error>68 fn copy_to_buffer(
69 &mut self,
70 dst_buf: &mut AsyncPlaybackBuffer<'_>,
71 reader: &mut Reader,
72 ) -> Result<usize, Error> {
73 dst_buf.copy_from(reader).map_err(Error::Io)
74 }
75 /// Check to see if an additional read from the tx virtqueue is needed during a playback
76 /// loop. If so, read from the virtqueue.
77 ///
78 /// Prefill will happen, for example, if the endpoint buffer requires a 513 frame period, but
79 /// each tx virtqueue read only produces 480 frames.
80 #[cfg(windows)]
check_and_prefill( &mut self, mem: &GuestMemory, desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>, sender: &mut mpsc::UnboundedSender<PcmResponse>, ) -> Result<(), Error>81 async fn check_and_prefill(
82 &mut self,
83 mem: &GuestMemory,
84 desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>,
85 sender: &mut mpsc::UnboundedSender<PcmResponse>,
86 ) -> Result<(), Error>;
87 }
88
89 #[derive(Debug)]
90 enum VirtioSndPcmCmd {
91 SetParams { set_params: SetParams },
92 Prepare,
93 Start,
94 Stop,
95 Release,
96 }
97
98 impl fmt::Display for VirtioSndPcmCmd {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result99 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100 let cmd_code = match self {
101 VirtioSndPcmCmd::SetParams { set_params: _ } => VIRTIO_SND_R_PCM_SET_PARAMS,
102 VirtioSndPcmCmd::Prepare => VIRTIO_SND_R_PCM_PREPARE,
103 VirtioSndPcmCmd::Start => VIRTIO_SND_R_PCM_START,
104 VirtioSndPcmCmd::Stop => VIRTIO_SND_R_PCM_STOP,
105 VirtioSndPcmCmd::Release => VIRTIO_SND_R_PCM_RELEASE,
106 };
107 f.write_str(get_virtio_snd_r_pcm_cmd_name(cmd_code))
108 }
109 }
110
111 #[derive(ThisError, Debug)]
112 enum VirtioSndPcmCmdError {
113 #[error("SetParams requires additional parameters")]
114 SetParams,
115 #[error("Invalid virtio snd command code")]
116 InvalidCode,
117 }
118
119 impl TryFrom<u32> for VirtioSndPcmCmd {
120 type Error = VirtioSndPcmCmdError;
121
try_from(code: u32) -> Result<Self, Self::Error>122 fn try_from(code: u32) -> Result<Self, Self::Error> {
123 match code {
124 VIRTIO_SND_R_PCM_PREPARE => Ok(VirtioSndPcmCmd::Prepare),
125 VIRTIO_SND_R_PCM_START => Ok(VirtioSndPcmCmd::Start),
126 VIRTIO_SND_R_PCM_STOP => Ok(VirtioSndPcmCmd::Stop),
127 VIRTIO_SND_R_PCM_RELEASE => Ok(VirtioSndPcmCmd::Release),
128 VIRTIO_SND_R_PCM_SET_PARAMS => Err(VirtioSndPcmCmdError::SetParams),
129 _ => Err(VirtioSndPcmCmdError::InvalidCode),
130 }
131 }
132 }
133
134 impl VirtioSndPcmCmd {
with_set_params_and_direction( set_params: virtio_snd_pcm_set_params, dir: u8, ) -> VirtioSndPcmCmd135 fn with_set_params_and_direction(
136 set_params: virtio_snd_pcm_set_params,
137 dir: u8,
138 ) -> VirtioSndPcmCmd {
139 let buffer_bytes: u32 = set_params.buffer_bytes.into();
140 let period_bytes: u32 = set_params.period_bytes.into();
141 VirtioSndPcmCmd::SetParams {
142 set_params: SetParams {
143 channels: set_params.channels,
144 format: from_virtio_sample_format(set_params.format).unwrap(),
145 frame_rate: from_virtio_frame_rate(set_params.rate).unwrap(),
146 buffer_bytes: buffer_bytes as usize,
147 period_bytes: period_bytes as usize,
148 dir,
149 },
150 }
151 }
152 }
153
154 // Returns true if the operation is successful. Returns error if there is
155 // a runtime/internal error
process_pcm_ctrl( ex: &Executor, mem: &GuestMemory, tx_send: &mpsc::UnboundedSender<PcmResponse>, rx_send: &mpsc::UnboundedSender<PcmResponse>, streams: &Rc<AsyncMutex<Vec<AsyncMutex<StreamInfo>>>>, cmd: VirtioSndPcmCmd, writer: &mut Writer, stream_id: usize, ) -> Result<(), Error>156 async fn process_pcm_ctrl(
157 ex: &Executor,
158 mem: &GuestMemory,
159 tx_send: &mpsc::UnboundedSender<PcmResponse>,
160 rx_send: &mpsc::UnboundedSender<PcmResponse>,
161 streams: &Rc<AsyncMutex<Vec<AsyncMutex<StreamInfo>>>>,
162 cmd: VirtioSndPcmCmd,
163 writer: &mut Writer,
164 stream_id: usize,
165 ) -> Result<(), Error> {
166 let streams = streams.read_lock().await;
167 let mut stream = match streams.get(stream_id) {
168 Some(stream_info) => stream_info.lock().await,
169 None => {
170 error!(
171 "Stream id={} not found for {}. Error code: VIRTIO_SND_S_BAD_MSG",
172 stream_id, cmd
173 );
174 return writer
175 .write_obj(VIRTIO_SND_S_BAD_MSG)
176 .map_err(Error::WriteResponse);
177 }
178 };
179
180 debug!("{} for stream id={}", cmd, stream_id);
181
182 let result = match cmd {
183 VirtioSndPcmCmd::SetParams { set_params } => {
184 let result = stream.set_params(set_params).await;
185 if result.is_ok() {
186 debug!(
187 "VIRTIO_SND_R_PCM_SET_PARAMS for stream id={}. Stream info: {:#?}",
188 stream_id, *stream
189 );
190 }
191 result
192 }
193 VirtioSndPcmCmd::Prepare => stream.prepare(ex, mem.clone(), tx_send, rx_send).await,
194 VirtioSndPcmCmd::Start => stream.start().await,
195 VirtioSndPcmCmd::Stop => stream.stop().await,
196 VirtioSndPcmCmd::Release => stream.release().await,
197 };
198 match result {
199 Ok(_) => writer
200 .write_obj(VIRTIO_SND_S_OK)
201 .map_err(Error::WriteResponse),
202 Err(Error::OperationNotSupported) => {
203 error!(
204 "{} for stream id={} failed. Error code: VIRTIO_SND_S_NOT_SUPP.",
205 cmd, stream_id
206 );
207
208 writer
209 .write_obj(VIRTIO_SND_S_NOT_SUPP)
210 .map_err(Error::WriteResponse)
211 }
212 Err(e) => {
213 // Runtime/internal error would be more appropriate, but there's
214 // no such error type
215 error!(
216 "{} for stream id={} failed. Error code: VIRTIO_SND_S_IO_ERR. Actual error: {}",
217 cmd, stream_id, e
218 );
219 writer
220 .write_obj(VIRTIO_SND_S_IO_ERR)
221 .map_err(Error::WriteResponse)
222 }
223 }
224 }
225
write_data( mut dst_buf: AsyncPlaybackBuffer<'_>, reader: Option<Reader>, buffer_writer: &mut Box<dyn PlaybackBufferWriter>, ) -> Result<u32, Error>226 async fn write_data(
227 mut dst_buf: AsyncPlaybackBuffer<'_>,
228 reader: Option<Reader>,
229 buffer_writer: &mut Box<dyn PlaybackBufferWriter>,
230 ) -> Result<u32, Error> {
231 let transferred = match reader {
232 Some(mut reader) => buffer_writer.copy_to_buffer(&mut dst_buf, &mut reader)?,
233 None => dst_buf
234 .copy_from(&mut io::repeat(0).take(buffer_writer.endpoint_period_bytes() as u64))
235 .map_err(Error::Io)?,
236 };
237
238 if transferred as usize != buffer_writer.endpoint_period_bytes() {
239 error!(
240 "Bytes written {} != period_bytes {}",
241 transferred,
242 buffer_writer.endpoint_period_bytes()
243 );
244 Err(Error::InvalidBufferSize)
245 } else {
246 dst_buf.commit().await;
247 Ok(dst_buf.latency_bytes())
248 }
249 }
250
read_data<'a>( mut src_buf: AsyncCaptureBuffer<'a>, writer: Option<&mut Writer>, period_bytes: usize, ) -> Result<u32, Error>251 async fn read_data<'a>(
252 mut src_buf: AsyncCaptureBuffer<'a>,
253 writer: Option<&mut Writer>,
254 period_bytes: usize,
255 ) -> Result<u32, Error> {
256 let transferred = match writer {
257 Some(writer) => src_buf.copy_to(writer),
258 None => src_buf.copy_to(&mut io::sink()),
259 }
260 .map_err(Error::Io)?;
261 if transferred as usize != period_bytes {
262 error!(
263 "Bytes written {} != period_bytes {}",
264 transferred, period_bytes
265 );
266 Err(Error::InvalidBufferSize)
267 } else {
268 src_buf.commit().await;
269 Ok(src_buf.latency_bytes())
270 }
271 }
272
273 impl From<Result<u32, Error>> for virtio_snd_pcm_status {
from(res: Result<u32, Error>) -> Self274 fn from(res: Result<u32, Error>) -> Self {
275 match res {
276 Ok(latency_bytes) => virtio_snd_pcm_status::new(StatusCode::OK, latency_bytes),
277 Err(e) => {
278 error!("PCM I/O message failed: {}", e);
279 virtio_snd_pcm_status::new(StatusCode::IoErr, 0)
280 }
281 }
282 }
283 }
284
285 // Drain all DescriptorChain in desc_receiver during WorkerStatus::Quit process.
drain_desc_receiver( desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>, mem: &GuestMemory, sender: &mut mpsc::UnboundedSender<PcmResponse>, ) -> Result<(), Error>286 async fn drain_desc_receiver(
287 desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>,
288 mem: &GuestMemory,
289 sender: &mut mpsc::UnboundedSender<PcmResponse>,
290 ) -> Result<(), Error> {
291 let mut o_desc_chain = desc_receiver.next().await;
292 while let Some(desc_chain) = o_desc_chain {
293 // From the virtio-snd spec:
294 // The device MUST complete all pending I/O messages for the specified stream ID.
295 let desc_index = desc_chain.index;
296 let writer = Writer::new(mem.clone(), desc_chain).map_err(Error::DescriptorChain)?;
297 let status = virtio_snd_pcm_status::new(StatusCode::OK, 0);
298 // Fetch next DescriptorChain to see if the current one is the last one.
299 o_desc_chain = desc_receiver.next().await;
300 let (done, future) = if o_desc_chain.is_none() {
301 let (done, future) = oneshot::channel();
302 (Some(done), Some(future))
303 } else {
304 (None, None)
305 };
306 sender
307 .send(PcmResponse {
308 desc_index,
309 status,
310 writer,
311 done,
312 })
313 .await
314 .map_err(Error::MpscSend)?;
315
316 if let Some(f) = future {
317 // From the virtio-snd spec:
318 // The device MUST NOT complete the control request (VIRTIO_SND_R_PCM_RELEASE)
319 // while there are pending I/O messages for the specified stream ID.
320 f.await.map_err(Error::DoneNotTriggered)?;
321 };
322 }
323 Ok(())
324 }
325
get_index_with_reader_and_writer( mem: &GuestMemory, desc_chain: DescriptorChain, ) -> Result<(u16, Reader, Writer), Error>326 pub(crate) fn get_index_with_reader_and_writer(
327 mem: &GuestMemory,
328 desc_chain: DescriptorChain,
329 ) -> Result<(u16, Reader, Writer), Error> {
330 let desc_index = desc_chain.index;
331 let mut reader =
332 Reader::new(mem.clone(), desc_chain.clone()).map_err(Error::DescriptorChain)?;
333 // stream_id was already read in handle_pcm_queue
334 reader.consume(std::mem::size_of::<virtio_snd_pcm_xfer>());
335 let writer = Writer::new(mem.clone(), desc_chain).map_err(Error::DescriptorChain)?;
336 Ok((desc_index, reader, writer))
337 }
338
339 /// Start a pcm worker that receives descriptors containing PCM frames (audio data) from the tx/rx
340 /// queue, and forward them to CRAS. One pcm worker per stream.
341 ///
342 /// This worker is started when VIRTIO_SND_R_PCM_PREPARE is called, and returned before
343 /// VIRTIO_SND_R_PCM_RELEASE is completed for the stream.
start_pcm_worker( ex: Executor, dstream: DirectionalStream, mut desc_receiver: mpsc::UnboundedReceiver<DescriptorChain>, status_mutex: Rc<AsyncMutex<WorkerStatus>>, mem: GuestMemory, mut sender: mpsc::UnboundedSender<PcmResponse>, ) -> Result<(), Error>344 pub async fn start_pcm_worker(
345 ex: Executor,
346 dstream: DirectionalStream,
347 mut desc_receiver: mpsc::UnboundedReceiver<DescriptorChain>,
348 status_mutex: Rc<AsyncMutex<WorkerStatus>>,
349 mem: GuestMemory,
350 mut sender: mpsc::UnboundedSender<PcmResponse>,
351 ) -> Result<(), Error> {
352 let res = pcm_worker_loop(
353 ex,
354 dstream,
355 &mut desc_receiver,
356 &status_mutex,
357 &mem,
358 &mut sender,
359 )
360 .await;
361 *status_mutex.lock().await = WorkerStatus::Quit;
362 if res.is_err() {
363 error!(
364 "pcm_worker error: {:#?}. Draining desc_receiver",
365 res.as_ref().err()
366 );
367 // On error, guaranteed that desc_receiver has not been drained, so drain it here.
368 // Note that drain blocks until the stream is release.
369 drain_desc_receiver(&mut desc_receiver, &mem, &mut sender).await?;
370 }
371 res
372 }
373
pcm_worker_loop( ex: Executor, dstream: DirectionalStream, desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>, status_mutex: &Rc<AsyncMutex<WorkerStatus>>, mem: &GuestMemory, sender: &mut mpsc::UnboundedSender<PcmResponse>, ) -> Result<(), Error>374 async fn pcm_worker_loop(
375 ex: Executor,
376 dstream: DirectionalStream,
377 desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>,
378 status_mutex: &Rc<AsyncMutex<WorkerStatus>>,
379 mem: &GuestMemory,
380 sender: &mut mpsc::UnboundedSender<PcmResponse>,
381 ) -> Result<(), Error> {
382 match dstream {
383 #[allow(unused_mut)]
384 DirectionalStream::Output(mut stream, mut buffer_writer) => loop {
385 let dst_buf = stream
386 .next_playback_buffer(&ex)
387 .await
388 .map_err(Error::FetchBuffer)?;
389 let worker_status = status_mutex.lock().await;
390 match *worker_status {
391 WorkerStatus::Quit => {
392 drain_desc_receiver(desc_receiver, mem, sender).await?;
393 if let Err(e) = write_data(dst_buf, None, &mut buffer_writer).await {
394 error!("Error on write_data after worker quit: {}", e)
395 }
396 break Ok(());
397 }
398 WorkerStatus::Pause => {
399 write_data(dst_buf, None, &mut buffer_writer).await?;
400 }
401 WorkerStatus::Running => {
402 // TODO(b/246601226): Remove once a generic audio_stream solution that can
403 // accpet arbitrarily size buffers
404 #[cfg(windows)]
405 buffer_writer
406 .check_and_prefill(mem, desc_receiver, sender)
407 .await?;
408
409 match desc_receiver.try_next() {
410 Err(e) => {
411 error!("Underrun. No new DescriptorChain while running: {}", e);
412 write_data(dst_buf, None, &mut buffer_writer).await?;
413 }
414 Ok(None) => {
415 error!("Unreachable. status should be Quit when the channel is closed");
416 write_data(dst_buf, None, &mut buffer_writer).await?;
417 return Err(Error::InvalidPCMWorkerState);
418 }
419 Ok(Some(desc_chain)) => {
420 let (desc_index, reader, writer) =
421 get_index_with_reader_and_writer(mem, desc_chain)?;
422 sender
423 .send(PcmResponse {
424 desc_index,
425 status: write_data(dst_buf, Some(reader), &mut buffer_writer)
426 .await
427 .into(),
428 writer,
429 done: None,
430 })
431 .await
432 .map_err(Error::MpscSend)?;
433 }
434 }
435 }
436 }
437 },
438 DirectionalStream::Input(mut stream, period_bytes) => loop {
439 let src_buf = stream
440 .next_capture_buffer(&ex)
441 .await
442 .map_err(Error::FetchBuffer)?;
443
444 let worker_status = status_mutex.lock().await;
445 match *worker_status {
446 WorkerStatus::Quit => {
447 drain_desc_receiver(desc_receiver, mem, sender).await?;
448 if let Err(e) = read_data(src_buf, None, period_bytes).await {
449 error!("Error on read_data after worker quit: {}", e)
450 }
451 break Ok(());
452 }
453 WorkerStatus::Pause => {
454 read_data(src_buf, None, period_bytes).await?;
455 }
456 WorkerStatus::Running => match desc_receiver.try_next() {
457 Err(e) => {
458 error!("Overrun. No new DescriptorChain while running: {}", e);
459 read_data(src_buf, None, period_bytes).await?;
460 }
461 Ok(None) => {
462 error!("Unreachable. status should be Quit when the channel is closed");
463 read_data(src_buf, None, period_bytes).await?;
464 return Err(Error::InvalidPCMWorkerState);
465 }
466 Ok(Some(desc_chain)) => {
467 let (desc_index, _reader, mut writer) =
468 get_index_with_reader_and_writer(mem, desc_chain)?;
469
470 sender
471 .send(PcmResponse {
472 desc_index,
473 status: read_data(src_buf, Some(&mut writer), period_bytes)
474 .await
475 .into(),
476 writer,
477 done: None,
478 })
479 .await
480 .map_err(Error::MpscSend)?;
481 }
482 },
483 }
484 },
485 }
486 }
487
488 // Defer pcm message response to the pcm response worker
defer_pcm_response_to_worker( desc_chain: DescriptorChain, mem: &GuestMemory, status: virtio_snd_pcm_status, response_sender: &mut mpsc::UnboundedSender<PcmResponse>, ) -> Result<(), Error>489 async fn defer_pcm_response_to_worker(
490 desc_chain: DescriptorChain,
491 mem: &GuestMemory,
492 status: virtio_snd_pcm_status,
493 response_sender: &mut mpsc::UnboundedSender<PcmResponse>,
494 ) -> Result<(), Error> {
495 let desc_index = desc_chain.index;
496 let writer = Writer::new(mem.clone(), desc_chain).map_err(Error::DescriptorChain)?;
497 response_sender
498 .send(PcmResponse {
499 desc_index,
500 status,
501 writer,
502 done: None,
503 })
504 .await
505 .map_err(Error::MpscSend)
506 }
507
send_pcm_response_with_writer<I: SignalableInterrupt>( mut writer: Writer, desc_index: u16, mem: &GuestMemory, queue: &mut Queue, interrupt: &I, status: virtio_snd_pcm_status, ) -> Result<(), Error>508 fn send_pcm_response_with_writer<I: SignalableInterrupt>(
509 mut writer: Writer,
510 desc_index: u16,
511 mem: &GuestMemory,
512 queue: &mut Queue,
513 interrupt: &I,
514 status: virtio_snd_pcm_status,
515 ) -> Result<(), Error> {
516 // For rx queue only. Fast forward the unused audio data buffer.
517 if writer.available_bytes() > std::mem::size_of::<virtio_snd_pcm_status>() {
518 writer
519 .consume_bytes(writer.available_bytes() - std::mem::size_of::<virtio_snd_pcm_status>());
520 }
521 writer.write_obj(status).map_err(Error::WriteResponse)?;
522 queue.add_used(mem, desc_index, writer.bytes_written() as u32);
523 queue.trigger_interrupt(mem, interrupt);
524 Ok(())
525 }
526
527 // Await until reset_signal has been released
await_reset_signal(reset_signal_option: Option<&(AsyncMutex<bool>, Condvar)>)528 async fn await_reset_signal(reset_signal_option: Option<&(AsyncMutex<bool>, Condvar)>) {
529 match reset_signal_option {
530 Some((lock, cvar)) => {
531 let mut reset = lock.lock().await;
532 while !*reset {
533 reset = cvar.wait(reset).await;
534 }
535 }
536 None => futures::future::pending().await,
537 };
538 }
539
send_pcm_response_worker<I: SignalableInterrupt>( mem: &GuestMemory, queue: &Rc<AsyncMutex<Queue>>, interrupt: I, recv: &mut mpsc::UnboundedReceiver<PcmResponse>, reset_signal: Option<&(AsyncMutex<bool>, Condvar)>, ) -> Result<(), Error>540 pub async fn send_pcm_response_worker<I: SignalableInterrupt>(
541 mem: &GuestMemory,
542 queue: &Rc<AsyncMutex<Queue>>,
543 interrupt: I,
544 recv: &mut mpsc::UnboundedReceiver<PcmResponse>,
545 reset_signal: Option<&(AsyncMutex<bool>, Condvar)>,
546 ) -> Result<(), Error> {
547 let on_reset = await_reset_signal(reset_signal).fuse();
548 pin_mut!(on_reset);
549
550 loop {
551 let next_async = recv.next().fuse();
552 pin_mut!(next_async);
553
554 let res = select! {
555 _ = on_reset => break,
556 res = next_async => res,
557 };
558
559 if let Some(r) = res {
560 send_pcm_response_with_writer(
561 r.writer,
562 r.desc_index,
563 mem,
564 &mut *queue.lock().await,
565 &interrupt,
566 r.status,
567 )?;
568
569 // Resume pcm_worker
570 if let Some(done) = r.done {
571 done.send(()).map_err(Error::OneshotSend)?;
572 }
573 } else {
574 debug!("PcmResponse channel is closed.");
575 break;
576 }
577 }
578 Ok(())
579 }
580
581 /// Handle messages from the tx or the rx queue. One invocation is needed for
582 /// each queue.
handle_pcm_queue( mem: &GuestMemory, streams: &Rc<AsyncMutex<Vec<AsyncMutex<StreamInfo>>>>, mut response_sender: mpsc::UnboundedSender<PcmResponse>, queue: &Rc<AsyncMutex<Queue>>, queue_event: &EventAsync, reset_signal: Option<&(AsyncMutex<bool>, Condvar)>, ) -> Result<(), Error>583 pub async fn handle_pcm_queue(
584 mem: &GuestMemory,
585 streams: &Rc<AsyncMutex<Vec<AsyncMutex<StreamInfo>>>>,
586 mut response_sender: mpsc::UnboundedSender<PcmResponse>,
587 queue: &Rc<AsyncMutex<Queue>>,
588 queue_event: &EventAsync,
589 reset_signal: Option<&(AsyncMutex<bool>, Condvar)>,
590 ) -> Result<(), Error> {
591 let on_reset = await_reset_signal(reset_signal).fuse();
592 pin_mut!(on_reset);
593
594 loop {
595 // Manual queue.next_async() to avoid holding the mutex
596 let next_async = async {
597 loop {
598 // Check if there are more descriptors available.
599 if let Some(chain) = queue.lock().await.pop(mem) {
600 return Ok(chain);
601 }
602 queue_event.next_val().await?;
603 }
604 }
605 .fuse();
606 pin_mut!(next_async);
607
608 let desc_chain = select! {
609 _ = on_reset => break,
610 res = next_async => res.map_err(Error::Async)?,
611 };
612
613 let mut reader =
614 Reader::new(mem.clone(), desc_chain.clone()).map_err(Error::DescriptorChain)?;
615
616 let pcm_xfer: virtio_snd_pcm_xfer = reader.read_obj().map_err(Error::ReadMessage)?;
617 let stream_id: usize = u32::from(pcm_xfer.stream_id) as usize;
618
619 let streams = streams.read_lock().await;
620 let stream_info = match streams.get(stream_id) {
621 Some(stream_info) => stream_info.read_lock().await,
622 None => {
623 error!(
624 "stream_id ({}) >= num_streams ({})",
625 stream_id,
626 streams.len()
627 );
628 defer_pcm_response_to_worker(
629 desc_chain,
630 mem,
631 virtio_snd_pcm_status::new(StatusCode::IoErr, 0),
632 &mut response_sender,
633 )
634 .await?;
635 continue;
636 }
637 };
638
639 match stream_info.sender.as_ref() {
640 Some(mut s) => {
641 s.send(desc_chain).await.map_err(Error::MpscSend)?;
642 if *stream_info.status_mutex.lock().await == WorkerStatus::Quit {
643 // If sender channel is still intact but worker status is quit,
644 // the worker quitted unexpectedly. Return error to request a reset.
645 return Err(Error::PCMWorkerQuittedUnexpectedly);
646 }
647 }
648 None => {
649 if !stream_info.just_reset {
650 error!(
651 "stream {} is not ready. state: {}",
652 stream_id,
653 get_virtio_snd_r_pcm_cmd_name(stream_info.state)
654 );
655 }
656 defer_pcm_response_to_worker(
657 desc_chain,
658 mem,
659 virtio_snd_pcm_status::new(StatusCode::IoErr, 0),
660 &mut response_sender,
661 )
662 .await?;
663 }
664 };
665 }
666 Ok(())
667 }
668
669 /// Handle all the control messages from the ctrl queue.
handle_ctrl_queue<I: SignalableInterrupt>( ex: &Executor, mem: &GuestMemory, streams: &Rc<AsyncMutex<Vec<AsyncMutex<StreamInfo>>>>, snd_data: &SndData, queue: &mut Queue, queue_event: &mut EventAsync, interrupt: I, tx_send: mpsc::UnboundedSender<PcmResponse>, rx_send: mpsc::UnboundedSender<PcmResponse>, reset_signal: Option<&(AsyncMutex<bool>, Condvar)>, ) -> Result<(), Error>670 pub async fn handle_ctrl_queue<I: SignalableInterrupt>(
671 ex: &Executor,
672 mem: &GuestMemory,
673 streams: &Rc<AsyncMutex<Vec<AsyncMutex<StreamInfo>>>>,
674 snd_data: &SndData,
675 queue: &mut Queue,
676 queue_event: &mut EventAsync,
677 interrupt: I,
678 tx_send: mpsc::UnboundedSender<PcmResponse>,
679 rx_send: mpsc::UnboundedSender<PcmResponse>,
680 reset_signal: Option<&(AsyncMutex<bool>, Condvar)>,
681 ) -> Result<(), Error> {
682 let on_reset = await_reset_signal(reset_signal).fuse();
683 pin_mut!(on_reset);
684
685 loop {
686 let desc_chain = {
687 let next_async = queue.next_async(mem, queue_event).fuse();
688 pin_mut!(next_async);
689
690 select! {
691 _ = on_reset => break,
692 res = next_async => res.map_err(Error::Async)?,
693 }
694 };
695
696 let index = desc_chain.index;
697
698 let mut reader =
699 Reader::new(mem.clone(), desc_chain.clone()).map_err(Error::DescriptorChain)?;
700 let mut writer = Writer::new(mem.clone(), desc_chain).map_err(Error::DescriptorChain)?;
701 // Don't advance the reader
702 let code = reader
703 .clone()
704 .read_obj::<virtio_snd_hdr>()
705 .map_err(Error::ReadMessage)?
706 .code
707 .into();
708
709 let handle_ctrl_msg = async {
710 return match code {
711 VIRTIO_SND_R_JACK_INFO => {
712 let query_info: virtio_snd_query_info =
713 reader.read_obj().map_err(Error::ReadMessage)?;
714 let start_id: usize = u32::from(query_info.start_id) as usize;
715 let count: usize = u32::from(query_info.count) as usize;
716 if start_id + count > snd_data.jack_info.len() {
717 error!(
718 "start_id({}) + count({}) must be smaller than \
719 the number of jacks ({})",
720 start_id,
721 count,
722 snd_data.jack_info.len()
723 );
724 return writer
725 .write_obj(VIRTIO_SND_S_BAD_MSG)
726 .map_err(Error::WriteResponse);
727 }
728 // The response consists of the virtio_snd_hdr structure (contains the request
729 // status code), followed by the device-writable information structures of the
730 // item. Each information structure begins with the following common header
731 writer
732 .write_obj(VIRTIO_SND_S_OK)
733 .map_err(Error::WriteResponse)?;
734 for i in start_id..(start_id + count) {
735 writer
736 .write_all(snd_data.jack_info[i].as_bytes())
737 .map_err(Error::WriteResponse)?;
738 }
739 Ok(())
740 }
741 VIRTIO_SND_R_PCM_INFO => {
742 let query_info: virtio_snd_query_info =
743 reader.read_obj().map_err(Error::ReadMessage)?;
744 let start_id: usize = u32::from(query_info.start_id) as usize;
745 let count: usize = u32::from(query_info.count) as usize;
746 if start_id + count > snd_data.pcm_info.len() {
747 error!(
748 "start_id({}) + count({}) must be smaller than \
749 the number of streams ({})",
750 start_id,
751 count,
752 snd_data.pcm_info.len()
753 );
754 return writer
755 .write_obj(VIRTIO_SND_S_BAD_MSG)
756 .map_err(Error::WriteResponse);
757 }
758 // The response consists of the virtio_snd_hdr structure (contains the request
759 // status code), followed by the device-writable information structures of the
760 // item. Each information structure begins with the following common header
761 writer
762 .write_obj(VIRTIO_SND_S_OK)
763 .map_err(Error::WriteResponse)?;
764 for i in start_id..(start_id + count) {
765 writer
766 .write_all(snd_data.pcm_info[i].as_bytes())
767 .map_err(Error::WriteResponse)?;
768 }
769 Ok(())
770 }
771 VIRTIO_SND_R_CHMAP_INFO => {
772 let query_info: virtio_snd_query_info =
773 reader.read_obj().map_err(Error::ReadMessage)?;
774 let start_id: usize = u32::from(query_info.start_id) as usize;
775 let count: usize = u32::from(query_info.count) as usize;
776 if start_id + count > snd_data.chmap_info.len() {
777 error!(
778 "start_id({}) + count({}) must be smaller than \
779 the number of chmaps ({})",
780 start_id,
781 count,
782 snd_data.chmap_info.len()
783 );
784 return writer
785 .write_obj(VIRTIO_SND_S_BAD_MSG)
786 .map_err(Error::WriteResponse);
787 }
788 // The response consists of the virtio_snd_hdr structure (contains the request
789 // status code), followed by the device-writable information structures of the
790 // item. Each information structure begins with the following common header
791 writer
792 .write_obj(VIRTIO_SND_S_OK)
793 .map_err(Error::WriteResponse)?;
794 for i in start_id..(start_id + count) {
795 writer
796 .write_all(snd_data.chmap_info[i].as_bytes())
797 .map_err(Error::WriteResponse)?;
798 }
799 Ok(())
800 }
801 VIRTIO_SND_R_JACK_REMAP => {
802 unreachable!("remap is unsupported");
803 }
804 VIRTIO_SND_R_PCM_SET_PARAMS => {
805 // Raise VIRTIO_SND_S_BAD_MSG or IO error?
806 let set_params: virtio_snd_pcm_set_params =
807 reader.read_obj().map_err(Error::ReadMessage)?;
808 let stream_id: usize = u32::from(set_params.hdr.stream_id) as usize;
809 let buffer_bytes: u32 = set_params.buffer_bytes.into();
810 let period_bytes: u32 = set_params.period_bytes.into();
811
812 let dir = match snd_data.pcm_info.get(stream_id) {
813 Some(pcm_info) => {
814 if set_params.channels < pcm_info.channels_min
815 || set_params.channels > pcm_info.channels_max
816 {
817 error!(
818 "Number of channels ({}) must be between {} and {}",
819 set_params.channels,
820 pcm_info.channels_min,
821 pcm_info.channels_max
822 );
823 return writer
824 .write_obj(VIRTIO_SND_S_NOT_SUPP)
825 .map_err(Error::WriteResponse);
826 }
827 if (u64::from(pcm_info.formats) & (1 << set_params.format)) == 0 {
828 error!("PCM format {} is not supported.", set_params.format);
829 return writer
830 .write_obj(VIRTIO_SND_S_NOT_SUPP)
831 .map_err(Error::WriteResponse);
832 }
833 if (u64::from(pcm_info.rates) & (1 << set_params.rate)) == 0 {
834 error!("PCM frame rate {} is not supported.", set_params.rate);
835 return writer
836 .write_obj(VIRTIO_SND_S_NOT_SUPP)
837 .map_err(Error::WriteResponse);
838 }
839
840 pcm_info.direction
841 }
842 None => {
843 error!(
844 "stream_id {} < streams {}",
845 stream_id,
846 snd_data.pcm_info.len()
847 );
848 return writer
849 .write_obj(VIRTIO_SND_S_BAD_MSG)
850 .map_err(Error::WriteResponse);
851 }
852 };
853
854 if set_params.features != 0 {
855 error!("No feature is supported");
856 return writer
857 .write_obj(VIRTIO_SND_S_NOT_SUPP)
858 .map_err(Error::WriteResponse);
859 }
860
861 if buffer_bytes % period_bytes != 0 {
862 error!(
863 "buffer_bytes({}) must be dividable by period_bytes({})",
864 buffer_bytes, period_bytes
865 );
866 return writer
867 .write_obj(VIRTIO_SND_S_BAD_MSG)
868 .map_err(Error::WriteResponse);
869 }
870
871 process_pcm_ctrl(
872 ex,
873 &mem.clone(),
874 &tx_send,
875 &rx_send,
876 streams,
877 VirtioSndPcmCmd::with_set_params_and_direction(set_params, dir),
878 &mut writer,
879 stream_id,
880 )
881 .await
882 }
883 VIRTIO_SND_R_PCM_PREPARE
884 | VIRTIO_SND_R_PCM_START
885 | VIRTIO_SND_R_PCM_STOP
886 | VIRTIO_SND_R_PCM_RELEASE => {
887 let hdr: virtio_snd_pcm_hdr = reader.read_obj().map_err(Error::ReadMessage)?;
888 let stream_id: usize = u32::from(hdr.stream_id) as usize;
889 let cmd = match VirtioSndPcmCmd::try_from(code) {
890 Ok(cmd) => cmd,
891 Err(err) => {
892 error!("Error converting code to command: {}", err);
893 return writer
894 .write_obj(VIRTIO_SND_S_BAD_MSG)
895 .map_err(Error::WriteResponse);
896 }
897 };
898 process_pcm_ctrl(
899 ex,
900 &mem.clone(),
901 &tx_send,
902 &rx_send,
903 streams,
904 cmd,
905 &mut writer,
906 stream_id,
907 )
908 .await
909 .and(Ok(()))?;
910 Ok(())
911 }
912 c => {
913 error!("Unrecognized code: {}", c);
914 return writer
915 .write_obj(VIRTIO_SND_S_BAD_MSG)
916 .map_err(Error::WriteResponse);
917 }
918 };
919 };
920
921 handle_ctrl_msg.await?;
922 queue.add_used(mem, index, writer.bytes_written() as u32);
923 queue.trigger_interrupt(mem, &interrupt);
924 }
925 Ok(())
926 }
927
928 /// Send events to the audio driver.
handle_event_queue<I: SignalableInterrupt>( mem: &GuestMemory, mut queue: Queue, mut queue_event: EventAsync, interrupt: I, ) -> Result<(), Error>929 pub async fn handle_event_queue<I: SignalableInterrupt>(
930 mem: &GuestMemory,
931 mut queue: Queue,
932 mut queue_event: EventAsync,
933 interrupt: I,
934 ) -> Result<(), Error> {
935 loop {
936 let desc_chain = queue
937 .next_async(mem, &mut queue_event)
938 .await
939 .map_err(Error::Async)?;
940
941 // TODO(woodychow): Poll and forward events from cras asynchronously (API to be added)
942 let index = desc_chain.index;
943 queue.add_used(mem, index, 0);
944 queue.trigger_interrupt(mem, &interrupt);
945 }
946 }
947