1 // Copyright 2022 The Chromium OS Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 // TODO(acourbot): Remove once we start using this file 6 #![allow(dead_code)] 7 8 use std::{ 9 collections::{btree_map::Entry, BTreeMap, VecDeque}, 10 time::Duration, 11 }; 12 13 use base::Event; 14 use thiserror::Error as ThisError; 15 16 use crate::virtio::video::resource::GuestResource; 17 18 /// Manages a pollable queue of events to be sent to the decoder or encoder. 19 pub struct EventQueue<T> { 20 /// Pipe used to signal available events. 21 event: Event, 22 /// FIFO of all pending events. 23 pending_events: VecDeque<T>, 24 } 25 26 impl<T> EventQueue<T> { 27 /// Create a new event queue. new() -> base::Result<Self>28 pub fn new() -> base::Result<Self> { 29 Ok(Self { 30 // Use semaphore semantics so `eventfd` can be `read` as many times as it has been 31 // `write`n to without blocking. 32 event: Event::new()?, 33 pending_events: Default::default(), 34 }) 35 } 36 37 /// Add `event` to the queue. queue_event(&mut self, event: T) -> base::Result<()>38 pub fn queue_event(&mut self, event: T) -> base::Result<()> { 39 self.pending_events.push_back(event); 40 self.event.write(1)?; 41 Ok(()) 42 } 43 44 /// Read the next event, blocking until an event becomes available. dequeue_event(&mut self) -> base::Result<T>45 pub fn dequeue_event(&mut self) -> base::Result<T> { 46 // Wait until at least one event is written, if necessary. 47 let cpt = self.event.read()?; 48 let event = match self.pending_events.pop_front() { 49 Some(event) => event, 50 None => panic!("event signaled but no pending event - this is a bug."), 51 }; 52 // If we have more than one event pending, write the remainder back into the event so it 53 // keeps signalling. 54 if cpt > 1 { 55 self.event.write(cpt - 1)?; 56 } 57 58 Ok(event) 59 } 60 61 /// Return a reference to an `Event` on which the caller can poll to know when `dequeue_event` 62 /// can be called without blocking. event_pipe(&self) -> &Event63 pub fn event_pipe(&self) -> &Event { 64 &self.event 65 } 66 67 /// Remove all the posted events for which `predicate` returns `false`. retain<P: FnMut(&T) -> bool>(&mut self, predicate: P)68 pub fn retain<P: FnMut(&T) -> bool>(&mut self, predicate: P) { 69 if self.pending_events.len() > 0 { 70 let _ = self 71 .event 72 .read_timeout(Duration::from_millis(0)) 73 .expect("read_timeout failure"); 74 } 75 76 self.pending_events.retain(predicate); 77 78 let num_pending_events = self.pending_events.len(); 79 if num_pending_events > 0 { 80 self.event 81 .write(num_pending_events as u64) 82 .expect("write failure"); 83 } 84 } 85 86 /// Returns the number of events currently pending on this queue, i.e. the number of times 87 /// `dequeue_event` can be called without blocking. 88 #[cfg(test)] len(&self) -> usize89 pub fn len(&self) -> usize { 90 self.pending_events.len() 91 } 92 } 93 94 /// Queue of all the output buffers provided by crosvm. 95 pub struct OutputQueue { 96 // Max number of output buffers that can be imported into this queue. 97 num_buffers: usize, 98 // Maps picture IDs to the corresponding guest resource. 99 buffers: BTreeMap<u32, GuestResource>, 100 // Picture IDs of output buffers we can write into. 101 ready_buffers: VecDeque<u32>, 102 } 103 104 #[derive(Debug, ThisError)] 105 pub enum OutputBufferImportError { 106 #[error("maximum number of imported buffers ({0}) already reached")] 107 MaxBuffersReached(usize), 108 #[error("a buffer with picture ID {0} is already imported")] 109 AlreadyImported(u32), 110 } 111 112 #[derive(Debug, ThisError)] 113 pub enum OutputBufferReuseError { 114 #[error("no buffer with picture ID {0} is imported at the moment")] 115 NotYetImported(u32), 116 #[error("buffer with picture ID {0} is already ready for use")] 117 AlreadyUsed(u32), 118 } 119 120 impl OutputQueue { 121 /// Creates a new output queue capable of containing `num_buffers` buffers. new(num_buffers: usize) -> Self122 pub fn new(num_buffers: usize) -> Self { 123 Self { 124 num_buffers, 125 buffers: Default::default(), 126 ready_buffers: Default::default(), 127 } 128 } 129 130 /// Import a buffer, i.e. associate the buffer's `resource` to a given `picture_buffer_id`, and 131 /// make the buffer ready for use. 132 /// 133 /// A buffer with a given `picture_buffer_id` can only be imported once. import_buffer( &mut self, picture_buffer_id: u32, resource: GuestResource, ) -> Result<(), OutputBufferImportError>134 pub fn import_buffer( 135 &mut self, 136 picture_buffer_id: u32, 137 resource: GuestResource, 138 ) -> Result<(), OutputBufferImportError> { 139 if self.buffers.len() >= self.num_buffers { 140 return Err(OutputBufferImportError::MaxBuffersReached(self.num_buffers)); 141 } 142 143 match self.buffers.entry(picture_buffer_id) { 144 Entry::Vacant(o) => { 145 o.insert(resource); 146 } 147 Entry::Occupied(_) => { 148 return Err(OutputBufferImportError::AlreadyImported(picture_buffer_id)); 149 } 150 } 151 152 self.ready_buffers.push_back(picture_buffer_id); 153 154 Ok(()) 155 } 156 157 /// Mark the previously-imported buffer with ID `picture_buffer_id` as ready for being used. reuse_buffer(&mut self, picture_buffer_id: u32) -> Result<(), OutputBufferReuseError>158 pub fn reuse_buffer(&mut self, picture_buffer_id: u32) -> Result<(), OutputBufferReuseError> { 159 if !self.buffers.contains_key(&picture_buffer_id) { 160 return Err(OutputBufferReuseError::NotYetImported(picture_buffer_id)); 161 } 162 163 if self.ready_buffers.contains(&picture_buffer_id) { 164 return Err(OutputBufferReuseError::AlreadyUsed(picture_buffer_id)); 165 } 166 167 self.ready_buffers.push_back(picture_buffer_id); 168 169 Ok(()) 170 } 171 172 /// Get a buffer ready to be decoded into, if any is available. try_get_ready_buffer(&mut self) -> Option<(u32, &mut GuestResource)>173 pub fn try_get_ready_buffer(&mut self) -> Option<(u32, &mut GuestResource)> { 174 let picture_buffer_id = self.ready_buffers.pop_front()?; 175 // Unwrapping is safe here because our interface guarantees that ids in `ready_buffers` are 176 // valid keys for `buffers`. 177 Some(( 178 picture_buffer_id, 179 self.buffers 180 .get_mut(&picture_buffer_id) 181 .expect("expected buffer not present in queue"), 182 )) 183 } 184 clear_ready_buffers(&mut self)185 pub fn clear_ready_buffers(&mut self) { 186 self.ready_buffers.clear(); 187 } 188 } 189 190 #[cfg(test)] 191 mod tests { 192 use std::time::Duration; 193 194 use super::*; 195 use crate::virtio::video::{decoder::DecoderEvent, format::Rect}; 196 use base::{PollToken, WaitContext}; 197 198 /// Test basic queue/dequeue functionality of `EventQueue`. 199 #[test] event_queue()200 fn event_queue() { 201 let mut event_queue = EventQueue::new().unwrap(); 202 203 assert_eq!( 204 event_queue.queue_event(DecoderEvent::NotifyEndOfBitstreamBuffer(1)), 205 Ok(()) 206 ); 207 assert_eq!(event_queue.len(), 1); 208 assert_eq!( 209 event_queue.queue_event(DecoderEvent::PictureReady { 210 picture_buffer_id: 0, 211 bitstream_id: 1, 212 visible_rect: Rect { 213 left: 0, 214 top: 0, 215 right: 320, 216 bottom: 240, 217 }, 218 }), 219 Ok(()) 220 ); 221 assert_eq!(event_queue.len(), 2); 222 223 assert!(matches!( 224 event_queue.dequeue_event(), 225 Ok(DecoderEvent::NotifyEndOfBitstreamBuffer(1)) 226 )); 227 assert_eq!(event_queue.len(), 1); 228 assert!(matches!( 229 event_queue.dequeue_event(), 230 Ok(DecoderEvent::PictureReady { 231 picture_buffer_id: 0, 232 bitstream_id: 1, 233 visible_rect: Rect { 234 left: 0, 235 top: 0, 236 right: 320, 237 bottom: 240, 238 } 239 }) 240 )); 241 assert_eq!(event_queue.len(), 0); 242 } 243 244 /// Test polling of `DecoderEventQueue`'s `event_pipe`. 245 #[test] decoder_event_queue_polling()246 fn decoder_event_queue_polling() { 247 #[derive(PollToken)] 248 enum Token { 249 Event, 250 } 251 252 let mut event_queue = EventQueue::new().unwrap(); 253 let event_pipe = event_queue.event_pipe(); 254 let wait_context = WaitContext::build_with(&[(event_pipe, Token::Event)]).unwrap(); 255 256 // The queue is empty, so `event_pipe` should not signal. 257 assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 0); 258 259 // `event_pipe` should signal as long as the queue is not empty. 260 event_queue 261 .queue_event(DecoderEvent::NotifyEndOfBitstreamBuffer(1)) 262 .unwrap(); 263 assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 1); 264 event_queue 265 .queue_event(DecoderEvent::NotifyEndOfBitstreamBuffer(2)) 266 .unwrap(); 267 assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 1); 268 event_queue 269 .queue_event(DecoderEvent::NotifyEndOfBitstreamBuffer(3)) 270 .unwrap(); 271 assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 1); 272 273 event_queue.dequeue_event().unwrap(); 274 assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 1); 275 event_queue.dequeue_event().unwrap(); 276 assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 1); 277 event_queue.dequeue_event().unwrap(); 278 279 // The queue is empty again, so `event_pipe` should not signal. 280 assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 0); 281 } 282 } 283