1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::fmt;
6 use std::io;
7 use std::io::Read;
8 use std::io::Write;
9 use std::rc::Rc;
10 use std::time::Duration;
11
12 use async_trait::async_trait;
13 use audio_streams::capture::AsyncCaptureBuffer;
14 use audio_streams::AsyncPlaybackBuffer;
15 use audio_streams::BoxError;
16 use base::debug;
17 use base::error;
18 use cros_async::sync::Condvar;
19 use cros_async::sync::RwLock as AsyncRwLock;
20 use cros_async::EventAsync;
21 use cros_async::Executor;
22 use cros_async::TimerAsync;
23 use futures::channel::mpsc;
24 use futures::channel::oneshot;
25 use futures::pin_mut;
26 use futures::select;
27 use futures::FutureExt;
28 use futures::SinkExt;
29 use futures::StreamExt;
30 use thiserror::Error as ThisError;
31 use zerocopy::AsBytes;
32
33 use super::Error;
34 use super::SndData;
35 use super::WorkerStatus;
36 use crate::virtio::snd::common::*;
37 use crate::virtio::snd::common_backend::stream_info::SetParams;
38 use crate::virtio::snd::common_backend::stream_info::StreamInfo;
39 use crate::virtio::snd::common_backend::DirectionalStream;
40 use crate::virtio::snd::common_backend::PcmResponse;
41 use crate::virtio::snd::constants::*;
42 use crate::virtio::snd::layout::*;
43 use crate::virtio::DescriptorChain;
44 use crate::virtio::Interrupt;
45 use crate::virtio::Queue;
46 use crate::virtio::Reader;
47 use crate::virtio::Writer;
48
49 /// Trait to wrap system specific helpers for reading from the start point capture buffer.
50 #[async_trait(?Send)]
51 pub trait CaptureBufferReader {
get_next_capture_period( &mut self, ex: &Executor, ) -> Result<AsyncCaptureBuffer, BoxError>52 async fn get_next_capture_period(
53 &mut self,
54 ex: &Executor,
55 ) -> Result<AsyncCaptureBuffer, BoxError>;
56 }
57
58 /// Trait to wrap system specific helpers for writing to endpoint playback buffers.
59 #[async_trait(?Send)]
60 pub trait PlaybackBufferWriter {
new(guest_period_bytes: usize) -> Self where Self: Sized61 fn new(guest_period_bytes: usize) -> Self
62 where
63 Self: Sized;
64
65 /// Returns the period of the endpoint device.
endpoint_period_bytes(&self) -> usize66 fn endpoint_period_bytes(&self) -> usize;
67
68 /// Read audio samples from the tx virtqueue.
copy_to_buffer( &mut self, dst_buf: &mut AsyncPlaybackBuffer<'_>, reader: &mut Reader, ) -> Result<usize, Error>69 fn copy_to_buffer(
70 &mut self,
71 dst_buf: &mut AsyncPlaybackBuffer<'_>,
72 reader: &mut Reader,
73 ) -> Result<usize, Error> {
74 dst_buf.copy_from(reader).map_err(Error::Io)
75 }
76 }
77
78 #[derive(Debug)]
79 enum VirtioSndPcmCmd {
80 SetParams { set_params: SetParams },
81 Prepare,
82 Start,
83 Stop,
84 Release,
85 }
86
87 impl fmt::Display for VirtioSndPcmCmd {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result88 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89 let cmd_code = match self {
90 VirtioSndPcmCmd::SetParams { set_params: _ } => VIRTIO_SND_R_PCM_SET_PARAMS,
91 VirtioSndPcmCmd::Prepare => VIRTIO_SND_R_PCM_PREPARE,
92 VirtioSndPcmCmd::Start => VIRTIO_SND_R_PCM_START,
93 VirtioSndPcmCmd::Stop => VIRTIO_SND_R_PCM_STOP,
94 VirtioSndPcmCmd::Release => VIRTIO_SND_R_PCM_RELEASE,
95 };
96 f.write_str(get_virtio_snd_r_pcm_cmd_name(cmd_code))
97 }
98 }
99
100 #[derive(ThisError, Debug)]
101 enum VirtioSndPcmCmdError {
102 #[error("SetParams requires additional parameters")]
103 SetParams,
104 #[error("Invalid virtio snd command code")]
105 InvalidCode,
106 }
107
108 impl TryFrom<u32> for VirtioSndPcmCmd {
109 type Error = VirtioSndPcmCmdError;
110
try_from(code: u32) -> Result<Self, Self::Error>111 fn try_from(code: u32) -> Result<Self, Self::Error> {
112 match code {
113 VIRTIO_SND_R_PCM_PREPARE => Ok(VirtioSndPcmCmd::Prepare),
114 VIRTIO_SND_R_PCM_START => Ok(VirtioSndPcmCmd::Start),
115 VIRTIO_SND_R_PCM_STOP => Ok(VirtioSndPcmCmd::Stop),
116 VIRTIO_SND_R_PCM_RELEASE => Ok(VirtioSndPcmCmd::Release),
117 VIRTIO_SND_R_PCM_SET_PARAMS => Err(VirtioSndPcmCmdError::SetParams),
118 _ => Err(VirtioSndPcmCmdError::InvalidCode),
119 }
120 }
121 }
122
123 impl VirtioSndPcmCmd {
with_set_params_and_direction( set_params: virtio_snd_pcm_set_params, dir: u8, ) -> VirtioSndPcmCmd124 fn with_set_params_and_direction(
125 set_params: virtio_snd_pcm_set_params,
126 dir: u8,
127 ) -> VirtioSndPcmCmd {
128 let buffer_bytes: u32 = set_params.buffer_bytes.into();
129 let period_bytes: u32 = set_params.period_bytes.into();
130 VirtioSndPcmCmd::SetParams {
131 set_params: SetParams {
132 channels: set_params.channels,
133 format: from_virtio_sample_format(set_params.format).unwrap(),
134 frame_rate: from_virtio_frame_rate(set_params.rate).unwrap(),
135 buffer_bytes: buffer_bytes as usize,
136 period_bytes: period_bytes as usize,
137 dir,
138 },
139 }
140 }
141 }
142
143 // Returns true if the operation is successful. Returns error if there is
144 // a runtime/internal error
process_pcm_ctrl( ex: &Executor, tx_send: &mpsc::UnboundedSender<PcmResponse>, rx_send: &mpsc::UnboundedSender<PcmResponse>, streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>, cmd: VirtioSndPcmCmd, writer: &mut Writer, stream_id: usize, ) -> Result<(), Error>145 async fn process_pcm_ctrl(
146 ex: &Executor,
147 tx_send: &mpsc::UnboundedSender<PcmResponse>,
148 rx_send: &mpsc::UnboundedSender<PcmResponse>,
149 streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>,
150 cmd: VirtioSndPcmCmd,
151 writer: &mut Writer,
152 stream_id: usize,
153 ) -> Result<(), Error> {
154 let streams = streams.read_lock().await;
155 let mut stream = match streams.get(stream_id) {
156 Some(stream_info) => stream_info.lock().await,
157 None => {
158 error!(
159 "Stream id={} not found for {}. Error code: VIRTIO_SND_S_BAD_MSG",
160 stream_id, cmd
161 );
162 return writer
163 .write_obj(VIRTIO_SND_S_BAD_MSG)
164 .map_err(Error::WriteResponse);
165 }
166 };
167
168 debug!("{} for stream id={}", cmd, stream_id);
169
170 let result = match cmd {
171 VirtioSndPcmCmd::SetParams { set_params } => {
172 let result = stream.set_params(set_params).await;
173 if result.is_ok() {
174 debug!(
175 "VIRTIO_SND_R_PCM_SET_PARAMS for stream id={}. Stream info: {:#?}",
176 stream_id, *stream
177 );
178 }
179 result
180 }
181 VirtioSndPcmCmd::Prepare => stream.prepare(ex, tx_send, rx_send).await,
182 VirtioSndPcmCmd::Start => stream.start().await,
183 VirtioSndPcmCmd::Stop => stream.stop().await,
184 VirtioSndPcmCmd::Release => stream.release().await,
185 };
186 match result {
187 Ok(_) => writer
188 .write_obj(VIRTIO_SND_S_OK)
189 .map_err(Error::WriteResponse),
190 Err(Error::OperationNotSupported) => {
191 error!(
192 "{} for stream id={} failed. Error code: VIRTIO_SND_S_NOT_SUPP.",
193 cmd, stream_id
194 );
195
196 writer
197 .write_obj(VIRTIO_SND_S_NOT_SUPP)
198 .map_err(Error::WriteResponse)
199 }
200 Err(e) => {
201 // Runtime/internal error would be more appropriate, but there's
202 // no such error type
203 error!(
204 "{} for stream id={} failed. Error code: VIRTIO_SND_S_IO_ERR. Actual error: {}",
205 cmd, stream_id, e
206 );
207 writer
208 .write_obj(VIRTIO_SND_S_IO_ERR)
209 .map_err(Error::WriteResponse)
210 }
211 }
212 }
213
write_data( mut dst_buf: AsyncPlaybackBuffer<'_>, reader: Option<&mut Reader>, buffer_writer: &mut Box<dyn PlaybackBufferWriter>, ) -> Result<u32, Error>214 async fn write_data(
215 mut dst_buf: AsyncPlaybackBuffer<'_>,
216 reader: Option<&mut Reader>,
217 buffer_writer: &mut Box<dyn PlaybackBufferWriter>,
218 ) -> Result<u32, Error> {
219 let transferred = match reader {
220 Some(reader) => buffer_writer.copy_to_buffer(&mut dst_buf, reader)?,
221 None => dst_buf
222 .copy_from(&mut io::repeat(0).take(buffer_writer.endpoint_period_bytes() as u64))
223 .map_err(Error::Io)?,
224 };
225
226 if transferred != buffer_writer.endpoint_period_bytes() {
227 error!(
228 "Bytes written {} != period_bytes {}",
229 transferred,
230 buffer_writer.endpoint_period_bytes()
231 );
232 Err(Error::InvalidBufferSize)
233 } else {
234 dst_buf.commit().await;
235 Ok(dst_buf.latency_bytes())
236 }
237 }
238
read_data<'a>( mut src_buf: AsyncCaptureBuffer<'a>, writer: Option<&mut Writer>, period_bytes: usize, ) -> Result<u32, Error>239 async fn read_data<'a>(
240 mut src_buf: AsyncCaptureBuffer<'a>,
241 writer: Option<&mut Writer>,
242 period_bytes: usize,
243 ) -> Result<u32, Error> {
244 let transferred = match writer {
245 Some(writer) => src_buf.copy_to(writer),
246 None => src_buf.copy_to(&mut io::sink()),
247 }
248 .map_err(Error::Io)?;
249 if transferred != period_bytes {
250 error!(
251 "Bytes written {} != period_bytes {}",
252 transferred, period_bytes
253 );
254 Err(Error::InvalidBufferSize)
255 } else {
256 src_buf.commit().await;
257 Ok(src_buf.latency_bytes())
258 }
259 }
260
261 impl From<Result<u32, Error>> for virtio_snd_pcm_status {
from(res: Result<u32, Error>) -> Self262 fn from(res: Result<u32, Error>) -> Self {
263 match res {
264 Ok(latency_bytes) => virtio_snd_pcm_status::new(StatusCode::OK, latency_bytes),
265 Err(e) => {
266 error!("PCM I/O message failed: {}", e);
267 virtio_snd_pcm_status::new(StatusCode::IoErr, 0)
268 }
269 }
270 }
271 }
272
273 // Drain all DescriptorChain in desc_receiver during WorkerStatus::Quit process.
drain_desc_receiver( desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>, sender: &mut mpsc::UnboundedSender<PcmResponse>, ) -> Result<(), Error>274 async fn drain_desc_receiver(
275 desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>,
276 sender: &mut mpsc::UnboundedSender<PcmResponse>,
277 ) -> Result<(), Error> {
278 let mut o_desc_chain = desc_receiver.next().await;
279 while let Some(desc_chain) = o_desc_chain {
280 // From the virtio-snd spec:
281 // The device MUST complete all pending I/O messages for the specified stream ID.
282 let status = virtio_snd_pcm_status::new(StatusCode::OK, 0);
283 // Fetch next DescriptorChain to see if the current one is the last one.
284 o_desc_chain = desc_receiver.next().await;
285 let (done, future) = if o_desc_chain.is_none() {
286 let (done, future) = oneshot::channel();
287 (Some(done), Some(future))
288 } else {
289 (None, None)
290 };
291 sender
292 .send(PcmResponse {
293 desc_chain,
294 status,
295 done,
296 })
297 .await
298 .map_err(Error::MpscSend)?;
299
300 if let Some(f) = future {
301 // From the virtio-snd spec:
302 // The device MUST NOT complete the control request (VIRTIO_SND_R_PCM_RELEASE)
303 // while there are pending I/O messages for the specified stream ID.
304 f.await.map_err(Error::DoneNotTriggered)?;
305 };
306 }
307 Ok(())
308 }
309
310 /// Start a pcm worker that receives descriptors containing PCM frames (audio data) from the tx/rx
311 /// queue, and forward them to CRAS. One pcm worker per stream.
312 ///
313 /// This worker is started when VIRTIO_SND_R_PCM_PREPARE is called, and returned before
314 /// VIRTIO_SND_R_PCM_RELEASE is completed for the stream.
start_pcm_worker( ex: Executor, dstream: DirectionalStream, mut desc_receiver: mpsc::UnboundedReceiver<DescriptorChain>, status_mutex: Rc<AsyncRwLock<WorkerStatus>>, mut sender: mpsc::UnboundedSender<PcmResponse>, period_dur: Duration, release_signal: Rc<(AsyncRwLock<bool>, Condvar)>, ) -> Result<(), Error>315 pub async fn start_pcm_worker(
316 ex: Executor,
317 dstream: DirectionalStream,
318 mut desc_receiver: mpsc::UnboundedReceiver<DescriptorChain>,
319 status_mutex: Rc<AsyncRwLock<WorkerStatus>>,
320 mut sender: mpsc::UnboundedSender<PcmResponse>,
321 period_dur: Duration,
322 release_signal: Rc<(AsyncRwLock<bool>, Condvar)>,
323 ) -> Result<(), Error> {
324 let res = pcm_worker_loop(
325 ex,
326 dstream,
327 &mut desc_receiver,
328 &status_mutex,
329 &mut sender,
330 period_dur,
331 release_signal,
332 )
333 .await;
334 *status_mutex.lock().await = WorkerStatus::Quit;
335 if res.is_err() {
336 error!(
337 "pcm_worker error: {:#?}. Draining desc_receiver",
338 res.as_ref().err()
339 );
340 // On error, guaranteed that desc_receiver has not been drained, so drain it here.
341 // Note that drain blocks until the stream is release.
342 drain_desc_receiver(&mut desc_receiver, &mut sender).await?;
343 }
344 res
345 }
346
pcm_worker_loop( ex: Executor, dstream: DirectionalStream, desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>, status_mutex: &Rc<AsyncRwLock<WorkerStatus>>, sender: &mut mpsc::UnboundedSender<PcmResponse>, period_dur: Duration, release_signal: Rc<(AsyncRwLock<bool>, Condvar)>, ) -> Result<(), Error>347 async fn pcm_worker_loop(
348 ex: Executor,
349 dstream: DirectionalStream,
350 desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>,
351 status_mutex: &Rc<AsyncRwLock<WorkerStatus>>,
352 sender: &mut mpsc::UnboundedSender<PcmResponse>,
353 period_dur: Duration,
354 release_signal: Rc<(AsyncRwLock<bool>, Condvar)>,
355 ) -> Result<(), Error> {
356 let on_release = async {
357 await_reset_signal(Some(&*release_signal)).await;
358 // After receiving release signal, wait for up to 2 periods,
359 // giving it a chance to respond to the last buffer.
360 if let Err(e) = TimerAsync::sleep(&ex, period_dur * 2).await {
361 error!("Error on sleep after receiving reset signal: {}", e)
362 }
363 }
364 .fuse();
365 pin_mut!(on_release);
366
367 match dstream {
368 DirectionalStream::Output(mut sys_direction_output) => loop {
369 #[cfg(windows)]
370 let (mut stream, mut buffer_writer_lock) = (
371 sys_direction_output
372 .async_playback_buffer_stream
373 .lock()
374 .await,
375 sys_direction_output.buffer_writer.lock().await,
376 );
377 #[cfg(windows)]
378 let buffer_writer = &mut buffer_writer_lock;
379 #[cfg(any(target_os = "android", target_os = "linux"))]
380 let (stream, buffer_writer) = (
381 &mut sys_direction_output.async_playback_buffer_stream,
382 &mut sys_direction_output.buffer_writer,
383 );
384
385 let next_buf = stream.next_playback_buffer(&ex).fuse();
386 pin_mut!(next_buf);
387
388 let dst_buf = select! {
389 _ = on_release => {
390 drain_desc_receiver(desc_receiver, sender).await?;
391 break Ok(());
392 },
393 buf = next_buf => buf.map_err(Error::FetchBuffer)?,
394 };
395 let worker_status = status_mutex.lock().await;
396 match *worker_status {
397 WorkerStatus::Quit => {
398 drain_desc_receiver(desc_receiver, sender).await?;
399 if let Err(e) = write_data(dst_buf, None, buffer_writer).await {
400 error!("Error on write_data after worker quit: {}", e)
401 }
402 break Ok(());
403 }
404 WorkerStatus::Pause => {
405 write_data(dst_buf, None, buffer_writer).await?;
406 }
407 WorkerStatus::Running => match desc_receiver.try_next() {
408 Err(e) => {
409 error!("Underrun. No new DescriptorChain while running: {}", e);
410 write_data(dst_buf, None, buffer_writer).await?;
411 }
412 Ok(None) => {
413 error!("Unreachable. status should be Quit when the channel is closed");
414 write_data(dst_buf, None, buffer_writer).await?;
415 return Err(Error::InvalidPCMWorkerState);
416 }
417 Ok(Some(mut desc_chain)) => {
418 // stream_id was already read in handle_pcm_queue
419 let status =
420 write_data(dst_buf, Some(&mut desc_chain.reader), buffer_writer)
421 .await
422 .into();
423 sender
424 .send(PcmResponse {
425 desc_chain,
426 status,
427 done: None,
428 })
429 .await
430 .map_err(Error::MpscSend)?;
431 }
432 },
433 }
434 },
435 DirectionalStream::Input(period_bytes, mut buffer_reader) => loop {
436 let next_buf = buffer_reader.get_next_capture_period(&ex).fuse();
437 pin_mut!(next_buf);
438
439 let src_buf = select! {
440 _ = on_release => {
441 drain_desc_receiver(desc_receiver, sender).await?;
442 break Ok(());
443 },
444 buf = next_buf => buf.map_err(Error::FetchBuffer)?,
445 };
446
447 let worker_status = status_mutex.lock().await;
448 match *worker_status {
449 WorkerStatus::Quit => {
450 drain_desc_receiver(desc_receiver, sender).await?;
451 if let Err(e) = read_data(src_buf, None, period_bytes).await {
452 error!("Error on read_data after worker quit: {}", e)
453 }
454 break Ok(());
455 }
456 WorkerStatus::Pause => {
457 read_data(src_buf, None, period_bytes).await?;
458 }
459 WorkerStatus::Running => match desc_receiver.try_next() {
460 Err(e) => {
461 error!("Overrun. No new DescriptorChain while running: {}", e);
462 read_data(src_buf, None, period_bytes).await?;
463 }
464 Ok(None) => {
465 error!("Unreachable. status should be Quit when the channel is closed");
466 read_data(src_buf, None, period_bytes).await?;
467 return Err(Error::InvalidPCMWorkerState);
468 }
469 Ok(Some(mut desc_chain)) => {
470 let status = read_data(src_buf, Some(&mut desc_chain.writer), period_bytes)
471 .await
472 .into();
473 sender
474 .send(PcmResponse {
475 desc_chain,
476 status,
477 done: None,
478 })
479 .await
480 .map_err(Error::MpscSend)?;
481 }
482 },
483 }
484 },
485 }
486 }
487
488 // Defer pcm message response to the pcm response worker
defer_pcm_response_to_worker( desc_chain: DescriptorChain, status: virtio_snd_pcm_status, response_sender: &mut mpsc::UnboundedSender<PcmResponse>, ) -> Result<(), Error>489 async fn defer_pcm_response_to_worker(
490 desc_chain: DescriptorChain,
491 status: virtio_snd_pcm_status,
492 response_sender: &mut mpsc::UnboundedSender<PcmResponse>,
493 ) -> Result<(), Error> {
494 response_sender
495 .send(PcmResponse {
496 desc_chain,
497 status,
498 done: None,
499 })
500 .await
501 .map_err(Error::MpscSend)
502 }
503
send_pcm_response( mut desc_chain: DescriptorChain, queue: &mut Queue, interrupt: &Interrupt, status: virtio_snd_pcm_status, ) -> Result<(), Error>504 fn send_pcm_response(
505 mut desc_chain: DescriptorChain,
506 queue: &mut Queue,
507 interrupt: &Interrupt,
508 status: virtio_snd_pcm_status,
509 ) -> Result<(), Error> {
510 let writer = &mut desc_chain.writer;
511
512 // For rx queue only. Fast forward the unused audio data buffer.
513 if writer.available_bytes() > std::mem::size_of::<virtio_snd_pcm_status>() {
514 writer
515 .consume_bytes(writer.available_bytes() - std::mem::size_of::<virtio_snd_pcm_status>());
516 }
517 writer.write_obj(status).map_err(Error::WriteResponse)?;
518 let len = writer.bytes_written() as u32;
519 queue.add_used(desc_chain, len);
520 queue.trigger_interrupt(interrupt);
521 Ok(())
522 }
523
524 // Await until reset_signal has been released
await_reset_signal(reset_signal_option: Option<&(AsyncRwLock<bool>, Condvar)>)525 async fn await_reset_signal(reset_signal_option: Option<&(AsyncRwLock<bool>, Condvar)>) {
526 match reset_signal_option {
527 Some((lock, cvar)) => {
528 let mut reset = lock.lock().await;
529 while !*reset {
530 reset = cvar.wait(reset).await;
531 }
532 }
533 None => futures::future::pending().await,
534 };
535 }
536
send_pcm_response_worker( queue: Rc<AsyncRwLock<Queue>>, interrupt: Interrupt, recv: &mut mpsc::UnboundedReceiver<PcmResponse>, reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>, ) -> Result<(), Error>537 pub async fn send_pcm_response_worker(
538 queue: Rc<AsyncRwLock<Queue>>,
539 interrupt: Interrupt,
540 recv: &mut mpsc::UnboundedReceiver<PcmResponse>,
541 reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>,
542 ) -> Result<(), Error> {
543 let on_reset = await_reset_signal(reset_signal).fuse();
544 pin_mut!(on_reset);
545
546 loop {
547 let next_async = recv.next().fuse();
548 pin_mut!(next_async);
549
550 let res = select! {
551 _ = on_reset => break,
552 res = next_async => res,
553 };
554
555 if let Some(r) = res {
556 send_pcm_response(r.desc_chain, &mut *queue.lock().await, &interrupt, r.status)?;
557
558 // Resume pcm_worker
559 if let Some(done) = r.done {
560 done.send(()).map_err(Error::OneshotSend)?;
561 }
562 } else {
563 debug!("PcmResponse channel is closed.");
564 break;
565 }
566 }
567 Ok(())
568 }
569
570 /// Handle messages from the tx or the rx queue. One invocation is needed for
571 /// each queue.
handle_pcm_queue( streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>, mut response_sender: mpsc::UnboundedSender<PcmResponse>, queue: Rc<AsyncRwLock<Queue>>, queue_event: &EventAsync, reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>, ) -> Result<(), Error>572 pub async fn handle_pcm_queue(
573 streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>,
574 mut response_sender: mpsc::UnboundedSender<PcmResponse>,
575 queue: Rc<AsyncRwLock<Queue>>,
576 queue_event: &EventAsync,
577 reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>,
578 ) -> Result<(), Error> {
579 let on_reset = await_reset_signal(reset_signal).fuse();
580 pin_mut!(on_reset);
581
582 loop {
583 // Manual queue.next_async() to avoid holding the mutex
584 let next_async = async {
585 loop {
586 // Check if there are more descriptors available.
587 if let Some(chain) = queue.lock().await.pop() {
588 return Ok(chain);
589 }
590 queue_event.next_val().await?;
591 }
592 }
593 .fuse();
594 pin_mut!(next_async);
595
596 let mut desc_chain = select! {
597 _ = on_reset => break,
598 res = next_async => res.map_err(Error::Async)?,
599 };
600
601 let pcm_xfer: virtio_snd_pcm_xfer =
602 desc_chain.reader.read_obj().map_err(Error::ReadMessage)?;
603 let stream_id: usize = u32::from(pcm_xfer.stream_id) as usize;
604
605 let streams = streams.read_lock().await;
606 let stream_info = match streams.get(stream_id) {
607 Some(stream_info) => stream_info.read_lock().await,
608 None => {
609 error!(
610 "stream_id ({}) >= num_streams ({})",
611 stream_id,
612 streams.len()
613 );
614 defer_pcm_response_to_worker(
615 desc_chain,
616 virtio_snd_pcm_status::new(StatusCode::IoErr, 0),
617 &mut response_sender,
618 )
619 .await?;
620 continue;
621 }
622 };
623
624 match stream_info.sender.as_ref() {
625 Some(mut s) => {
626 s.send(desc_chain).await.map_err(Error::MpscSend)?;
627 if *stream_info.status_mutex.lock().await == WorkerStatus::Quit {
628 // If sender channel is still intact but worker status is quit,
629 // the worker quitted unexpectedly. Return error to request a reset.
630 return Err(Error::PCMWorkerQuittedUnexpectedly);
631 }
632 }
633 None => {
634 if !stream_info.just_reset {
635 error!(
636 "stream {} is not ready. state: {}",
637 stream_id,
638 get_virtio_snd_r_pcm_cmd_name(stream_info.state)
639 );
640 }
641 defer_pcm_response_to_worker(
642 desc_chain,
643 virtio_snd_pcm_status::new(StatusCode::IoErr, 0),
644 &mut response_sender,
645 )
646 .await?;
647 }
648 };
649 }
650 Ok(())
651 }
652
653 /// Handle all the control messages from the ctrl queue.
handle_ctrl_queue( ex: &Executor, streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>, snd_data: &SndData, queue: Rc<AsyncRwLock<Queue>>, queue_event: &mut EventAsync, interrupt: Interrupt, tx_send: mpsc::UnboundedSender<PcmResponse>, rx_send: mpsc::UnboundedSender<PcmResponse>, reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>, ) -> Result<(), Error>654 pub async fn handle_ctrl_queue(
655 ex: &Executor,
656 streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>,
657 snd_data: &SndData,
658 queue: Rc<AsyncRwLock<Queue>>,
659 queue_event: &mut EventAsync,
660 interrupt: Interrupt,
661 tx_send: mpsc::UnboundedSender<PcmResponse>,
662 rx_send: mpsc::UnboundedSender<PcmResponse>,
663 reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>,
664 ) -> Result<(), Error> {
665 let on_reset = await_reset_signal(reset_signal).fuse();
666 pin_mut!(on_reset);
667
668 let mut queue = queue.lock().await;
669 loop {
670 let mut desc_chain = {
671 let next_async = queue.next_async(queue_event).fuse();
672 pin_mut!(next_async);
673
674 select! {
675 _ = on_reset => break,
676 res = next_async => res.map_err(Error::Async)?,
677 }
678 };
679
680 let reader = &mut desc_chain.reader;
681 let writer = &mut desc_chain.writer;
682 // Don't advance the reader
683 let code = reader
684 .peek_obj::<virtio_snd_hdr>()
685 .map_err(Error::ReadMessage)?
686 .code
687 .into();
688
689 let handle_ctrl_msg = async {
690 return match code {
691 VIRTIO_SND_R_JACK_INFO => {
692 let query_info: virtio_snd_query_info =
693 reader.read_obj().map_err(Error::ReadMessage)?;
694 let start_id: usize = u32::from(query_info.start_id) as usize;
695 let count: usize = u32::from(query_info.count) as usize;
696 if start_id + count > snd_data.jack_info.len() {
697 error!(
698 "start_id({}) + count({}) must be smaller than \
699 the number of jacks ({})",
700 start_id,
701 count,
702 snd_data.jack_info.len()
703 );
704 return writer
705 .write_obj(VIRTIO_SND_S_BAD_MSG)
706 .map_err(Error::WriteResponse);
707 }
708 // The response consists of the virtio_snd_hdr structure (contains the request
709 // status code), followed by the device-writable information structures of the
710 // item. Each information structure begins with the following common header
711 writer
712 .write_obj(VIRTIO_SND_S_OK)
713 .map_err(Error::WriteResponse)?;
714 for i in start_id..(start_id + count) {
715 writer
716 .write_all(snd_data.jack_info[i].as_bytes())
717 .map_err(Error::WriteResponse)?;
718 }
719 Ok(())
720 }
721 VIRTIO_SND_R_PCM_INFO => {
722 let query_info: virtio_snd_query_info =
723 reader.read_obj().map_err(Error::ReadMessage)?;
724 let start_id: usize = u32::from(query_info.start_id) as usize;
725 let count: usize = u32::from(query_info.count) as usize;
726 if start_id + count > snd_data.pcm_info.len() {
727 error!(
728 "start_id({}) + count({}) must be smaller than \
729 the number of streams ({})",
730 start_id,
731 count,
732 snd_data.pcm_info.len()
733 );
734 return writer
735 .write_obj(VIRTIO_SND_S_BAD_MSG)
736 .map_err(Error::WriteResponse);
737 }
738 // The response consists of the virtio_snd_hdr structure (contains the request
739 // status code), followed by the device-writable information structures of the
740 // item. Each information structure begins with the following common header
741 writer
742 .write_obj(VIRTIO_SND_S_OK)
743 .map_err(Error::WriteResponse)?;
744 for i in start_id..(start_id + count) {
745 writer
746 .write_all(snd_data.pcm_info[i].as_bytes())
747 .map_err(Error::WriteResponse)?;
748 }
749 Ok(())
750 }
751 VIRTIO_SND_R_CHMAP_INFO => {
752 let query_info: virtio_snd_query_info =
753 reader.read_obj().map_err(Error::ReadMessage)?;
754 let start_id: usize = u32::from(query_info.start_id) as usize;
755 let count: usize = u32::from(query_info.count) as usize;
756 if start_id + count > snd_data.chmap_info.len() {
757 error!(
758 "start_id({}) + count({}) must be smaller than \
759 the number of chmaps ({})",
760 start_id,
761 count,
762 snd_data.chmap_info.len()
763 );
764 return writer
765 .write_obj(VIRTIO_SND_S_BAD_MSG)
766 .map_err(Error::WriteResponse);
767 }
768 // The response consists of the virtio_snd_hdr structure (contains the request
769 // status code), followed by the device-writable information structures of the
770 // item. Each information structure begins with the following common header
771 writer
772 .write_obj(VIRTIO_SND_S_OK)
773 .map_err(Error::WriteResponse)?;
774 for i in start_id..(start_id + count) {
775 writer
776 .write_all(snd_data.chmap_info[i].as_bytes())
777 .map_err(Error::WriteResponse)?;
778 }
779 Ok(())
780 }
781 VIRTIO_SND_R_JACK_REMAP => {
782 unreachable!("remap is unsupported");
783 }
784 VIRTIO_SND_R_PCM_SET_PARAMS => {
785 // Raise VIRTIO_SND_S_BAD_MSG or IO error?
786 let set_params: virtio_snd_pcm_set_params =
787 reader.read_obj().map_err(Error::ReadMessage)?;
788 let stream_id: usize = u32::from(set_params.hdr.stream_id) as usize;
789 let buffer_bytes: u32 = set_params.buffer_bytes.into();
790 let period_bytes: u32 = set_params.period_bytes.into();
791
792 let dir = match snd_data.pcm_info.get(stream_id) {
793 Some(pcm_info) => {
794 if set_params.channels < pcm_info.channels_min
795 || set_params.channels > pcm_info.channels_max
796 {
797 error!(
798 "Number of channels ({}) must be between {} and {}",
799 set_params.channels,
800 pcm_info.channels_min,
801 pcm_info.channels_max
802 );
803 return writer
804 .write_obj(VIRTIO_SND_S_NOT_SUPP)
805 .map_err(Error::WriteResponse);
806 }
807 if (u64::from(pcm_info.formats) & (1 << set_params.format)) == 0 {
808 error!("PCM format {} is not supported.", set_params.format);
809 return writer
810 .write_obj(VIRTIO_SND_S_NOT_SUPP)
811 .map_err(Error::WriteResponse);
812 }
813 if (u64::from(pcm_info.rates) & (1 << set_params.rate)) == 0 {
814 error!("PCM frame rate {} is not supported.", set_params.rate);
815 return writer
816 .write_obj(VIRTIO_SND_S_NOT_SUPP)
817 .map_err(Error::WriteResponse);
818 }
819
820 pcm_info.direction
821 }
822 None => {
823 error!(
824 "stream_id {} < streams {}",
825 stream_id,
826 snd_data.pcm_info.len()
827 );
828 return writer
829 .write_obj(VIRTIO_SND_S_BAD_MSG)
830 .map_err(Error::WriteResponse);
831 }
832 };
833
834 if set_params.features != 0 {
835 error!("No feature is supported");
836 return writer
837 .write_obj(VIRTIO_SND_S_NOT_SUPP)
838 .map_err(Error::WriteResponse);
839 }
840
841 if buffer_bytes % period_bytes != 0 {
842 error!(
843 "buffer_bytes({}) must be dividable by period_bytes({})",
844 buffer_bytes, period_bytes
845 );
846 return writer
847 .write_obj(VIRTIO_SND_S_BAD_MSG)
848 .map_err(Error::WriteResponse);
849 }
850
851 process_pcm_ctrl(
852 ex,
853 &tx_send,
854 &rx_send,
855 streams,
856 VirtioSndPcmCmd::with_set_params_and_direction(set_params, dir),
857 writer,
858 stream_id,
859 )
860 .await
861 }
862 VIRTIO_SND_R_PCM_PREPARE
863 | VIRTIO_SND_R_PCM_START
864 | VIRTIO_SND_R_PCM_STOP
865 | VIRTIO_SND_R_PCM_RELEASE => {
866 let hdr: virtio_snd_pcm_hdr = reader.read_obj().map_err(Error::ReadMessage)?;
867 let stream_id: usize = u32::from(hdr.stream_id) as usize;
868 let cmd = match VirtioSndPcmCmd::try_from(code) {
869 Ok(cmd) => cmd,
870 Err(err) => {
871 error!("Error converting code to command: {}", err);
872 return writer
873 .write_obj(VIRTIO_SND_S_BAD_MSG)
874 .map_err(Error::WriteResponse);
875 }
876 };
877 process_pcm_ctrl(ex, &tx_send, &rx_send, streams, cmd, writer, stream_id)
878 .await
879 .and(Ok(()))?;
880 Ok(())
881 }
882 c => {
883 error!("Unrecognized code: {}", c);
884 return writer
885 .write_obj(VIRTIO_SND_S_BAD_MSG)
886 .map_err(Error::WriteResponse);
887 }
888 };
889 };
890
891 handle_ctrl_msg.await?;
892 let len = writer.bytes_written() as u32;
893 queue.add_used(desc_chain, len);
894 queue.trigger_interrupt(&interrupt);
895 }
896 Ok(())
897 }
898
899 /// Send events to the audio driver.
handle_event_queue( mut queue: Queue, mut queue_event: EventAsync, interrupt: Interrupt, ) -> Result<(), Error>900 pub async fn handle_event_queue(
901 mut queue: Queue,
902 mut queue_event: EventAsync,
903 interrupt: Interrupt,
904 ) -> Result<(), Error> {
905 loop {
906 let desc_chain = queue
907 .next_async(&mut queue_event)
908 .await
909 .map_err(Error::Async)?;
910
911 // TODO(woodychow): Poll and forward events from cras asynchronously (API to be added)
912 queue.add_used(desc_chain, 0);
913 queue.trigger_interrupt(&interrupt);
914 }
915 }
916