• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 // TODO(acourbot): Remove once we start using this file
6 #![allow(dead_code)]
7 
8 use std::{
9     collections::{btree_map::Entry, BTreeMap, VecDeque},
10     time::Duration,
11 };
12 
13 use base::Event;
14 use thiserror::Error as ThisError;
15 
16 use crate::virtio::video::resource::GuestResource;
17 
18 /// Manages a pollable queue of events to be sent to the decoder or encoder.
19 pub struct EventQueue<T> {
20     /// Pipe used to signal available events.
21     event: Event,
22     /// FIFO of all pending events.
23     pending_events: VecDeque<T>,
24 }
25 
26 impl<T> EventQueue<T> {
27     /// Create a new event queue.
new() -> base::Result<Self>28     pub fn new() -> base::Result<Self> {
29         Ok(Self {
30             // Use semaphore semantics so `eventfd` can be `read` as many times as it has been
31             // `write`n to without blocking.
32             event: Event::new()?,
33             pending_events: Default::default(),
34         })
35     }
36 
37     /// Add `event` to the queue.
queue_event(&mut self, event: T) -> base::Result<()>38     pub fn queue_event(&mut self, event: T) -> base::Result<()> {
39         self.pending_events.push_back(event);
40         self.event.write(1)?;
41         Ok(())
42     }
43 
44     /// Read the next event, blocking until an event becomes available.
dequeue_event(&mut self) -> base::Result<T>45     pub fn dequeue_event(&mut self) -> base::Result<T> {
46         // Wait until at least one event is written, if necessary.
47         let cpt = self.event.read()?;
48         let event = match self.pending_events.pop_front() {
49             Some(event) => event,
50             None => panic!("event signaled but no pending event - this is a bug."),
51         };
52         // If we have more than one event pending, write the remainder back into the event so it
53         // keeps signalling.
54         if cpt > 1 {
55             self.event.write(cpt - 1)?;
56         }
57 
58         Ok(event)
59     }
60 
61     /// Return a reference to an `Event` on which the caller can poll to know when `dequeue_event`
62     /// can be called without blocking.
event_pipe(&self) -> &Event63     pub fn event_pipe(&self) -> &Event {
64         &self.event
65     }
66 
67     /// Remove all the posted events for which `predicate` returns `false`.
retain<P: FnMut(&T) -> bool>(&mut self, predicate: P)68     pub fn retain<P: FnMut(&T) -> bool>(&mut self, predicate: P) {
69         if self.pending_events.len() > 0 {
70             let _ = self
71                 .event
72                 .read_timeout(Duration::from_millis(0))
73                 .expect("read_timeout failure");
74         }
75 
76         self.pending_events.retain(predicate);
77 
78         let num_pending_events = self.pending_events.len();
79         if num_pending_events > 0 {
80             self.event
81                 .write(num_pending_events as u64)
82                 .expect("write failure");
83         }
84     }
85 
86     /// Returns the number of events currently pending on this queue, i.e. the number of times
87     /// `dequeue_event` can be called without blocking.
88     #[cfg(test)]
len(&self) -> usize89     pub fn len(&self) -> usize {
90         self.pending_events.len()
91     }
92 }
93 
94 /// Queue of all the output buffers provided by crosvm.
95 pub struct OutputQueue {
96     // Max number of output buffers that can be imported into this queue.
97     num_buffers: usize,
98     // Maps picture IDs to the corresponding guest resource.
99     buffers: BTreeMap<u32, GuestResource>,
100     // Picture IDs of output buffers we can write into.
101     ready_buffers: VecDeque<u32>,
102 }
103 
104 #[derive(Debug, ThisError)]
105 pub enum OutputBufferImportError {
106     #[error("maximum number of imported buffers ({0}) already reached")]
107     MaxBuffersReached(usize),
108     #[error("a buffer with picture ID {0} is already imported")]
109     AlreadyImported(u32),
110 }
111 
112 #[derive(Debug, ThisError)]
113 pub enum OutputBufferReuseError {
114     #[error("no buffer with picture ID {0} is imported at the moment")]
115     NotYetImported(u32),
116     #[error("buffer with picture ID {0} is already ready for use")]
117     AlreadyUsed(u32),
118 }
119 
120 impl OutputQueue {
121     /// Creates a new output queue capable of containing `num_buffers` buffers.
new(num_buffers: usize) -> Self122     pub fn new(num_buffers: usize) -> Self {
123         Self {
124             num_buffers,
125             buffers: Default::default(),
126             ready_buffers: Default::default(),
127         }
128     }
129 
130     /// Import a buffer, i.e. associate the buffer's `resource` to a given `picture_buffer_id`, and
131     /// make the buffer ready for use.
132     ///
133     /// A buffer with a given `picture_buffer_id` can only be imported once.
import_buffer( &mut self, picture_buffer_id: u32, resource: GuestResource, ) -> Result<(), OutputBufferImportError>134     pub fn import_buffer(
135         &mut self,
136         picture_buffer_id: u32,
137         resource: GuestResource,
138     ) -> Result<(), OutputBufferImportError> {
139         if self.buffers.len() >= self.num_buffers {
140             return Err(OutputBufferImportError::MaxBuffersReached(self.num_buffers));
141         }
142 
143         match self.buffers.entry(picture_buffer_id) {
144             Entry::Vacant(o) => {
145                 o.insert(resource);
146             }
147             Entry::Occupied(_) => {
148                 return Err(OutputBufferImportError::AlreadyImported(picture_buffer_id));
149             }
150         }
151 
152         self.ready_buffers.push_back(picture_buffer_id);
153 
154         Ok(())
155     }
156 
157     /// Mark the previously-imported buffer with ID `picture_buffer_id` as ready for being used.
reuse_buffer(&mut self, picture_buffer_id: u32) -> Result<(), OutputBufferReuseError>158     pub fn reuse_buffer(&mut self, picture_buffer_id: u32) -> Result<(), OutputBufferReuseError> {
159         if !self.buffers.contains_key(&picture_buffer_id) {
160             return Err(OutputBufferReuseError::NotYetImported(picture_buffer_id));
161         }
162 
163         if self.ready_buffers.contains(&picture_buffer_id) {
164             return Err(OutputBufferReuseError::AlreadyUsed(picture_buffer_id));
165         }
166 
167         self.ready_buffers.push_back(picture_buffer_id);
168 
169         Ok(())
170     }
171 
172     /// Get a buffer ready to be decoded into, if any is available.
try_get_ready_buffer(&mut self) -> Option<(u32, &mut GuestResource)>173     pub fn try_get_ready_buffer(&mut self) -> Option<(u32, &mut GuestResource)> {
174         let picture_buffer_id = self.ready_buffers.pop_front()?;
175         // Unwrapping is safe here because our interface guarantees that ids in `ready_buffers` are
176         // valid keys for `buffers`.
177         Some((
178             picture_buffer_id,
179             self.buffers
180                 .get_mut(&picture_buffer_id)
181                 .expect("expected buffer not present in queue"),
182         ))
183     }
184 
clear_ready_buffers(&mut self)185     pub fn clear_ready_buffers(&mut self) {
186         self.ready_buffers.clear();
187     }
188 }
189 
190 #[cfg(test)]
191 mod tests {
192     use std::time::Duration;
193 
194     use super::*;
195     use crate::virtio::video::{decoder::DecoderEvent, format::Rect};
196     use base::{PollToken, WaitContext};
197 
198     /// Test basic queue/dequeue functionality of `EventQueue`.
199     #[test]
event_queue()200     fn event_queue() {
201         let mut event_queue = EventQueue::new().unwrap();
202 
203         assert_eq!(
204             event_queue.queue_event(DecoderEvent::NotifyEndOfBitstreamBuffer(1)),
205             Ok(())
206         );
207         assert_eq!(event_queue.len(), 1);
208         assert_eq!(
209             event_queue.queue_event(DecoderEvent::PictureReady {
210                 picture_buffer_id: 0,
211                 bitstream_id: 1,
212                 visible_rect: Rect {
213                     left: 0,
214                     top: 0,
215                     right: 320,
216                     bottom: 240,
217                 },
218             }),
219             Ok(())
220         );
221         assert_eq!(event_queue.len(), 2);
222 
223         assert!(matches!(
224             event_queue.dequeue_event(),
225             Ok(DecoderEvent::NotifyEndOfBitstreamBuffer(1))
226         ));
227         assert_eq!(event_queue.len(), 1);
228         assert!(matches!(
229             event_queue.dequeue_event(),
230             Ok(DecoderEvent::PictureReady {
231                 picture_buffer_id: 0,
232                 bitstream_id: 1,
233                 visible_rect: Rect {
234                     left: 0,
235                     top: 0,
236                     right: 320,
237                     bottom: 240,
238                 }
239             })
240         ));
241         assert_eq!(event_queue.len(), 0);
242     }
243 
244     /// Test polling of `DecoderEventQueue`'s `event_pipe`.
245     #[test]
decoder_event_queue_polling()246     fn decoder_event_queue_polling() {
247         #[derive(PollToken)]
248         enum Token {
249             Event,
250         }
251 
252         let mut event_queue = EventQueue::new().unwrap();
253         let event_pipe = event_queue.event_pipe();
254         let wait_context = WaitContext::build_with(&[(event_pipe, Token::Event)]).unwrap();
255 
256         // The queue is empty, so `event_pipe` should not signal.
257         assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 0);
258 
259         // `event_pipe` should signal as long as the queue is not empty.
260         event_queue
261             .queue_event(DecoderEvent::NotifyEndOfBitstreamBuffer(1))
262             .unwrap();
263         assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 1);
264         event_queue
265             .queue_event(DecoderEvent::NotifyEndOfBitstreamBuffer(2))
266             .unwrap();
267         assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 1);
268         event_queue
269             .queue_event(DecoderEvent::NotifyEndOfBitstreamBuffer(3))
270             .unwrap();
271         assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 1);
272 
273         event_queue.dequeue_event().unwrap();
274         assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 1);
275         event_queue.dequeue_event().unwrap();
276         assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 1);
277         event_queue.dequeue_event().unwrap();
278 
279         // The queue is empty again, so `event_pipe` should not signal.
280         assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 0);
281     }
282 }
283