1 // Copyright 2021 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::os::unix::net::UnixListener;
6 use std::rc::Rc;
7 use std::sync::Arc;
8
9 use anyhow::{anyhow, bail, Context};
10 use argh::FromArgs;
11 use base::{warn, Event, UnlinkUnixListener};
12 use cros_async::{sync::Mutex as AsyncMutex, EventAsync, Executor};
13 use data_model::DataInit;
14 use futures::channel::mpsc;
15 use futures::future::{AbortHandle, Abortable};
16 use hypervisor::ProtectionType;
17 use once_cell::sync::OnceCell;
18 use sync::Mutex;
19 use vm_memory::GuestMemory;
20 use vmm_vhost::message::{VhostUserProtocolFeatures, VhostUserVirtioFeatures};
21
22 use crate::virtio::snd::cras_backend::{
23 async_funcs::{handle_ctrl_queue, handle_pcm_queue, send_pcm_response_worker},
24 hardcoded_snd_data, hardcoded_virtio_snd_config, Parameters, PcmResponse, SndData, StreamInfo,
25 MAX_QUEUE_NUM, MAX_VRING_LEN,
26 };
27 use crate::virtio::snd::layout::virtio_snd_config;
28 use crate::virtio::vhost::user::device::handler::{
29 DeviceRequestHandler, Doorbell, VhostUserBackend,
30 };
31 use crate::virtio::{self, copy_config};
32
33 static SND_EXECUTOR: OnceCell<Executor> = OnceCell::new();
34
35 // Async workers:
36 // 0 - ctrl
37 // 1 - event
38 // 2 - tx
39 // 3 - rx
40 const PCM_RESPONSE_WORKER_IDX_OFFSET: usize = 2;
41 struct CrasSndBackend {
42 cfg: virtio_snd_config,
43 avail_features: u64,
44 acked_features: u64,
45 acked_protocol_features: VhostUserProtocolFeatures,
46 workers: [Option<AbortHandle>; MAX_QUEUE_NUM],
47 response_workers: [Option<AbortHandle>; 2], // tx and rx
48 snd_data: Rc<SndData>,
49 streams: Rc<AsyncMutex<Vec<AsyncMutex<StreamInfo<'static>>>>>,
50 params: Parameters,
51 tx_send: mpsc::UnboundedSender<PcmResponse>,
52 rx_send: mpsc::UnboundedSender<PcmResponse>,
53 tx_recv: Option<mpsc::UnboundedReceiver<PcmResponse>>,
54 rx_recv: Option<mpsc::UnboundedReceiver<PcmResponse>>,
55 }
56
57 impl CrasSndBackend {
new(params: Parameters) -> anyhow::Result<Self>58 pub fn new(params: Parameters) -> anyhow::Result<Self> {
59 let cfg = hardcoded_virtio_snd_config(¶ms);
60 let avail_features = virtio::base_features(ProtectionType::Unprotected)
61 | VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits();
62
63 let snd_data = hardcoded_snd_data(¶ms);
64
65 let mut streams: Vec<AsyncMutex<StreamInfo>> = Vec::new();
66 streams.resize_with(snd_data.pcm_info_len(), Default::default);
67 let streams = Rc::new(AsyncMutex::new(streams));
68
69 let (tx_send, tx_recv) = mpsc::unbounded();
70 let (rx_send, rx_recv) = mpsc::unbounded();
71
72 Ok(CrasSndBackend {
73 cfg,
74 avail_features,
75 acked_features: 0,
76 acked_protocol_features: VhostUserProtocolFeatures::empty(),
77 workers: Default::default(),
78 response_workers: Default::default(),
79 snd_data: Rc::new(snd_data),
80 streams,
81 params,
82 tx_send,
83 rx_send,
84 tx_recv: Some(tx_recv),
85 rx_recv: Some(rx_recv),
86 })
87 }
88 }
89
90 impl VhostUserBackend for CrasSndBackend {
91 const MAX_QUEUE_NUM: usize = MAX_QUEUE_NUM;
92 const MAX_VRING_LEN: u16 = MAX_VRING_LEN;
93
94 type Error = anyhow::Error;
95
features(&self) -> u6496 fn features(&self) -> u64 {
97 self.avail_features
98 }
99
ack_features(&mut self, value: u64) -> anyhow::Result<()>100 fn ack_features(&mut self, value: u64) -> anyhow::Result<()> {
101 let unrequested_features = value & !self.avail_features;
102 if unrequested_features != 0 {
103 bail!("invalid features are given: {:#x}", unrequested_features);
104 }
105
106 self.acked_features |= value;
107
108 Ok(())
109 }
110
acked_features(&self) -> u64111 fn acked_features(&self) -> u64 {
112 self.acked_features
113 }
114
protocol_features(&self) -> VhostUserProtocolFeatures115 fn protocol_features(&self) -> VhostUserProtocolFeatures {
116 VhostUserProtocolFeatures::CONFIG | VhostUserProtocolFeatures::MQ
117 }
118
ack_protocol_features(&mut self, features: u64) -> anyhow::Result<()>119 fn ack_protocol_features(&mut self, features: u64) -> anyhow::Result<()> {
120 let features = VhostUserProtocolFeatures::from_bits(features)
121 .ok_or_else(|| anyhow!("invalid protocol features are given: {:#x}", features))?;
122 let supported = self.protocol_features();
123 self.acked_protocol_features = features & supported;
124 Ok(())
125 }
126
acked_protocol_features(&self) -> u64127 fn acked_protocol_features(&self) -> u64 {
128 self.acked_protocol_features.bits()
129 }
130
read_config(&self, offset: u64, data: &mut [u8])131 fn read_config(&self, offset: u64, data: &mut [u8]) {
132 copy_config(data, 0, self.cfg.as_slice(), offset)
133 }
134
reset(&mut self)135 fn reset(&mut self) {
136 for handle in self.workers.iter_mut().filter_map(Option::take) {
137 handle.abort();
138 }
139 }
140
start_queue( &mut self, idx: usize, mut queue: virtio::Queue, mem: GuestMemory, doorbell: Arc<Mutex<Doorbell>>, kick_evt: Event, ) -> anyhow::Result<()>141 fn start_queue(
142 &mut self,
143 idx: usize,
144 mut queue: virtio::Queue,
145 mem: GuestMemory,
146 doorbell: Arc<Mutex<Doorbell>>,
147 kick_evt: Event,
148 ) -> anyhow::Result<()> {
149 if let Some(handle) = self.workers.get_mut(idx).and_then(Option::take) {
150 warn!("Starting new queue handler without stopping old handler");
151 handle.abort();
152 }
153
154 // Safe because the executor is initialized in main() below.
155 let ex = SND_EXECUTOR.get().expect("Executor not initialized");
156
157 // Enable any virtqueue features that were negotiated (like VIRTIO_RING_F_EVENT_IDX).
158 queue.ack_features(self.acked_features);
159
160 let kick_evt =
161 EventAsync::new(kick_evt.0, ex).context("failed to create EventAsync for kick_evt")?;
162 let (handle, registration) = AbortHandle::new_pair();
163 match idx {
164 0 => {
165 // ctrl queue
166 let streams = self.streams.clone();
167 let snd_data = self.snd_data.clone();
168 let tx_send = self.tx_send.clone();
169 let rx_send = self.rx_send.clone();
170 let params = self.params.clone();
171 ex.spawn_local(Abortable::new(
172 async move {
173 handle_ctrl_queue(
174 &ex, &mem, &streams, &*snd_data, queue, kick_evt, &doorbell, tx_send,
175 rx_send, ¶ms,
176 )
177 .await
178 },
179 registration,
180 ))
181 .detach();
182 }
183 1 => {} // TODO(woodychow): Add event queue support
184 2 | 3 => {
185 let (send, recv) = if idx == 2 {
186 (self.tx_send.clone(), self.tx_recv.take())
187 } else {
188 (self.rx_send.clone(), self.rx_recv.take())
189 };
190 let mut recv = recv.ok_or_else(|| anyhow!("queue restart is not supported"))?;
191 let queue = Rc::new(AsyncMutex::new(queue));
192 let queue2 = Rc::clone(&queue);
193 let mem = Rc::new(mem);
194 let mem2 = Rc::clone(&mem);
195 let streams = Rc::clone(&self.streams);
196 ex.spawn_local(Abortable::new(
197 async move { handle_pcm_queue(&*mem, &streams, send, &queue, kick_evt).await },
198 registration,
199 ))
200 .detach();
201
202 let (handle2, registration2) = AbortHandle::new_pair();
203
204 ex.spawn_local(Abortable::new(
205 async move {
206 send_pcm_response_worker(&*mem2, &queue2, &doorbell, &mut recv).await
207 },
208 registration2,
209 ))
210 .detach();
211
212 self.response_workers[idx - PCM_RESPONSE_WORKER_IDX_OFFSET] = Some(handle2);
213 }
214 _ => bail!("attempted to start unknown queue: {}", idx),
215 }
216
217 self.workers[idx] = Some(handle);
218 Ok(())
219 }
220
stop_queue(&mut self, idx: usize)221 fn stop_queue(&mut self, idx: usize) {
222 if let Some(handle) = self.workers.get_mut(idx).and_then(Option::take) {
223 handle.abort();
224 }
225 if idx == 2 || idx == 3 {
226 if let Some(handle) = self
227 .response_workers
228 .get_mut(idx - PCM_RESPONSE_WORKER_IDX_OFFSET)
229 .and_then(Option::take)
230 {
231 handle.abort();
232 }
233 }
234 }
235 }
236
237 #[derive(FromArgs)]
238 #[argh(description = "")]
239 struct Options {
240 #[argh(option, description = "path to a socket", arg_name = "PATH")]
241 socket: String,
242 #[argh(
243 option,
244 description = "comma separated key=value pairs for setting up cras snd devices.
245 Possible key values:
246 capture - Enable audio capture. Default to false.
247 client_type - Set specific client type for cras backend.
248 num_output_streams - Set number of output PCM streams.
249 num_input_streams - Set number of input PCM streams.
250 Example: [capture=true,client=crosvm,socket=unified,num_output_streams=1,num_input_streams=1]",
251 arg_name = "CONFIG"
252 )]
253 config: Option<String>,
254 }
255
256 /// Starts a vhost-user snd device with the cras backend.
257 /// Returns an error if the given `args` is invalid or the device fails to run.
run_cras_snd_device(program_name: &str, args: &[&str]) -> anyhow::Result<()>258 pub fn run_cras_snd_device(program_name: &str, args: &[&str]) -> anyhow::Result<()> {
259 let opts = match Options::from_args(&[program_name], args) {
260 Ok(opts) => opts,
261 Err(e) => {
262 if e.status.is_err() {
263 bail!(e.output);
264 } else {
265 println!("{}", e.output);
266 }
267 return Ok(());
268 }
269 };
270 let params = opts
271 .config
272 .unwrap_or("".to_string())
273 .parse::<Parameters>()?;
274
275 let snd_device = CrasSndBackend::new(params)?;
276
277 // Create and bind unix socket
278 let listener = UnixListener::bind(opts.socket).map(UnlinkUnixListener)?;
279
280 let handler = DeviceRequestHandler::new(snd_device);
281
282 // Child, we can continue by spawning the executor and set up the device
283 let ex = Executor::new().context("Failed to create executor")?;
284
285 let _ = SND_EXECUTOR.set(ex.clone());
286
287 // run_until() returns an Result<Result<..>> which the ? operator lets us flatten.
288 ex.run_until(handler.run_with_listener(listener, &ex))?
289 }
290