1 // Copyright 2024 The ChromiumOS Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 use nix::errno::Errno; 6 use nix::sys::eventfd::EfdFlags; 7 use nix::sys::eventfd::EventFd; 8 9 use thiserror::Error; 10 11 use std::collections::VecDeque; 12 use std::marker::PhantomData; 13 use std::sync::atomic::AtomicU32; 14 use std::sync::Arc; 15 use std::sync::Mutex; 16 use std::thread; 17 use std::thread::JoinHandle; 18 use std::vec::Vec; 19 20 use crate::decoder::StreamInfo; 21 use crate::video_frame::VideoFrame; 22 use crate::Fourcc; 23 24 pub mod c2_decoder; 25 pub mod c2_encoder; 26 #[cfg(feature = "v4l2")] 27 pub mod c2_v4l2_decoder; 28 #[cfg(feature = "v4l2")] 29 pub mod c2_v4l2_encoder; 30 #[cfg(feature = "vaapi")] 31 pub mod c2_vaapi_decoder; 32 #[cfg(feature = "vaapi")] 33 pub mod c2_vaapi_encoder; 34 35 #[derive(Debug, Default, PartialEq, Eq, Copy, Clone)] 36 pub enum DrainMode { 37 // Not draining 38 #[default] 39 NoDrain = -1, 40 // Drain the C2 component and signal an EOS. Currently we also change the state to stop. 41 EOSDrain = 0, 42 // Drain the C2 component, but keep accepting new jobs in the queue immediately after. 43 NoEOSDrain = 1, 44 } 45 46 #[derive(Debug)] 47 pub struct C2DecodeJob<V: VideoFrame> { 48 // Compressed input data 49 // TODO: Use VideoFrame for input too 50 pub input: Vec<u8>, 51 // Decompressed output frame. Note that this needs to be reference counted because we may still 52 // use this frame as a reference frame even while we're displaying it. 53 pub output: Option<Arc<V>>, 54 pub timestamp: u64, 55 pub drain: DrainMode, 56 // TODO: Add output delay and color aspect support as needed. 57 } 58 59 impl<V> Job for C2DecodeJob<V> 60 where 61 V: VideoFrame, 62 { 63 type Frame = V; 64 set_drain(&mut self, drain: DrainMode)65 fn set_drain(&mut self, drain: DrainMode) { 66 self.drain = drain; 67 } 68 get_drain(&self) -> DrainMode69 fn get_drain(&self) -> DrainMode { 70 self.drain 71 } 72 } 73 74 impl<V: VideoFrame> Default for C2DecodeJob<V> { default() -> Self75 fn default() -> Self { 76 Self { input: vec![], output: None, timestamp: 0, drain: DrainMode::NoDrain } 77 } 78 } 79 80 pub trait Job: Send + 'static { 81 type Frame: VideoFrame; 82 set_drain(&mut self, drain: DrainMode) -> ()83 fn set_drain(&mut self, drain: DrainMode) -> (); get_drain(&self) -> DrainMode84 fn get_drain(&self) -> DrainMode; 85 } 86 87 #[derive(Debug)] 88 pub struct C2EncodeJob<V: VideoFrame> { 89 pub input: Option<V>, 90 // TODO: Use VideoFrame for output too 91 pub output: Vec<u8>, 92 // In microseconds. 93 pub timestamp: u64, 94 // TODO: only support CBR right now, follow up with VBR support. 95 pub bitrate: u64, 96 // Framerate is actually negotiated, so the encoder can change this value 97 // based on the timestamps of the frames it receives. 98 pub framerate: Arc<AtomicU32>, 99 pub drain: DrainMode, 100 } 101 102 impl<V: VideoFrame> Default for C2EncodeJob<V> { default() -> Self103 fn default() -> Self { 104 Self { 105 input: None, 106 output: vec![], 107 timestamp: 0, 108 bitrate: 0, 109 framerate: Arc::new(AtomicU32::new(0)), 110 drain: DrainMode::NoDrain, 111 } 112 } 113 } 114 115 impl<V> Job for C2EncodeJob<V> 116 where 117 V: VideoFrame, 118 { 119 type Frame = V; 120 set_drain(&mut self, drain: DrainMode)121 fn set_drain(&mut self, drain: DrainMode) { 122 self.drain = drain; 123 } 124 get_drain(&self) -> DrainMode125 fn get_drain(&self) -> DrainMode { 126 self.drain 127 } 128 } 129 130 #[derive(Debug, PartialEq, Eq, Copy, Clone)] 131 pub enum C2State { 132 C2Running, 133 C2Stopped, 134 // Note that on state C2Error, stop() must be called before we can start() 135 // again. 136 C2Error, 137 } 138 139 // This is not a very "Rust-y" way of doing error handling, but it will 140 // hopefully make the FFI easier to write. Numerical values taken from 141 // frameworks/av/media/codec2/core/include/C2.h 142 // TODO: Improve error handling by adding more statuses. 143 #[derive(Debug, PartialEq, Eq, Copy, Clone)] 144 pub enum C2Status { 145 C2Ok = 0, 146 C2BadState = 1, // EPERM 147 C2BadValue = 22, // EINVAL 148 } 149 150 // J should be either C2DecodeJob or C2EncodeJob. 151 pub trait C2Worker<J> 152 where 153 J: Send + Job + 'static, 154 { 155 type Options: Clone + Send + 'static; 156 new( input_fourcc: Fourcc, output_fourcc: Fourcc, awaiting_job_event: Arc<EventFd>, error_cb: Arc<Mutex<dyn FnMut(C2Status) + Send + 'static>>, work_done_cb: Arc<Mutex<dyn FnMut(J) + Send + 'static>>, work_queue: Arc<Mutex<VecDeque<J>>>, state: Arc<Mutex<C2State>>, framepool_hint_cb: Arc<Mutex<dyn FnMut(StreamInfo) + Send + 'static>>, alloc_cb: Arc<Mutex<dyn FnMut() -> Option<<J as Job>::Frame> + Send + 'static>>, options: Self::Options, ) -> Result<Self, String> where Self: Sized157 fn new( 158 input_fourcc: Fourcc, 159 output_fourcc: Fourcc, 160 awaiting_job_event: Arc<EventFd>, 161 error_cb: Arc<Mutex<dyn FnMut(C2Status) + Send + 'static>>, 162 work_done_cb: Arc<Mutex<dyn FnMut(J) + Send + 'static>>, 163 work_queue: Arc<Mutex<VecDeque<J>>>, 164 state: Arc<Mutex<C2State>>, 165 framepool_hint_cb: Arc<Mutex<dyn FnMut(StreamInfo) + Send + 'static>>, 166 alloc_cb: Arc<Mutex<dyn FnMut() -> Option<<J as Job>::Frame> + Send + 'static>>, 167 options: Self::Options, 168 ) -> Result<Self, String> 169 where 170 Self: Sized; 171 process_loop(&mut self)172 fn process_loop(&mut self); 173 } 174 175 #[derive(Debug, Error)] 176 pub enum C2WrapperError { 177 #[error("failed to create EventFd for awaiting job event: {0}")] 178 AwaitingJobEventFd(Errno), 179 } 180 181 // Note that we do not guarantee thread safety in C2CrosCodecsWrapper. 182 pub struct C2Wrapper<J, W> 183 where 184 J: Send + Default + Job + 'static, 185 W: C2Worker<J>, 186 { 187 awaiting_job_event: Arc<EventFd>, 188 input_fourcc: Fourcc, 189 output_fourcc: Fourcc, 190 error_cb: Arc<Mutex<dyn FnMut(C2Status) + Send + 'static>>, 191 work_done_cb: Arc<Mutex<dyn FnMut(J) + Send + 'static>>, 192 work_queue: Arc<Mutex<VecDeque<J>>>, 193 state: Arc<Mutex<C2State>>, 194 framepool_hint_cb: Arc<Mutex<dyn FnMut(StreamInfo) + Send + 'static>>, 195 alloc_cb: Arc<Mutex<dyn FnMut() -> Option<<J as Job>::Frame> + Send + 'static>>, 196 options: <W as C2Worker<J>>::Options, 197 worker_thread: Option<JoinHandle<()>>, 198 // The instance of V actually lives in the thread creation closure, not 199 // this struct. 200 _phantom: PhantomData<W>, 201 } 202 203 impl<J, W> C2Wrapper<J, W> 204 where 205 J: Send + Default + Job + 'static, 206 W: C2Worker<J>, 207 { new( input_fourcc: Fourcc, output_fourcc: Fourcc, error_cb: impl FnMut(C2Status) + Send + 'static, work_done_cb: impl FnMut(J) + Send + 'static, framepool_hint_cb: impl FnMut(StreamInfo) + Send + 'static, alloc_cb: impl FnMut() -> Option<<J as Job>::Frame> + Send + 'static, options: <W as C2Worker<J>>::Options, ) -> Self208 pub fn new( 209 input_fourcc: Fourcc, 210 output_fourcc: Fourcc, 211 error_cb: impl FnMut(C2Status) + Send + 'static, 212 work_done_cb: impl FnMut(J) + Send + 'static, 213 framepool_hint_cb: impl FnMut(StreamInfo) + Send + 'static, 214 alloc_cb: impl FnMut() -> Option<<J as Job>::Frame> + Send + 'static, 215 options: <W as C2Worker<J>>::Options, 216 ) -> Self { 217 Self { 218 awaiting_job_event: Arc::new( 219 EventFd::from_flags(EfdFlags::EFD_SEMAPHORE) 220 .map_err(C2WrapperError::AwaitingJobEventFd) 221 .unwrap(), 222 ), 223 input_fourcc: input_fourcc, 224 output_fourcc: output_fourcc, 225 error_cb: Arc::new(Mutex::new(error_cb)), 226 work_done_cb: Arc::new(Mutex::new(work_done_cb)), 227 work_queue: Arc::new(Mutex::new(VecDeque::new())), 228 state: Arc::new(Mutex::new(C2State::C2Stopped)), 229 framepool_hint_cb: Arc::new(Mutex::new(framepool_hint_cb)), 230 alloc_cb: Arc::new(Mutex::new(alloc_cb)), 231 options: options, 232 worker_thread: None, 233 _phantom: Default::default(), 234 } 235 } 236 237 // This isn't part of C2, but it's convenient to check if our worker thread 238 // is still running. is_alive(&self) -> bool239 pub fn is_alive(&self) -> bool { 240 match &self.worker_thread { 241 Some(worker_thread) => !worker_thread.is_finished(), 242 None => false, 243 } 244 } 245 246 // Start the decoder/encoder. 247 // State will be C2Running after this call. start(&mut self) -> C2Status248 pub fn start(&mut self) -> C2Status { 249 { 250 let mut state = self.state.lock().unwrap(); 251 if *state != C2State::C2Stopped { 252 (*self.error_cb.lock().unwrap())(C2Status::C2BadState); 253 return C2Status::C2BadState; 254 } 255 *state = C2State::C2Running; 256 } 257 258 let input_fourcc = self.input_fourcc.clone(); 259 let output_fourcc = self.output_fourcc.clone(); 260 let error_cb = self.error_cb.clone(); 261 let work_done_cb = self.work_done_cb.clone(); 262 let work_queue = self.work_queue.clone(); 263 let state = self.state.clone(); 264 let options = self.options.clone(); 265 let awaiting_job_event = self.awaiting_job_event.clone(); 266 let framepool_hint_cb = self.framepool_hint_cb.clone(); 267 let alloc_cb = self.alloc_cb.clone(); 268 self.worker_thread = Some(thread::spawn(move || { 269 let worker = W::new( 270 input_fourcc, 271 output_fourcc, 272 awaiting_job_event, 273 error_cb.clone(), 274 work_done_cb, 275 work_queue, 276 state.clone(), 277 framepool_hint_cb, 278 alloc_cb, 279 options, 280 ); 281 match worker { 282 Ok(mut worker) => worker.process_loop(), 283 Err(msg) => { 284 log::debug!("Error instantiating C2Worker {}", msg); 285 *state.lock().unwrap() = C2State::C2Error; 286 (*error_cb.lock().unwrap())(C2Status::C2BadValue); 287 } 288 }; 289 })); 290 291 C2Status::C2Ok 292 } 293 294 // Stop the decoder/encoder and abandon in-flight work. 295 // C2's reset() function is equivalent for our purposes. 296 // Note that in event of error, stop() must be called before we can start() 297 // again. This is to ensure we clear out the work queue. 298 // State will be C2Stopped after this call. stop(&mut self) -> C2Status299 pub fn stop(&mut self) -> C2Status { 300 *self.state.lock().unwrap() = C2State::C2Stopped; 301 302 self.work_queue.lock().unwrap().drain(..); 303 304 self.awaiting_job_event.write(1).unwrap(); 305 306 let mut worker_thread: Option<JoinHandle<()>> = None; 307 std::mem::swap(&mut worker_thread, &mut self.worker_thread); 308 self.worker_thread = match worker_thread { 309 Some(worker_thread) => { 310 let _ = worker_thread.join(); 311 None 312 } 313 None => None, 314 }; 315 316 C2Status::C2Ok 317 } 318 319 // Add work to the work queue. 320 // State must be C2Running or this function is invalid. 321 // State will remain C2Running. queue(&mut self, work_items: Vec<J>) -> C2Status322 pub fn queue(&mut self, work_items: Vec<J>) -> C2Status { 323 if *self.state.lock().unwrap() != C2State::C2Running { 324 (*self.error_cb.lock().unwrap())(C2Status::C2BadState); 325 return C2Status::C2BadState; 326 } 327 328 self.work_queue.lock().unwrap().extend(work_items.into_iter()); 329 330 self.awaiting_job_event.write(1).unwrap(); 331 332 C2Status::C2Ok 333 } 334 335 // Flush work from the queue and return it as |flushed_work|. 336 // State will not change after this call. 337 // TODO: Support different flush modes. flush(&mut self, flushed_work: &mut Vec<J>) -> C2Status338 pub fn flush(&mut self, flushed_work: &mut Vec<J>) -> C2Status { 339 if *self.state.lock().unwrap() != C2State::C2Running { 340 (*self.error_cb.lock().unwrap())(C2Status::C2BadState); 341 return C2Status::C2BadState; 342 } 343 344 { 345 let mut work_queue = self.work_queue.lock().unwrap(); 346 let mut tmp = work_queue.drain(..).collect::<Vec<J>>(); 347 flushed_work.append(&mut tmp); 348 349 // Note that we don't just call drain() because we want to guarantee atomicity with respect 350 // to the work_queue eviction. 351 let mut drain_job: J = Default::default(); 352 drain_job.set_drain(DrainMode::NoEOSDrain); 353 work_queue.push_back(drain_job); 354 } 355 356 C2Status::C2Ok 357 } 358 359 // Signal to the decoder/encoder that it does not need to wait for 360 // additional work to begin processing. This is an unusual name for this 361 // function, but it is the convention that C2 uses. 362 // State must be C2Running or this function is invalid. 363 // State will remain C2Running until the last frames drain, at which point 364 // the state will change to C2Stopped. 365 // TODO: Support different drain modes. drain(&mut self, mode: DrainMode) -> C2Status366 pub fn drain(&mut self, mode: DrainMode) -> C2Status { 367 if *self.state.lock().unwrap() != C2State::C2Running { 368 (*self.error_cb.lock().unwrap())(C2Status::C2BadState); 369 return C2Status::C2BadState; 370 } 371 372 let mut drain_job: J = Default::default(); 373 drain_job.set_drain(mode); 374 self.work_queue.lock().unwrap().push_back(drain_job); 375 376 self.awaiting_job_event.write(1).unwrap(); 377 378 C2Status::C2Ok 379 } 380 } 381 382 // Instead of C2's release() function, we implement Drop and use RAII to 383 // accomplish the same thing 384 impl<J, W> Drop for C2Wrapper<J, W> 385 where 386 J: Send + Default + Job + 'static, 387 W: C2Worker<J>, 388 { drop(&mut self)389 fn drop(&mut self) { 390 self.stop(); 391 } 392 } 393