• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 pub mod sys;
6 
7 use std::borrow::Borrow;
8 use std::rc::Rc;
9 
10 use anyhow::anyhow;
11 use anyhow::bail;
12 use anyhow::Context;
13 use base::error;
14 use base::warn;
15 use cros_async::sync::RwLock as AsyncRwLock;
16 use cros_async::EventAsync;
17 use cros_async::Executor;
18 use futures::channel::mpsc;
19 use futures::FutureExt;
20 use hypervisor::ProtectionType;
21 use serde::Deserialize;
22 use serde::Serialize;
23 use snapshot::AnySnapshot;
24 pub use sys::run_snd_device;
25 pub use sys::Options;
26 use vm_memory::GuestMemory;
27 use vmm_vhost::message::VhostUserProtocolFeatures;
28 use vmm_vhost::VHOST_USER_F_PROTOCOL_FEATURES;
29 use zerocopy::IntoBytes;
30 
31 use crate::virtio;
32 use crate::virtio::copy_config;
33 use crate::virtio::device_constants::snd::virtio_snd_config;
34 use crate::virtio::snd::common_backend::async_funcs::handle_ctrl_queue;
35 use crate::virtio::snd::common_backend::async_funcs::handle_pcm_queue;
36 use crate::virtio::snd::common_backend::async_funcs::send_pcm_response_worker;
37 use crate::virtio::snd::common_backend::create_stream_info_builders;
38 use crate::virtio::snd::common_backend::hardcoded_snd_data;
39 use crate::virtio::snd::common_backend::hardcoded_virtio_snd_config;
40 use crate::virtio::snd::common_backend::stream_info::StreamInfo;
41 use crate::virtio::snd::common_backend::stream_info::StreamInfoBuilder;
42 use crate::virtio::snd::common_backend::stream_info::StreamInfoSnapshot;
43 use crate::virtio::snd::common_backend::Error;
44 use crate::virtio::snd::common_backend::PcmResponse;
45 use crate::virtio::snd::common_backend::SndData;
46 use crate::virtio::snd::common_backend::MAX_QUEUE_NUM;
47 use crate::virtio::snd::constants::VIRTIO_SND_R_PCM_PREPARE;
48 use crate::virtio::snd::constants::VIRTIO_SND_R_PCM_START;
49 use crate::virtio::snd::parameters::Parameters;
50 use crate::virtio::vhost::user::device::handler::DeviceRequestHandler;
51 use crate::virtio::vhost::user::device::handler::Error as DeviceError;
52 use crate::virtio::vhost::user::device::handler::VhostUserDevice;
53 use crate::virtio::vhost::user::device::handler::WorkerState;
54 use crate::virtio::vhost::user::VhostUserDeviceBuilder;
55 use crate::virtio::Queue;
56 
57 // Async workers:
58 // 0 - ctrl
59 // 1 - event
60 // 2 - tx
61 // 3 - rx
62 const PCM_RESPONSE_WORKER_IDX_OFFSET: usize = 2;
63 struct SndBackend {
64     ex: Executor,
65     cfg: virtio_snd_config,
66     avail_features: u64,
67     workers: [Option<WorkerState<Rc<AsyncRwLock<Queue>>, Result<(), Error>>>; MAX_QUEUE_NUM],
68     // tx and rx
69     response_workers: [Option<WorkerState<Rc<AsyncRwLock<Queue>>, Result<(), Error>>>; 2],
70     snd_data: Rc<SndData>,
71     streams: Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>,
72     tx_send: mpsc::UnboundedSender<PcmResponse>,
73     rx_send: mpsc::UnboundedSender<PcmResponse>,
74     tx_recv: Option<mpsc::UnboundedReceiver<PcmResponse>>,
75     rx_recv: Option<mpsc::UnboundedReceiver<PcmResponse>>,
76     // Appended to logs for when there are mutliple audio devices.
77     card_index: usize,
78 }
79 
80 #[derive(Serialize, Deserialize)]
81 struct SndBackendSnapshot {
82     avail_features: u64,
83     stream_infos: Option<Vec<StreamInfoSnapshot>>,
84     snd_data: SndData,
85 }
86 
87 impl SndBackend {
new( ex: &Executor, params: Parameters, #[cfg(windows)] audio_client_guid: Option<String>, card_index: usize, ) -> anyhow::Result<Self>88     pub fn new(
89         ex: &Executor,
90         params: Parameters,
91         #[cfg(windows)] audio_client_guid: Option<String>,
92         card_index: usize,
93     ) -> anyhow::Result<Self> {
94         let cfg = hardcoded_virtio_snd_config(&params);
95         let avail_features = virtio::base_features(ProtectionType::Unprotected)
96             | 1 << VHOST_USER_F_PROTOCOL_FEATURES;
97 
98         let snd_data = hardcoded_snd_data(&params);
99         let mut keep_rds = Vec::new();
100         let builders = create_stream_info_builders(&params, &snd_data, &mut keep_rds, card_index)?;
101 
102         if snd_data.pcm_info_len() != builders.len() {
103             error!(
104                 "[Card {}] snd: expected {} stream info builders, got {}",
105                 card_index,
106                 snd_data.pcm_info_len(),
107                 builders.len(),
108             )
109         }
110 
111         let streams = builders.into_iter();
112 
113         #[cfg(windows)]
114         let streams = streams
115             .map(|stream_builder| stream_builder.audio_client_guid(audio_client_guid.clone()));
116 
117         let streams = streams
118             .map(StreamInfoBuilder::build)
119             .map(AsyncRwLock::new)
120             .collect();
121         let streams = Rc::new(AsyncRwLock::new(streams));
122 
123         let (tx_send, tx_recv) = mpsc::unbounded();
124         let (rx_send, rx_recv) = mpsc::unbounded();
125 
126         Ok(SndBackend {
127             ex: ex.clone(),
128             cfg,
129             avail_features,
130             workers: Default::default(),
131             response_workers: Default::default(),
132             snd_data: Rc::new(snd_data),
133             streams,
134             tx_send,
135             rx_send,
136             tx_recv: Some(tx_recv),
137             rx_recv: Some(rx_recv),
138             card_index,
139         })
140     }
141 }
142 
143 impl VhostUserDeviceBuilder for SndBackend {
build(self: Box<Self>, _ex: &Executor) -> anyhow::Result<Box<dyn vmm_vhost::Backend>>144     fn build(self: Box<Self>, _ex: &Executor) -> anyhow::Result<Box<dyn vmm_vhost::Backend>> {
145         let handler = DeviceRequestHandler::new(*self);
146         Ok(Box::new(handler))
147     }
148 }
149 
150 impl VhostUserDevice for SndBackend {
max_queue_num(&self) -> usize151     fn max_queue_num(&self) -> usize {
152         MAX_QUEUE_NUM
153     }
154 
features(&self) -> u64155     fn features(&self) -> u64 {
156         self.avail_features
157     }
158 
protocol_features(&self) -> VhostUserProtocolFeatures159     fn protocol_features(&self) -> VhostUserProtocolFeatures {
160         VhostUserProtocolFeatures::CONFIG
161             | VhostUserProtocolFeatures::MQ
162             | VhostUserProtocolFeatures::DEVICE_STATE
163     }
164 
read_config(&self, offset: u64, data: &mut [u8])165     fn read_config(&self, offset: u64, data: &mut [u8]) {
166         copy_config(data, 0, self.cfg.as_bytes(), offset)
167     }
168 
reset(&mut self)169     fn reset(&mut self) {
170         for worker in self.workers.iter_mut().filter_map(Option::take) {
171             let _ = self.ex.run_until(worker.queue_task.cancel());
172         }
173     }
174 
start_queue( &mut self, idx: usize, queue: virtio::Queue, _mem: GuestMemory, ) -> anyhow::Result<()>175     fn start_queue(
176         &mut self,
177         idx: usize,
178         queue: virtio::Queue,
179         _mem: GuestMemory,
180     ) -> anyhow::Result<()> {
181         if self.workers[idx].is_some() {
182             warn!(
183                 "[Card {}] Starting new queue handler without stopping old handler",
184                 self.card_index
185             );
186             self.stop_queue(idx)?;
187         }
188 
189         let kick_evt = queue.event().try_clone().context(format!(
190             "[Card {}] failed to clone queue event",
191             self.card_index
192         ))?;
193         let mut kick_evt = EventAsync::new(kick_evt, &self.ex).context(format!(
194             "[Card {}] failed to create EventAsync for kick_evt",
195             self.card_index
196         ))?;
197         let queue = Rc::new(AsyncRwLock::new(queue));
198         let card_index = self.card_index;
199         let queue_task = match idx {
200             0 => {
201                 // ctrl queue
202                 let streams = self.streams.clone();
203                 let snd_data = self.snd_data.clone();
204                 let tx_send = self.tx_send.clone();
205                 let rx_send = self.rx_send.clone();
206                 let ctrl_queue = queue.clone();
207 
208                 let ex_clone = self.ex.clone();
209                 Some(self.ex.spawn_local(async move {
210                     handle_ctrl_queue(
211                         &ex_clone,
212                         &streams,
213                         &snd_data,
214                         ctrl_queue,
215                         &mut kick_evt,
216                         tx_send,
217                         rx_send,
218                         card_index,
219                         None,
220                     )
221                     .await
222                 }))
223             }
224             // TODO(woodychow): Add event queue support
225             //
226             // Note: Even though we don't support the event queue, we still need to keep track of
227             // the Queue so we can return it back in stop_queue. As such, we create a do nothing
228             // future to "run" this queue so that we track a WorkerState for it (which is how
229             // we return the Queue back).
230             1 => Some(self.ex.spawn_local(async move { Ok(()) })),
231             2 | 3 => {
232                 let (send, recv) = if idx == 2 {
233                     (self.tx_send.clone(), self.tx_recv.take())
234                 } else {
235                     (self.rx_send.clone(), self.rx_recv.take())
236                 };
237                 let mut recv = recv.ok_or_else(|| {
238                     anyhow!("[Card {}] queue restart is not supported", self.card_index)
239                 })?;
240                 let streams = Rc::clone(&self.streams);
241                 let queue_pcm_queue = queue.clone();
242                 let queue_task = self.ex.spawn_local(async move {
243                     handle_pcm_queue(&streams, send, queue_pcm_queue, &kick_evt, card_index, None)
244                         .await
245                 });
246 
247                 let queue_response_queue = queue.clone();
248                 let response_queue_task = self.ex.spawn_local(async move {
249                     send_pcm_response_worker(queue_response_queue, &mut recv, None).await
250                 });
251 
252                 self.response_workers[idx - PCM_RESPONSE_WORKER_IDX_OFFSET] = Some(WorkerState {
253                     queue_task: response_queue_task,
254                     queue: queue.clone(),
255                 });
256 
257                 Some(queue_task)
258             }
259             _ => bail!(
260                 "[Card {}] attempted to start unknown queue: {}",
261                 self.card_index,
262                 idx
263             ),
264         };
265 
266         if let Some(queue_task) = queue_task {
267             self.workers[idx] = Some(WorkerState { queue_task, queue });
268         }
269         Ok(())
270     }
271 
stop_queue(&mut self, idx: usize) -> anyhow::Result<virtio::Queue>272     fn stop_queue(&mut self, idx: usize) -> anyhow::Result<virtio::Queue> {
273         let worker_queue_rc = self
274             .workers
275             .get_mut(idx)
276             .and_then(Option::take)
277             .map(|worker| {
278                 // Wait for queue_task to be aborted.
279                 let _ = self.ex.run_until(worker.queue_task.cancel());
280                 worker.queue
281             });
282 
283         if idx == 2 || idx == 3 {
284             if let Some(worker) = self
285                 .response_workers
286                 .get_mut(idx - PCM_RESPONSE_WORKER_IDX_OFFSET)
287                 .and_then(Option::take)
288             {
289                 // Wait for queue_task to be aborted.
290                 let _ = self.ex.run_until(worker.queue_task.cancel());
291             }
292         }
293 
294         if let Some(queue_rc) = worker_queue_rc {
295             match Rc::try_unwrap(queue_rc) {
296                 Ok(queue_mutex) => Ok(queue_mutex.into_inner()),
297                 Err(_) => panic!(
298                     "[Card {}] failed to recover queue from worker",
299                     self.card_index
300                 ),
301             }
302         } else {
303             Err(anyhow::Error::new(DeviceError::WorkerNotFound))
304         }
305     }
306 
snapshot(&mut self) -> anyhow::Result<AnySnapshot>307     fn snapshot(&mut self) -> anyhow::Result<AnySnapshot> {
308         // now_or_never will succeed here because no workers are running.
309         let stream_info_snaps = if let Some(stream_infos) = &self.streams.lock().now_or_never() {
310             let mut snaps = Vec::new();
311             for stream_info in stream_infos.iter() {
312                 snaps.push(
313                     stream_info
314                         .lock()
315                         .now_or_never()
316                         .unwrap_or_else(|| {
317                             panic!(
318                                 "[Card {}] failed to lock audio state during snapshot",
319                                 self.card_index
320                             )
321                         })
322                         .snapshot(),
323                 );
324             }
325             Some(snaps)
326         } else {
327             None
328         };
329         let snd_data_ref: &SndData = self.snd_data.borrow();
330         AnySnapshot::to_any(SndBackendSnapshot {
331             avail_features: self.avail_features,
332             stream_infos: stream_info_snaps,
333             snd_data: snd_data_ref.clone(),
334         })
335         .context(format!(
336             "[Card {}] Failed to serialize SndBackendSnapshot",
337             self.card_index
338         ))
339     }
340 
restore(&mut self, data: AnySnapshot) -> anyhow::Result<()>341     fn restore(&mut self, data: AnySnapshot) -> anyhow::Result<()> {
342         let deser: SndBackendSnapshot = AnySnapshot::from_any(data).context(format!(
343             "[Card {}] Failed to deserialize SndBackendSnapshot",
344             self.card_index
345         ))?;
346         anyhow::ensure!(
347             deser.avail_features == self.avail_features,
348             "[Card {}] avail features doesn't match on restore: expected: {}, got: {}",
349             self.card_index,
350             deser.avail_features,
351             self.avail_features
352         );
353         let snd_data = self.snd_data.borrow();
354         anyhow::ensure!(
355             &deser.snd_data == snd_data,
356             "[Card {}] snd data doesn't match on restore: expected: {:?}, got: {:?}",
357             self.card_index,
358             deser.snd_data,
359             snd_data,
360         );
361 
362         let ex_clone = self.ex.clone();
363         let streams_rc = self.streams.clone();
364         let tx_send_clone = self.tx_send.clone();
365         let rx_send_clone = self.rx_send.clone();
366 
367         let card_index = self.card_index;
368         let restore_task = self.ex.spawn_local(async move {
369             if let Some(stream_infos) = &deser.stream_infos {
370                 for (stream, stream_info) in streams_rc.lock().await.iter().zip(stream_infos.iter())
371                 {
372                     stream.lock().await.restore(stream_info);
373                     if stream_info.state == VIRTIO_SND_R_PCM_START
374                         || stream_info.state == VIRTIO_SND_R_PCM_PREPARE
375                     {
376                         stream
377                             .lock()
378                             .await
379                             .prepare(&ex_clone, &tx_send_clone, &rx_send_clone)
380                             .await
381                             .unwrap_or_else(|_| {
382                                 panic!("[Card {}] failed to prepare PCM", card_index)
383                             });
384                     }
385                     if stream_info.state == VIRTIO_SND_R_PCM_START {
386                         stream.lock().await.start().await.unwrap_or_else(|_| {
387                             panic!("[Card {}] failed to start PCM", card_index)
388                         });
389                     }
390                 }
391             }
392         });
393         self.ex
394             .run_until(restore_task)
395             .unwrap_or_else(|_| panic!("[Card {}] failed to restore streams", self.card_index));
396         Ok(())
397     }
398 
enter_suspended_state(&mut self) -> anyhow::Result<()>399     fn enter_suspended_state(&mut self) -> anyhow::Result<()> {
400         // This device has no non-queue workers to stop.
401         Ok(())
402     }
403 }
404