• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2020 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 //! Worker that runs in a virtio-video thread.
6 
7 use std::collections::VecDeque;
8 
9 use base::{error, info, Event, WaitContext};
10 use vm_memory::GuestMemory;
11 
12 use crate::virtio::queue::{DescriptorChain, Queue};
13 use crate::virtio::video::async_cmd_desc_map::AsyncCmdDescMap;
14 use crate::virtio::video::command::{QueueType, VideoCmd};
15 use crate::virtio::video::device::{
16     AsyncCmdResponse, AsyncCmdTag, Device, Token, VideoCmdResponseType, VideoEvtResponseType,
17 };
18 use crate::virtio::video::event::{self, EvtType, VideoEvt};
19 use crate::virtio::video::response::{self, Response};
20 use crate::virtio::video::{Error, Result};
21 use crate::virtio::{Interrupt, Reader, SignalableInterrupt, Writer};
22 
23 pub struct Worker {
24     interrupt: Interrupt,
25     mem: GuestMemory,
26     cmd_queue: Queue,
27     cmd_evt: Event,
28     event_queue: Queue,
29     event_evt: Event,
30     kill_evt: Event,
31     // Stores descriptors in which responses for asynchronous commands will be written.
32     desc_map: AsyncCmdDescMap,
33 }
34 
35 /// Pair of a descriptor chain and a response to be written.
36 type WritableResp = (DescriptorChain, response::CmdResponse);
37 
38 impl Worker {
new( interrupt: Interrupt, mem: GuestMemory, cmd_queue: Queue, cmd_evt: Event, event_queue: Queue, event_evt: Event, kill_evt: Event, ) -> Self39     pub fn new(
40         interrupt: Interrupt,
41         mem: GuestMemory,
42         cmd_queue: Queue,
43         cmd_evt: Event,
44         event_queue: Queue,
45         event_evt: Event,
46         kill_evt: Event,
47     ) -> Self {
48         Self {
49             interrupt,
50             mem,
51             cmd_queue,
52             cmd_evt,
53             event_queue,
54             event_evt,
55             kill_evt,
56             desc_map: Default::default(),
57         }
58     }
59 
60     /// Writes responses into the command queue.
write_responses(&mut self, responses: &mut VecDeque<WritableResp>) -> Result<()>61     fn write_responses(&mut self, responses: &mut VecDeque<WritableResp>) -> Result<()> {
62         if responses.is_empty() {
63             return Ok(());
64         }
65         while let Some((desc, response)) = responses.pop_front() {
66             let desc_index = desc.index;
67             let mut writer =
68                 Writer::new(self.mem.clone(), desc).map_err(Error::InvalidDescriptorChain)?;
69             if let Err(e) = response.write(&mut writer) {
70                 error!(
71                     "failed to write a command response for {:?}: {}",
72                     response, e
73                 );
74             }
75             self.cmd_queue
76                 .add_used(&self.mem, desc_index, writer.bytes_written() as u32);
77         }
78         self.cmd_queue.trigger_interrupt(&self.mem, &self.interrupt);
79         Ok(())
80     }
81 
82     /// Writes a `VideoEvt` into the event queue.
write_event(&mut self, event: event::VideoEvt) -> Result<()>83     fn write_event(&mut self, event: event::VideoEvt) -> Result<()> {
84         let desc = self
85             .event_queue
86             .pop(&self.mem)
87             .ok_or(Error::DescriptorNotAvailable)?;
88 
89         let desc_index = desc.index;
90         let mut writer =
91             Writer::new(self.mem.clone(), desc).map_err(Error::InvalidDescriptorChain)?;
92         event
93             .write(&mut writer)
94             .map_err(|error| Error::WriteEventFailure { event, error })?;
95         self.event_queue
96             .add_used(&self.mem, desc_index, writer.bytes_written() as u32);
97         self.event_queue
98             .trigger_interrupt(&self.mem, &self.interrupt);
99         Ok(())
100     }
101 
write_event_responses( &mut self, event_responses: Vec<VideoEvtResponseType>, stream_id: u32, ) -> Result<()>102     fn write_event_responses(
103         &mut self,
104         event_responses: Vec<VideoEvtResponseType>,
105         stream_id: u32,
106     ) -> Result<()> {
107         let mut responses: VecDeque<WritableResp> = Default::default();
108         for event_response in event_responses {
109             match event_response {
110                 VideoEvtResponseType::AsyncCmd(async_response) => {
111                     let AsyncCmdResponse {
112                         tag,
113                         response: cmd_result,
114                     } = async_response;
115                     match self.desc_map.remove(&tag) {
116                         Some(desc) => {
117                             let cmd_response = match cmd_result {
118                                 Ok(r) => r,
119                                 Err(e) => {
120                                     error!("returning async error response: {}", &e);
121                                     e.into()
122                                 }
123                             };
124                             responses.push_back((desc, cmd_response))
125                         }
126                         None => match tag {
127                             // TODO(b/153406792): Drain is cancelled by clearing either of the
128                             // stream's queues. To work around a limitation in the VDA api, the
129                             // output queue is cleared synchronously without going through VDA.
130                             // Because of this, the cancellation response from VDA for the
131                             // input queue might fail to find the drain's AsyncCmdTag.
132                             AsyncCmdTag::Drain { stream_id: _ } => {
133                                 info!("ignoring unknown drain response");
134                             }
135                             _ => {
136                                 error!("dropping response for an untracked command: {:?}", tag);
137                             }
138                         },
139                     }
140                 }
141                 VideoEvtResponseType::Event(evt) => {
142                     self.write_event(evt)?;
143                 }
144             }
145         }
146 
147         if let Err(e) = self.write_responses(&mut responses) {
148             error!("Failed to write event responses: {:?}", e);
149             // Ignore result of write_event for a fatal error.
150             let _ = self.write_event(VideoEvt {
151                 typ: EvtType::Error,
152                 stream_id,
153             });
154             return Err(e);
155         }
156 
157         Ok(())
158     }
159 
160     /// Handles a `DescriptorChain` value sent via the command queue and returns a `VecDeque`
161     /// of `WritableResp` to be sent to the guest.
handle_command_desc( &mut self, device: &mut dyn Device, wait_ctx: &WaitContext<Token>, desc: DescriptorChain, ) -> Result<VecDeque<WritableResp>>162     fn handle_command_desc(
163         &mut self,
164         device: &mut dyn Device,
165         wait_ctx: &WaitContext<Token>,
166         desc: DescriptorChain,
167     ) -> Result<VecDeque<WritableResp>> {
168         let mut responses: VecDeque<WritableResp> = Default::default();
169         let mut reader =
170             Reader::new(self.mem.clone(), desc.clone()).map_err(Error::InvalidDescriptorChain)?;
171 
172         let cmd = VideoCmd::from_reader(&mut reader).map_err(Error::ReadFailure)?;
173 
174         // If a destruction command comes, cancel pending requests.
175         // TODO(b/161774071): Allow `process_cmd` to return multiple responses and move this
176         // into encoder/decoder.
177         let async_responses = match cmd {
178             VideoCmd::ResourceDestroyAll {
179                 stream_id,
180                 queue_type,
181             } => self
182                 .desc_map
183                 .create_cancellation_responses(&stream_id, Some(queue_type), None),
184             VideoCmd::StreamDestroy { stream_id } => self
185                 .desc_map
186                 .create_cancellation_responses(&stream_id, None, None),
187             VideoCmd::QueueClear {
188                 stream_id,
189                 queue_type: QueueType::Output,
190             } => {
191                 // TODO(b/153406792): Due to a workaround for a limitation in the VDA api,
192                 // clearing the output queue doesn't go through the same Async path as clearing
193                 // the input queue. However, we still need to cancel the pending resources.
194                 self.desc_map.create_cancellation_responses(
195                     &stream_id,
196                     Some(QueueType::Output),
197                     None,
198                 )
199             }
200             _ => Default::default(),
201         };
202         for async_response in async_responses {
203             let AsyncCmdResponse {
204                 tag,
205                 response: cmd_result,
206             } = async_response;
207             let destroy_response = match cmd_result {
208                 Ok(r) => r,
209                 Err(e) => {
210                     error!("returning async error response: {}", &e);
211                     e.into()
212                 }
213             };
214             match self.desc_map.remove(&tag) {
215                 Some(destroy_desc) => {
216                     responses.push_back((destroy_desc, destroy_response));
217                 }
218                 None => error!("dropping response for an untracked command: {:?}", tag),
219             }
220         }
221 
222         // Process the command by the device.
223         let (cmd_response, event_responses_with_id) = device.process_cmd(cmd, wait_ctx);
224         match cmd_response {
225             VideoCmdResponseType::Sync(r) => {
226                 responses.push_back((desc, r));
227             }
228             VideoCmdResponseType::Async(tag) => {
229                 // If the command expects an asynchronous response,
230                 // store `desc` to use it after the back-end device notifies the
231                 // completion.
232                 self.desc_map.insert(tag, desc);
233             }
234         }
235         if let Some((stream_id, event_responses)) = event_responses_with_id {
236             self.write_event_responses(event_responses, stream_id)?;
237         }
238 
239         Ok(responses)
240     }
241 
242     /// Handles each command in the command queue.
handle_command_queue( &mut self, device: &mut dyn Device, wait_ctx: &WaitContext<Token>, ) -> Result<()>243     fn handle_command_queue(
244         &mut self,
245         device: &mut dyn Device,
246         wait_ctx: &WaitContext<Token>,
247     ) -> Result<()> {
248         let _ = self.cmd_evt.read();
249         while let Some(desc) = self.cmd_queue.pop(&self.mem) {
250             let mut resps = self.handle_command_desc(device, wait_ctx, desc)?;
251             self.write_responses(&mut resps)?;
252         }
253         Ok(())
254     }
255 
256     /// Handles an event notified via an event.
handle_event(&mut self, device: &mut dyn Device, stream_id: u32) -> Result<()>257     fn handle_event(&mut self, device: &mut dyn Device, stream_id: u32) -> Result<()> {
258         if let Some(event_responses) = device.process_event(&mut self.desc_map, stream_id) {
259             self.write_event_responses(event_responses, stream_id)?;
260         }
261         Ok(())
262     }
263 
run(&mut self, mut device: Box<dyn Device>) -> Result<()>264     pub fn run(&mut self, mut device: Box<dyn Device>) -> Result<()> {
265         let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
266             (&self.cmd_evt, Token::CmdQueue),
267             (&self.event_evt, Token::EventQueue),
268             (&self.kill_evt, Token::Kill),
269         ])
270         .and_then(|wc| {
271             if let Some(resample_evt) = self.interrupt.get_resample_evt() {
272                 wc.add(resample_evt, Token::InterruptResample)?;
273             }
274             Ok(wc)
275         })
276         .map_err(Error::WaitContextCreationFailed)?;
277 
278         loop {
279             let wait_events = wait_ctx.wait().map_err(Error::WaitError)?;
280 
281             for wait_event in wait_events.iter().filter(|e| e.is_readable) {
282                 match wait_event.token {
283                     Token::CmdQueue => {
284                         self.handle_command_queue(device.as_mut(), &wait_ctx)?;
285                     }
286                     Token::EventQueue => {
287                         let _ = self.event_evt.read();
288                     }
289                     Token::Event { id } => {
290                         self.handle_event(device.as_mut(), id)?;
291                     }
292                     Token::InterruptResample => {
293                         self.interrupt.interrupt_resample();
294                     }
295                     Token::Kill => return Ok(()),
296                 }
297             }
298         }
299     }
300 }
301