1 // Copyright 2021 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use futures::{
6 channel::{mpsc, oneshot},
7 SinkExt, StreamExt,
8 };
9 use std::io::{self, Read, Write};
10 use std::rc::Rc;
11
12 use audio_streams::{capture::AsyncCaptureBuffer, AsyncPlaybackBuffer};
13 use base::{debug, error};
14 use cros_async::{sync::Mutex as AsyncMutex, EventAsync, Executor};
15 use data_model::{DataInit, Le32};
16 use vm_memory::GuestMemory;
17
18 use crate::virtio::cras_backend::{Parameters, PcmResponse};
19 use crate::virtio::snd::common::*;
20 use crate::virtio::snd::constants::*;
21 use crate::virtio::snd::layout::*;
22 use crate::virtio::{DescriptorChain, Queue, Reader, SignalableInterrupt, Writer};
23
24 use super::{DirectionalStream, Error, SndData, StreamInfo, WorkerStatus};
25
26 // Returns true if the operation is successful. Returns error if there is
27 // a runtime/internal error
process_pcm_ctrl( ex: &Executor, mem: &GuestMemory, tx_send: &mpsc::UnboundedSender<PcmResponse>, rx_send: &mpsc::UnboundedSender<PcmResponse>, streams: &Rc<AsyncMutex<Vec<AsyncMutex<StreamInfo<'_>>>>>, params: &Parameters, cmd_code: u32, writer: &mut Writer, stream_id: usize, ) -> Result<(), Error>28 async fn process_pcm_ctrl(
29 ex: &Executor,
30 mem: &GuestMemory,
31 tx_send: &mpsc::UnboundedSender<PcmResponse>,
32 rx_send: &mpsc::UnboundedSender<PcmResponse>,
33 streams: &Rc<AsyncMutex<Vec<AsyncMutex<StreamInfo<'_>>>>>,
34 params: &Parameters,
35 cmd_code: u32,
36 writer: &mut Writer,
37 stream_id: usize,
38 ) -> Result<(), Error> {
39 let streams = streams.read_lock().await;
40 let mut stream = match streams.get(stream_id) {
41 Some(stream_info) => stream_info.lock().await,
42 None => {
43 error!(
44 "Stream id={} not found for {}. Error code: VIRTIO_SND_S_BAD_MSG",
45 stream_id,
46 get_virtio_snd_r_pcm_cmd_name(cmd_code)
47 );
48 return writer
49 .write_obj(VIRTIO_SND_S_BAD_MSG)
50 .map_err(Error::WriteResponse);
51 }
52 };
53
54 debug!(
55 "{} for stream id={}",
56 get_virtio_snd_r_pcm_cmd_name(cmd_code),
57 stream_id
58 );
59
60 let result = match cmd_code {
61 VIRTIO_SND_R_PCM_PREPARE => {
62 stream
63 .prepare(ex, mem.clone(), tx_send, rx_send, params)
64 .await
65 }
66 VIRTIO_SND_R_PCM_START => stream.start().await,
67 VIRTIO_SND_R_PCM_STOP => stream.stop().await,
68 VIRTIO_SND_R_PCM_RELEASE => stream.release().await,
69 _ => unreachable!(),
70 };
71 match result {
72 Ok(_) => {
73 return writer
74 .write_obj(VIRTIO_SND_S_OK)
75 .map_err(Error::WriteResponse);
76 }
77 Err(Error::OperationNotSupported) => {
78 error!(
79 "{} for stream id={} failed. Error code: VIRTIO_SND_S_NOT_SUPP.",
80 get_virtio_snd_r_pcm_cmd_name(cmd_code),
81 stream_id
82 );
83
84 return writer
85 .write_obj(VIRTIO_SND_S_NOT_SUPP)
86 .map_err(Error::WriteResponse);
87 }
88 Err(e) => {
89 // Runtime/internal error would be more appropriate, but there's
90 // no such error type
91 error!(
92 "{} for stream id={} failed. Error code: VIRTIO_SND_S_IO_ERR. Actual error: {}",
93 get_virtio_snd_r_pcm_cmd_name(cmd_code),
94 stream_id,
95 e
96 );
97 return writer
98 .write_obj(VIRTIO_SND_S_IO_ERR)
99 .map_err(Error::WriteResponse);
100 }
101 };
102 }
103
write_data<'a>( mut dst_buf: AsyncPlaybackBuffer<'a>, reader: Option<Reader>, period_bytes: usize, ) -> Result<(), Error>104 async fn write_data<'a>(
105 mut dst_buf: AsyncPlaybackBuffer<'a>,
106 reader: Option<Reader>,
107 period_bytes: usize,
108 ) -> Result<(), Error> {
109 let transferred = match reader {
110 Some(mut reader) => dst_buf.copy_from(&mut reader),
111 None => dst_buf.copy_from(&mut io::repeat(0).take(period_bytes as u64)),
112 }
113 .map_err(Error::Io)?;
114 if transferred as usize != period_bytes {
115 error!(
116 "Bytes written {} != period_bytes {}",
117 transferred, period_bytes
118 );
119 Err(Error::InvalidBufferSize)
120 } else {
121 dst_buf.commit().await;
122 Ok(())
123 }
124 }
125
read_data<'a>( mut src_buf: AsyncCaptureBuffer<'a>, writer: Option<&mut Writer>, period_bytes: usize, ) -> Result<(), Error>126 async fn read_data<'a>(
127 mut src_buf: AsyncCaptureBuffer<'a>,
128 writer: Option<&mut Writer>,
129 period_bytes: usize,
130 ) -> Result<(), Error> {
131 let transferred = match writer {
132 Some(writer) => src_buf.copy_to(writer),
133 None => src_buf.copy_to(&mut io::sink()),
134 }
135 .map_err(Error::Io)?;
136 if transferred as usize != period_bytes {
137 error!(
138 "Bytes written {} != period_bytes {}",
139 transferred, period_bytes
140 );
141 Err(Error::InvalidBufferSize)
142 } else {
143 src_buf.commit().await;
144 Ok(())
145 }
146 }
147
148 impl From<Result<(), Error>> for virtio_snd_pcm_status {
from(res: Result<(), Error>) -> Self149 fn from(res: Result<(), Error>) -> Self {
150 let status = match res {
151 Ok(()) => VIRTIO_SND_S_OK,
152 Err(e) => {
153 error!("PCM I/O message failed: {}", e);
154 VIRTIO_SND_S_IO_ERR
155 }
156 };
157
158 // TODO(woodychow): Extend audio_streams API, and fetch latency_bytes from
159 // `next_playback_buffer` or `next_capture_buffer`"
160 Self {
161 status: Le32::from(status),
162 latency_bytes: Le32::from(0),
163 }
164 }
165 }
166
167 // Drain all DescriptorChain in desc_receiver during WorkerStatus::Quit process.
drain_desc_receiver( desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>, mem: &GuestMemory, sender: &mut mpsc::UnboundedSender<PcmResponse>, ) -> Result<(), Error>168 async fn drain_desc_receiver(
169 desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>,
170 mem: &GuestMemory,
171 sender: &mut mpsc::UnboundedSender<PcmResponse>,
172 ) -> Result<(), Error> {
173 let mut o_desc_chain = desc_receiver.next().await;
174 while let Some(desc_chain) = o_desc_chain {
175 // From the virtio-snd spec:
176 // The device MUST complete all pending I/O messages for the specified stream ID.
177 let desc_index = desc_chain.index;
178 let writer = Writer::new(mem.clone(), desc_chain).map_err(Error::DescriptorChain)?;
179 let status = virtio_snd_pcm_status {
180 status: Le32::from(VIRTIO_SND_S_OK),
181 latency_bytes: Le32::from(0),
182 };
183 // Fetch next DescriptorChain to see if the current one is the last one.
184 o_desc_chain = desc_receiver.next().await;
185 let (done, future) = if o_desc_chain.is_none() {
186 let (done, future) = oneshot::channel();
187 (Some(done), Some(future))
188 } else {
189 (None, None)
190 };
191 sender
192 .send(PcmResponse {
193 desc_index,
194 status,
195 writer,
196 done,
197 })
198 .await
199 .map_err(Error::MpscSend)?;
200
201 if let Some(f) = future {
202 // From the virtio-snd spec:
203 // The device MUST NOT complete the control request (VIRTIO_SND_R_PCM_RELEASE)
204 // while there are pending I/O messages for the specified stream ID.
205 f.await.map_err(Error::DoneNotTriggered)?;
206 };
207 }
208 Ok(())
209 }
210
211 /// Start a pcm worker that receives descriptors containing PCM frames (audio data) from the tx/rx
212 /// queue, and forward them to CRAS. One pcm worker per stream.
213 ///
214 /// This worker is started when VIRTIO_SND_R_PCM_PREPARE is called, and returned before
215 /// VIRTIO_SND_R_PCM_RELEASE is completed for the stream.
start_pcm_worker( ex: Executor, dstream: DirectionalStream, mut desc_receiver: mpsc::UnboundedReceiver<DescriptorChain>, status_mutex: Rc<AsyncMutex<WorkerStatus>>, mem: GuestMemory, mut sender: mpsc::UnboundedSender<PcmResponse>, period_bytes: usize, ) -> Result<(), Error>216 pub async fn start_pcm_worker(
217 ex: Executor,
218 dstream: DirectionalStream,
219 mut desc_receiver: mpsc::UnboundedReceiver<DescriptorChain>,
220 status_mutex: Rc<AsyncMutex<WorkerStatus>>,
221 mem: GuestMemory,
222 mut sender: mpsc::UnboundedSender<PcmResponse>,
223 period_bytes: usize,
224 ) -> Result<(), Error> {
225 match dstream {
226 DirectionalStream::Output(mut stream) => {
227 loop {
228 let dst_buf = stream
229 .next_playback_buffer(&ex)
230 .await
231 .map_err(Error::FetchBuffer)?;
232 let worker_status = status_mutex.lock().await;
233 match *worker_status {
234 WorkerStatus::Quit => {
235 drain_desc_receiver(&mut desc_receiver, &mem, &mut sender).await?;
236 write_data(dst_buf, None, period_bytes).await?;
237 break Ok(());
238 }
239 WorkerStatus::Pause => {
240 write_data(dst_buf, None, period_bytes).await?;
241 }
242 WorkerStatus::Running => match desc_receiver.try_next() {
243 Err(e) => {
244 error!("Underrun. No new DescriptorChain while running: {}", e);
245 write_data(dst_buf, None, period_bytes).await?;
246 }
247 Ok(None) => {
248 error!("Unreachable. status should be Quit when the channel is closed");
249 write_data(dst_buf, None, period_bytes).await?;
250 return Err(Error::InvalidPCMWorkerState);
251 }
252 Ok(Some(desc_chain)) => {
253 let desc_index = desc_chain.index;
254 let mut reader = Reader::new(mem.clone(), desc_chain.clone())
255 .map_err(Error::DescriptorChain)?;
256 // stream_id was already read in handle_pcm_queue
257 reader.consume(std::mem::size_of::<virtio_snd_pcm_xfer>());
258 let writer = Writer::new(mem.clone(), desc_chain)
259 .map_err(Error::DescriptorChain)?;
260
261 sender
262 .send(PcmResponse {
263 desc_index,
264 status: write_data(dst_buf, Some(reader), period_bytes)
265 .await
266 .into(),
267 writer,
268 done: None,
269 })
270 .await
271 .map_err(Error::MpscSend)?;
272 }
273 },
274 }
275 }
276 }
277 DirectionalStream::Input(mut stream) => {
278 loop {
279 let src_buf = stream
280 .next_capture_buffer(&ex)
281 .await
282 .map_err(Error::FetchBuffer)?;
283
284 let worker_status = status_mutex.lock().await;
285 match *worker_status {
286 WorkerStatus::Quit => {
287 drain_desc_receiver(&mut desc_receiver, &mem, &mut sender).await?;
288 read_data(src_buf, None, period_bytes).await?;
289 break Ok(());
290 }
291 WorkerStatus::Pause => {
292 read_data(src_buf, None, period_bytes).await?;
293 }
294 WorkerStatus::Running => match desc_receiver.try_next() {
295 Err(e) => {
296 error!("Overrun. No new DescriptorChain while running: {}", e);
297 read_data(src_buf, None, period_bytes).await?;
298 }
299 Ok(None) => {
300 error!("Unreachable. status should be Quit when the channel is closed");
301 read_data(src_buf, None, period_bytes).await?;
302 return Err(Error::InvalidPCMWorkerState);
303 }
304 Ok(Some(desc_chain)) => {
305 let desc_index = desc_chain.index;
306 let mut reader = Reader::new(mem.clone(), desc_chain.clone())
307 .map_err(Error::DescriptorChain)?;
308 // stream_id was already read in handle_pcm_queue
309 reader.consume(std::mem::size_of::<virtio_snd_pcm_xfer>());
310 let mut writer = Writer::new(mem.clone(), desc_chain)
311 .map_err(Error::DescriptorChain)?;
312
313 sender
314 .send(PcmResponse {
315 desc_index,
316 status: read_data(src_buf, Some(&mut writer), period_bytes)
317 .await
318 .into(),
319 writer,
320 done: None,
321 })
322 .await
323 .map_err(Error::MpscSend)?;
324 }
325 },
326 }
327 }
328 }
329 }
330 }
331
332 // Defer pcm message response to the pcm response worker
defer_pcm_response_to_worker( desc_chain: DescriptorChain, mem: &GuestMemory, status: virtio_snd_pcm_status, response_sender: &mut mpsc::UnboundedSender<PcmResponse>, ) -> Result<(), Error>333 async fn defer_pcm_response_to_worker(
334 desc_chain: DescriptorChain,
335 mem: &GuestMemory,
336 status: virtio_snd_pcm_status,
337 response_sender: &mut mpsc::UnboundedSender<PcmResponse>,
338 ) -> Result<(), Error> {
339 let desc_index = desc_chain.index;
340 let writer = Writer::new(mem.clone(), desc_chain).map_err(Error::DescriptorChain)?;
341 response_sender
342 .send(PcmResponse {
343 desc_index,
344 status,
345 writer,
346 done: None,
347 })
348 .await
349 .map_err(Error::MpscSend)
350 }
351
send_pcm_response_with_writer<I: SignalableInterrupt>( mut writer: Writer, desc_index: u16, mem: &GuestMemory, queue: &mut Queue, interrupt: &I, status: virtio_snd_pcm_status, ) -> Result<(), Error>352 fn send_pcm_response_with_writer<I: SignalableInterrupt>(
353 mut writer: Writer,
354 desc_index: u16,
355 mem: &GuestMemory,
356 queue: &mut Queue,
357 interrupt: &I,
358 status: virtio_snd_pcm_status,
359 ) -> Result<(), Error> {
360 // For rx queue only. Fast forward the unused audio data buffer.
361 if writer.available_bytes() > std::mem::size_of::<virtio_snd_pcm_status>() {
362 writer
363 .consume_bytes(writer.available_bytes() - std::mem::size_of::<virtio_snd_pcm_status>());
364 }
365 writer.write_obj(status).map_err(Error::WriteResponse)?;
366 queue.add_used(mem, desc_index, writer.bytes_written() as u32);
367 queue.trigger_interrupt(mem, interrupt);
368 Ok(())
369 }
370
send_pcm_response_worker<I: SignalableInterrupt>( mem: &GuestMemory, queue: &Rc<AsyncMutex<Queue>>, interrupt: &I, recv: &mut mpsc::UnboundedReceiver<PcmResponse>, ) -> Result<(), Error>371 pub async fn send_pcm_response_worker<I: SignalableInterrupt>(
372 mem: &GuestMemory,
373 queue: &Rc<AsyncMutex<Queue>>,
374 interrupt: &I,
375 recv: &mut mpsc::UnboundedReceiver<PcmResponse>,
376 ) -> Result<(), Error> {
377 loop {
378 if let Some(r) = recv.next().await {
379 send_pcm_response_with_writer(
380 r.writer,
381 r.desc_index,
382 &mem,
383 &mut *queue.lock().await,
384 interrupt,
385 r.status,
386 )?;
387
388 // Resume pcm_worker
389 if let Some(done) = r.done {
390 done.send(()).map_err(Error::OneshotSend)?;
391 }
392 } else {
393 debug!("PcmResponse channel is closed.");
394 break;
395 }
396 }
397 Ok(())
398 }
399
400 /// Handle messages from the tx or the rx queue. One invocation is needed for
401 /// each queue.
handle_pcm_queue<'a>( mem: &GuestMemory, streams: &Rc<AsyncMutex<Vec<AsyncMutex<StreamInfo<'a>>>>>, mut response_sender: mpsc::UnboundedSender<PcmResponse>, queue: &Rc<AsyncMutex<Queue>>, queue_event: EventAsync, ) -> Result<(), Error>402 pub async fn handle_pcm_queue<'a>(
403 mem: &GuestMemory,
404 streams: &Rc<AsyncMutex<Vec<AsyncMutex<StreamInfo<'a>>>>>,
405 mut response_sender: mpsc::UnboundedSender<PcmResponse>,
406 queue: &Rc<AsyncMutex<Queue>>,
407 queue_event: EventAsync,
408 ) -> Result<(), Error> {
409 loop {
410 // Manual queue.next_async() to avoid holding the mutex
411 let next_async = async {
412 loop {
413 // Check if there are more descriptors available.
414 if let Some(chain) = queue.lock().await.pop(mem) {
415 return Ok(chain);
416 }
417 queue_event.next_val().await?;
418 }
419 };
420
421 let desc_chain = next_async.await.map_err(Error::Async)?;
422 let mut reader =
423 Reader::new(mem.clone(), desc_chain.clone()).map_err(Error::DescriptorChain)?;
424
425 let pcm_xfer: virtio_snd_pcm_xfer = reader.read_obj().map_err(Error::ReadMessage)?;
426 let stream_id: usize = u32::from(pcm_xfer.stream_id) as usize;
427
428 let streams = streams.read_lock().await;
429 let stream_info = match streams.get(stream_id) {
430 Some(stream_info) => stream_info.read_lock().await,
431 None => {
432 error!(
433 "stream_id ({}) >= num_streams ({})",
434 stream_id,
435 streams.len()
436 );
437 defer_pcm_response_to_worker(
438 desc_chain,
439 mem,
440 virtio_snd_pcm_status {
441 status: Le32::from(VIRTIO_SND_S_IO_ERR),
442 latency_bytes: Le32::from(0),
443 },
444 &mut response_sender,
445 )
446 .await?;
447 continue;
448 }
449 };
450
451 match stream_info.sender.as_ref() {
452 Some(mut s) => {
453 s.send(desc_chain).await.map_err(Error::MpscSend)?;
454 }
455 None => {
456 error!(
457 "stream {} is not ready. state: {}",
458 stream_id,
459 get_virtio_snd_r_pcm_cmd_name(stream_info.state)
460 );
461 defer_pcm_response_to_worker(
462 desc_chain,
463 mem,
464 virtio_snd_pcm_status {
465 status: Le32::from(VIRTIO_SND_S_IO_ERR),
466 latency_bytes: Le32::from(0),
467 },
468 &mut response_sender,
469 )
470 .await?;
471 }
472 };
473 }
474 }
475
476 /// Handle all the control messages from the ctrl queue.
handle_ctrl_queue<I: SignalableInterrupt>( ex: &Executor, mem: &GuestMemory, streams: &Rc<AsyncMutex<Vec<AsyncMutex<StreamInfo<'_>>>>>, snd_data: &SndData, mut queue: Queue, mut queue_event: EventAsync, interrupt: &I, tx_send: mpsc::UnboundedSender<PcmResponse>, rx_send: mpsc::UnboundedSender<PcmResponse>, params: &Parameters, ) -> Result<(), Error>477 pub async fn handle_ctrl_queue<I: SignalableInterrupt>(
478 ex: &Executor,
479 mem: &GuestMemory,
480 streams: &Rc<AsyncMutex<Vec<AsyncMutex<StreamInfo<'_>>>>>,
481 snd_data: &SndData,
482 mut queue: Queue,
483 mut queue_event: EventAsync,
484 interrupt: &I,
485 tx_send: mpsc::UnboundedSender<PcmResponse>,
486 rx_send: mpsc::UnboundedSender<PcmResponse>,
487 params: &Parameters,
488 ) -> Result<(), Error> {
489 loop {
490 let desc_chain = queue
491 .next_async(mem, &mut queue_event)
492 .await
493 .map_err(Error::Async)?;
494
495 let index = desc_chain.index;
496
497 let mut reader =
498 Reader::new(mem.clone(), desc_chain.clone()).map_err(Error::DescriptorChain)?;
499 let mut writer = Writer::new(mem.clone(), desc_chain).map_err(Error::DescriptorChain)?;
500 // Don't advance the reader
501 let code = reader
502 .clone()
503 .read_obj::<virtio_snd_hdr>()
504 .map_err(Error::ReadMessage)?
505 .code
506 .into();
507
508 let handle_ctrl_msg = async {
509 return match code {
510 VIRTIO_SND_R_JACK_INFO => {
511 let query_info: virtio_snd_query_info =
512 reader.read_obj().map_err(Error::ReadMessage)?;
513 let start_id: usize = u32::from(query_info.start_id) as usize;
514 let count: usize = u32::from(query_info.count) as usize;
515 if start_id + count > snd_data.jack_info.len() {
516 error!(
517 "start_id({}) + count({}) must be smaller than the number of jacks ({})",
518 start_id,
519 count,
520 snd_data.jack_info.len()
521 );
522 return writer
523 .write_obj(VIRTIO_SND_S_BAD_MSG)
524 .map_err(Error::WriteResponse);
525 }
526 // The response consists of the virtio_snd_hdr structure (contains the request
527 // status code), followed by the device-writable information structures of the
528 // item. Each information structure begins with the following common header
529 writer
530 .write_obj(VIRTIO_SND_S_OK)
531 .map_err(Error::WriteResponse)?;
532 for i in start_id..(start_id + count) {
533 writer
534 .write_all(snd_data.jack_info[i].as_slice())
535 .map_err(Error::WriteResponse)?;
536 }
537 Ok(())
538 }
539 VIRTIO_SND_R_PCM_INFO => {
540 let query_info: virtio_snd_query_info =
541 reader.read_obj().map_err(Error::ReadMessage)?;
542 let start_id: usize = u32::from(query_info.start_id) as usize;
543 let count: usize = u32::from(query_info.count) as usize;
544 if start_id + count > snd_data.pcm_info.len() {
545 error!(
546 "start_id({}) + count({}) must be smaller than the number of streams ({})",
547 start_id,
548 count,
549 snd_data.pcm_info.len()
550 );
551 return writer
552 .write_obj(VIRTIO_SND_S_BAD_MSG)
553 .map_err(Error::WriteResponse);
554 }
555 // The response consists of the virtio_snd_hdr structure (contains the request
556 // status code), followed by the device-writable information structures of the
557 // item. Each information structure begins with the following common header
558 writer
559 .write_obj(VIRTIO_SND_S_OK)
560 .map_err(Error::WriteResponse)?;
561 for i in start_id..(start_id + count) {
562 writer
563 .write_all(snd_data.pcm_info[i].as_slice())
564 .map_err(Error::WriteResponse)?;
565 }
566 Ok(())
567 }
568 VIRTIO_SND_R_CHMAP_INFO => {
569 let query_info: virtio_snd_query_info =
570 reader.read_obj().map_err(Error::ReadMessage)?;
571 let start_id: usize = u32::from(query_info.start_id) as usize;
572 let count: usize = u32::from(query_info.count) as usize;
573 if start_id + count > snd_data.chmap_info.len() {
574 error!(
575 "start_id({}) + count({}) must be smaller than the number of chmaps ({})",
576 start_id,
577 count,
578 snd_data.pcm_info.len()
579 );
580 return writer
581 .write_obj(VIRTIO_SND_S_BAD_MSG)
582 .map_err(Error::WriteResponse);
583 }
584 // The response consists of the virtio_snd_hdr structure (contains the request
585 // status code), followed by the device-writable information structures of the
586 // item. Each information structure begins with the following common header
587 writer
588 .write_obj(VIRTIO_SND_S_OK)
589 .map_err(Error::WriteResponse)?;
590 for i in start_id..(start_id + count) {
591 writer
592 .write_all(snd_data.chmap_info[i].as_slice())
593 .map_err(Error::WriteResponse)?;
594 }
595 Ok(())
596 }
597 VIRTIO_SND_R_JACK_REMAP => {
598 unreachable!("remap is unsupported");
599 }
600 VIRTIO_SND_R_PCM_SET_PARAMS => {
601 // Raise VIRTIO_SND_S_BAD_MSG or IO error?
602 let set_params: virtio_snd_pcm_set_params =
603 reader.read_obj().map_err(Error::ReadMessage)?;
604 let stream_id: usize = u32::from(set_params.hdr.stream_id) as usize;
605 let buffer_bytes: u32 = set_params.buffer_bytes.into();
606 let period_bytes: u32 = set_params.period_bytes.into();
607
608 let dir = match snd_data.pcm_info.get(stream_id) {
609 Some(pcm_info) => {
610 if set_params.channels < pcm_info.channels_min
611 || set_params.channels > pcm_info.channels_max
612 {
613 error!(
614 "Number of channels ({}) must be between {} and {}",
615 set_params.channels,
616 pcm_info.channels_min,
617 pcm_info.channels_max
618 );
619 return writer
620 .write_obj(VIRTIO_SND_S_NOT_SUPP)
621 .map_err(Error::WriteResponse);
622 }
623 if (u64::from(pcm_info.formats) & (1 << set_params.format)) == 0 {
624 error!("PCM format {} is not supported.", set_params.format);
625 return writer
626 .write_obj(VIRTIO_SND_S_NOT_SUPP)
627 .map_err(Error::WriteResponse);
628 }
629 if (u64::from(pcm_info.rates) & (1 << set_params.rate)) == 0 {
630 error!("PCM frame rate {} is not supported.", set_params.rate);
631 return writer
632 .write_obj(VIRTIO_SND_S_NOT_SUPP)
633 .map_err(Error::WriteResponse);
634 }
635
636 pcm_info.direction
637 }
638 None => {
639 error!(
640 "stream_id {} < streams {}",
641 stream_id,
642 snd_data.pcm_info.len()
643 );
644 return writer
645 .write_obj(VIRTIO_SND_S_BAD_MSG)
646 .map_err(Error::WriteResponse);
647 }
648 };
649
650 if set_params.features != 0 {
651 error!("No feature is supported");
652 return writer
653 .write_obj(VIRTIO_SND_S_NOT_SUPP)
654 .map_err(Error::WriteResponse);
655 }
656
657 if buffer_bytes % period_bytes != 0 {
658 error!(
659 "buffer_bytes({}) must be dividable by period_bytes({})",
660 buffer_bytes, period_bytes
661 );
662 return writer
663 .write_obj(VIRTIO_SND_S_BAD_MSG)
664 .map_err(Error::WriteResponse);
665 }
666
667 let streams = streams.read_lock().await;
668 let mut stream_info = match streams.get(stream_id) {
669 Some(stream_info) => stream_info.lock().await,
670 None => {
671 error!("stream_id {} < streams {}", stream_id, streams.len());
672 return writer
673 .write_obj(VIRTIO_SND_S_BAD_MSG)
674 .map_err(Error::WriteResponse);
675 }
676 };
677
678 if stream_info.state != 0
679 && stream_info.state != VIRTIO_SND_R_PCM_SET_PARAMS
680 && stream_info.state != VIRTIO_SND_R_PCM_PREPARE
681 && stream_info.state != VIRTIO_SND_R_PCM_RELEASE
682 {
683 error!(
684 "Invalid PCM state transition from {} to {}",
685 get_virtio_snd_r_pcm_cmd_name(stream_info.state),
686 get_virtio_snd_r_pcm_cmd_name(VIRTIO_SND_R_PCM_SET_PARAMS)
687 );
688 return writer
689 .write_obj(VIRTIO_SND_S_NOT_SUPP)
690 .map_err(Error::WriteResponse);
691 }
692
693 // Only required for PREPARE -> SET_PARAMS
694 stream_info.release_worker().await?;
695
696 stream_info.channels = set_params.channels;
697 stream_info.format = from_virtio_sample_format(set_params.format).unwrap();
698 stream_info.frame_rate = from_virtio_frame_rate(set_params.rate).unwrap();
699 stream_info.buffer_bytes = buffer_bytes as usize;
700 stream_info.period_bytes = period_bytes as usize;
701 stream_info.direction = dir;
702 stream_info.state = VIRTIO_SND_R_PCM_SET_PARAMS;
703
704 debug!(
705 "VIRTIO_SND_R_PCM_SET_PARAMS for stream id={}. Stream info: {:#?}",
706 stream_id, *stream_info
707 );
708
709 writer
710 .write_obj(VIRTIO_SND_S_OK)
711 .map_err(Error::WriteResponse)
712 }
713 VIRTIO_SND_R_PCM_PREPARE
714 | VIRTIO_SND_R_PCM_START
715 | VIRTIO_SND_R_PCM_STOP
716 | VIRTIO_SND_R_PCM_RELEASE => {
717 let hdr: virtio_snd_pcm_hdr = reader.read_obj().map_err(Error::ReadMessage)?;
718 let stream_id: usize = u32::from(hdr.stream_id) as usize;
719 process_pcm_ctrl(
720 ex,
721 &mem.clone(),
722 &tx_send,
723 &rx_send,
724 streams,
725 params,
726 code,
727 &mut writer,
728 stream_id,
729 )
730 .await
731 .and(Ok(()))?;
732 Ok(())
733 }
734 c => {
735 error!("Unrecognized code: {}", c);
736 return writer
737 .write_obj(VIRTIO_SND_S_BAD_MSG)
738 .map_err(Error::WriteResponse);
739 }
740 };
741 };
742
743 handle_ctrl_msg.await?;
744 queue.add_used(mem, index, writer.bytes_written() as u32);
745 queue.trigger_interrupt(&mem, interrupt);
746 }
747 }
748
749 /// Send events to the audio driver.
handle_event_queue<I: SignalableInterrupt>( mem: &GuestMemory, mut queue: Queue, mut queue_event: EventAsync, interrupt: &I, ) -> Result<(), Error>750 pub async fn handle_event_queue<I: SignalableInterrupt>(
751 mem: &GuestMemory,
752 mut queue: Queue,
753 mut queue_event: EventAsync,
754 interrupt: &I,
755 ) -> Result<(), Error> {
756 loop {
757 let desc_chain = queue
758 .next_async(mem, &mut queue_event)
759 .await
760 .map_err(Error::Async)?;
761
762 // TODO(woodychow): Poll and forward events from cras asynchronously (API to be added)
763 let index = desc_chain.index;
764 queue.add_used(mem, index, 0);
765 queue.trigger_interrupt(&mem, interrupt);
766 }
767 }
768