• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2020 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 //! Worker that runs in a virtio-video thread.
6 
7 use std::collections::VecDeque;
8 use std::time::Duration;
9 
10 use base::clone_descriptor;
11 use base::error;
12 use base::info;
13 use base::Event;
14 use base::WaitContext;
15 use cros_async::select3;
16 use cros_async::AsyncWrapper;
17 use cros_async::EventAsync;
18 use cros_async::Executor;
19 use cros_async::SelectResult;
20 use futures::FutureExt;
21 
22 use crate::virtio::video::async_cmd_desc_map::AsyncCmdDescMap;
23 use crate::virtio::video::command::QueueType;
24 use crate::virtio::video::command::VideoCmd;
25 use crate::virtio::video::device::AsyncCmdResponse;
26 use crate::virtio::video::device::AsyncCmdTag;
27 use crate::virtio::video::device::Device;
28 use crate::virtio::video::device::Token;
29 use crate::virtio::video::device::VideoCmdResponseType;
30 use crate::virtio::video::device::VideoEvtResponseType;
31 use crate::virtio::video::event;
32 use crate::virtio::video::event::EvtType;
33 use crate::virtio::video::event::VideoEvt;
34 use crate::virtio::video::response;
35 use crate::virtio::video::response::Response;
36 use crate::virtio::video::Error;
37 use crate::virtio::video::Result;
38 use crate::virtio::DescriptorChain;
39 use crate::virtio::Interrupt;
40 use crate::virtio::Queue;
41 
42 /// Worker that takes care of running the virtio video device.
43 pub struct Worker {
44     /// VirtIO queue for Command queue
45     cmd_queue: Queue,
46     /// Device-to-driver notification for command queue
47     cmd_queue_interrupt: Interrupt,
48     /// VirtIO queue for Event queue
49     event_queue: Queue,
50     /// Device-to-driver notification for the event queue.
51     event_queue_interrupt: Interrupt,
52     /// Stores descriptor chains in which responses for asynchronous commands will be written
53     desc_map: AsyncCmdDescMap,
54 }
55 
56 /// Pair of a descriptor chain and a response to be written.
57 type WritableResp = (DescriptorChain, response::CmdResponse);
58 
59 impl Worker {
new( cmd_queue: Queue, cmd_queue_interrupt: Interrupt, event_queue: Queue, event_queue_interrupt: Interrupt, ) -> Self60     pub fn new(
61         cmd_queue: Queue,
62         cmd_queue_interrupt: Interrupt,
63         event_queue: Queue,
64         event_queue_interrupt: Interrupt,
65     ) -> Self {
66         Self {
67             cmd_queue,
68             cmd_queue_interrupt,
69             event_queue,
70             event_queue_interrupt,
71             desc_map: Default::default(),
72         }
73     }
74 
75     /// Writes responses into the command queue.
write_responses(&mut self, responses: &mut VecDeque<WritableResp>) -> Result<()>76     fn write_responses(&mut self, responses: &mut VecDeque<WritableResp>) -> Result<()> {
77         if responses.is_empty() {
78             return Ok(());
79         }
80         while let Some((mut desc, response)) = responses.pop_front() {
81             if let Err(e) = response.write(&mut desc.writer) {
82                 error!(
83                     "failed to write a command response for {:?}: {}",
84                     response, e
85                 );
86             }
87             let len = desc.writer.bytes_written() as u32;
88             self.cmd_queue.add_used(desc, len);
89         }
90         self.cmd_queue.trigger_interrupt(&self.cmd_queue_interrupt);
91         Ok(())
92     }
93 
94     /// Writes a `VideoEvt` into the event queue.
write_event(&mut self, event: event::VideoEvt) -> Result<()>95     fn write_event(&mut self, event: event::VideoEvt) -> Result<()> {
96         let mut desc = self
97             .event_queue
98             .pop()
99             .ok_or(Error::DescriptorNotAvailable)?;
100 
101         event
102             .write(&mut desc.writer)
103             .map_err(|error| Error::WriteEventFailure { event, error })?;
104         let len = desc.writer.bytes_written() as u32;
105         self.event_queue.add_used(desc, len);
106         self.event_queue
107             .trigger_interrupt(&self.event_queue_interrupt);
108         Ok(())
109     }
110 
111     /// Writes the `event_responses` into the command queue or the event queue according to
112     /// each response's type.
113     ///
114     /// # Arguments
115     ///
116     /// * `event_responses` - Responses to write
117     /// * `stream_id` - Stream session ID of the responses
write_event_responses( &mut self, event_responses: Vec<VideoEvtResponseType>, stream_id: u32, ) -> Result<()>118     fn write_event_responses(
119         &mut self,
120         event_responses: Vec<VideoEvtResponseType>,
121         stream_id: u32,
122     ) -> Result<()> {
123         let mut responses: VecDeque<WritableResp> = Default::default();
124         for event_response in event_responses {
125             match event_response {
126                 VideoEvtResponseType::AsyncCmd(async_response) => {
127                     let AsyncCmdResponse {
128                         tag,
129                         response: cmd_result,
130                     } = async_response;
131                     match self.desc_map.remove(&tag) {
132                         Some(desc) => {
133                             let cmd_response = match cmd_result {
134                                 Ok(r) => r,
135                                 Err(e) => {
136                                     error!("returning async error response: {}", &e);
137                                     e.into()
138                                 }
139                             };
140                             responses.push_back((desc, cmd_response))
141                         }
142                         None => match tag {
143                             // TODO(b/153406792): Drain is cancelled by clearing either of the
144                             // stream's queues. To work around a limitation in the VDA api, the
145                             // output queue is cleared synchronously without going through VDA.
146                             // Because of this, the cancellation response from VDA for the
147                             // input queue might fail to find the drain's AsyncCmdTag.
148                             AsyncCmdTag::Drain { stream_id: _ } => {
149                                 info!("ignoring unknown drain response");
150                             }
151                             _ => {
152                                 error!("dropping response for an untracked command: {:?}", tag);
153                             }
154                         },
155                     }
156                 }
157                 VideoEvtResponseType::Event(evt) => {
158                     self.write_event(evt)?;
159                 }
160             }
161         }
162 
163         if let Err(e) = self.write_responses(&mut responses) {
164             error!("Failed to write event responses: {:?}", e);
165             // Ignore result of write_event for a fatal error.
166             let _ = self.write_event(VideoEvt {
167                 typ: EvtType::Error,
168                 stream_id,
169             });
170             return Err(e);
171         }
172 
173         Ok(())
174     }
175 
176     /// Handles a `DescriptorChain` value sent via the command queue and returns a `VecDeque`
177     /// of `WritableResp` to be sent to the guest.
178     ///
179     /// # Arguments
180     ///
181     /// * `device` - Instance of backend device
182     /// * `wait_ctx` - `device` may register a new `Token::Event` for a new stream session to
183     ///   `wait_ctx`
184     /// * `desc` - `DescriptorChain` to handle
handle_command_desc( &mut self, device: &mut dyn Device, wait_ctx: &WaitContext<Token>, mut desc: DescriptorChain, ) -> Result<VecDeque<WritableResp>>185     fn handle_command_desc(
186         &mut self,
187         device: &mut dyn Device,
188         wait_ctx: &WaitContext<Token>,
189         mut desc: DescriptorChain,
190     ) -> Result<VecDeque<WritableResp>> {
191         let mut responses: VecDeque<WritableResp> = Default::default();
192         let cmd = VideoCmd::from_reader(&mut desc.reader).map_err(Error::ReadFailure)?;
193 
194         // If a destruction command comes, cancel pending requests.
195         // TODO(b/161774071): Allow `process_cmd` to return multiple responses and move this
196         // into encoder/decoder.
197         let async_responses = match cmd {
198             VideoCmd::ResourceDestroyAll {
199                 stream_id,
200                 queue_type,
201             } => self
202                 .desc_map
203                 .create_cancellation_responses(&stream_id, Some(queue_type), None),
204             VideoCmd::StreamDestroy { stream_id } => self
205                 .desc_map
206                 .create_cancellation_responses(&stream_id, None, None),
207             VideoCmd::QueueClear {
208                 stream_id,
209                 queue_type: QueueType::Output,
210             } => {
211                 // TODO(b/153406792): Due to a workaround for a limitation in the VDA api,
212                 // clearing the output queue doesn't go through the same Async path as clearing
213                 // the input queue. However, we still need to cancel the pending resources.
214                 self.desc_map.create_cancellation_responses(
215                     &stream_id,
216                     Some(QueueType::Output),
217                     None,
218                 )
219             }
220             _ => Default::default(),
221         };
222         for async_response in async_responses {
223             let AsyncCmdResponse {
224                 tag,
225                 response: cmd_result,
226             } = async_response;
227             let destroy_response = match cmd_result {
228                 Ok(r) => r,
229                 Err(e) => {
230                     error!("returning async error response: {}", &e);
231                     e.into()
232                 }
233             };
234             match self.desc_map.remove(&tag) {
235                 Some(destroy_desc) => {
236                     responses.push_back((destroy_desc, destroy_response));
237                 }
238                 None => error!("dropping response for an untracked command: {:?}", tag),
239             }
240         }
241 
242         // Process the command by the device.
243         let (cmd_response, event_responses_with_id) = device.process_cmd(cmd, wait_ctx);
244         match cmd_response {
245             VideoCmdResponseType::Sync(r) => {
246                 responses.push_back((desc, r));
247             }
248             VideoCmdResponseType::Async(tag) => {
249                 // If the command expects an asynchronous response,
250                 // store `desc` to use it after the back-end device notifies the
251                 // completion.
252                 self.desc_map.insert(tag, desc);
253             }
254         }
255         if let Some((stream_id, event_responses)) = event_responses_with_id {
256             self.write_event_responses(event_responses, stream_id)?;
257         }
258 
259         Ok(responses)
260     }
261 
262     /// Handles each command in the command queue.
263     ///
264     /// # Arguments
265     ///
266     /// * `device` - Instance of backend device
267     /// * `wait_ctx` - `device` may register a new `Token::Event` for a new stream session to
268     ///   `wait_ctx`
handle_command_queue( &mut self, device: &mut dyn Device, wait_ctx: &WaitContext<Token>, ) -> Result<()>269     fn handle_command_queue(
270         &mut self,
271         device: &mut dyn Device,
272         wait_ctx: &WaitContext<Token>,
273     ) -> Result<()> {
274         while let Some(desc) = self.cmd_queue.pop() {
275             let mut resps = self.handle_command_desc(device, wait_ctx, desc)?;
276             self.write_responses(&mut resps)?;
277         }
278         Ok(())
279     }
280 
281     /// Handles an event notified via an event.
282     ///
283     /// # Arguments
284     ///
285     /// * `device` - Instance of backend device
286     /// * `stream_id` - Stream session ID of the event
287     /// * `wait_ctx` - `device` may register a new `Token::Buffer` for a new stream session to
288     ///   `wait_ctx`
handle_event( &mut self, device: &mut dyn Device, stream_id: u32, wait_ctx: &WaitContext<Token>, ) -> Result<()>289     fn handle_event(
290         &mut self,
291         device: &mut dyn Device,
292         stream_id: u32,
293         wait_ctx: &WaitContext<Token>,
294     ) -> Result<()> {
295         if let Some(event_responses) = device.process_event(&mut self.desc_map, stream_id, wait_ctx)
296         {
297             self.write_event_responses(event_responses, stream_id)?;
298         }
299         Ok(())
300     }
301 
302     /// Handles a completed buffer barrier.
303     ///
304     /// # Arguments
305     ///
306     /// * `device` - Instance of backend device
307     /// * `stream_id` - Stream session ID of the event
308     /// * `wait_ctx` - `device` may deregister the completed `Token::BufferBarrier` from
309     /// `wait_ctx`.
handle_buffer_barrier( &mut self, device: &mut dyn Device, stream_id: u32, wait_ctx: &WaitContext<Token>, ) -> Result<()>310     fn handle_buffer_barrier(
311         &mut self,
312         device: &mut dyn Device,
313         stream_id: u32,
314         wait_ctx: &WaitContext<Token>,
315     ) -> Result<()> {
316         if let Some(event_responses) = device.process_buffer_barrier(stream_id, wait_ctx) {
317             self.write_event_responses(event_responses, stream_id)?;
318         }
319         Ok(())
320     }
321 
322     /// Runs the video device virtio queues in a blocking way.
323     ///
324     /// # Arguments
325     ///
326     /// * `device` - Instance of backend device
327     /// * `kill_evt` - `Event` notified to make `run` stop and return
run(&mut self, mut device: Box<dyn Device>, kill_evt: &Event) -> Result<()>328     pub fn run(&mut self, mut device: Box<dyn Device>, kill_evt: &Event) -> Result<()> {
329         let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
330             (self.cmd_queue.event(), Token::CmdQueue),
331             (self.event_queue.event(), Token::EventQueue),
332             (kill_evt, Token::Kill),
333         ])
334         .and_then(|wc| {
335             // resampling event exists per-PCI-INTx basis, so the two queues have the same event.
336             // Thus, checking only cmd_queue_interrupt suffices.
337             if let Some(resample_evt) = self.cmd_queue_interrupt.get_resample_evt() {
338                 wc.add(resample_evt, Token::InterruptResample)?;
339             }
340             Ok(wc)
341         })
342         .map_err(Error::WaitContextCreationFailed)?;
343 
344         loop {
345             let wait_events = wait_ctx.wait().map_err(Error::WaitError)?;
346 
347             for wait_event in wait_events.iter().filter(|e| e.is_readable) {
348                 match wait_event.token {
349                     Token::CmdQueue => {
350                         let _ = self.cmd_queue.event().wait();
351                         self.handle_command_queue(device.as_mut(), &wait_ctx)?;
352                     }
353                     Token::EventQueue => {
354                         let _ = self.event_queue.event().wait();
355                     }
356                     Token::Event { id } => {
357                         self.handle_event(device.as_mut(), id, &wait_ctx)?;
358                     }
359                     Token::BufferBarrier { id } => {
360                         self.handle_buffer_barrier(device.as_mut(), id, &wait_ctx)?;
361                     }
362                     Token::InterruptResample => {
363                         // Clear the event. `expect` is ok since the token fires if and only if
364                         // resample exists. resampling event exists per-PCI-INTx basis, so the
365                         // two queues have the same event.
366                         let _ = self
367                             .cmd_queue_interrupt
368                             .get_resample_evt()
369                             .expect("resample event for the command queue doesn't exist")
370                             .wait();
371                         self.cmd_queue_interrupt.do_interrupt_resample();
372                     }
373                     Token::Kill => return Ok(()),
374                 }
375             }
376         }
377     }
378 
379     /// Runs the video device virtio queues asynchronously.
380     ///
381     /// # Arguments
382     ///
383     /// * `device` - Instance of backend device
384     /// * `ex` - Instance of `Executor` of asynchronous operations
385     /// * `cmd_evt` - Driver-to-device kick event for the command queue
386     /// * `event_evt` - Driver-to-device kick event for the event queue
387     #[allow(dead_code)]
run_async( mut self, mut device: Box<dyn Device>, ex: Executor, cmd_evt: Event, event_evt: Event, ) -> Result<()>388     pub async fn run_async(
389         mut self,
390         mut device: Box<dyn Device>,
391         ex: Executor,
392         cmd_evt: Event,
393         event_evt: Event,
394     ) -> Result<()> {
395         let cmd_queue_evt =
396             EventAsync::new(cmd_evt, &ex).map_err(Error::EventAsyncCreationFailed)?;
397         let event_queue_evt =
398             EventAsync::new(event_evt, &ex).map_err(Error::EventAsyncCreationFailed)?;
399 
400         // WaitContext to wait for the response from the encoder/decoder device.
401         let device_wait_ctx = WaitContext::new().map_err(Error::WaitContextCreationFailed)?;
402         let device_evt = ex
403             .async_from(AsyncWrapper::new(
404                 clone_descriptor(&device_wait_ctx).map_err(Error::CloneDescriptorFailed)?,
405             ))
406             .map_err(Error::EventAsyncCreationFailed)?;
407 
408         loop {
409             let (
410                 cmd_queue_evt,
411                 device_evt,
412                 // Ignore driver-to-device kicks since the event queue is write-only for a device.
413                 _event_queue_evt,
414             ) = select3(
415                 cmd_queue_evt.next_val().boxed_local(),
416                 device_evt.wait_readable().boxed_local(),
417                 event_queue_evt.next_val().boxed_local(),
418             )
419             .await;
420 
421             if let SelectResult::Finished(_) = cmd_queue_evt {
422                 self.handle_command_queue(device.as_mut(), &device_wait_ctx)?;
423             }
424 
425             if let SelectResult::Finished(_) = device_evt {
426                 let device_events = match device_wait_ctx.wait_timeout(Duration::from_secs(0)) {
427                     Ok(device_events) => device_events,
428                     Err(_) => {
429                         error!("failed to read a device event");
430                         continue;
431                     }
432                 };
433                 for device_event in device_events {
434                     // A Device must trigger only Token::Event. See [`Device::process_cmd()`].
435                     if let Token::Event { id } = device_event.token {
436                         self.handle_event(device.as_mut(), id, &device_wait_ctx)?;
437                     } else {
438                         error!(
439                             "invalid event is triggered by a device {:?}",
440                             device_event.token
441                         );
442                     }
443                 }
444             }
445         }
446     }
447 }
448