1 // Copyright 2021 The ChromiumOS Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 pub mod sys; 6 7 use std::borrow::Borrow; 8 use std::rc::Rc; 9 10 use anyhow::anyhow; 11 use anyhow::bail; 12 use anyhow::Context; 13 use base::error; 14 use base::warn; 15 use cros_async::sync::RwLock as AsyncRwLock; 16 use cros_async::EventAsync; 17 use cros_async::Executor; 18 use futures::channel::mpsc; 19 use futures::FutureExt; 20 use hypervisor::ProtectionType; 21 use once_cell::sync::OnceCell; 22 use serde::Deserialize; 23 use serde::Serialize; 24 pub use sys::run_snd_device; 25 pub use sys::Options; 26 use vm_memory::GuestMemory; 27 use vmm_vhost::message::VhostUserProtocolFeatures; 28 use vmm_vhost::VHOST_USER_F_PROTOCOL_FEATURES; 29 use zerocopy::AsBytes; 30 31 use crate::virtio; 32 use crate::virtio::copy_config; 33 use crate::virtio::device_constants::snd::virtio_snd_config; 34 use crate::virtio::snd::common_backend::async_funcs::handle_ctrl_queue; 35 use crate::virtio::snd::common_backend::async_funcs::handle_pcm_queue; 36 use crate::virtio::snd::common_backend::async_funcs::send_pcm_response_worker; 37 use crate::virtio::snd::common_backend::create_stream_info_builders; 38 use crate::virtio::snd::common_backend::hardcoded_snd_data; 39 use crate::virtio::snd::common_backend::hardcoded_virtio_snd_config; 40 use crate::virtio::snd::common_backend::stream_info::StreamInfo; 41 use crate::virtio::snd::common_backend::stream_info::StreamInfoBuilder; 42 use crate::virtio::snd::common_backend::stream_info::StreamInfoSnapshot; 43 use crate::virtio::snd::common_backend::Error; 44 use crate::virtio::snd::common_backend::PcmResponse; 45 use crate::virtio::snd::common_backend::SndData; 46 use crate::virtio::snd::common_backend::MAX_QUEUE_NUM; 47 use crate::virtio::snd::constants::VIRTIO_SND_R_PCM_PREPARE; 48 use crate::virtio::snd::constants::VIRTIO_SND_R_PCM_START; 49 use crate::virtio::snd::parameters::Parameters; 50 use crate::virtio::vhost::user::device::handler::DeviceRequestHandler; 51 use crate::virtio::vhost::user::device::handler::Error as DeviceError; 52 use crate::virtio::vhost::user::device::handler::VhostUserDevice; 53 use crate::virtio::vhost::user::device::handler::WorkerState; 54 use crate::virtio::vhost::user::VhostUserDeviceBuilder; 55 use crate::virtio::Interrupt; 56 use crate::virtio::Queue; 57 58 static SND_EXECUTOR: OnceCell<Executor> = OnceCell::new(); 59 60 // Async workers: 61 // 0 - ctrl 62 // 1 - event 63 // 2 - tx 64 // 3 - rx 65 const PCM_RESPONSE_WORKER_IDX_OFFSET: usize = 2; 66 struct SndBackend { 67 cfg: virtio_snd_config, 68 avail_features: u64, 69 acked_features: u64, 70 acked_protocol_features: VhostUserProtocolFeatures, 71 workers: [Option<WorkerState<Rc<AsyncRwLock<Queue>>, Result<(), Error>>>; MAX_QUEUE_NUM], 72 // tx and rx 73 response_workers: [Option<WorkerState<Rc<AsyncRwLock<Queue>>, Result<(), Error>>>; 2], 74 snd_data: Rc<SndData>, 75 streams: Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>, 76 tx_send: mpsc::UnboundedSender<PcmResponse>, 77 rx_send: mpsc::UnboundedSender<PcmResponse>, 78 tx_recv: Option<mpsc::UnboundedReceiver<PcmResponse>>, 79 rx_recv: Option<mpsc::UnboundedReceiver<PcmResponse>>, 80 } 81 82 #[derive(Serialize, Deserialize)] 83 struct SndBackendSnapshot { 84 avail_features: u64, 85 acked_features: u64, 86 acked_protocol_features: u64, 87 stream_infos: Option<Vec<StreamInfoSnapshot>>, 88 snd_data: SndData, 89 } 90 91 impl SndBackend { new(params: Parameters) -> anyhow::Result<Self>92 pub fn new(params: Parameters) -> anyhow::Result<Self> { 93 let cfg = hardcoded_virtio_snd_config(¶ms); 94 let avail_features = virtio::base_features(ProtectionType::Unprotected) 95 | 1 << VHOST_USER_F_PROTOCOL_FEATURES; 96 97 let snd_data = hardcoded_snd_data(¶ms); 98 let mut keep_rds = Vec::new(); 99 let builders = create_stream_info_builders(¶ms, &snd_data, &mut keep_rds)?; 100 101 if snd_data.pcm_info_len() != builders.len() { 102 error!( 103 "snd: expected {} stream info builders, got {}", 104 snd_data.pcm_info_len(), 105 builders.len(), 106 ) 107 } 108 109 let streams = builders 110 .into_iter() 111 .map(StreamInfoBuilder::build) 112 .map(AsyncRwLock::new) 113 .collect(); 114 let streams = Rc::new(AsyncRwLock::new(streams)); 115 116 let (tx_send, tx_recv) = mpsc::unbounded(); 117 let (rx_send, rx_recv) = mpsc::unbounded(); 118 119 Ok(SndBackend { 120 cfg, 121 avail_features, 122 acked_features: 0, 123 acked_protocol_features: VhostUserProtocolFeatures::empty(), 124 workers: Default::default(), 125 response_workers: Default::default(), 126 snd_data: Rc::new(snd_data), 127 streams, 128 tx_send, 129 rx_send, 130 tx_recv: Some(tx_recv), 131 rx_recv: Some(rx_recv), 132 }) 133 } 134 } 135 136 impl VhostUserDeviceBuilder for SndBackend { build(self: Box<Self>, _ex: &Executor) -> anyhow::Result<Box<dyn vmm_vhost::Backend>>137 fn build(self: Box<Self>, _ex: &Executor) -> anyhow::Result<Box<dyn vmm_vhost::Backend>> { 138 let handler = DeviceRequestHandler::new(*self); 139 Ok(Box::new(handler)) 140 } 141 } 142 143 impl VhostUserDevice for SndBackend { max_queue_num(&self) -> usize144 fn max_queue_num(&self) -> usize { 145 MAX_QUEUE_NUM 146 } 147 features(&self) -> u64148 fn features(&self) -> u64 { 149 self.avail_features 150 } 151 ack_features(&mut self, value: u64) -> anyhow::Result<()>152 fn ack_features(&mut self, value: u64) -> anyhow::Result<()> { 153 let unrequested_features = value & !self.avail_features; 154 if unrequested_features != 0 { 155 bail!("invalid features are given: {:#x}", unrequested_features); 156 } 157 158 self.acked_features |= value; 159 160 Ok(()) 161 } 162 acked_features(&self) -> u64163 fn acked_features(&self) -> u64 { 164 self.acked_features 165 } 166 protocol_features(&self) -> VhostUserProtocolFeatures167 fn protocol_features(&self) -> VhostUserProtocolFeatures { 168 VhostUserProtocolFeatures::CONFIG | VhostUserProtocolFeatures::MQ 169 } 170 ack_protocol_features(&mut self, features: u64) -> anyhow::Result<()>171 fn ack_protocol_features(&mut self, features: u64) -> anyhow::Result<()> { 172 let features = VhostUserProtocolFeatures::from_bits(features) 173 .ok_or_else(|| anyhow!("invalid protocol features are given: {:#x}", features))?; 174 let supported = self.protocol_features(); 175 self.acked_protocol_features = features & supported; 176 Ok(()) 177 } 178 acked_protocol_features(&self) -> u64179 fn acked_protocol_features(&self) -> u64 { 180 self.acked_protocol_features.bits() 181 } 182 read_config(&self, offset: u64, data: &mut [u8])183 fn read_config(&self, offset: u64, data: &mut [u8]) { 184 copy_config(data, 0, self.cfg.as_bytes(), offset) 185 } 186 reset(&mut self)187 fn reset(&mut self) { 188 let ex = SND_EXECUTOR.get().expect("Executor not initialized"); 189 for worker in self.workers.iter_mut().filter_map(Option::take) { 190 let _ = ex.run_until(worker.queue_task.cancel()); 191 } 192 } 193 start_queue( &mut self, idx: usize, queue: virtio::Queue, _mem: GuestMemory, doorbell: Interrupt, ) -> anyhow::Result<()>194 fn start_queue( 195 &mut self, 196 idx: usize, 197 queue: virtio::Queue, 198 _mem: GuestMemory, 199 doorbell: Interrupt, 200 ) -> anyhow::Result<()> { 201 if self.workers[idx].is_some() { 202 warn!("Starting new queue handler without stopping old handler"); 203 self.stop_queue(idx)?; 204 } 205 206 // Safe because the executor is initialized in main() below. 207 let ex = SND_EXECUTOR.get().expect("Executor not initialized"); 208 209 let kick_evt = queue 210 .event() 211 .try_clone() 212 .context("failed to clone queue event")?; 213 let mut kick_evt = 214 EventAsync::new(kick_evt, ex).context("failed to create EventAsync for kick_evt")?; 215 let queue = Rc::new(AsyncRwLock::new(queue)); 216 let queue_task = match idx { 217 0 => { 218 // ctrl queue 219 let streams = self.streams.clone(); 220 let snd_data = self.snd_data.clone(); 221 let tx_send = self.tx_send.clone(); 222 let rx_send = self.rx_send.clone(); 223 let ctrl_queue = queue.clone(); 224 Some(ex.spawn_local(async move { 225 handle_ctrl_queue( 226 ex, 227 &streams, 228 &snd_data, 229 ctrl_queue, 230 &mut kick_evt, 231 doorbell, 232 tx_send, 233 rx_send, 234 None, 235 ) 236 .await 237 })) 238 } 239 // TODO(woodychow): Add event queue support 240 // 241 // Note: Even though we don't support the event queue, we still need to keep track of 242 // the Queue so we can return it back in stop_queue. As such, we create a do nothing 243 // future to "run" this queue so that we track a WorkerState for it (which is how 244 // we return the Queue back). 245 1 => Some(ex.spawn_local(async move { Ok(()) })), 246 2 | 3 => { 247 let (send, recv) = if idx == 2 { 248 (self.tx_send.clone(), self.tx_recv.take()) 249 } else { 250 (self.rx_send.clone(), self.rx_recv.take()) 251 }; 252 let mut recv = recv.ok_or_else(|| anyhow!("queue restart is not supported"))?; 253 let streams = Rc::clone(&self.streams); 254 let queue_pcm_queue = queue.clone(); 255 let queue_task = ex.spawn_local(async move { 256 handle_pcm_queue(&streams, send, queue_pcm_queue, &kick_evt, None).await 257 }); 258 259 let queue_response_queue = queue.clone(); 260 let response_queue_task = ex.spawn_local(async move { 261 send_pcm_response_worker(queue_response_queue, doorbell, &mut recv, None).await 262 }); 263 264 self.response_workers[idx - PCM_RESPONSE_WORKER_IDX_OFFSET] = Some(WorkerState { 265 queue_task: response_queue_task, 266 queue: queue.clone(), 267 }); 268 269 Some(queue_task) 270 } 271 _ => bail!("attempted to start unknown queue: {}", idx), 272 }; 273 274 if let Some(queue_task) = queue_task { 275 self.workers[idx] = Some(WorkerState { queue_task, queue }); 276 } 277 Ok(()) 278 } 279 stop_queue(&mut self, idx: usize) -> anyhow::Result<virtio::Queue>280 fn stop_queue(&mut self, idx: usize) -> anyhow::Result<virtio::Queue> { 281 let ex = SND_EXECUTOR.get().expect("Executor not initialized"); 282 let worker_queue_rc = self 283 .workers 284 .get_mut(idx) 285 .and_then(Option::take) 286 .map(|worker| { 287 // Wait for queue_task to be aborted. 288 let _ = ex.run_until(worker.queue_task.cancel()); 289 worker.queue 290 }); 291 292 if idx == 2 || idx == 3 { 293 if let Some(worker) = self 294 .response_workers 295 .get_mut(idx - PCM_RESPONSE_WORKER_IDX_OFFSET) 296 .and_then(Option::take) 297 { 298 // Wait for queue_task to be aborted. 299 let _ = ex.run_until(worker.queue_task.cancel()); 300 } 301 } 302 303 if let Some(queue_rc) = worker_queue_rc { 304 match Rc::try_unwrap(queue_rc) { 305 Ok(queue_mutex) => Ok(queue_mutex.into_inner()), 306 Err(_) => panic!("failed to recover queue from worker"), 307 } 308 } else { 309 Err(anyhow::Error::new(DeviceError::WorkerNotFound)) 310 } 311 } 312 snapshot(&self) -> anyhow::Result<Vec<u8>>313 fn snapshot(&self) -> anyhow::Result<Vec<u8>> { 314 // now_or_never will succeed here because no workers are running. 315 let stream_info_snaps = if let Some(stream_infos) = &self.streams.lock().now_or_never() { 316 let mut snaps = Vec::new(); 317 for stream_info in stream_infos.iter() { 318 snaps.push( 319 stream_info 320 .lock() 321 .now_or_never() 322 .expect("failed to lock audio state during snapshot") 323 .snapshot(), 324 ); 325 } 326 Some(snaps) 327 } else { 328 None 329 }; 330 let snd_data_ref: &SndData = self.snd_data.borrow(); 331 serde_json::to_vec(&SndBackendSnapshot { 332 avail_features: self.avail_features, 333 acked_protocol_features: self.acked_protocol_features.bits(), 334 acked_features: self.acked_features, 335 stream_infos: stream_info_snaps, 336 snd_data: snd_data_ref.clone(), 337 }) 338 .context("Failed to serialize SndBackendSnapshot") 339 } 340 restore(&mut self, data: Vec<u8>) -> anyhow::Result<()>341 fn restore(&mut self, data: Vec<u8>) -> anyhow::Result<()> { 342 let deser: SndBackendSnapshot = serde_json::from_slice(data.as_slice()) 343 .context("Failed to deserialize SndBackendSnapshot")?; 344 anyhow::ensure!( 345 deser.avail_features == self.avail_features, 346 "avail features doesn't match on restore: expected: {}, got: {}", 347 deser.avail_features, 348 self.avail_features 349 ); 350 anyhow::ensure!( 351 self.acked_protocol_features.bits() == deser.acked_protocol_features, 352 "Vhost user snd restored acked_protocol_features do not match. Live: {:?}, \ 353 snapshot: {:?}", 354 self.acked_protocol_features, 355 deser.acked_protocol_features, 356 ); 357 let snd_data = self.snd_data.borrow(); 358 anyhow::ensure!( 359 &deser.snd_data == snd_data, 360 "snd data doesn't match on restore: expected: {:?}, got: {:?}", 361 deser.snd_data, 362 snd_data, 363 ); 364 self.acked_features = deser.acked_features; 365 366 // Wondering why we can pass ex to a move block *and* still use it 367 // afterwards? It's a &'static, which is the only kind of reference that 368 // can taken by a future run via spawn/spawn_local. 369 let ex = SND_EXECUTOR.get().expect("executor must be initialized"); 370 let streams_rc = self.streams.clone(); 371 let tx_send_clone = self.tx_send.clone(); 372 let rx_send_clone = self.rx_send.clone(); 373 374 let restore_task = ex.spawn_local(async move { 375 if let Some(stream_infos) = &deser.stream_infos { 376 for (stream, stream_info) in streams_rc.lock().await.iter().zip(stream_infos.iter()) 377 { 378 stream.lock().await.restore(stream_info); 379 if stream_info.state == VIRTIO_SND_R_PCM_START 380 || stream_info.state == VIRTIO_SND_R_PCM_PREPARE 381 { 382 stream 383 .lock() 384 .await 385 .prepare(ex, &tx_send_clone, &rx_send_clone) 386 .await 387 .expect("failed to prepare PCM"); 388 } 389 if stream_info.state == VIRTIO_SND_R_PCM_START { 390 stream 391 .lock() 392 .await 393 .start() 394 .await 395 .expect("failed to start PCM"); 396 } 397 } 398 } 399 }); 400 ex.run_until(restore_task) 401 .expect("failed to restore streams"); 402 Ok(()) 403 } 404 stop_non_queue_workers(&mut self) -> anyhow::Result<()>405 fn stop_non_queue_workers(&mut self) -> anyhow::Result<()> { 406 // This device has no non-queue workers to stop. 407 Ok(()) 408 } 409 } 410