1 // Copyright 2020 The Chromium OS Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 //! Worker that runs in a virtio-video thread. 6 7 use std::collections::VecDeque; 8 9 use base::{error, info, Event, WaitContext}; 10 use vm_memory::GuestMemory; 11 12 use crate::virtio::queue::{DescriptorChain, Queue}; 13 use crate::virtio::video::async_cmd_desc_map::AsyncCmdDescMap; 14 use crate::virtio::video::command::{QueueType, VideoCmd}; 15 use crate::virtio::video::device::{ 16 AsyncCmdResponse, AsyncCmdTag, Device, Token, VideoCmdResponseType, VideoEvtResponseType, 17 }; 18 use crate::virtio::video::event::{self, EvtType, VideoEvt}; 19 use crate::virtio::video::response::{self, Response}; 20 use crate::virtio::video::{Error, Result}; 21 use crate::virtio::{Interrupt, Reader, SignalableInterrupt, Writer}; 22 23 pub struct Worker { 24 interrupt: Interrupt, 25 mem: GuestMemory, 26 cmd_queue: Queue, 27 cmd_evt: Event, 28 event_queue: Queue, 29 event_evt: Event, 30 kill_evt: Event, 31 // Stores descriptors in which responses for asynchronous commands will be written. 32 desc_map: AsyncCmdDescMap, 33 } 34 35 /// Pair of a descriptor chain and a response to be written. 36 type WritableResp = (DescriptorChain, response::CmdResponse); 37 38 impl Worker { new( interrupt: Interrupt, mem: GuestMemory, cmd_queue: Queue, cmd_evt: Event, event_queue: Queue, event_evt: Event, kill_evt: Event, ) -> Self39 pub fn new( 40 interrupt: Interrupt, 41 mem: GuestMemory, 42 cmd_queue: Queue, 43 cmd_evt: Event, 44 event_queue: Queue, 45 event_evt: Event, 46 kill_evt: Event, 47 ) -> Self { 48 Self { 49 interrupt, 50 mem, 51 cmd_queue, 52 cmd_evt, 53 event_queue, 54 event_evt, 55 kill_evt, 56 desc_map: Default::default(), 57 } 58 } 59 60 /// Writes responses into the command queue. write_responses(&mut self, responses: &mut VecDeque<WritableResp>) -> Result<()>61 fn write_responses(&mut self, responses: &mut VecDeque<WritableResp>) -> Result<()> { 62 if responses.is_empty() { 63 return Ok(()); 64 } 65 while let Some((desc, response)) = responses.pop_front() { 66 let desc_index = desc.index; 67 let mut writer = 68 Writer::new(self.mem.clone(), desc).map_err(Error::InvalidDescriptorChain)?; 69 if let Err(e) = response.write(&mut writer) { 70 error!( 71 "failed to write a command response for {:?}: {}", 72 response, e 73 ); 74 } 75 self.cmd_queue 76 .add_used(&self.mem, desc_index, writer.bytes_written() as u32); 77 } 78 self.cmd_queue.trigger_interrupt(&self.mem, &self.interrupt); 79 Ok(()) 80 } 81 82 /// Writes a `VideoEvt` into the event queue. write_event(&mut self, event: event::VideoEvt) -> Result<()>83 fn write_event(&mut self, event: event::VideoEvt) -> Result<()> { 84 let desc = self 85 .event_queue 86 .pop(&self.mem) 87 .ok_or(Error::DescriptorNotAvailable)?; 88 89 let desc_index = desc.index; 90 let mut writer = 91 Writer::new(self.mem.clone(), desc).map_err(Error::InvalidDescriptorChain)?; 92 event 93 .write(&mut writer) 94 .map_err(|error| Error::WriteEventFailure { event, error })?; 95 self.event_queue 96 .add_used(&self.mem, desc_index, writer.bytes_written() as u32); 97 self.event_queue 98 .trigger_interrupt(&self.mem, &self.interrupt); 99 Ok(()) 100 } 101 write_event_responses( &mut self, event_responses: Vec<VideoEvtResponseType>, stream_id: u32, ) -> Result<()>102 fn write_event_responses( 103 &mut self, 104 event_responses: Vec<VideoEvtResponseType>, 105 stream_id: u32, 106 ) -> Result<()> { 107 let mut responses: VecDeque<WritableResp> = Default::default(); 108 for event_response in event_responses { 109 match event_response { 110 VideoEvtResponseType::AsyncCmd(async_response) => { 111 let AsyncCmdResponse { 112 tag, 113 response: cmd_result, 114 } = async_response; 115 match self.desc_map.remove(&tag) { 116 Some(desc) => { 117 let cmd_response = match cmd_result { 118 Ok(r) => r, 119 Err(e) => { 120 error!("returning async error response: {}", &e); 121 e.into() 122 } 123 }; 124 responses.push_back((desc, cmd_response)) 125 } 126 None => match tag { 127 // TODO(b/153406792): Drain is cancelled by clearing either of the 128 // stream's queues. To work around a limitation in the VDA api, the 129 // output queue is cleared synchronously without going through VDA. 130 // Because of this, the cancellation response from VDA for the 131 // input queue might fail to find the drain's AsyncCmdTag. 132 AsyncCmdTag::Drain { stream_id: _ } => { 133 info!("ignoring unknown drain response"); 134 } 135 _ => { 136 error!("dropping response for an untracked command: {:?}", tag); 137 } 138 }, 139 } 140 } 141 VideoEvtResponseType::Event(evt) => { 142 self.write_event(evt)?; 143 } 144 } 145 } 146 147 if let Err(e) = self.write_responses(&mut responses) { 148 error!("Failed to write event responses: {:?}", e); 149 // Ignore result of write_event for a fatal error. 150 let _ = self.write_event(VideoEvt { 151 typ: EvtType::Error, 152 stream_id, 153 }); 154 return Err(e); 155 } 156 157 Ok(()) 158 } 159 160 /// Handles a `DescriptorChain` value sent via the command queue and returns a `VecDeque` 161 /// of `WritableResp` to be sent to the guest. handle_command_desc( &mut self, device: &mut dyn Device, wait_ctx: &WaitContext<Token>, desc: DescriptorChain, ) -> Result<VecDeque<WritableResp>>162 fn handle_command_desc( 163 &mut self, 164 device: &mut dyn Device, 165 wait_ctx: &WaitContext<Token>, 166 desc: DescriptorChain, 167 ) -> Result<VecDeque<WritableResp>> { 168 let mut responses: VecDeque<WritableResp> = Default::default(); 169 let mut reader = 170 Reader::new(self.mem.clone(), desc.clone()).map_err(Error::InvalidDescriptorChain)?; 171 172 let cmd = VideoCmd::from_reader(&mut reader).map_err(Error::ReadFailure)?; 173 174 // If a destruction command comes, cancel pending requests. 175 // TODO(b/161774071): Allow `process_cmd` to return multiple responses and move this 176 // into encoder/decoder. 177 let async_responses = match cmd { 178 VideoCmd::ResourceDestroyAll { 179 stream_id, 180 queue_type, 181 } => self 182 .desc_map 183 .create_cancellation_responses(&stream_id, Some(queue_type), None), 184 VideoCmd::StreamDestroy { stream_id } => self 185 .desc_map 186 .create_cancellation_responses(&stream_id, None, None), 187 VideoCmd::QueueClear { 188 stream_id, 189 queue_type: QueueType::Output, 190 } => { 191 // TODO(b/153406792): Due to a workaround for a limitation in the VDA api, 192 // clearing the output queue doesn't go through the same Async path as clearing 193 // the input queue. However, we still need to cancel the pending resources. 194 self.desc_map.create_cancellation_responses( 195 &stream_id, 196 Some(QueueType::Output), 197 None, 198 ) 199 } 200 _ => Default::default(), 201 }; 202 for async_response in async_responses { 203 let AsyncCmdResponse { 204 tag, 205 response: cmd_result, 206 } = async_response; 207 let destroy_response = match cmd_result { 208 Ok(r) => r, 209 Err(e) => { 210 error!("returning async error response: {}", &e); 211 e.into() 212 } 213 }; 214 match self.desc_map.remove(&tag) { 215 Some(destroy_desc) => { 216 responses.push_back((destroy_desc, destroy_response)); 217 } 218 None => error!("dropping response for an untracked command: {:?}", tag), 219 } 220 } 221 222 // Process the command by the device. 223 let (cmd_response, event_responses_with_id) = device.process_cmd(cmd, wait_ctx); 224 match cmd_response { 225 VideoCmdResponseType::Sync(r) => { 226 responses.push_back((desc, r)); 227 } 228 VideoCmdResponseType::Async(tag) => { 229 // If the command expects an asynchronous response, 230 // store `desc` to use it after the back-end device notifies the 231 // completion. 232 self.desc_map.insert(tag, desc); 233 } 234 } 235 if let Some((stream_id, event_responses)) = event_responses_with_id { 236 self.write_event_responses(event_responses, stream_id)?; 237 } 238 239 Ok(responses) 240 } 241 242 /// Handles each command in the command queue. handle_command_queue( &mut self, device: &mut dyn Device, wait_ctx: &WaitContext<Token>, ) -> Result<()>243 fn handle_command_queue( 244 &mut self, 245 device: &mut dyn Device, 246 wait_ctx: &WaitContext<Token>, 247 ) -> Result<()> { 248 let _ = self.cmd_evt.read(); 249 while let Some(desc) = self.cmd_queue.pop(&self.mem) { 250 let mut resps = self.handle_command_desc(device, wait_ctx, desc)?; 251 self.write_responses(&mut resps)?; 252 } 253 Ok(()) 254 } 255 256 /// Handles an event notified via an event. handle_event(&mut self, device: &mut dyn Device, stream_id: u32) -> Result<()>257 fn handle_event(&mut self, device: &mut dyn Device, stream_id: u32) -> Result<()> { 258 if let Some(event_responses) = device.process_event(&mut self.desc_map, stream_id) { 259 self.write_event_responses(event_responses, stream_id)?; 260 } 261 Ok(()) 262 } 263 run(&mut self, mut device: Box<dyn Device>) -> Result<()>264 pub fn run(&mut self, mut device: Box<dyn Device>) -> Result<()> { 265 let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[ 266 (&self.cmd_evt, Token::CmdQueue), 267 (&self.event_evt, Token::EventQueue), 268 (&self.kill_evt, Token::Kill), 269 ]) 270 .and_then(|wc| { 271 if let Some(resample_evt) = self.interrupt.get_resample_evt() { 272 wc.add(resample_evt, Token::InterruptResample)?; 273 } 274 Ok(wc) 275 }) 276 .map_err(Error::WaitContextCreationFailed)?; 277 278 loop { 279 let wait_events = wait_ctx.wait().map_err(Error::WaitError)?; 280 281 for wait_event in wait_events.iter().filter(|e| e.is_readable) { 282 match wait_event.token { 283 Token::CmdQueue => { 284 self.handle_command_queue(device.as_mut(), &wait_ctx)?; 285 } 286 Token::EventQueue => { 287 let _ = self.event_evt.read(); 288 } 289 Token::Event { id } => { 290 self.handle_event(device.as_mut(), id)?; 291 } 292 Token::InterruptResample => { 293 self.interrupt.interrupt_resample(); 294 } 295 Token::Kill => return Ok(()), 296 } 297 } 298 } 299 } 300 } 301