1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::fmt;
6 use std::io;
7 use std::io::Read;
8 use std::io::Write;
9 use std::rc::Rc;
10 use std::sync::atomic::AtomicBool;
11 use std::sync::atomic::Ordering;
12 use std::time::Duration;
13
14 use async_trait::async_trait;
15 use audio_streams::capture::AsyncCaptureBuffer;
16 use audio_streams::AsyncPlaybackBuffer;
17 use audio_streams::BoxError;
18 use base::debug;
19 use base::error;
20 use base::info;
21 use cros_async::sync::Condvar;
22 use cros_async::sync::RwLock as AsyncRwLock;
23 use cros_async::AsyncTube;
24 use cros_async::EventAsync;
25 use cros_async::Executor;
26 use cros_async::TimerAsync;
27 use futures::channel::mpsc;
28 use futures::channel::oneshot;
29 use futures::pin_mut;
30 use futures::select;
31 use futures::FutureExt;
32 use futures::SinkExt;
33 use futures::StreamExt;
34 use thiserror::Error as ThisError;
35 use vm_control::SndControlCommand;
36 use vm_control::VmResponse;
37 use zerocopy::IntoBytes;
38
39 use super::Error;
40 use super::SndData;
41 use super::WorkerStatus;
42 use crate::virtio::snd::common::*;
43 use crate::virtio::snd::common_backend::stream_info::SetParams;
44 use crate::virtio::snd::common_backend::stream_info::StreamInfo;
45 use crate::virtio::snd::common_backend::DirectionalStream;
46 use crate::virtio::snd::common_backend::PcmResponse;
47 use crate::virtio::snd::constants::*;
48 use crate::virtio::snd::layout::*;
49 use crate::virtio::DescriptorChain;
50 use crate::virtio::Queue;
51 use crate::virtio::Reader;
52 use crate::virtio::Writer;
53
54 /// Trait to wrap system specific helpers for reading from the start point capture buffer.
55 #[async_trait(?Send)]
56 pub trait CaptureBufferReader {
get_next_capture_period( &mut self, ex: &Executor, ) -> Result<AsyncCaptureBuffer, BoxError>57 async fn get_next_capture_period(
58 &mut self,
59 ex: &Executor,
60 ) -> Result<AsyncCaptureBuffer, BoxError>;
61 }
62
63 /// Trait to wrap system specific helpers for writing to endpoint playback buffers.
64 #[async_trait(?Send)]
65 pub trait PlaybackBufferWriter {
new(guest_period_bytes: usize) -> Self where Self: Sized66 fn new(guest_period_bytes: usize) -> Self
67 where
68 Self: Sized;
69
70 /// Returns the period of the endpoint device.
endpoint_period_bytes(&self) -> usize71 fn endpoint_period_bytes(&self) -> usize;
72
73 /// Read audio samples from the tx virtqueue.
copy_to_buffer( &mut self, dst_buf: &mut AsyncPlaybackBuffer<'_>, reader: &mut Reader, ) -> Result<usize, Error>74 fn copy_to_buffer(
75 &mut self,
76 dst_buf: &mut AsyncPlaybackBuffer<'_>,
77 reader: &mut Reader,
78 ) -> Result<usize, Error> {
79 dst_buf.copy_from(reader).map_err(Error::Io)
80 }
81 }
82
83 #[derive(Debug)]
84 enum VirtioSndPcmCmd {
85 SetParams { set_params: SetParams },
86 Prepare,
87 Start,
88 Stop,
89 Release,
90 }
91
92 impl fmt::Display for VirtioSndPcmCmd {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result93 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94 let cmd_code = match self {
95 VirtioSndPcmCmd::SetParams { set_params: _ } => VIRTIO_SND_R_PCM_SET_PARAMS,
96 VirtioSndPcmCmd::Prepare => VIRTIO_SND_R_PCM_PREPARE,
97 VirtioSndPcmCmd::Start => VIRTIO_SND_R_PCM_START,
98 VirtioSndPcmCmd::Stop => VIRTIO_SND_R_PCM_STOP,
99 VirtioSndPcmCmd::Release => VIRTIO_SND_R_PCM_RELEASE,
100 };
101 f.write_str(get_virtio_snd_r_pcm_cmd_name(cmd_code))
102 }
103 }
104
105 #[derive(ThisError, Debug)]
106 enum VirtioSndPcmCmdError {
107 #[error("SetParams requires additional parameters")]
108 SetParams,
109 #[error("Invalid virtio snd command code")]
110 InvalidCode,
111 }
112
113 impl TryFrom<u32> for VirtioSndPcmCmd {
114 type Error = VirtioSndPcmCmdError;
115
try_from(code: u32) -> Result<Self, Self::Error>116 fn try_from(code: u32) -> Result<Self, Self::Error> {
117 match code {
118 VIRTIO_SND_R_PCM_PREPARE => Ok(VirtioSndPcmCmd::Prepare),
119 VIRTIO_SND_R_PCM_START => Ok(VirtioSndPcmCmd::Start),
120 VIRTIO_SND_R_PCM_STOP => Ok(VirtioSndPcmCmd::Stop),
121 VIRTIO_SND_R_PCM_RELEASE => Ok(VirtioSndPcmCmd::Release),
122 VIRTIO_SND_R_PCM_SET_PARAMS => Err(VirtioSndPcmCmdError::SetParams),
123 _ => Err(VirtioSndPcmCmdError::InvalidCode),
124 }
125 }
126 }
127
128 impl VirtioSndPcmCmd {
with_set_params_and_direction( set_params: virtio_snd_pcm_set_params, dir: u8, ) -> VirtioSndPcmCmd129 fn with_set_params_and_direction(
130 set_params: virtio_snd_pcm_set_params,
131 dir: u8,
132 ) -> VirtioSndPcmCmd {
133 let buffer_bytes: u32 = set_params.buffer_bytes.into();
134 let period_bytes: u32 = set_params.period_bytes.into();
135 VirtioSndPcmCmd::SetParams {
136 set_params: SetParams {
137 channels: set_params.channels,
138 format: from_virtio_sample_format(set_params.format).unwrap(),
139 frame_rate: from_virtio_frame_rate(set_params.rate).unwrap(),
140 buffer_bytes: buffer_bytes as usize,
141 period_bytes: period_bytes as usize,
142 dir,
143 },
144 }
145 }
146 }
147
148 // Returns true if the operation is successful. Returns error if there is
149 // a runtime/internal error
process_pcm_ctrl( ex: &Executor, tx_send: &mpsc::UnboundedSender<PcmResponse>, rx_send: &mpsc::UnboundedSender<PcmResponse>, streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>, cmd: VirtioSndPcmCmd, writer: &mut Writer, stream_id: usize, card_index: usize, ) -> Result<(), Error>150 async fn process_pcm_ctrl(
151 ex: &Executor,
152 tx_send: &mpsc::UnboundedSender<PcmResponse>,
153 rx_send: &mpsc::UnboundedSender<PcmResponse>,
154 streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>,
155 cmd: VirtioSndPcmCmd,
156 writer: &mut Writer,
157 stream_id: usize,
158 card_index: usize,
159 ) -> Result<(), Error> {
160 let streams = streams.read_lock().await;
161 let mut stream = match streams.get(stream_id) {
162 Some(stream_info) => stream_info.lock().await,
163 None => {
164 error!(
165 "[Card {}] Stream id={} not found for {}. Error code: VIRTIO_SND_S_BAD_MSG",
166 card_index, stream_id, cmd
167 );
168 return writer
169 .write_obj(VIRTIO_SND_S_BAD_MSG)
170 .map_err(Error::WriteResponse);
171 }
172 };
173
174 debug!("[Card {}] {} for stream id={}", card_index, cmd, stream_id);
175
176 let result = match cmd {
177 VirtioSndPcmCmd::SetParams { set_params } => {
178 let result = stream.set_params(set_params).await;
179 if result.is_ok() {
180 debug!(
181 "[Card {}] VIRTIO_SND_R_PCM_SET_PARAMS for stream id={}. Stream info: {:#?}",
182 card_index, stream_id, *stream
183 );
184 }
185 result
186 }
187 VirtioSndPcmCmd::Prepare => stream.prepare(ex, tx_send, rx_send).await,
188 VirtioSndPcmCmd::Start => stream.start().await,
189 VirtioSndPcmCmd::Stop => stream.stop().await,
190 VirtioSndPcmCmd::Release => stream.release().await,
191 };
192 match result {
193 Ok(_) => writer
194 .write_obj(VIRTIO_SND_S_OK)
195 .map_err(Error::WriteResponse),
196 Err(Error::OperationNotSupported) => {
197 error!(
198 "[Card {}] {} for stream id={} failed. Error code: VIRTIO_SND_S_NOT_SUPP.",
199 card_index, cmd, stream_id
200 );
201
202 writer
203 .write_obj(VIRTIO_SND_S_NOT_SUPP)
204 .map_err(Error::WriteResponse)
205 }
206 Err(e) => {
207 // Runtime/internal error would be more appropriate, but there's
208 // no such error type
209 error!(
210 "[Card {}] {} for stream id={} failed. Error code: VIRTIO_SND_S_IO_ERR. Actual error: {}",
211 card_index, cmd, stream_id, e
212 );
213 writer
214 .write_obj(VIRTIO_SND_S_IO_ERR)
215 .map_err(Error::WriteResponse)
216 }
217 }
218 }
219
write_data( mut dst_buf: AsyncPlaybackBuffer<'_>, reader: Option<&mut Reader>, buffer_writer: &mut Box<dyn PlaybackBufferWriter>, ) -> Result<u32, Error>220 async fn write_data(
221 mut dst_buf: AsyncPlaybackBuffer<'_>,
222 reader: Option<&mut Reader>,
223 buffer_writer: &mut Box<dyn PlaybackBufferWriter>,
224 ) -> Result<u32, Error> {
225 let transferred = match reader {
226 Some(reader) => buffer_writer.copy_to_buffer(&mut dst_buf, reader)?,
227 None => dst_buf
228 .copy_from(&mut io::repeat(0).take(buffer_writer.endpoint_period_bytes() as u64))
229 .map_err(Error::Io)?,
230 };
231
232 if transferred != buffer_writer.endpoint_period_bytes() {
233 error!(
234 "Bytes written {} != period_bytes {}",
235 transferred,
236 buffer_writer.endpoint_period_bytes()
237 );
238 Err(Error::InvalidBufferSize)
239 } else {
240 dst_buf.commit().await;
241 Ok(dst_buf.latency_bytes())
242 }
243 }
244
read_data( mut src_buf: AsyncCaptureBuffer<'_>, writer: Option<&mut Writer>, period_bytes: usize, ) -> Result<u32, Error>245 async fn read_data(
246 mut src_buf: AsyncCaptureBuffer<'_>,
247 writer: Option<&mut Writer>,
248 period_bytes: usize,
249 ) -> Result<u32, Error> {
250 let transferred = match writer {
251 Some(writer) => src_buf.copy_to(writer),
252 None => src_buf.copy_to(&mut io::sink()),
253 }
254 .map_err(Error::Io)?;
255 if transferred != period_bytes {
256 error!(
257 "Bytes written {} != period_bytes {}",
258 transferred, period_bytes
259 );
260 Err(Error::InvalidBufferSize)
261 } else {
262 src_buf.commit().await;
263 Ok(src_buf.latency_bytes())
264 }
265 }
266
267 impl From<Result<u32, Error>> for virtio_snd_pcm_status {
from(res: Result<u32, Error>) -> Self268 fn from(res: Result<u32, Error>) -> Self {
269 match res {
270 Ok(latency_bytes) => virtio_snd_pcm_status::new(StatusCode::OK, latency_bytes),
271 Err(e) => {
272 error!("PCM I/O message failed: {}", e);
273 virtio_snd_pcm_status::new(StatusCode::IoErr, 0)
274 }
275 }
276 }
277 }
278
279 // Drain all DescriptorChain in desc_receiver during WorkerStatus::Quit process.
drain_desc_receiver( desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>, sender: &mut mpsc::UnboundedSender<PcmResponse>, ) -> Result<(), Error>280 async fn drain_desc_receiver(
281 desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>,
282 sender: &mut mpsc::UnboundedSender<PcmResponse>,
283 ) -> Result<(), Error> {
284 let mut o_desc_chain = desc_receiver.next().await;
285 while let Some(desc_chain) = o_desc_chain {
286 // From the virtio-snd spec:
287 // The device MUST complete all pending I/O messages for the specified stream ID.
288 let status = virtio_snd_pcm_status::new(StatusCode::OK, 0);
289 // Fetch next DescriptorChain to see if the current one is the last one.
290 o_desc_chain = desc_receiver.next().await;
291 let (done, future) = if o_desc_chain.is_none() {
292 let (done, future) = oneshot::channel();
293 (Some(done), Some(future))
294 } else {
295 (None, None)
296 };
297 sender
298 .send(PcmResponse {
299 desc_chain,
300 status,
301 done,
302 })
303 .await
304 .map_err(Error::MpscSend)?;
305
306 if let Some(f) = future {
307 // From the virtio-snd spec:
308 // The device MUST NOT complete the control request (VIRTIO_SND_R_PCM_RELEASE)
309 // while there are pending I/O messages for the specified stream ID.
310 f.await.map_err(Error::DoneNotTriggered)?;
311 };
312 }
313 Ok(())
314 }
315
316 /// Start a pcm worker that receives descriptors containing PCM frames (audio data) from the tx/rx
317 /// queue, and forward them to CRAS. One pcm worker per stream.
318 ///
319 /// This worker is started when VIRTIO_SND_R_PCM_PREPARE is called, and returned before
320 /// VIRTIO_SND_R_PCM_RELEASE is completed for the stream.
start_pcm_worker( ex: Executor, dstream: DirectionalStream, mut desc_receiver: mpsc::UnboundedReceiver<DescriptorChain>, status_mutex: Rc<AsyncRwLock<WorkerStatus>>, mut sender: mpsc::UnboundedSender<PcmResponse>, period_dur: Duration, card_index: usize, muted: Rc<AtomicBool>, release_signal: Rc<(AsyncRwLock<bool>, Condvar)>, ) -> Result<(), Error>321 pub async fn start_pcm_worker(
322 ex: Executor,
323 dstream: DirectionalStream,
324 mut desc_receiver: mpsc::UnboundedReceiver<DescriptorChain>,
325 status_mutex: Rc<AsyncRwLock<WorkerStatus>>,
326 mut sender: mpsc::UnboundedSender<PcmResponse>,
327 period_dur: Duration,
328 card_index: usize,
329 muted: Rc<AtomicBool>,
330 release_signal: Rc<(AsyncRwLock<bool>, Condvar)>,
331 ) -> Result<(), Error> {
332 let res = pcm_worker_loop(
333 ex,
334 dstream,
335 &mut desc_receiver,
336 &status_mutex,
337 &mut sender,
338 period_dur,
339 card_index,
340 muted,
341 release_signal,
342 )
343 .await;
344 *status_mutex.lock().await = WorkerStatus::Quit;
345 if res.is_err() {
346 error!(
347 "[Card {}] pcm_worker error: {:#?}. Draining desc_receiver",
348 card_index,
349 res.as_ref().err()
350 );
351 // On error, guaranteed that desc_receiver has not been drained, so drain it here.
352 // Note that drain blocks until the stream is release.
353 drain_desc_receiver(&mut desc_receiver, &mut sender).await?;
354 }
355 res
356 }
357
pcm_worker_loop( ex: Executor, dstream: DirectionalStream, desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>, status_mutex: &Rc<AsyncRwLock<WorkerStatus>>, sender: &mut mpsc::UnboundedSender<PcmResponse>, period_dur: Duration, card_index: usize, muted: Rc<AtomicBool>, release_signal: Rc<(AsyncRwLock<bool>, Condvar)>, ) -> Result<(), Error>358 async fn pcm_worker_loop(
359 ex: Executor,
360 dstream: DirectionalStream,
361 desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>,
362 status_mutex: &Rc<AsyncRwLock<WorkerStatus>>,
363 sender: &mut mpsc::UnboundedSender<PcmResponse>,
364 period_dur: Duration,
365 card_index: usize,
366 muted: Rc<AtomicBool>,
367 release_signal: Rc<(AsyncRwLock<bool>, Condvar)>,
368 ) -> Result<(), Error> {
369 let on_release = async {
370 await_reset_signal(Some(&*release_signal)).await;
371 // After receiving release signal, wait for up to 2 periods,
372 // giving it a chance to respond to the last buffer.
373 if let Err(e) = TimerAsync::sleep(&ex, period_dur * 2).await {
374 error!(
375 "[Card {}] Error on sleep after receiving reset signal: {}",
376 card_index, e
377 )
378 }
379 }
380 .fuse();
381 pin_mut!(on_release);
382
383 match dstream {
384 DirectionalStream::Output(mut sys_direction_output) => loop {
385 #[cfg(windows)]
386 let (mut stream, mut buffer_writer_lock) = (
387 sys_direction_output
388 .async_playback_buffer_stream
389 .lock()
390 .await,
391 sys_direction_output.buffer_writer.lock().await,
392 );
393 #[cfg(windows)]
394 let buffer_writer = &mut buffer_writer_lock;
395 #[cfg(any(target_os = "android", target_os = "linux"))]
396 let (stream, buffer_writer) = (
397 &mut sys_direction_output.async_playback_buffer_stream,
398 &mut sys_direction_output.buffer_writer,
399 );
400
401 let next_buf = stream.next_playback_buffer(&ex).fuse();
402 pin_mut!(next_buf);
403
404 let dst_buf = select! {
405 _ = on_release => {
406 drain_desc_receiver(desc_receiver, sender).await?;
407 break Ok(());
408 },
409 buf = next_buf => buf.map_err(Error::FetchBuffer)?,
410 };
411 let worker_status = status_mutex.lock().await;
412 match *worker_status {
413 WorkerStatus::Quit => {
414 drain_desc_receiver(desc_receiver, sender).await?;
415 if let Err(e) = write_data(dst_buf, None, buffer_writer).await {
416 error!(
417 "[Card {}] Error on write_data after worker quit: {}",
418 card_index, e
419 )
420 }
421 break Ok(());
422 }
423 WorkerStatus::Pause => {
424 write_data(dst_buf, None, buffer_writer).await?;
425 }
426 WorkerStatus::Running => match desc_receiver.try_next() {
427 Err(e) => {
428 error!(
429 "[Card {}] Underrun. No new DescriptorChain while running: {}",
430 card_index, e
431 );
432 write_data(dst_buf, None, buffer_writer).await?;
433 }
434 Ok(None) => {
435 error!("[Card {}] Unreachable. status should be Quit when the channel is closed", card_index);
436 write_data(dst_buf, None, buffer_writer).await?;
437 return Err(Error::InvalidPCMWorkerState);
438 }
439 Ok(Some(mut desc_chain)) => {
440 let reader = if muted.load(Ordering::Relaxed) {
441 None
442 } else {
443 // stream_id was already read in handle_pcm_queue
444 Some(&mut desc_chain.reader)
445 };
446 let status = write_data(dst_buf, reader, buffer_writer).await.into();
447 sender
448 .send(PcmResponse {
449 desc_chain,
450 status,
451 done: None,
452 })
453 .await
454 .map_err(Error::MpscSend)?;
455 }
456 },
457 }
458 },
459 DirectionalStream::Input(period_bytes, mut buffer_reader) => loop {
460 let next_buf = buffer_reader.get_next_capture_period(&ex).fuse();
461 pin_mut!(next_buf);
462
463 let src_buf = select! {
464 _ = on_release => {
465 drain_desc_receiver(desc_receiver, sender).await?;
466 break Ok(());
467 },
468 buf = next_buf => buf.map_err(Error::FetchBuffer)?,
469 };
470
471 let worker_status = status_mutex.lock().await;
472 match *worker_status {
473 WorkerStatus::Quit => {
474 drain_desc_receiver(desc_receiver, sender).await?;
475 if let Err(e) = read_data(src_buf, None, period_bytes).await {
476 error!(
477 "[Card {}] Error on read_data after worker quit: {}",
478 card_index, e
479 )
480 }
481 break Ok(());
482 }
483 WorkerStatus::Pause => {
484 read_data(src_buf, None, period_bytes).await?;
485 }
486 WorkerStatus::Running => match desc_receiver.try_next() {
487 Err(e) => {
488 error!(
489 "[Card {}] Overrun. No new DescriptorChain while running: {}",
490 card_index, e
491 );
492 read_data(src_buf, None, period_bytes).await?;
493 }
494 Ok(None) => {
495 error!("[Card {}] Unreachable. status should be Quit when the channel is closed", card_index);
496 read_data(src_buf, None, period_bytes).await?;
497 return Err(Error::InvalidPCMWorkerState);
498 }
499 Ok(Some(mut desc_chain)) => {
500 let writer = if muted.load(Ordering::Relaxed) {
501 None
502 } else {
503 Some(&mut desc_chain.writer)
504 };
505 let status = read_data(src_buf, writer, period_bytes).await.into();
506 sender
507 .send(PcmResponse {
508 desc_chain,
509 status,
510 done: None,
511 })
512 .await
513 .map_err(Error::MpscSend)?;
514 }
515 },
516 }
517 },
518 }
519 }
520
521 // Defer pcm message response to the pcm response worker
defer_pcm_response_to_worker( desc_chain: DescriptorChain, status: virtio_snd_pcm_status, response_sender: &mut mpsc::UnboundedSender<PcmResponse>, ) -> Result<(), Error>522 async fn defer_pcm_response_to_worker(
523 desc_chain: DescriptorChain,
524 status: virtio_snd_pcm_status,
525 response_sender: &mut mpsc::UnboundedSender<PcmResponse>,
526 ) -> Result<(), Error> {
527 response_sender
528 .send(PcmResponse {
529 desc_chain,
530 status,
531 done: None,
532 })
533 .await
534 .map_err(Error::MpscSend)
535 }
536
send_pcm_response( mut desc_chain: DescriptorChain, queue: &mut Queue, status: virtio_snd_pcm_status, ) -> Result<(), Error>537 fn send_pcm_response(
538 mut desc_chain: DescriptorChain,
539 queue: &mut Queue,
540 status: virtio_snd_pcm_status,
541 ) -> Result<(), Error> {
542 let writer = &mut desc_chain.writer;
543
544 // For rx queue only. Fast forward the unused audio data buffer.
545 if writer.available_bytes() > std::mem::size_of::<virtio_snd_pcm_status>() {
546 writer
547 .consume_bytes(writer.available_bytes() - std::mem::size_of::<virtio_snd_pcm_status>());
548 }
549 writer.write_obj(status).map_err(Error::WriteResponse)?;
550 let len = writer.bytes_written() as u32;
551 queue.add_used(desc_chain, len);
552 queue.trigger_interrupt();
553 Ok(())
554 }
555
556 // Await until reset_signal has been released
await_reset_signal(reset_signal_option: Option<&(AsyncRwLock<bool>, Condvar)>)557 async fn await_reset_signal(reset_signal_option: Option<&(AsyncRwLock<bool>, Condvar)>) {
558 match reset_signal_option {
559 Some((lock, cvar)) => {
560 let mut reset = lock.lock().await;
561 while !*reset {
562 reset = cvar.wait(reset).await;
563 }
564 }
565 None => futures::future::pending().await,
566 };
567 }
568
send_pcm_response_worker( queue: Rc<AsyncRwLock<Queue>>, recv: &mut mpsc::UnboundedReceiver<PcmResponse>, reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>, ) -> Result<(), Error>569 pub async fn send_pcm_response_worker(
570 queue: Rc<AsyncRwLock<Queue>>,
571 recv: &mut mpsc::UnboundedReceiver<PcmResponse>,
572 reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>,
573 ) -> Result<(), Error> {
574 let on_reset = await_reset_signal(reset_signal).fuse();
575 pin_mut!(on_reset);
576
577 loop {
578 let next_async = recv.next().fuse();
579 pin_mut!(next_async);
580
581 let res = select! {
582 _ = on_reset => break,
583 res = next_async => res,
584 };
585
586 if let Some(r) = res {
587 send_pcm_response(r.desc_chain, &mut *queue.lock().await, r.status)?;
588
589 // Resume pcm_worker
590 if let Some(done) = r.done {
591 done.send(()).map_err(Error::OneshotSend)?;
592 }
593 } else {
594 debug!("PcmResponse channel is closed.");
595 break;
596 }
597 }
598 Ok(())
599 }
600
601 /// Handle messages from the control tube. This one is not related to virtio spec.
handle_ctrl_tube( streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>, control_tube: &AsyncTube, reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>, ) -> Result<(), Error>602 pub async fn handle_ctrl_tube(
603 streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>,
604 control_tube: &AsyncTube,
605 reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>,
606 ) -> Result<(), Error> {
607 let on_reset = await_reset_signal(reset_signal).fuse();
608 pin_mut!(on_reset);
609
610 loop {
611 let next_async = control_tube.next().fuse();
612 pin_mut!(next_async);
613
614 let cmd = select! {
615 _ = on_reset => break,
616 res = next_async => res,
617 };
618
619 match cmd {
620 Ok(cmd) => {
621 let resp = match cmd {
622 SndControlCommand::MuteAll(muted) => {
623 let streams = streams.read_lock().await;
624 for stream in streams.iter() {
625 let stream_info = stream.lock().await;
626 stream_info.muted.store(muted, Ordering::Relaxed);
627 info!("Stream muted = {:?}", muted);
628 }
629 VmResponse::Ok
630 }
631 };
632 control_tube
633 .send(resp)
634 .await
635 .map_err(Error::ControlTubeError)?;
636 }
637 Err(e) => {
638 error!("Failed to read the command: {}", e);
639 return Err(Error::ControlTubeError(e));
640 }
641 }
642 }
643
644 Ok(())
645 }
646
647 /// Handle messages from the tx or the rx queue. One invocation is needed for
648 /// each queue.
handle_pcm_queue( streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>, mut response_sender: mpsc::UnboundedSender<PcmResponse>, queue: Rc<AsyncRwLock<Queue>>, queue_event: &EventAsync, card_index: usize, reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>, ) -> Result<(), Error>649 pub async fn handle_pcm_queue(
650 streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>,
651 mut response_sender: mpsc::UnboundedSender<PcmResponse>,
652 queue: Rc<AsyncRwLock<Queue>>,
653 queue_event: &EventAsync,
654 card_index: usize,
655 reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>,
656 ) -> Result<(), Error> {
657 let on_reset = await_reset_signal(reset_signal).fuse();
658 pin_mut!(on_reset);
659
660 loop {
661 // Manual queue.next_async() to avoid holding the mutex
662 let next_async = async {
663 loop {
664 // Check if there are more descriptors available.
665 if let Some(chain) = queue.lock().await.pop() {
666 return Ok(chain);
667 }
668 queue_event.next_val().await?;
669 }
670 }
671 .fuse();
672 pin_mut!(next_async);
673
674 let mut desc_chain = select! {
675 _ = on_reset => break,
676 res = next_async => res.map_err(Error::Async)?,
677 };
678
679 let pcm_xfer: virtio_snd_pcm_xfer =
680 desc_chain.reader.read_obj().map_err(Error::ReadMessage)?;
681 let stream_id: usize = u32::from(pcm_xfer.stream_id) as usize;
682
683 let streams = streams.read_lock().await;
684 let stream_info = match streams.get(stream_id) {
685 Some(stream_info) => stream_info.read_lock().await,
686 None => {
687 error!(
688 "[Card {}] stream_id ({}) >= num_streams ({})",
689 card_index,
690 stream_id,
691 streams.len()
692 );
693 defer_pcm_response_to_worker(
694 desc_chain,
695 virtio_snd_pcm_status::new(StatusCode::IoErr, 0),
696 &mut response_sender,
697 )
698 .await?;
699 continue;
700 }
701 };
702
703 match stream_info.sender.as_ref() {
704 Some(mut s) => {
705 s.send(desc_chain).await.map_err(Error::MpscSend)?;
706 if *stream_info.status_mutex.lock().await == WorkerStatus::Quit {
707 // If sender channel is still intact but worker status is quit,
708 // the worker quitted unexpectedly. Return error to request a reset.
709 return Err(Error::PCMWorkerQuittedUnexpectedly);
710 }
711 }
712 None => {
713 if !stream_info.just_reset {
714 error!(
715 "[Card {}] stream {} is not ready. state: {}",
716 card_index,
717 stream_id,
718 get_virtio_snd_r_pcm_cmd_name(stream_info.state)
719 );
720 }
721 defer_pcm_response_to_worker(
722 desc_chain,
723 virtio_snd_pcm_status::new(StatusCode::IoErr, 0),
724 &mut response_sender,
725 )
726 .await?;
727 }
728 };
729 }
730 Ok(())
731 }
732
733 /// Handle all the control messages from the ctrl queue.
handle_ctrl_queue( ex: &Executor, streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>, snd_data: &SndData, queue: Rc<AsyncRwLock<Queue>>, queue_event: &mut EventAsync, tx_send: mpsc::UnboundedSender<PcmResponse>, rx_send: mpsc::UnboundedSender<PcmResponse>, card_index: usize, reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>, ) -> Result<(), Error>734 pub async fn handle_ctrl_queue(
735 ex: &Executor,
736 streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>,
737 snd_data: &SndData,
738 queue: Rc<AsyncRwLock<Queue>>,
739 queue_event: &mut EventAsync,
740 tx_send: mpsc::UnboundedSender<PcmResponse>,
741 rx_send: mpsc::UnboundedSender<PcmResponse>,
742 card_index: usize,
743 reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>,
744 ) -> Result<(), Error> {
745 let on_reset = await_reset_signal(reset_signal).fuse();
746 pin_mut!(on_reset);
747
748 let mut queue = queue.lock().await;
749 loop {
750 let mut desc_chain = {
751 let next_async = queue.next_async(queue_event).fuse();
752 pin_mut!(next_async);
753
754 select! {
755 _ = on_reset => break,
756 res = next_async => res.map_err(Error::Async)?,
757 }
758 };
759
760 let reader = &mut desc_chain.reader;
761 let writer = &mut desc_chain.writer;
762 // Don't advance the reader
763 let code = reader
764 .peek_obj::<virtio_snd_hdr>()
765 .map_err(Error::ReadMessage)?
766 .code
767 .into();
768
769 let handle_ctrl_msg = async {
770 match code {
771 VIRTIO_SND_R_JACK_INFO => {
772 let query_info: virtio_snd_query_info =
773 reader.read_obj().map_err(Error::ReadMessage)?;
774 let start_id: usize = u32::from(query_info.start_id) as usize;
775 let count: usize = u32::from(query_info.count) as usize;
776 if start_id + count > snd_data.jack_info.len() {
777 error!(
778 "[Card {}] start_id({}) + count({}) must be smaller than \
779 the number of jacks ({})",
780 card_index,
781 start_id,
782 count,
783 snd_data.jack_info.len()
784 );
785 return writer
786 .write_obj(VIRTIO_SND_S_BAD_MSG)
787 .map_err(Error::WriteResponse);
788 }
789 // The response consists of the virtio_snd_hdr structure (contains the request
790 // status code), followed by the device-writable information structures of the
791 // item. Each information structure begins with the following common header
792 writer
793 .write_obj(VIRTIO_SND_S_OK)
794 .map_err(Error::WriteResponse)?;
795 for i in start_id..(start_id + count) {
796 writer
797 .write_all(snd_data.jack_info[i].as_bytes())
798 .map_err(Error::WriteResponse)?;
799 }
800 Ok(())
801 }
802 VIRTIO_SND_R_PCM_INFO => {
803 let query_info: virtio_snd_query_info =
804 reader.read_obj().map_err(Error::ReadMessage)?;
805 let start_id: usize = u32::from(query_info.start_id) as usize;
806 let count: usize = u32::from(query_info.count) as usize;
807 if start_id + count > snd_data.pcm_info.len() {
808 error!(
809 "[Card {}] start_id({}) + count({}) must be smaller than \
810 the number of streams ({})",
811 card_index,
812 start_id,
813 count,
814 snd_data.pcm_info.len()
815 );
816 return writer
817 .write_obj(VIRTIO_SND_S_BAD_MSG)
818 .map_err(Error::WriteResponse);
819 }
820 // The response consists of the virtio_snd_hdr structure (contains the request
821 // status code), followed by the device-writable information structures of the
822 // item. Each information structure begins with the following common header
823 writer
824 .write_obj(VIRTIO_SND_S_OK)
825 .map_err(Error::WriteResponse)?;
826 for i in start_id..(start_id + count) {
827 writer
828 .write_all(snd_data.pcm_info[i].as_bytes())
829 .map_err(Error::WriteResponse)?;
830 }
831 Ok(())
832 }
833 VIRTIO_SND_R_CHMAP_INFO => {
834 let query_info: virtio_snd_query_info =
835 reader.read_obj().map_err(Error::ReadMessage)?;
836 let start_id: usize = u32::from(query_info.start_id) as usize;
837 let count: usize = u32::from(query_info.count) as usize;
838 if start_id + count > snd_data.chmap_info.len() {
839 error!(
840 "[Card {}] start_id({}) + count({}) must be smaller than \
841 the number of chmaps ({})",
842 card_index,
843 start_id,
844 count,
845 snd_data.chmap_info.len()
846 );
847 return writer
848 .write_obj(VIRTIO_SND_S_BAD_MSG)
849 .map_err(Error::WriteResponse);
850 }
851 // The response consists of the virtio_snd_hdr structure (contains the request
852 // status code), followed by the device-writable information structures of the
853 // item. Each information structure begins with the following common header
854 writer
855 .write_obj(VIRTIO_SND_S_OK)
856 .map_err(Error::WriteResponse)?;
857 for i in start_id..(start_id + count) {
858 writer
859 .write_all(snd_data.chmap_info[i].as_bytes())
860 .map_err(Error::WriteResponse)?;
861 }
862 Ok(())
863 }
864 VIRTIO_SND_R_JACK_REMAP => {
865 unreachable!("remap is unsupported");
866 }
867 VIRTIO_SND_R_PCM_SET_PARAMS => {
868 // Raise VIRTIO_SND_S_BAD_MSG or IO error?
869 let set_params: virtio_snd_pcm_set_params =
870 reader.read_obj().map_err(Error::ReadMessage)?;
871 let stream_id: usize = u32::from(set_params.hdr.stream_id) as usize;
872 let buffer_bytes: u32 = set_params.buffer_bytes.into();
873 let period_bytes: u32 = set_params.period_bytes.into();
874
875 let dir = match snd_data.pcm_info.get(stream_id) {
876 Some(pcm_info) => {
877 if set_params.channels < pcm_info.channels_min
878 || set_params.channels > pcm_info.channels_max
879 {
880 error!(
881 "[Card {}] Number of channels ({}) must be between {} and {}",
882 card_index,
883 set_params.channels,
884 pcm_info.channels_min,
885 pcm_info.channels_max
886 );
887 return writer
888 .write_obj(VIRTIO_SND_S_NOT_SUPP)
889 .map_err(Error::WriteResponse);
890 }
891 if (u64::from(pcm_info.formats) & (1 << set_params.format)) == 0 {
892 error!(
893 "[Card {}] PCM format {} is not supported.",
894 card_index, set_params.format
895 );
896 return writer
897 .write_obj(VIRTIO_SND_S_NOT_SUPP)
898 .map_err(Error::WriteResponse);
899 }
900 if (u64::from(pcm_info.rates) & (1 << set_params.rate)) == 0 {
901 error!(
902 "[Card {}] PCM frame rate {} is not supported.",
903 card_index, set_params.rate
904 );
905 return writer
906 .write_obj(VIRTIO_SND_S_NOT_SUPP)
907 .map_err(Error::WriteResponse);
908 }
909
910 pcm_info.direction
911 }
912 None => {
913 error!(
914 "[Card {}] stream_id {} < streams {}",
915 card_index,
916 stream_id,
917 snd_data.pcm_info.len()
918 );
919 return writer
920 .write_obj(VIRTIO_SND_S_BAD_MSG)
921 .map_err(Error::WriteResponse);
922 }
923 };
924
925 if set_params.features != 0 {
926 error!("[Card {}] No feature is supported", card_index);
927 return writer
928 .write_obj(VIRTIO_SND_S_NOT_SUPP)
929 .map_err(Error::WriteResponse);
930 }
931
932 if buffer_bytes % period_bytes != 0 {
933 error!(
934 "[Card {}] buffer_bytes({}) must be dividable by period_bytes({})",
935 card_index, buffer_bytes, period_bytes
936 );
937 return writer
938 .write_obj(VIRTIO_SND_S_BAD_MSG)
939 .map_err(Error::WriteResponse);
940 }
941
942 process_pcm_ctrl(
943 ex,
944 &tx_send,
945 &rx_send,
946 streams,
947 VirtioSndPcmCmd::with_set_params_and_direction(set_params, dir),
948 writer,
949 stream_id,
950 card_index,
951 )
952 .await
953 }
954 VIRTIO_SND_R_PCM_PREPARE
955 | VIRTIO_SND_R_PCM_START
956 | VIRTIO_SND_R_PCM_STOP
957 | VIRTIO_SND_R_PCM_RELEASE => {
958 let hdr: virtio_snd_pcm_hdr = reader.read_obj().map_err(Error::ReadMessage)?;
959 let stream_id: usize = u32::from(hdr.stream_id) as usize;
960 let cmd = match VirtioSndPcmCmd::try_from(code) {
961 Ok(cmd) => cmd,
962 Err(err) => {
963 error!(
964 "[Card {}] Error converting code to command: {}",
965 card_index, err
966 );
967 return writer
968 .write_obj(VIRTIO_SND_S_BAD_MSG)
969 .map_err(Error::WriteResponse);
970 }
971 };
972 process_pcm_ctrl(
973 ex, &tx_send, &rx_send, streams, cmd, writer, stream_id, card_index,
974 )
975 .await
976 .and(Ok(()))?;
977 Ok(())
978 }
979 c => {
980 error!("[Card {}] Unrecognized code: {}", card_index, c);
981 writer
982 .write_obj(VIRTIO_SND_S_BAD_MSG)
983 .map_err(Error::WriteResponse)
984 }
985 }
986 };
987
988 handle_ctrl_msg.await?;
989 let len = writer.bytes_written() as u32;
990 queue.add_used(desc_chain, len);
991 queue.trigger_interrupt();
992 }
993 Ok(())
994 }
995
996 /// Send events to the audio driver.
handle_event_queue( mut queue: Queue, mut queue_event: EventAsync, ) -> Result<(), Error>997 pub async fn handle_event_queue(
998 mut queue: Queue,
999 mut queue_event: EventAsync,
1000 ) -> Result<(), Error> {
1001 loop {
1002 let desc_chain = queue
1003 .next_async(&mut queue_event)
1004 .await
1005 .map_err(Error::Async)?;
1006
1007 // TODO(woodychow): Poll and forward events from cras asynchronously (API to be added)
1008 queue.add_used(desc_chain, 0);
1009 queue.trigger_interrupt();
1010 }
1011 }
1012
1013 #[cfg(test)]
1014 mod tests {
1015 use std::sync::Arc;
1016
1017 use audio_streams::NoopStreamSourceGenerator;
1018 use base::Tube;
1019
1020 use super::*;
1021 use crate::virtio::snd::common_backend::notify_reset_signal;
1022
1023 #[test]
test_handle_ctrl_tube_reset_signal()1024 fn test_handle_ctrl_tube_reset_signal() {
1025 let ex = Executor::new().expect("Failed to create an executor");
1026 let result = ex.run_until(async {
1027 let streams: Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>> = Default::default();
1028 let (t0, _t1) = Tube::pair().expect("Failed to create tube pairs");
1029 let t0 = AsyncTube::new(&ex, t0).expect("Failed to create async tube");
1030 let reset_signal = (AsyncRwLock::new(false), Condvar::new());
1031
1032 let handle_future = handle_ctrl_tube(&streams, &t0, Some(&reset_signal));
1033 let notify_future = notify_reset_signal(&reset_signal);
1034 let (result, _) = futures::join!(handle_future, notify_future);
1035
1036 assert!(
1037 result.is_ok(),
1038 "handle_ctrl_tube returns an error after reset signal"
1039 );
1040 });
1041
1042 assert!(result.is_ok(), "ex.run_until returns an error");
1043 }
1044
new_stream() -> StreamInfo1045 fn new_stream() -> StreamInfo {
1046 let card_index = 0;
1047 StreamInfo::builder(
1048 Arc::new(Box::new(NoopStreamSourceGenerator::new())),
1049 card_index,
1050 )
1051 .build()
1052 }
1053
1054 #[test]
test_handle_ctrl_tube_receive_mute_cmd()1055 fn test_handle_ctrl_tube_receive_mute_cmd() {
1056 let ex = Executor::new().expect("Failed to create an executor");
1057 let result = ex.run_until(async {
1058 let streams: Vec<AsyncRwLock<StreamInfo>> = vec![AsyncRwLock::new(new_stream())];
1059 let streams = Rc::new(AsyncRwLock::new(streams));
1060
1061 let (t0, t1) = Tube::pair().expect("Failed to create tube pairs");
1062 let t0 = AsyncTube::new(&ex, t0).expect("Failed to create an async tube");
1063 let t1 = AsyncTube::new(&ex, t1).expect("Failed to create an async tube");
1064 let reset_signal = (AsyncRwLock::new(false), Condvar::new());
1065
1066 let handle_future = handle_ctrl_tube(&streams, &t0, Some(&reset_signal));
1067 let tube_future = async {
1068 let _ = t1.send(&SndControlCommand::MuteAll(true)).await;
1069 let recv_result = t1.next::<VmResponse>().await;
1070 notify_reset_signal(&reset_signal).await;
1071 recv_result
1072 };
1073 let (handle_result, tube_result) = futures::join!(handle_future, tube_future);
1074
1075 assert!(
1076 handle_result.is_ok(),
1077 "handle_ctrl_tube returns an error after reset signal"
1078 );
1079 assert!(tube_result.is_ok(), "Failed to receive data from the tube");
1080 assert!(
1081 matches!(tube_result.unwrap(), VmResponse::Ok),
1082 "tube_result is not Ok",
1083 );
1084
1085 let streams = streams.read_lock().await;
1086 let stream = streams.first().unwrap().lock().await;
1087 assert!(stream.muted.load(Ordering::Relaxed), "Stream is not muted");
1088 });
1089
1090 assert!(result.is_ok(), "ex.run_until returns an error");
1091 }
1092 }
1093