// Copyright 2024 The ChromiumOS Authors // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. use nix::errno::Errno; use nix::sys::eventfd::EfdFlags; use nix::sys::eventfd::EventFd; use thiserror::Error; use std::collections::VecDeque; use std::marker::PhantomData; use std::sync::atomic::AtomicU32; use std::sync::Arc; use std::sync::Mutex; use std::thread; use std::thread::JoinHandle; use std::vec::Vec; use crate::decoder::StreamInfo; use crate::video_frame::VideoFrame; use crate::Fourcc; pub mod c2_decoder; pub mod c2_encoder; #[cfg(feature = "v4l2")] pub mod c2_v4l2_decoder; #[cfg(feature = "v4l2")] pub mod c2_v4l2_encoder; #[cfg(feature = "vaapi")] pub mod c2_vaapi_decoder; #[cfg(feature = "vaapi")] pub mod c2_vaapi_encoder; #[derive(Debug, Default, PartialEq, Eq, Copy, Clone)] pub enum DrainMode { // Not draining #[default] NoDrain = -1, // Drain the C2 component and signal an EOS. Currently we also change the state to stop. EOSDrain = 0, // Drain the C2 component, but keep accepting new jobs in the queue immediately after. NoEOSDrain = 1, } #[derive(Debug)] pub struct C2DecodeJob { // Compressed input data // TODO: Use VideoFrame for input too pub input: Vec, // Decompressed output frame. Note that this needs to be reference counted because we may still // use this frame as a reference frame even while we're displaying it. pub output: Option>, pub timestamp: u64, pub drain: DrainMode, // TODO: Add output delay and color aspect support as needed. } impl Job for C2DecodeJob where V: VideoFrame, { type Frame = V; fn set_drain(&mut self, drain: DrainMode) { self.drain = drain; } fn get_drain(&self) -> DrainMode { self.drain } } impl Default for C2DecodeJob { fn default() -> Self { Self { input: vec![], output: None, timestamp: 0, drain: DrainMode::NoDrain } } } pub trait Job: Send + 'static { type Frame: VideoFrame; fn set_drain(&mut self, drain: DrainMode) -> (); fn get_drain(&self) -> DrainMode; } #[derive(Debug)] pub struct C2EncodeJob { pub input: Option, // TODO: Use VideoFrame for output too pub output: Vec, // In microseconds. pub timestamp: u64, // TODO: only support CBR right now, follow up with VBR support. pub bitrate: u64, // Framerate is actually negotiated, so the encoder can change this value // based on the timestamps of the frames it receives. pub framerate: Arc, pub drain: DrainMode, } impl Default for C2EncodeJob { fn default() -> Self { Self { input: None, output: vec![], timestamp: 0, bitrate: 0, framerate: Arc::new(AtomicU32::new(0)), drain: DrainMode::NoDrain, } } } impl Job for C2EncodeJob where V: VideoFrame, { type Frame = V; fn set_drain(&mut self, drain: DrainMode) { self.drain = drain; } fn get_drain(&self) -> DrainMode { self.drain } } #[derive(Debug, PartialEq, Eq, Copy, Clone)] pub enum C2State { C2Running, C2Stopped, // Note that on state C2Error, stop() must be called before we can start() // again. C2Error, } // This is not a very "Rust-y" way of doing error handling, but it will // hopefully make the FFI easier to write. Numerical values taken from // frameworks/av/media/codec2/core/include/C2.h // TODO: Improve error handling by adding more statuses. #[derive(Debug, PartialEq, Eq, Copy, Clone)] pub enum C2Status { C2Ok = 0, C2BadState = 1, // EPERM C2BadValue = 22, // EINVAL } // J should be either C2DecodeJob or C2EncodeJob. pub trait C2Worker where J: Send + Job + 'static, { type Options: Clone + Send + 'static; fn new( input_fourcc: Fourcc, output_fourcc: Fourcc, awaiting_job_event: Arc, error_cb: Arc>, work_done_cb: Arc>, work_queue: Arc>>, state: Arc>, framepool_hint_cb: Arc>, alloc_cb: Arc Option<::Frame> + Send + 'static>>, options: Self::Options, ) -> Result where Self: Sized; fn process_loop(&mut self); } #[derive(Debug, Error)] pub enum C2WrapperError { #[error("failed to create EventFd for awaiting job event: {0}")] AwaitingJobEventFd(Errno), } // Note that we do not guarantee thread safety in C2CrosCodecsWrapper. pub struct C2Wrapper where J: Send + Default + Job + 'static, W: C2Worker, { awaiting_job_event: Arc, input_fourcc: Fourcc, output_fourcc: Fourcc, error_cb: Arc>, work_done_cb: Arc>, work_queue: Arc>>, state: Arc>, framepool_hint_cb: Arc>, alloc_cb: Arc Option<::Frame> + Send + 'static>>, options: >::Options, worker_thread: Option>, // The instance of V actually lives in the thread creation closure, not // this struct. _phantom: PhantomData, } impl C2Wrapper where J: Send + Default + Job + 'static, W: C2Worker, { pub fn new( input_fourcc: Fourcc, output_fourcc: Fourcc, error_cb: impl FnMut(C2Status) + Send + 'static, work_done_cb: impl FnMut(J) + Send + 'static, framepool_hint_cb: impl FnMut(StreamInfo) + Send + 'static, alloc_cb: impl FnMut() -> Option<::Frame> + Send + 'static, options: >::Options, ) -> Self { Self { awaiting_job_event: Arc::new( EventFd::from_flags(EfdFlags::EFD_SEMAPHORE) .map_err(C2WrapperError::AwaitingJobEventFd) .unwrap(), ), input_fourcc: input_fourcc, output_fourcc: output_fourcc, error_cb: Arc::new(Mutex::new(error_cb)), work_done_cb: Arc::new(Mutex::new(work_done_cb)), work_queue: Arc::new(Mutex::new(VecDeque::new())), state: Arc::new(Mutex::new(C2State::C2Stopped)), framepool_hint_cb: Arc::new(Mutex::new(framepool_hint_cb)), alloc_cb: Arc::new(Mutex::new(alloc_cb)), options: options, worker_thread: None, _phantom: Default::default(), } } // This isn't part of C2, but it's convenient to check if our worker thread // is still running. pub fn is_alive(&self) -> bool { match &self.worker_thread { Some(worker_thread) => !worker_thread.is_finished(), None => false, } } // Start the decoder/encoder. // State will be C2Running after this call. pub fn start(&mut self) -> C2Status { { let mut state = self.state.lock().unwrap(); if *state != C2State::C2Stopped { (*self.error_cb.lock().unwrap())(C2Status::C2BadState); return C2Status::C2BadState; } *state = C2State::C2Running; } let input_fourcc = self.input_fourcc.clone(); let output_fourcc = self.output_fourcc.clone(); let error_cb = self.error_cb.clone(); let work_done_cb = self.work_done_cb.clone(); let work_queue = self.work_queue.clone(); let state = self.state.clone(); let options = self.options.clone(); let awaiting_job_event = self.awaiting_job_event.clone(); let framepool_hint_cb = self.framepool_hint_cb.clone(); let alloc_cb = self.alloc_cb.clone(); self.worker_thread = Some(thread::spawn(move || { let worker = W::new( input_fourcc, output_fourcc, awaiting_job_event, error_cb.clone(), work_done_cb, work_queue, state.clone(), framepool_hint_cb, alloc_cb, options, ); match worker { Ok(mut worker) => worker.process_loop(), Err(msg) => { log::debug!("Error instantiating C2Worker {}", msg); *state.lock().unwrap() = C2State::C2Error; (*error_cb.lock().unwrap())(C2Status::C2BadValue); } }; })); C2Status::C2Ok } // Stop the decoder/encoder and abandon in-flight work. // C2's reset() function is equivalent for our purposes. // Note that in event of error, stop() must be called before we can start() // again. This is to ensure we clear out the work queue. // State will be C2Stopped after this call. pub fn stop(&mut self) -> C2Status { *self.state.lock().unwrap() = C2State::C2Stopped; self.work_queue.lock().unwrap().drain(..); self.awaiting_job_event.write(1).unwrap(); let mut worker_thread: Option> = None; std::mem::swap(&mut worker_thread, &mut self.worker_thread); self.worker_thread = match worker_thread { Some(worker_thread) => { let _ = worker_thread.join(); None } None => None, }; C2Status::C2Ok } // Add work to the work queue. // State must be C2Running or this function is invalid. // State will remain C2Running. pub fn queue(&mut self, work_items: Vec) -> C2Status { if *self.state.lock().unwrap() != C2State::C2Running { (*self.error_cb.lock().unwrap())(C2Status::C2BadState); return C2Status::C2BadState; } self.work_queue.lock().unwrap().extend(work_items.into_iter()); self.awaiting_job_event.write(1).unwrap(); C2Status::C2Ok } // Flush work from the queue and return it as |flushed_work|. // State will not change after this call. // TODO: Support different flush modes. pub fn flush(&mut self, flushed_work: &mut Vec) -> C2Status { if *self.state.lock().unwrap() != C2State::C2Running { (*self.error_cb.lock().unwrap())(C2Status::C2BadState); return C2Status::C2BadState; } { let mut work_queue = self.work_queue.lock().unwrap(); let mut tmp = work_queue.drain(..).collect::>(); flushed_work.append(&mut tmp); // Note that we don't just call drain() because we want to guarantee atomicity with respect // to the work_queue eviction. let mut drain_job: J = Default::default(); drain_job.set_drain(DrainMode::NoEOSDrain); work_queue.push_back(drain_job); } C2Status::C2Ok } // Signal to the decoder/encoder that it does not need to wait for // additional work to begin processing. This is an unusual name for this // function, but it is the convention that C2 uses. // State must be C2Running or this function is invalid. // State will remain C2Running until the last frames drain, at which point // the state will change to C2Stopped. // TODO: Support different drain modes. pub fn drain(&mut self, mode: DrainMode) -> C2Status { if *self.state.lock().unwrap() != C2State::C2Running { (*self.error_cb.lock().unwrap())(C2Status::C2BadState); return C2Status::C2BadState; } let mut drain_job: J = Default::default(); drain_job.set_drain(mode); self.work_queue.lock().unwrap().push_back(drain_job); self.awaiting_job_event.write(1).unwrap(); C2Status::C2Ok } } // Instead of C2's release() function, we implement Drop and use RAII to // accomplish the same thing impl Drop for C2Wrapper where J: Send + Default + Job + 'static, W: C2Worker, { fn drop(&mut self) { self.stop(); } }