• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 pub mod sys;
6 
7 use std::borrow::Borrow;
8 use std::rc::Rc;
9 
10 use anyhow::anyhow;
11 use anyhow::bail;
12 use anyhow::Context;
13 use base::error;
14 use base::warn;
15 use cros_async::sync::RwLock as AsyncRwLock;
16 use cros_async::EventAsync;
17 use cros_async::Executor;
18 use futures::channel::mpsc;
19 use futures::FutureExt;
20 use hypervisor::ProtectionType;
21 use once_cell::sync::OnceCell;
22 use serde::Deserialize;
23 use serde::Serialize;
24 pub use sys::run_snd_device;
25 pub use sys::Options;
26 use vm_memory::GuestMemory;
27 use vmm_vhost::message::VhostUserProtocolFeatures;
28 use vmm_vhost::VHOST_USER_F_PROTOCOL_FEATURES;
29 use zerocopy::AsBytes;
30 
31 use crate::virtio;
32 use crate::virtio::copy_config;
33 use crate::virtio::device_constants::snd::virtio_snd_config;
34 use crate::virtio::snd::common_backend::async_funcs::handle_ctrl_queue;
35 use crate::virtio::snd::common_backend::async_funcs::handle_pcm_queue;
36 use crate::virtio::snd::common_backend::async_funcs::send_pcm_response_worker;
37 use crate::virtio::snd::common_backend::create_stream_info_builders;
38 use crate::virtio::snd::common_backend::hardcoded_snd_data;
39 use crate::virtio::snd::common_backend::hardcoded_virtio_snd_config;
40 use crate::virtio::snd::common_backend::stream_info::StreamInfo;
41 use crate::virtio::snd::common_backend::stream_info::StreamInfoBuilder;
42 use crate::virtio::snd::common_backend::stream_info::StreamInfoSnapshot;
43 use crate::virtio::snd::common_backend::Error;
44 use crate::virtio::snd::common_backend::PcmResponse;
45 use crate::virtio::snd::common_backend::SndData;
46 use crate::virtio::snd::common_backend::MAX_QUEUE_NUM;
47 use crate::virtio::snd::constants::VIRTIO_SND_R_PCM_PREPARE;
48 use crate::virtio::snd::constants::VIRTIO_SND_R_PCM_START;
49 use crate::virtio::snd::parameters::Parameters;
50 use crate::virtio::vhost::user::device::handler::DeviceRequestHandler;
51 use crate::virtio::vhost::user::device::handler::Error as DeviceError;
52 use crate::virtio::vhost::user::device::handler::VhostUserDevice;
53 use crate::virtio::vhost::user::device::handler::WorkerState;
54 use crate::virtio::vhost::user::VhostUserDeviceBuilder;
55 use crate::virtio::Interrupt;
56 use crate::virtio::Queue;
57 
58 static SND_EXECUTOR: OnceCell<Executor> = OnceCell::new();
59 
60 // Async workers:
61 // 0 - ctrl
62 // 1 - event
63 // 2 - tx
64 // 3 - rx
65 const PCM_RESPONSE_WORKER_IDX_OFFSET: usize = 2;
66 struct SndBackend {
67     cfg: virtio_snd_config,
68     avail_features: u64,
69     acked_features: u64,
70     acked_protocol_features: VhostUserProtocolFeatures,
71     workers: [Option<WorkerState<Rc<AsyncRwLock<Queue>>, Result<(), Error>>>; MAX_QUEUE_NUM],
72     // tx and rx
73     response_workers: [Option<WorkerState<Rc<AsyncRwLock<Queue>>, Result<(), Error>>>; 2],
74     snd_data: Rc<SndData>,
75     streams: Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>,
76     tx_send: mpsc::UnboundedSender<PcmResponse>,
77     rx_send: mpsc::UnboundedSender<PcmResponse>,
78     tx_recv: Option<mpsc::UnboundedReceiver<PcmResponse>>,
79     rx_recv: Option<mpsc::UnboundedReceiver<PcmResponse>>,
80 }
81 
82 #[derive(Serialize, Deserialize)]
83 struct SndBackendSnapshot {
84     avail_features: u64,
85     acked_features: u64,
86     acked_protocol_features: u64,
87     stream_infos: Option<Vec<StreamInfoSnapshot>>,
88     snd_data: SndData,
89 }
90 
91 impl SndBackend {
new(params: Parameters) -> anyhow::Result<Self>92     pub fn new(params: Parameters) -> anyhow::Result<Self> {
93         let cfg = hardcoded_virtio_snd_config(&params);
94         let avail_features = virtio::base_features(ProtectionType::Unprotected)
95             | 1 << VHOST_USER_F_PROTOCOL_FEATURES;
96 
97         let snd_data = hardcoded_snd_data(&params);
98         let mut keep_rds = Vec::new();
99         let builders = create_stream_info_builders(&params, &snd_data, &mut keep_rds)?;
100 
101         if snd_data.pcm_info_len() != builders.len() {
102             error!(
103                 "snd: expected {} stream info builders, got {}",
104                 snd_data.pcm_info_len(),
105                 builders.len(),
106             )
107         }
108 
109         let streams = builders
110             .into_iter()
111             .map(StreamInfoBuilder::build)
112             .map(AsyncRwLock::new)
113             .collect();
114         let streams = Rc::new(AsyncRwLock::new(streams));
115 
116         let (tx_send, tx_recv) = mpsc::unbounded();
117         let (rx_send, rx_recv) = mpsc::unbounded();
118 
119         Ok(SndBackend {
120             cfg,
121             avail_features,
122             acked_features: 0,
123             acked_protocol_features: VhostUserProtocolFeatures::empty(),
124             workers: Default::default(),
125             response_workers: Default::default(),
126             snd_data: Rc::new(snd_data),
127             streams,
128             tx_send,
129             rx_send,
130             tx_recv: Some(tx_recv),
131             rx_recv: Some(rx_recv),
132         })
133     }
134 }
135 
136 impl VhostUserDeviceBuilder for SndBackend {
build(self: Box<Self>, _ex: &Executor) -> anyhow::Result<Box<dyn vmm_vhost::Backend>>137     fn build(self: Box<Self>, _ex: &Executor) -> anyhow::Result<Box<dyn vmm_vhost::Backend>> {
138         let handler = DeviceRequestHandler::new(*self);
139         Ok(Box::new(handler))
140     }
141 }
142 
143 impl VhostUserDevice for SndBackend {
max_queue_num(&self) -> usize144     fn max_queue_num(&self) -> usize {
145         MAX_QUEUE_NUM
146     }
147 
features(&self) -> u64148     fn features(&self) -> u64 {
149         self.avail_features
150     }
151 
ack_features(&mut self, value: u64) -> anyhow::Result<()>152     fn ack_features(&mut self, value: u64) -> anyhow::Result<()> {
153         let unrequested_features = value & !self.avail_features;
154         if unrequested_features != 0 {
155             bail!("invalid features are given: {:#x}", unrequested_features);
156         }
157 
158         self.acked_features |= value;
159 
160         Ok(())
161     }
162 
acked_features(&self) -> u64163     fn acked_features(&self) -> u64 {
164         self.acked_features
165     }
166 
protocol_features(&self) -> VhostUserProtocolFeatures167     fn protocol_features(&self) -> VhostUserProtocolFeatures {
168         VhostUserProtocolFeatures::CONFIG | VhostUserProtocolFeatures::MQ
169     }
170 
ack_protocol_features(&mut self, features: u64) -> anyhow::Result<()>171     fn ack_protocol_features(&mut self, features: u64) -> anyhow::Result<()> {
172         let features = VhostUserProtocolFeatures::from_bits(features)
173             .ok_or_else(|| anyhow!("invalid protocol features are given: {:#x}", features))?;
174         let supported = self.protocol_features();
175         self.acked_protocol_features = features & supported;
176         Ok(())
177     }
178 
acked_protocol_features(&self) -> u64179     fn acked_protocol_features(&self) -> u64 {
180         self.acked_protocol_features.bits()
181     }
182 
read_config(&self, offset: u64, data: &mut [u8])183     fn read_config(&self, offset: u64, data: &mut [u8]) {
184         copy_config(data, 0, self.cfg.as_bytes(), offset)
185     }
186 
reset(&mut self)187     fn reset(&mut self) {
188         let ex = SND_EXECUTOR.get().expect("Executor not initialized");
189         for worker in self.workers.iter_mut().filter_map(Option::take) {
190             let _ = ex.run_until(worker.queue_task.cancel());
191         }
192     }
193 
start_queue( &mut self, idx: usize, queue: virtio::Queue, _mem: GuestMemory, doorbell: Interrupt, ) -> anyhow::Result<()>194     fn start_queue(
195         &mut self,
196         idx: usize,
197         queue: virtio::Queue,
198         _mem: GuestMemory,
199         doorbell: Interrupt,
200     ) -> anyhow::Result<()> {
201         if self.workers[idx].is_some() {
202             warn!("Starting new queue handler without stopping old handler");
203             self.stop_queue(idx)?;
204         }
205 
206         // Safe because the executor is initialized in main() below.
207         let ex = SND_EXECUTOR.get().expect("Executor not initialized");
208 
209         let kick_evt = queue
210             .event()
211             .try_clone()
212             .context("failed to clone queue event")?;
213         let mut kick_evt =
214             EventAsync::new(kick_evt, ex).context("failed to create EventAsync for kick_evt")?;
215         let queue = Rc::new(AsyncRwLock::new(queue));
216         let queue_task = match idx {
217             0 => {
218                 // ctrl queue
219                 let streams = self.streams.clone();
220                 let snd_data = self.snd_data.clone();
221                 let tx_send = self.tx_send.clone();
222                 let rx_send = self.rx_send.clone();
223                 let ctrl_queue = queue.clone();
224                 Some(ex.spawn_local(async move {
225                     handle_ctrl_queue(
226                         ex,
227                         &streams,
228                         &snd_data,
229                         ctrl_queue,
230                         &mut kick_evt,
231                         doorbell,
232                         tx_send,
233                         rx_send,
234                         None,
235                     )
236                     .await
237                 }))
238             }
239             // TODO(woodychow): Add event queue support
240             //
241             // Note: Even though we don't support the event queue, we still need to keep track of
242             // the Queue so we can return it back in stop_queue. As such, we create a do nothing
243             // future to "run" this queue so that we track a WorkerState for it (which is how
244             // we return the Queue back).
245             1 => Some(ex.spawn_local(async move { Ok(()) })),
246             2 | 3 => {
247                 let (send, recv) = if idx == 2 {
248                     (self.tx_send.clone(), self.tx_recv.take())
249                 } else {
250                     (self.rx_send.clone(), self.rx_recv.take())
251                 };
252                 let mut recv = recv.ok_or_else(|| anyhow!("queue restart is not supported"))?;
253                 let streams = Rc::clone(&self.streams);
254                 let queue_pcm_queue = queue.clone();
255                 let queue_task = ex.spawn_local(async move {
256                     handle_pcm_queue(&streams, send, queue_pcm_queue, &kick_evt, None).await
257                 });
258 
259                 let queue_response_queue = queue.clone();
260                 let response_queue_task = ex.spawn_local(async move {
261                     send_pcm_response_worker(queue_response_queue, doorbell, &mut recv, None).await
262                 });
263 
264                 self.response_workers[idx - PCM_RESPONSE_WORKER_IDX_OFFSET] = Some(WorkerState {
265                     queue_task: response_queue_task,
266                     queue: queue.clone(),
267                 });
268 
269                 Some(queue_task)
270             }
271             _ => bail!("attempted to start unknown queue: {}", idx),
272         };
273 
274         if let Some(queue_task) = queue_task {
275             self.workers[idx] = Some(WorkerState { queue_task, queue });
276         }
277         Ok(())
278     }
279 
stop_queue(&mut self, idx: usize) -> anyhow::Result<virtio::Queue>280     fn stop_queue(&mut self, idx: usize) -> anyhow::Result<virtio::Queue> {
281         let ex = SND_EXECUTOR.get().expect("Executor not initialized");
282         let worker_queue_rc = self
283             .workers
284             .get_mut(idx)
285             .and_then(Option::take)
286             .map(|worker| {
287                 // Wait for queue_task to be aborted.
288                 let _ = ex.run_until(worker.queue_task.cancel());
289                 worker.queue
290             });
291 
292         if idx == 2 || idx == 3 {
293             if let Some(worker) = self
294                 .response_workers
295                 .get_mut(idx - PCM_RESPONSE_WORKER_IDX_OFFSET)
296                 .and_then(Option::take)
297             {
298                 // Wait for queue_task to be aborted.
299                 let _ = ex.run_until(worker.queue_task.cancel());
300             }
301         }
302 
303         if let Some(queue_rc) = worker_queue_rc {
304             match Rc::try_unwrap(queue_rc) {
305                 Ok(queue_mutex) => Ok(queue_mutex.into_inner()),
306                 Err(_) => panic!("failed to recover queue from worker"),
307             }
308         } else {
309             Err(anyhow::Error::new(DeviceError::WorkerNotFound))
310         }
311     }
312 
snapshot(&self) -> anyhow::Result<Vec<u8>>313     fn snapshot(&self) -> anyhow::Result<Vec<u8>> {
314         // now_or_never will succeed here because no workers are running.
315         let stream_info_snaps = if let Some(stream_infos) = &self.streams.lock().now_or_never() {
316             let mut snaps = Vec::new();
317             for stream_info in stream_infos.iter() {
318                 snaps.push(
319                     stream_info
320                         .lock()
321                         .now_or_never()
322                         .expect("failed to lock audio state during snapshot")
323                         .snapshot(),
324                 );
325             }
326             Some(snaps)
327         } else {
328             None
329         };
330         let snd_data_ref: &SndData = self.snd_data.borrow();
331         serde_json::to_vec(&SndBackendSnapshot {
332             avail_features: self.avail_features,
333             acked_protocol_features: self.acked_protocol_features.bits(),
334             acked_features: self.acked_features,
335             stream_infos: stream_info_snaps,
336             snd_data: snd_data_ref.clone(),
337         })
338         .context("Failed to serialize SndBackendSnapshot")
339     }
340 
restore(&mut self, data: Vec<u8>) -> anyhow::Result<()>341     fn restore(&mut self, data: Vec<u8>) -> anyhow::Result<()> {
342         let deser: SndBackendSnapshot = serde_json::from_slice(data.as_slice())
343             .context("Failed to deserialize SndBackendSnapshot")?;
344         anyhow::ensure!(
345             deser.avail_features == self.avail_features,
346             "avail features doesn't match on restore: expected: {}, got: {}",
347             deser.avail_features,
348             self.avail_features
349         );
350         anyhow::ensure!(
351             self.acked_protocol_features.bits() == deser.acked_protocol_features,
352             "Vhost user snd restored acked_protocol_features do not match. Live: {:?}, \
353             snapshot: {:?}",
354             self.acked_protocol_features,
355             deser.acked_protocol_features,
356         );
357         let snd_data = self.snd_data.borrow();
358         anyhow::ensure!(
359             &deser.snd_data == snd_data,
360             "snd data doesn't match on restore: expected: {:?}, got: {:?}",
361             deser.snd_data,
362             snd_data,
363         );
364         self.acked_features = deser.acked_features;
365 
366         // Wondering why we can pass ex to a move block *and* still use it
367         // afterwards? It's a &'static, which is the only kind of reference that
368         // can taken by a future run via spawn/spawn_local.
369         let ex = SND_EXECUTOR.get().expect("executor must be initialized");
370         let streams_rc = self.streams.clone();
371         let tx_send_clone = self.tx_send.clone();
372         let rx_send_clone = self.rx_send.clone();
373 
374         let restore_task = ex.spawn_local(async move {
375             if let Some(stream_infos) = &deser.stream_infos {
376                 for (stream, stream_info) in streams_rc.lock().await.iter().zip(stream_infos.iter())
377                 {
378                     stream.lock().await.restore(stream_info);
379                     if stream_info.state == VIRTIO_SND_R_PCM_START
380                         || stream_info.state == VIRTIO_SND_R_PCM_PREPARE
381                     {
382                         stream
383                             .lock()
384                             .await
385                             .prepare(ex, &tx_send_clone, &rx_send_clone)
386                             .await
387                             .expect("failed to prepare PCM");
388                     }
389                     if stream_info.state == VIRTIO_SND_R_PCM_START {
390                         stream
391                             .lock()
392                             .await
393                             .start()
394                             .await
395                             .expect("failed to start PCM");
396                     }
397                 }
398             }
399         });
400         ex.run_until(restore_task)
401             .expect("failed to restore streams");
402         Ok(())
403     }
404 
stop_non_queue_workers(&mut self) -> anyhow::Result<()>405     fn stop_non_queue_workers(&mut self) -> anyhow::Result<()> {
406         // This device has no non-queue workers to stop.
407         Ok(())
408     }
409 }
410