1 // Copyright 2021 The ChromiumOS Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 pub mod sys; 6 7 use std::borrow::Borrow; 8 use std::rc::Rc; 9 10 use anyhow::anyhow; 11 use anyhow::bail; 12 use anyhow::Context; 13 use base::error; 14 use base::warn; 15 use cros_async::sync::RwLock as AsyncRwLock; 16 use cros_async::EventAsync; 17 use cros_async::Executor; 18 use futures::channel::mpsc; 19 use futures::FutureExt; 20 use hypervisor::ProtectionType; 21 use serde::Deserialize; 22 use serde::Serialize; 23 use snapshot::AnySnapshot; 24 pub use sys::run_snd_device; 25 pub use sys::Options; 26 use vm_memory::GuestMemory; 27 use vmm_vhost::message::VhostUserProtocolFeatures; 28 use vmm_vhost::VHOST_USER_F_PROTOCOL_FEATURES; 29 use zerocopy::IntoBytes; 30 31 use crate::virtio; 32 use crate::virtio::copy_config; 33 use crate::virtio::device_constants::snd::virtio_snd_config; 34 use crate::virtio::snd::common_backend::async_funcs::handle_ctrl_queue; 35 use crate::virtio::snd::common_backend::async_funcs::handle_pcm_queue; 36 use crate::virtio::snd::common_backend::async_funcs::send_pcm_response_worker; 37 use crate::virtio::snd::common_backend::create_stream_info_builders; 38 use crate::virtio::snd::common_backend::hardcoded_snd_data; 39 use crate::virtio::snd::common_backend::hardcoded_virtio_snd_config; 40 use crate::virtio::snd::common_backend::stream_info::StreamInfo; 41 use crate::virtio::snd::common_backend::stream_info::StreamInfoBuilder; 42 use crate::virtio::snd::common_backend::stream_info::StreamInfoSnapshot; 43 use crate::virtio::snd::common_backend::Error; 44 use crate::virtio::snd::common_backend::PcmResponse; 45 use crate::virtio::snd::common_backend::SndData; 46 use crate::virtio::snd::common_backend::MAX_QUEUE_NUM; 47 use crate::virtio::snd::constants::VIRTIO_SND_R_PCM_PREPARE; 48 use crate::virtio::snd::constants::VIRTIO_SND_R_PCM_START; 49 use crate::virtio::snd::parameters::Parameters; 50 use crate::virtio::vhost::user::device::handler::DeviceRequestHandler; 51 use crate::virtio::vhost::user::device::handler::Error as DeviceError; 52 use crate::virtio::vhost::user::device::handler::VhostUserDevice; 53 use crate::virtio::vhost::user::device::handler::WorkerState; 54 use crate::virtio::vhost::user::VhostUserDeviceBuilder; 55 use crate::virtio::Queue; 56 57 // Async workers: 58 // 0 - ctrl 59 // 1 - event 60 // 2 - tx 61 // 3 - rx 62 const PCM_RESPONSE_WORKER_IDX_OFFSET: usize = 2; 63 struct SndBackend { 64 ex: Executor, 65 cfg: virtio_snd_config, 66 avail_features: u64, 67 workers: [Option<WorkerState<Rc<AsyncRwLock<Queue>>, Result<(), Error>>>; MAX_QUEUE_NUM], 68 // tx and rx 69 response_workers: [Option<WorkerState<Rc<AsyncRwLock<Queue>>, Result<(), Error>>>; 2], 70 snd_data: Rc<SndData>, 71 streams: Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>, 72 tx_send: mpsc::UnboundedSender<PcmResponse>, 73 rx_send: mpsc::UnboundedSender<PcmResponse>, 74 tx_recv: Option<mpsc::UnboundedReceiver<PcmResponse>>, 75 rx_recv: Option<mpsc::UnboundedReceiver<PcmResponse>>, 76 // Appended to logs for when there are mutliple audio devices. 77 card_index: usize, 78 } 79 80 #[derive(Serialize, Deserialize)] 81 struct SndBackendSnapshot { 82 avail_features: u64, 83 stream_infos: Option<Vec<StreamInfoSnapshot>>, 84 snd_data: SndData, 85 } 86 87 impl SndBackend { new( ex: &Executor, params: Parameters, #[cfg(windows)] audio_client_guid: Option<String>, card_index: usize, ) -> anyhow::Result<Self>88 pub fn new( 89 ex: &Executor, 90 params: Parameters, 91 #[cfg(windows)] audio_client_guid: Option<String>, 92 card_index: usize, 93 ) -> anyhow::Result<Self> { 94 let cfg = hardcoded_virtio_snd_config(¶ms); 95 let avail_features = virtio::base_features(ProtectionType::Unprotected) 96 | 1 << VHOST_USER_F_PROTOCOL_FEATURES; 97 98 let snd_data = hardcoded_snd_data(¶ms); 99 let mut keep_rds = Vec::new(); 100 let builders = create_stream_info_builders(¶ms, &snd_data, &mut keep_rds, card_index)?; 101 102 if snd_data.pcm_info_len() != builders.len() { 103 error!( 104 "[Card {}] snd: expected {} stream info builders, got {}", 105 card_index, 106 snd_data.pcm_info_len(), 107 builders.len(), 108 ) 109 } 110 111 let streams = builders.into_iter(); 112 113 #[cfg(windows)] 114 let streams = streams 115 .map(|stream_builder| stream_builder.audio_client_guid(audio_client_guid.clone())); 116 117 let streams = streams 118 .map(StreamInfoBuilder::build) 119 .map(AsyncRwLock::new) 120 .collect(); 121 let streams = Rc::new(AsyncRwLock::new(streams)); 122 123 let (tx_send, tx_recv) = mpsc::unbounded(); 124 let (rx_send, rx_recv) = mpsc::unbounded(); 125 126 Ok(SndBackend { 127 ex: ex.clone(), 128 cfg, 129 avail_features, 130 workers: Default::default(), 131 response_workers: Default::default(), 132 snd_data: Rc::new(snd_data), 133 streams, 134 tx_send, 135 rx_send, 136 tx_recv: Some(tx_recv), 137 rx_recv: Some(rx_recv), 138 card_index, 139 }) 140 } 141 } 142 143 impl VhostUserDeviceBuilder for SndBackend { build(self: Box<Self>, _ex: &Executor) -> anyhow::Result<Box<dyn vmm_vhost::Backend>>144 fn build(self: Box<Self>, _ex: &Executor) -> anyhow::Result<Box<dyn vmm_vhost::Backend>> { 145 let handler = DeviceRequestHandler::new(*self); 146 Ok(Box::new(handler)) 147 } 148 } 149 150 impl VhostUserDevice for SndBackend { max_queue_num(&self) -> usize151 fn max_queue_num(&self) -> usize { 152 MAX_QUEUE_NUM 153 } 154 features(&self) -> u64155 fn features(&self) -> u64 { 156 self.avail_features 157 } 158 protocol_features(&self) -> VhostUserProtocolFeatures159 fn protocol_features(&self) -> VhostUserProtocolFeatures { 160 VhostUserProtocolFeatures::CONFIG 161 | VhostUserProtocolFeatures::MQ 162 | VhostUserProtocolFeatures::DEVICE_STATE 163 } 164 read_config(&self, offset: u64, data: &mut [u8])165 fn read_config(&self, offset: u64, data: &mut [u8]) { 166 copy_config(data, 0, self.cfg.as_bytes(), offset) 167 } 168 reset(&mut self)169 fn reset(&mut self) { 170 for worker in self.workers.iter_mut().filter_map(Option::take) { 171 let _ = self.ex.run_until(worker.queue_task.cancel()); 172 } 173 } 174 start_queue( &mut self, idx: usize, queue: virtio::Queue, _mem: GuestMemory, ) -> anyhow::Result<()>175 fn start_queue( 176 &mut self, 177 idx: usize, 178 queue: virtio::Queue, 179 _mem: GuestMemory, 180 ) -> anyhow::Result<()> { 181 if self.workers[idx].is_some() { 182 warn!( 183 "[Card {}] Starting new queue handler without stopping old handler", 184 self.card_index 185 ); 186 self.stop_queue(idx)?; 187 } 188 189 let kick_evt = queue.event().try_clone().context(format!( 190 "[Card {}] failed to clone queue event", 191 self.card_index 192 ))?; 193 let mut kick_evt = EventAsync::new(kick_evt, &self.ex).context(format!( 194 "[Card {}] failed to create EventAsync for kick_evt", 195 self.card_index 196 ))?; 197 let queue = Rc::new(AsyncRwLock::new(queue)); 198 let card_index = self.card_index; 199 let queue_task = match idx { 200 0 => { 201 // ctrl queue 202 let streams = self.streams.clone(); 203 let snd_data = self.snd_data.clone(); 204 let tx_send = self.tx_send.clone(); 205 let rx_send = self.rx_send.clone(); 206 let ctrl_queue = queue.clone(); 207 208 let ex_clone = self.ex.clone(); 209 Some(self.ex.spawn_local(async move { 210 handle_ctrl_queue( 211 &ex_clone, 212 &streams, 213 &snd_data, 214 ctrl_queue, 215 &mut kick_evt, 216 tx_send, 217 rx_send, 218 card_index, 219 None, 220 ) 221 .await 222 })) 223 } 224 // TODO(woodychow): Add event queue support 225 // 226 // Note: Even though we don't support the event queue, we still need to keep track of 227 // the Queue so we can return it back in stop_queue. As such, we create a do nothing 228 // future to "run" this queue so that we track a WorkerState for it (which is how 229 // we return the Queue back). 230 1 => Some(self.ex.spawn_local(async move { Ok(()) })), 231 2 | 3 => { 232 let (send, recv) = if idx == 2 { 233 (self.tx_send.clone(), self.tx_recv.take()) 234 } else { 235 (self.rx_send.clone(), self.rx_recv.take()) 236 }; 237 let mut recv = recv.ok_or_else(|| { 238 anyhow!("[Card {}] queue restart is not supported", self.card_index) 239 })?; 240 let streams = Rc::clone(&self.streams); 241 let queue_pcm_queue = queue.clone(); 242 let queue_task = self.ex.spawn_local(async move { 243 handle_pcm_queue(&streams, send, queue_pcm_queue, &kick_evt, card_index, None) 244 .await 245 }); 246 247 let queue_response_queue = queue.clone(); 248 let response_queue_task = self.ex.spawn_local(async move { 249 send_pcm_response_worker(queue_response_queue, &mut recv, None).await 250 }); 251 252 self.response_workers[idx - PCM_RESPONSE_WORKER_IDX_OFFSET] = Some(WorkerState { 253 queue_task: response_queue_task, 254 queue: queue.clone(), 255 }); 256 257 Some(queue_task) 258 } 259 _ => bail!( 260 "[Card {}] attempted to start unknown queue: {}", 261 self.card_index, 262 idx 263 ), 264 }; 265 266 if let Some(queue_task) = queue_task { 267 self.workers[idx] = Some(WorkerState { queue_task, queue }); 268 } 269 Ok(()) 270 } 271 stop_queue(&mut self, idx: usize) -> anyhow::Result<virtio::Queue>272 fn stop_queue(&mut self, idx: usize) -> anyhow::Result<virtio::Queue> { 273 let worker_queue_rc = self 274 .workers 275 .get_mut(idx) 276 .and_then(Option::take) 277 .map(|worker| { 278 // Wait for queue_task to be aborted. 279 let _ = self.ex.run_until(worker.queue_task.cancel()); 280 worker.queue 281 }); 282 283 if idx == 2 || idx == 3 { 284 if let Some(worker) = self 285 .response_workers 286 .get_mut(idx - PCM_RESPONSE_WORKER_IDX_OFFSET) 287 .and_then(Option::take) 288 { 289 // Wait for queue_task to be aborted. 290 let _ = self.ex.run_until(worker.queue_task.cancel()); 291 } 292 } 293 294 if let Some(queue_rc) = worker_queue_rc { 295 match Rc::try_unwrap(queue_rc) { 296 Ok(queue_mutex) => Ok(queue_mutex.into_inner()), 297 Err(_) => panic!( 298 "[Card {}] failed to recover queue from worker", 299 self.card_index 300 ), 301 } 302 } else { 303 Err(anyhow::Error::new(DeviceError::WorkerNotFound)) 304 } 305 } 306 snapshot(&mut self) -> anyhow::Result<AnySnapshot>307 fn snapshot(&mut self) -> anyhow::Result<AnySnapshot> { 308 // now_or_never will succeed here because no workers are running. 309 let stream_info_snaps = if let Some(stream_infos) = &self.streams.lock().now_or_never() { 310 let mut snaps = Vec::new(); 311 for stream_info in stream_infos.iter() { 312 snaps.push( 313 stream_info 314 .lock() 315 .now_or_never() 316 .unwrap_or_else(|| { 317 panic!( 318 "[Card {}] failed to lock audio state during snapshot", 319 self.card_index 320 ) 321 }) 322 .snapshot(), 323 ); 324 } 325 Some(snaps) 326 } else { 327 None 328 }; 329 let snd_data_ref: &SndData = self.snd_data.borrow(); 330 AnySnapshot::to_any(SndBackendSnapshot { 331 avail_features: self.avail_features, 332 stream_infos: stream_info_snaps, 333 snd_data: snd_data_ref.clone(), 334 }) 335 .context(format!( 336 "[Card {}] Failed to serialize SndBackendSnapshot", 337 self.card_index 338 )) 339 } 340 restore(&mut self, data: AnySnapshot) -> anyhow::Result<()>341 fn restore(&mut self, data: AnySnapshot) -> anyhow::Result<()> { 342 let deser: SndBackendSnapshot = AnySnapshot::from_any(data).context(format!( 343 "[Card {}] Failed to deserialize SndBackendSnapshot", 344 self.card_index 345 ))?; 346 anyhow::ensure!( 347 deser.avail_features == self.avail_features, 348 "[Card {}] avail features doesn't match on restore: expected: {}, got: {}", 349 self.card_index, 350 deser.avail_features, 351 self.avail_features 352 ); 353 let snd_data = self.snd_data.borrow(); 354 anyhow::ensure!( 355 &deser.snd_data == snd_data, 356 "[Card {}] snd data doesn't match on restore: expected: {:?}, got: {:?}", 357 self.card_index, 358 deser.snd_data, 359 snd_data, 360 ); 361 362 let ex_clone = self.ex.clone(); 363 let streams_rc = self.streams.clone(); 364 let tx_send_clone = self.tx_send.clone(); 365 let rx_send_clone = self.rx_send.clone(); 366 367 let card_index = self.card_index; 368 let restore_task = self.ex.spawn_local(async move { 369 if let Some(stream_infos) = &deser.stream_infos { 370 for (stream, stream_info) in streams_rc.lock().await.iter().zip(stream_infos.iter()) 371 { 372 stream.lock().await.restore(stream_info); 373 if stream_info.state == VIRTIO_SND_R_PCM_START 374 || stream_info.state == VIRTIO_SND_R_PCM_PREPARE 375 { 376 stream 377 .lock() 378 .await 379 .prepare(&ex_clone, &tx_send_clone, &rx_send_clone) 380 .await 381 .unwrap_or_else(|_| { 382 panic!("[Card {}] failed to prepare PCM", card_index) 383 }); 384 } 385 if stream_info.state == VIRTIO_SND_R_PCM_START { 386 stream.lock().await.start().await.unwrap_or_else(|_| { 387 panic!("[Card {}] failed to start PCM", card_index) 388 }); 389 } 390 } 391 } 392 }); 393 self.ex 394 .run_until(restore_task) 395 .unwrap_or_else(|_| panic!("[Card {}] failed to restore streams", self.card_index)); 396 Ok(()) 397 } 398 enter_suspended_state(&mut self) -> anyhow::Result<()>399 fn enter_suspended_state(&mut self) -> anyhow::Result<()> { 400 // This device has no non-queue workers to stop. 401 Ok(()) 402 } 403 } 404