1 // Copyright 2021 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::os::unix::net::UnixListener;
6 use std::rc::Rc;
7 use std::sync::Arc;
8 
9 use anyhow::{anyhow, bail, Context};
10 use argh::FromArgs;
11 use base::{warn, Event, UnlinkUnixListener};
12 use cros_async::{sync::Mutex as AsyncMutex, EventAsync, Executor};
13 use data_model::DataInit;
14 use futures::channel::mpsc;
15 use futures::future::{AbortHandle, Abortable};
16 use hypervisor::ProtectionType;
17 use once_cell::sync::OnceCell;
18 use sync::Mutex;
19 use vm_memory::GuestMemory;
20 use vmm_vhost::message::{VhostUserProtocolFeatures, VhostUserVirtioFeatures};
21 
22 use crate::virtio::snd::cras_backend::{
23     async_funcs::{handle_ctrl_queue, handle_pcm_queue, send_pcm_response_worker},
24     hardcoded_snd_data, hardcoded_virtio_snd_config, Parameters, PcmResponse, SndData, StreamInfo,
25     MAX_QUEUE_NUM, MAX_VRING_LEN,
26 };
27 use crate::virtio::snd::layout::virtio_snd_config;
28 use crate::virtio::vhost::user::device::handler::{
29     DeviceRequestHandler, Doorbell, VhostUserBackend,
30 };
31 use crate::virtio::{self, copy_config};
32 
33 static SND_EXECUTOR: OnceCell<Executor> = OnceCell::new();
34 
35 // Async workers:
36 // 0 - ctrl
37 // 1 - event
38 // 2 - tx
39 // 3 - rx
40 const PCM_RESPONSE_WORKER_IDX_OFFSET: usize = 2;
41 struct CrasSndBackend {
42     cfg: virtio_snd_config,
43     avail_features: u64,
44     acked_features: u64,
45     acked_protocol_features: VhostUserProtocolFeatures,
46     workers: [Option<AbortHandle>; MAX_QUEUE_NUM],
47     response_workers: [Option<AbortHandle>; 2], // tx and rx
48     snd_data: Rc<SndData>,
49     streams: Rc<AsyncMutex<Vec<AsyncMutex<StreamInfo<'static>>>>>,
50     params: Parameters,
51     tx_send: mpsc::UnboundedSender<PcmResponse>,
52     rx_send: mpsc::UnboundedSender<PcmResponse>,
53     tx_recv: Option<mpsc::UnboundedReceiver<PcmResponse>>,
54     rx_recv: Option<mpsc::UnboundedReceiver<PcmResponse>>,
55 }
56 
57 impl CrasSndBackend {
new(params: Parameters) -> anyhow::Result<Self>58     pub fn new(params: Parameters) -> anyhow::Result<Self> {
59         let cfg = hardcoded_virtio_snd_config(¶ms);
60         let avail_features = virtio::base_features(ProtectionType::Unprotected)
61             | VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits();
62 
63         let snd_data = hardcoded_snd_data(¶ms);
64 
65         let mut streams: Vec<AsyncMutex<StreamInfo>> = Vec::new();
66         streams.resize_with(snd_data.pcm_info_len(), Default::default);
67         let streams = Rc::new(AsyncMutex::new(streams));
68 
69         let (tx_send, tx_recv) = mpsc::unbounded();
70         let (rx_send, rx_recv) = mpsc::unbounded();
71 
72         Ok(CrasSndBackend {
73             cfg,
74             avail_features,
75             acked_features: 0,
76             acked_protocol_features: VhostUserProtocolFeatures::empty(),
77             workers: Default::default(),
78             response_workers: Default::default(),
79             snd_data: Rc::new(snd_data),
80             streams,
81             params,
82             tx_send,
83             rx_send,
84             tx_recv: Some(tx_recv),
85             rx_recv: Some(rx_recv),
86         })
87     }
88 }
89 
90 impl VhostUserBackend for CrasSndBackend {
91     const MAX_QUEUE_NUM: usize = MAX_QUEUE_NUM;
92     const MAX_VRING_LEN: u16 = MAX_VRING_LEN;
93 
94     type Error = anyhow::Error;
95 
features(&self) -> u6496     fn features(&self) -> u64 {
97         self.avail_features
98     }
99 
ack_features(&mut self, value: u64) -> anyhow::Result<()>100     fn ack_features(&mut self, value: u64) -> anyhow::Result<()> {
101         let unrequested_features = value & !self.avail_features;
102         if unrequested_features != 0 {
103             bail!("invalid features are given: {:#x}", unrequested_features);
104         }
105 
106         self.acked_features |= value;
107 
108         Ok(())
109     }
110 
acked_features(&self) -> u64111     fn acked_features(&self) -> u64 {
112         self.acked_features
113     }
114 
protocol_features(&self) -> VhostUserProtocolFeatures115     fn protocol_features(&self) -> VhostUserProtocolFeatures {
116         VhostUserProtocolFeatures::CONFIG | VhostUserProtocolFeatures::MQ
117     }
118 
ack_protocol_features(&mut self, features: u64) -> anyhow::Result<()>119     fn ack_protocol_features(&mut self, features: u64) -> anyhow::Result<()> {
120         let features = VhostUserProtocolFeatures::from_bits(features)
121             .ok_or_else(|| anyhow!("invalid protocol features are given: {:#x}", features))?;
122         let supported = self.protocol_features();
123         self.acked_protocol_features = features & supported;
124         Ok(())
125     }
126 
acked_protocol_features(&self) -> u64127     fn acked_protocol_features(&self) -> u64 {
128         self.acked_protocol_features.bits()
129     }
130 
read_config(&self, offset: u64, data: &mut [u8])131     fn read_config(&self, offset: u64, data: &mut [u8]) {
132         copy_config(data, 0, self.cfg.as_slice(), offset)
133     }
134 
reset(&mut self)135     fn reset(&mut self) {
136         for handle in self.workers.iter_mut().filter_map(Option::take) {
137             handle.abort();
138         }
139     }
140 
start_queue( &mut self, idx: usize, mut queue: virtio::Queue, mem: GuestMemory, doorbell: Arc<Mutex<Doorbell>>, kick_evt: Event, ) -> anyhow::Result<()>141     fn start_queue(
142         &mut self,
143         idx: usize,
144         mut queue: virtio::Queue,
145         mem: GuestMemory,
146         doorbell: Arc<Mutex<Doorbell>>,
147         kick_evt: Event,
148     ) -> anyhow::Result<()> {
149         if let Some(handle) = self.workers.get_mut(idx).and_then(Option::take) {
150             warn!("Starting new queue handler without stopping old handler");
151             handle.abort();
152         }
153 
154         // Safe because the executor is initialized in main() below.
155         let ex = SND_EXECUTOR.get().expect("Executor not initialized");
156 
157         // Enable any virtqueue features that were negotiated (like VIRTIO_RING_F_EVENT_IDX).
158         queue.ack_features(self.acked_features);
159 
160         let kick_evt =
161             EventAsync::new(kick_evt.0, ex).context("failed to create EventAsync for kick_evt")?;
162         let (handle, registration) = AbortHandle::new_pair();
163         match idx {
164             0 => {
165                 // ctrl queue
166                 let streams = self.streams.clone();
167                 let snd_data = self.snd_data.clone();
168                 let tx_send = self.tx_send.clone();
169                 let rx_send = self.rx_send.clone();
170                 let params = self.params.clone();
171                 ex.spawn_local(Abortable::new(
172                     async move {
173                         handle_ctrl_queue(
174                             &ex, &mem, &streams, &*snd_data, queue, kick_evt, &doorbell, tx_send,
175                             rx_send, ¶ms,
176                         )
177                         .await
178                     },
179                     registration,
180                 ))
181                 .detach();
182             }
183             1 => {} // TODO(woodychow): Add event queue support
184             2 | 3 => {
185                 let (send, recv) = if idx == 2 {
186                     (self.tx_send.clone(), self.tx_recv.take())
187                 } else {
188                     (self.rx_send.clone(), self.rx_recv.take())
189                 };
190                 let mut recv = recv.ok_or_else(|| anyhow!("queue restart is not supported"))?;
191                 let queue = Rc::new(AsyncMutex::new(queue));
192                 let queue2 = Rc::clone(&queue);
193                 let mem = Rc::new(mem);
194                 let mem2 = Rc::clone(&mem);
195                 let streams = Rc::clone(&self.streams);
196                 ex.spawn_local(Abortable::new(
197                     async move { handle_pcm_queue(&*mem, &streams, send, &queue, kick_evt).await },
198                     registration,
199                 ))
200                 .detach();
201 
202                 let (handle2, registration2) = AbortHandle::new_pair();
203 
204                 ex.spawn_local(Abortable::new(
205                     async move {
206                         send_pcm_response_worker(&*mem2, &queue2, &doorbell, &mut recv).await
207                     },
208                     registration2,
209                 ))
210                 .detach();
211 
212                 self.response_workers[idx - PCM_RESPONSE_WORKER_IDX_OFFSET] = Some(handle2);
213             }
214             _ => bail!("attempted to start unknown queue: {}", idx),
215         }
216 
217         self.workers[idx] = Some(handle);
218         Ok(())
219     }
220 
stop_queue(&mut self, idx: usize)221     fn stop_queue(&mut self, idx: usize) {
222         if let Some(handle) = self.workers.get_mut(idx).and_then(Option::take) {
223             handle.abort();
224         }
225         if idx == 2 || idx == 3 {
226             if let Some(handle) = self
227                 .response_workers
228                 .get_mut(idx - PCM_RESPONSE_WORKER_IDX_OFFSET)
229                 .and_then(Option::take)
230             {
231                 handle.abort();
232             }
233         }
234     }
235 }
236 
237 #[derive(FromArgs)]
238 #[argh(description = "")]
239 struct Options {
240     #[argh(option, description = "path to a socket", arg_name = "PATH")]
241     socket: String,
242     #[argh(
243         option,
244         description = "comma separated key=value pairs for setting up cras snd devices.
245 Possible key values:
246 capture - Enable audio capture. Default to false.
247 client_type - Set specific client type for cras backend.
248 num_output_streams - Set number of output PCM streams.
249 num_input_streams - Set number of input PCM streams.
250 Example: [capture=true,client=crosvm,socket=unified,num_output_streams=1,num_input_streams=1]",
251         arg_name = "CONFIG"
252     )]
253     config: Option<String>,
254 }
255 
256 /// Starts a vhost-user snd device with the cras backend.
257 /// Returns an error if the given `args` is invalid or the device fails to run.
run_cras_snd_device(program_name: &str, args: &[&str]) -> anyhow::Result<()>258 pub fn run_cras_snd_device(program_name: &str, args: &[&str]) -> anyhow::Result<()> {
259     let opts = match Options::from_args(&[program_name], args) {
260         Ok(opts) => opts,
261         Err(e) => {
262             if e.status.is_err() {
263                 bail!(e.output);
264             } else {
265                 println!("{}", e.output);
266             }
267             return Ok(());
268         }
269     };
270     let params = opts
271         .config
272         .unwrap_or("".to_string())
273         .parse::<Parameters>()?;
274 
275     let snd_device = CrasSndBackend::new(params)?;
276 
277     // Create and bind unix socket
278     let listener = UnixListener::bind(opts.socket).map(UnlinkUnixListener)?;
279 
280     let handler = DeviceRequestHandler::new(snd_device);
281 
282     // Child, we can continue by spawning the executor and set up the device
283     let ex = Executor::new().context("Failed to create executor")?;
284 
285     let _ = SND_EXECUTOR.set(ex.clone());
286 
287     // run_until() returns an Result<Result<..>> which the ? operator lets us flatten.
288     ex.run_until(handler.run_with_listener(listener, &ex))?
289 }
290