// Copyright 2025 The ChromiumOS Authors // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. use nix::errno::Errno; use nix::sys::epoll::Epoll; use nix::sys::epoll::EpollCreateFlags; use nix::sys::epoll::EpollEvent; use nix::sys::epoll::EpollFlags; use nix::sys::epoll::EpollTimeout; use nix::sys::eventfd::EventFd; use std::collections::VecDeque; use std::marker::PhantomData; use std::os::fd::AsFd; #[cfg(feature = "vaapi")] use std::path::PathBuf; use std::sync::Arc; use std::sync::Mutex; use thiserror::Error; use crate::c2_wrapper::C2DecodeJob; use crate::c2_wrapper::C2State; use crate::c2_wrapper::C2Status; use crate::c2_wrapper::C2Worker; use crate::c2_wrapper::DrainMode; use crate::c2_wrapper::Job; use crate::decoder::stateless::DecodeError; use crate::decoder::stateless::DynStatelessVideoDecoder; use crate::decoder::DecoderEvent; use crate::decoder::StreamInfo; use crate::image_processing::convert_video_frame; use crate::video_frame::frame_pool::FramePool; use crate::video_frame::frame_pool::PooledVideoFrame; #[cfg(feature = "vaapi")] use crate::video_frame::gbm_video_frame::{GbmDevice, GbmUsage, GbmVideoFrame}; #[cfg(feature = "v4l2")] use crate::video_frame::v4l2_mmap_video_frame::V4l2MmapVideoFrame; use crate::video_frame::VideoFrame; use crate::EncodedFormat; use crate::Fourcc; #[derive(Debug, Error)] pub enum C2DecoderPollErrorWrapper { #[error("failed to create Epoll: {0}")] Epoll(Errno), #[error("failed to add poll FDs to Epoll: {0}")] EpollAdd(Errno), } pub trait C2DecoderBackend { type DecoderOptions: Clone + Send + 'static; fn new(options: Self::DecoderOptions) -> Result where Self: Sized; fn supported_output_formats(&self) -> Result, String>; // TODO: Support stateful video decoders. fn get_decoder( &mut self, input_format: EncodedFormat, ) -> Result, String>; } #[cfg(feature = "vaapi")] type AuxiliaryVideoFrame = GbmVideoFrame; #[cfg(feature = "v4l2")] type AuxiliaryVideoFrame = V4l2MmapVideoFrame; // An "importing decoder" can directly import the DMA bufs we are getting, while a "converting // decoder" is used for performing image processing routines to convert between the video hardware // output and a pixel format that can be consumed by the GPU and display controller. // TODO: Come up with a better name for these? enum C2Decoder { ImportingDecoder(DynStatelessVideoDecoder), ConvertingDecoder(DynStatelessVideoDecoder>), } pub struct C2DecoderWorker where V: VideoFrame, B: C2DecoderBackend, { decoder: C2Decoder, epoll_fd: Epoll, awaiting_job_event: Arc, auxiliary_frame_pool: Option>, error_cb: Arc>, work_done_cb: Arc) + Send + 'static>>, framepool_hint_cb: Arc>, alloc_cb: Arc Option + Send + 'static>>, work_queue: Arc>>>, state: Arc>, _phantom: PhantomData, } impl C2DecoderWorker where V: VideoFrame, B: C2DecoderBackend, { // Processes events from the decoder. Primarily these are frame decoded events and DRCs. fn check_events(&mut self) { loop { let stream_info = match &self.decoder { C2Decoder::ImportingDecoder(decoder) => decoder.stream_info().map(|x| x.clone()), C2Decoder::ConvertingDecoder(decoder) => decoder.stream_info().map(|x| x.clone()), }; match &mut self.decoder { C2Decoder::ImportingDecoder(decoder) => match decoder.next_event() { Some(DecoderEvent::FrameReady(frame)) => { frame.sync().unwrap(); (*self.work_done_cb.lock().unwrap())(C2DecodeJob { output: Some(frame.video_frame()), timestamp: frame.timestamp(), ..Default::default() }); } Some(DecoderEvent::FormatChanged) => match stream_info { Some(stream_info) => { (*self.framepool_hint_cb.lock().unwrap())(stream_info.clone()); } None => { log::debug!("Could not get stream info after format change!"); *self.state.lock().unwrap() = C2State::C2Error; (*self.error_cb.lock().unwrap())(C2Status::C2BadValue); } }, _ => break, }, C2Decoder::ConvertingDecoder(decoder) => match decoder.next_event() { Some(DecoderEvent::FrameReady(frame)) => { frame.sync().unwrap(); let mut dst_frame = (*self.alloc_cb.lock().unwrap())().expect("Allocation failed!"); let src_frame = &*frame.video_frame(); if let Err(err) = convert_video_frame(src_frame, &mut dst_frame) { log::debug!("Error converting VideoFrame! {err}"); *self.state.lock().unwrap() = C2State::C2Error; (*self.error_cb.lock().unwrap())(C2Status::C2BadValue); } (*self.work_done_cb.lock().unwrap())(C2DecodeJob { output: Some(Arc::new(dst_frame)), timestamp: frame.timestamp(), ..Default::default() }); } Some(DecoderEvent::FormatChanged) => match stream_info { Some(stream_info) => { (*self.framepool_hint_cb.lock().unwrap())(stream_info.clone()); self.auxiliary_frame_pool.as_mut().unwrap().resize(&stream_info); } None => { log::debug!("Could not get stream info after format change!"); *self.state.lock().unwrap() = C2State::C2Error; (*self.error_cb.lock().unwrap())(C2Status::C2BadValue); } }, _ => break, }, } } } } impl C2Worker> for C2DecoderWorker where V: VideoFrame, B: C2DecoderBackend, { type Options = ::DecoderOptions; fn new( input_fourcc: Fourcc, output_fourcc: Fourcc, awaiting_job_event: Arc, error_cb: Arc>, work_done_cb: Arc) + Send + 'static>>, work_queue: Arc>>>, state: Arc>, framepool_hint_cb: Arc>, alloc_cb: Arc Option + Send + 'static>>, options: Self::Options, ) -> Result { let mut backend = B::new(options)?; let backend_fourccs = backend.supported_output_formats()?; let (auxiliary_frame_pool, decoder) = if backend_fourccs.contains(&output_fourcc) { ( None, C2Decoder::ImportingDecoder( backend.get_decoder(EncodedFormat::from(input_fourcc))?, ), ) } else { #[cfg(feature = "vaapi")] { let gbm_device = Arc::new( GbmDevice::open(PathBuf::from("/dev/dri/renderD128")) .expect("Could not open GBM device!"), ); let framepool = FramePool::new(move |stream_info: &StreamInfo| { // TODO: Query the driver for these alignment params. as Clone>::clone(&gbm_device) .new_frame( Fourcc::from(stream_info.format), stream_info.display_resolution.clone(), stream_info.coded_resolution.clone(), GbmUsage::Decode, ) .expect("Could not allocate frame for auxiliary frame pool!") }); ( Some(framepool), C2Decoder::ConvertingDecoder( backend.get_decoder(EncodedFormat::from(input_fourcc))?, ), ) } #[cfg(feature = "v4l2")] { let framepool = FramePool::new(move |stream_info: &StreamInfo| { V4l2MmapVideoFrame::new( Fourcc::from(stream_info.format), stream_info.display_resolution.clone(), ) }); ( Some(framepool), C2Decoder::ConvertingDecoder( backend.get_decoder(EncodedFormat::from(input_fourcc))?, ), ) } }; Ok(Self { decoder: decoder, auxiliary_frame_pool: auxiliary_frame_pool, epoll_fd: Epoll::new(EpollCreateFlags::empty()) .map_err(C2DecoderPollErrorWrapper::Epoll) .unwrap(), awaiting_job_event: awaiting_job_event, error_cb: error_cb, work_done_cb: work_done_cb, framepool_hint_cb: framepool_hint_cb, alloc_cb: alloc_cb, work_queue: work_queue, state: state, _phantom: Default::default(), }) } fn process_loop(&mut self) { self.epoll_fd = Epoll::new(EpollCreateFlags::empty()) .map_err(C2DecoderPollErrorWrapper::Epoll) .unwrap(); let _ = self .epoll_fd .add( match &self.decoder { C2Decoder::ImportingDecoder(decoder) => decoder.poll_fd(), C2Decoder::ConvertingDecoder(decoder) => decoder.poll_fd(), }, EpollEvent::new(EpollFlags::EPOLLIN, 1), ) .map_err(C2DecoderPollErrorWrapper::EpollAdd); self.epoll_fd .add(self.awaiting_job_event.as_fd(), EpollEvent::new(EpollFlags::EPOLLIN, 2)) .map_err(C2DecoderPollErrorWrapper::EpollAdd) .unwrap(); while *self.state.lock().unwrap() == C2State::C2Running { // Poll for decoder events or pending job events. let mut events = [EpollEvent::empty()]; let _nb_fds = self.epoll_fd.wait(&mut events, EpollTimeout::NONE).unwrap(); if events == [EpollEvent::new(EpollFlags::EPOLLIN, 2)] { self.awaiting_job_event.read().unwrap(); } // We want to try sending compressed buffers to the decoder regardless of what event // woke us up, because we either have new work, or we might more output buffers // available. let mut possible_job = (*self.work_queue.lock().unwrap()).pop_front(); while let Some(mut job) = possible_job { let bitstream = job.input.as_slice(); let decode_result = if !job.input.is_empty() { match &mut self.decoder { C2Decoder::ImportingDecoder(decoder) => decoder.decode( job.timestamp, bitstream, &mut *self.alloc_cb.lock().unwrap(), ), C2Decoder::ConvertingDecoder(decoder) => { decoder.decode(job.timestamp, bitstream, &mut || { self.auxiliary_frame_pool.as_mut().unwrap().alloc() }) } } } else { // This generally indicates a drain signal. Drain signals can be associated // with specific C2Work objects, or they can be a standalone call to the // drain() function of the C2Component, so we have to accommodate both. Ok(0) }; match decode_result { Ok(num_bytes) => { if num_bytes != job.input.len() { job.input = (&job.input[num_bytes..]).to_vec(); (*self.work_queue.lock().unwrap()).push_front(job); } else if job.get_drain() != DrainMode::NoDrain { let flush_result = match &mut self.decoder { C2Decoder::ImportingDecoder(decoder) => decoder.flush(), C2Decoder::ConvertingDecoder(decoder) => decoder.flush(), }; if let Err(_) = flush_result { log::debug!("Error handling drain request!"); *self.state.lock().unwrap() = C2State::C2Error; (*self.error_cb.lock().unwrap())(C2Status::C2BadValue); } else { self.check_events(); if job.get_drain() == DrainMode::EOSDrain { *self.state.lock().unwrap() = C2State::C2Stopped; (*self.work_done_cb.lock().unwrap())(C2DecodeJob { timestamp: job.timestamp, drain: DrainMode::EOSDrain, ..Default::default() }); } } break; } } Err(DecodeError::NotEnoughOutputBuffers(_) | DecodeError::CheckEvents) => { (*self.work_queue.lock().unwrap()).push_front(job); break; } Err(e) => { log::debug!("Unhandled error message from decoder {e:?}"); *self.state.lock().unwrap() = C2State::C2Error; (*self.error_cb.lock().unwrap())(C2Status::C2BadValue); break; } } possible_job = (*self.work_queue.lock().unwrap()).pop_front(); } self.check_events(); } } }