1 // Copyright 2020 The ChromiumOS Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 //! Worker that runs in a virtio-video thread. 6 7 use std::collections::VecDeque; 8 use std::time::Duration; 9 10 use base::clone_descriptor; 11 use base::error; 12 use base::info; 13 use base::Event; 14 use base::WaitContext; 15 use cros_async::select3; 16 use cros_async::AsyncWrapper; 17 use cros_async::EventAsync; 18 use cros_async::Executor; 19 use cros_async::SelectResult; 20 use futures::FutureExt; 21 22 use crate::virtio::video::async_cmd_desc_map::AsyncCmdDescMap; 23 use crate::virtio::video::command::QueueType; 24 use crate::virtio::video::command::VideoCmd; 25 use crate::virtio::video::device::AsyncCmdResponse; 26 use crate::virtio::video::device::AsyncCmdTag; 27 use crate::virtio::video::device::Device; 28 use crate::virtio::video::device::Token; 29 use crate::virtio::video::device::VideoCmdResponseType; 30 use crate::virtio::video::device::VideoEvtResponseType; 31 use crate::virtio::video::event; 32 use crate::virtio::video::event::EvtType; 33 use crate::virtio::video::event::VideoEvt; 34 use crate::virtio::video::response; 35 use crate::virtio::video::response::Response; 36 use crate::virtio::video::Error; 37 use crate::virtio::video::Result; 38 use crate::virtio::DescriptorChain; 39 use crate::virtio::Interrupt; 40 use crate::virtio::Queue; 41 42 /// Worker that takes care of running the virtio video device. 43 pub struct Worker { 44 /// VirtIO queue for Command queue 45 cmd_queue: Queue, 46 /// Device-to-driver notification for command queue 47 cmd_queue_interrupt: Interrupt, 48 /// VirtIO queue for Event queue 49 event_queue: Queue, 50 /// Device-to-driver notification for the event queue. 51 event_queue_interrupt: Interrupt, 52 /// Stores descriptor chains in which responses for asynchronous commands will be written 53 desc_map: AsyncCmdDescMap, 54 } 55 56 /// Pair of a descriptor chain and a response to be written. 57 type WritableResp = (DescriptorChain, response::CmdResponse); 58 59 impl Worker { new( cmd_queue: Queue, cmd_queue_interrupt: Interrupt, event_queue: Queue, event_queue_interrupt: Interrupt, ) -> Self60 pub fn new( 61 cmd_queue: Queue, 62 cmd_queue_interrupt: Interrupt, 63 event_queue: Queue, 64 event_queue_interrupt: Interrupt, 65 ) -> Self { 66 Self { 67 cmd_queue, 68 cmd_queue_interrupt, 69 event_queue, 70 event_queue_interrupt, 71 desc_map: Default::default(), 72 } 73 } 74 75 /// Writes responses into the command queue. write_responses(&mut self, responses: &mut VecDeque<WritableResp>) -> Result<()>76 fn write_responses(&mut self, responses: &mut VecDeque<WritableResp>) -> Result<()> { 77 if responses.is_empty() { 78 return Ok(()); 79 } 80 while let Some((mut desc, response)) = responses.pop_front() { 81 if let Err(e) = response.write(&mut desc.writer) { 82 error!( 83 "failed to write a command response for {:?}: {}", 84 response, e 85 ); 86 } 87 let len = desc.writer.bytes_written() as u32; 88 self.cmd_queue.add_used(desc, len); 89 } 90 self.cmd_queue.trigger_interrupt(&self.cmd_queue_interrupt); 91 Ok(()) 92 } 93 94 /// Writes a `VideoEvt` into the event queue. write_event(&mut self, event: event::VideoEvt) -> Result<()>95 fn write_event(&mut self, event: event::VideoEvt) -> Result<()> { 96 let mut desc = self 97 .event_queue 98 .pop() 99 .ok_or(Error::DescriptorNotAvailable)?; 100 101 event 102 .write(&mut desc.writer) 103 .map_err(|error| Error::WriteEventFailure { event, error })?; 104 let len = desc.writer.bytes_written() as u32; 105 self.event_queue.add_used(desc, len); 106 self.event_queue 107 .trigger_interrupt(&self.event_queue_interrupt); 108 Ok(()) 109 } 110 111 /// Writes the `event_responses` into the command queue or the event queue according to 112 /// each response's type. 113 /// 114 /// # Arguments 115 /// 116 /// * `event_responses` - Responses to write 117 /// * `stream_id` - Stream session ID of the responses write_event_responses( &mut self, event_responses: Vec<VideoEvtResponseType>, stream_id: u32, ) -> Result<()>118 fn write_event_responses( 119 &mut self, 120 event_responses: Vec<VideoEvtResponseType>, 121 stream_id: u32, 122 ) -> Result<()> { 123 let mut responses: VecDeque<WritableResp> = Default::default(); 124 for event_response in event_responses { 125 match event_response { 126 VideoEvtResponseType::AsyncCmd(async_response) => { 127 let AsyncCmdResponse { 128 tag, 129 response: cmd_result, 130 } = async_response; 131 match self.desc_map.remove(&tag) { 132 Some(desc) => { 133 let cmd_response = match cmd_result { 134 Ok(r) => r, 135 Err(e) => { 136 error!("returning async error response: {}", &e); 137 e.into() 138 } 139 }; 140 responses.push_back((desc, cmd_response)) 141 } 142 None => match tag { 143 // TODO(b/153406792): Drain is cancelled by clearing either of the 144 // stream's queues. To work around a limitation in the VDA api, the 145 // output queue is cleared synchronously without going through VDA. 146 // Because of this, the cancellation response from VDA for the 147 // input queue might fail to find the drain's AsyncCmdTag. 148 AsyncCmdTag::Drain { stream_id: _ } => { 149 info!("ignoring unknown drain response"); 150 } 151 _ => { 152 error!("dropping response for an untracked command: {:?}", tag); 153 } 154 }, 155 } 156 } 157 VideoEvtResponseType::Event(evt) => { 158 self.write_event(evt)?; 159 } 160 } 161 } 162 163 if let Err(e) = self.write_responses(&mut responses) { 164 error!("Failed to write event responses: {:?}", e); 165 // Ignore result of write_event for a fatal error. 166 let _ = self.write_event(VideoEvt { 167 typ: EvtType::Error, 168 stream_id, 169 }); 170 return Err(e); 171 } 172 173 Ok(()) 174 } 175 176 /// Handles a `DescriptorChain` value sent via the command queue and returns a `VecDeque` 177 /// of `WritableResp` to be sent to the guest. 178 /// 179 /// # Arguments 180 /// 181 /// * `device` - Instance of backend device 182 /// * `wait_ctx` - `device` may register a new `Token::Event` for a new stream session to 183 /// `wait_ctx` 184 /// * `desc` - `DescriptorChain` to handle handle_command_desc( &mut self, device: &mut dyn Device, wait_ctx: &WaitContext<Token>, mut desc: DescriptorChain, ) -> Result<VecDeque<WritableResp>>185 fn handle_command_desc( 186 &mut self, 187 device: &mut dyn Device, 188 wait_ctx: &WaitContext<Token>, 189 mut desc: DescriptorChain, 190 ) -> Result<VecDeque<WritableResp>> { 191 let mut responses: VecDeque<WritableResp> = Default::default(); 192 let cmd = VideoCmd::from_reader(&mut desc.reader).map_err(Error::ReadFailure)?; 193 194 // If a destruction command comes, cancel pending requests. 195 // TODO(b/161774071): Allow `process_cmd` to return multiple responses and move this 196 // into encoder/decoder. 197 let async_responses = match cmd { 198 VideoCmd::ResourceDestroyAll { 199 stream_id, 200 queue_type, 201 } => self 202 .desc_map 203 .create_cancellation_responses(&stream_id, Some(queue_type), None), 204 VideoCmd::StreamDestroy { stream_id } => self 205 .desc_map 206 .create_cancellation_responses(&stream_id, None, None), 207 VideoCmd::QueueClear { 208 stream_id, 209 queue_type: QueueType::Output, 210 } => { 211 // TODO(b/153406792): Due to a workaround for a limitation in the VDA api, 212 // clearing the output queue doesn't go through the same Async path as clearing 213 // the input queue. However, we still need to cancel the pending resources. 214 self.desc_map.create_cancellation_responses( 215 &stream_id, 216 Some(QueueType::Output), 217 None, 218 ) 219 } 220 _ => Default::default(), 221 }; 222 for async_response in async_responses { 223 let AsyncCmdResponse { 224 tag, 225 response: cmd_result, 226 } = async_response; 227 let destroy_response = match cmd_result { 228 Ok(r) => r, 229 Err(e) => { 230 error!("returning async error response: {}", &e); 231 e.into() 232 } 233 }; 234 match self.desc_map.remove(&tag) { 235 Some(destroy_desc) => { 236 responses.push_back((destroy_desc, destroy_response)); 237 } 238 None => error!("dropping response for an untracked command: {:?}", tag), 239 } 240 } 241 242 // Process the command by the device. 243 let (cmd_response, event_responses_with_id) = device.process_cmd(cmd, wait_ctx); 244 match cmd_response { 245 VideoCmdResponseType::Sync(r) => { 246 responses.push_back((desc, r)); 247 } 248 VideoCmdResponseType::Async(tag) => { 249 // If the command expects an asynchronous response, 250 // store `desc` to use it after the back-end device notifies the 251 // completion. 252 self.desc_map.insert(tag, desc); 253 } 254 } 255 if let Some((stream_id, event_responses)) = event_responses_with_id { 256 self.write_event_responses(event_responses, stream_id)?; 257 } 258 259 Ok(responses) 260 } 261 262 /// Handles each command in the command queue. 263 /// 264 /// # Arguments 265 /// 266 /// * `device` - Instance of backend device 267 /// * `wait_ctx` - `device` may register a new `Token::Event` for a new stream session to 268 /// `wait_ctx` handle_command_queue( &mut self, device: &mut dyn Device, wait_ctx: &WaitContext<Token>, ) -> Result<()>269 fn handle_command_queue( 270 &mut self, 271 device: &mut dyn Device, 272 wait_ctx: &WaitContext<Token>, 273 ) -> Result<()> { 274 while let Some(desc) = self.cmd_queue.pop() { 275 let mut resps = self.handle_command_desc(device, wait_ctx, desc)?; 276 self.write_responses(&mut resps)?; 277 } 278 Ok(()) 279 } 280 281 /// Handles an event notified via an event. 282 /// 283 /// # Arguments 284 /// 285 /// * `device` - Instance of backend device 286 /// * `stream_id` - Stream session ID of the event 287 /// * `wait_ctx` - `device` may register a new `Token::Buffer` for a new stream session to 288 /// `wait_ctx` handle_event( &mut self, device: &mut dyn Device, stream_id: u32, wait_ctx: &WaitContext<Token>, ) -> Result<()>289 fn handle_event( 290 &mut self, 291 device: &mut dyn Device, 292 stream_id: u32, 293 wait_ctx: &WaitContext<Token>, 294 ) -> Result<()> { 295 if let Some(event_responses) = device.process_event(&mut self.desc_map, stream_id, wait_ctx) 296 { 297 self.write_event_responses(event_responses, stream_id)?; 298 } 299 Ok(()) 300 } 301 302 /// Handles a completed buffer barrier. 303 /// 304 /// # Arguments 305 /// 306 /// * `device` - Instance of backend device 307 /// * `stream_id` - Stream session ID of the event 308 /// * `wait_ctx` - `device` may deregister the completed `Token::BufferBarrier` from 309 /// `wait_ctx`. handle_buffer_barrier( &mut self, device: &mut dyn Device, stream_id: u32, wait_ctx: &WaitContext<Token>, ) -> Result<()>310 fn handle_buffer_barrier( 311 &mut self, 312 device: &mut dyn Device, 313 stream_id: u32, 314 wait_ctx: &WaitContext<Token>, 315 ) -> Result<()> { 316 if let Some(event_responses) = device.process_buffer_barrier(stream_id, wait_ctx) { 317 self.write_event_responses(event_responses, stream_id)?; 318 } 319 Ok(()) 320 } 321 322 /// Runs the video device virtio queues in a blocking way. 323 /// 324 /// # Arguments 325 /// 326 /// * `device` - Instance of backend device 327 /// * `kill_evt` - `Event` notified to make `run` stop and return run(&mut self, mut device: Box<dyn Device>, kill_evt: &Event) -> Result<()>328 pub fn run(&mut self, mut device: Box<dyn Device>, kill_evt: &Event) -> Result<()> { 329 let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[ 330 (self.cmd_queue.event(), Token::CmdQueue), 331 (self.event_queue.event(), Token::EventQueue), 332 (kill_evt, Token::Kill), 333 ]) 334 .and_then(|wc| { 335 // resampling event exists per-PCI-INTx basis, so the two queues have the same event. 336 // Thus, checking only cmd_queue_interrupt suffices. 337 if let Some(resample_evt) = self.cmd_queue_interrupt.get_resample_evt() { 338 wc.add(resample_evt, Token::InterruptResample)?; 339 } 340 Ok(wc) 341 }) 342 .map_err(Error::WaitContextCreationFailed)?; 343 344 loop { 345 let wait_events = wait_ctx.wait().map_err(Error::WaitError)?; 346 347 for wait_event in wait_events.iter().filter(|e| e.is_readable) { 348 match wait_event.token { 349 Token::CmdQueue => { 350 let _ = self.cmd_queue.event().wait(); 351 self.handle_command_queue(device.as_mut(), &wait_ctx)?; 352 } 353 Token::EventQueue => { 354 let _ = self.event_queue.event().wait(); 355 } 356 Token::Event { id } => { 357 self.handle_event(device.as_mut(), id, &wait_ctx)?; 358 } 359 Token::BufferBarrier { id } => { 360 self.handle_buffer_barrier(device.as_mut(), id, &wait_ctx)?; 361 } 362 Token::InterruptResample => { 363 // Clear the event. `expect` is ok since the token fires if and only if 364 // resample exists. resampling event exists per-PCI-INTx basis, so the 365 // two queues have the same event. 366 let _ = self 367 .cmd_queue_interrupt 368 .get_resample_evt() 369 .expect("resample event for the command queue doesn't exist") 370 .wait(); 371 self.cmd_queue_interrupt.do_interrupt_resample(); 372 } 373 Token::Kill => return Ok(()), 374 } 375 } 376 } 377 } 378 379 /// Runs the video device virtio queues asynchronously. 380 /// 381 /// # Arguments 382 /// 383 /// * `device` - Instance of backend device 384 /// * `ex` - Instance of `Executor` of asynchronous operations 385 /// * `cmd_evt` - Driver-to-device kick event for the command queue 386 /// * `event_evt` - Driver-to-device kick event for the event queue 387 #[allow(dead_code)] run_async( mut self, mut device: Box<dyn Device>, ex: Executor, cmd_evt: Event, event_evt: Event, ) -> Result<()>388 pub async fn run_async( 389 mut self, 390 mut device: Box<dyn Device>, 391 ex: Executor, 392 cmd_evt: Event, 393 event_evt: Event, 394 ) -> Result<()> { 395 let cmd_queue_evt = 396 EventAsync::new(cmd_evt, &ex).map_err(Error::EventAsyncCreationFailed)?; 397 let event_queue_evt = 398 EventAsync::new(event_evt, &ex).map_err(Error::EventAsyncCreationFailed)?; 399 400 // WaitContext to wait for the response from the encoder/decoder device. 401 let device_wait_ctx = WaitContext::new().map_err(Error::WaitContextCreationFailed)?; 402 let device_evt = ex 403 .async_from(AsyncWrapper::new( 404 clone_descriptor(&device_wait_ctx).map_err(Error::CloneDescriptorFailed)?, 405 )) 406 .map_err(Error::EventAsyncCreationFailed)?; 407 408 loop { 409 let ( 410 cmd_queue_evt, 411 device_evt, 412 // Ignore driver-to-device kicks since the event queue is write-only for a device. 413 _event_queue_evt, 414 ) = select3( 415 cmd_queue_evt.next_val().boxed_local(), 416 device_evt.wait_readable().boxed_local(), 417 event_queue_evt.next_val().boxed_local(), 418 ) 419 .await; 420 421 if let SelectResult::Finished(_) = cmd_queue_evt { 422 self.handle_command_queue(device.as_mut(), &device_wait_ctx)?; 423 } 424 425 if let SelectResult::Finished(_) = device_evt { 426 let device_events = match device_wait_ctx.wait_timeout(Duration::from_secs(0)) { 427 Ok(device_events) => device_events, 428 Err(_) => { 429 error!("failed to read a device event"); 430 continue; 431 } 432 }; 433 for device_event in device_events { 434 // A Device must trigger only Token::Event. See [`Device::process_cmd()`]. 435 if let Token::Event { id } = device_event.token { 436 self.handle_event(device.as_mut(), id, &device_wait_ctx)?; 437 } else { 438 error!( 439 "invalid event is triggered by a device {:?}", 440 device_event.token 441 ); 442 } 443 } 444 } 445 } 446 } 447 } 448