1 // Copyright 2025 The ChromiumOS Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 use nix::errno::Errno; 6 use nix::sys::epoll::Epoll; 7 use nix::sys::epoll::EpollCreateFlags; 8 use nix::sys::epoll::EpollEvent; 9 use nix::sys::epoll::EpollFlags; 10 use nix::sys::epoll::EpollTimeout; 11 use nix::sys::eventfd::EventFd; 12 13 use std::collections::VecDeque; 14 use std::marker::PhantomData; 15 use std::os::fd::AsFd; 16 #[cfg(feature = "vaapi")] 17 use std::path::PathBuf; 18 use std::sync::Arc; 19 use std::sync::Mutex; 20 use thiserror::Error; 21 22 use crate::c2_wrapper::C2DecodeJob; 23 use crate::c2_wrapper::C2State; 24 use crate::c2_wrapper::C2Status; 25 use crate::c2_wrapper::C2Worker; 26 use crate::c2_wrapper::DrainMode; 27 use crate::c2_wrapper::Job; 28 use crate::decoder::stateless::DecodeError; 29 use crate::decoder::stateless::DynStatelessVideoDecoder; 30 use crate::decoder::DecoderEvent; 31 use crate::decoder::StreamInfo; 32 use crate::image_processing::convert_video_frame; 33 use crate::video_frame::frame_pool::FramePool; 34 use crate::video_frame::frame_pool::PooledVideoFrame; 35 #[cfg(feature = "vaapi")] 36 use crate::video_frame::gbm_video_frame::{GbmDevice, GbmUsage, GbmVideoFrame}; 37 #[cfg(feature = "v4l2")] 38 use crate::video_frame::v4l2_mmap_video_frame::V4l2MmapVideoFrame; 39 use crate::video_frame::VideoFrame; 40 use crate::EncodedFormat; 41 use crate::Fourcc; 42 43 #[derive(Debug, Error)] 44 pub enum C2DecoderPollErrorWrapper { 45 #[error("failed to create Epoll: {0}")] 46 Epoll(Errno), 47 #[error("failed to add poll FDs to Epoll: {0}")] 48 EpollAdd(Errno), 49 } 50 51 pub trait C2DecoderBackend { 52 type DecoderOptions: Clone + Send + 'static; 53 new(options: Self::DecoderOptions) -> Result<Self, String> where Self: Sized54 fn new(options: Self::DecoderOptions) -> Result<Self, String> 55 where 56 Self: Sized; supported_output_formats(&self) -> Result<Vec<Fourcc>, String>57 fn supported_output_formats(&self) -> Result<Vec<Fourcc>, String>; 58 // TODO: Support stateful video decoders. get_decoder<V: VideoFrame + 'static>( &mut self, input_format: EncodedFormat, ) -> Result<DynStatelessVideoDecoder<V>, String>59 fn get_decoder<V: VideoFrame + 'static>( 60 &mut self, 61 input_format: EncodedFormat, 62 ) -> Result<DynStatelessVideoDecoder<V>, String>; 63 } 64 65 #[cfg(feature = "vaapi")] 66 type AuxiliaryVideoFrame = GbmVideoFrame; 67 #[cfg(feature = "v4l2")] 68 type AuxiliaryVideoFrame = V4l2MmapVideoFrame; 69 70 // An "importing decoder" can directly import the DMA bufs we are getting, while a "converting 71 // decoder" is used for performing image processing routines to convert between the video hardware 72 // output and a pixel format that can be consumed by the GPU and display controller. 73 // TODO: Come up with a better name for these? 74 enum C2Decoder<V: VideoFrame> { 75 ImportingDecoder(DynStatelessVideoDecoder<V>), 76 ConvertingDecoder(DynStatelessVideoDecoder<PooledVideoFrame<AuxiliaryVideoFrame>>), 77 } 78 79 pub struct C2DecoderWorker<V, B> 80 where 81 V: VideoFrame, 82 B: C2DecoderBackend, 83 { 84 decoder: C2Decoder<V>, 85 epoll_fd: Epoll, 86 awaiting_job_event: Arc<EventFd>, 87 auxiliary_frame_pool: Option<FramePool<AuxiliaryVideoFrame>>, 88 error_cb: Arc<Mutex<dyn FnMut(C2Status) + Send + 'static>>, 89 work_done_cb: Arc<Mutex<dyn FnMut(C2DecodeJob<V>) + Send + 'static>>, 90 framepool_hint_cb: Arc<Mutex<dyn FnMut(StreamInfo) + Send + 'static>>, 91 alloc_cb: Arc<Mutex<dyn FnMut() -> Option<V> + Send + 'static>>, 92 work_queue: Arc<Mutex<VecDeque<C2DecodeJob<V>>>>, 93 state: Arc<Mutex<C2State>>, 94 _phantom: PhantomData<B>, 95 } 96 97 impl<V, B> C2DecoderWorker<V, B> 98 where 99 V: VideoFrame, 100 B: C2DecoderBackend, 101 { 102 // Processes events from the decoder. Primarily these are frame decoded events and DRCs. check_events(&mut self)103 fn check_events(&mut self) { 104 loop { 105 let stream_info = match &self.decoder { 106 C2Decoder::ImportingDecoder(decoder) => decoder.stream_info().map(|x| x.clone()), 107 C2Decoder::ConvertingDecoder(decoder) => decoder.stream_info().map(|x| x.clone()), 108 }; 109 match &mut self.decoder { 110 C2Decoder::ImportingDecoder(decoder) => match decoder.next_event() { 111 Some(DecoderEvent::FrameReady(frame)) => { 112 frame.sync().unwrap(); 113 (*self.work_done_cb.lock().unwrap())(C2DecodeJob { 114 output: Some(frame.video_frame()), 115 timestamp: frame.timestamp(), 116 ..Default::default() 117 }); 118 } 119 Some(DecoderEvent::FormatChanged) => match stream_info { 120 Some(stream_info) => { 121 (*self.framepool_hint_cb.lock().unwrap())(stream_info.clone()); 122 } 123 None => { 124 log::debug!("Could not get stream info after format change!"); 125 *self.state.lock().unwrap() = C2State::C2Error; 126 (*self.error_cb.lock().unwrap())(C2Status::C2BadValue); 127 } 128 }, 129 _ => break, 130 }, 131 C2Decoder::ConvertingDecoder(decoder) => match decoder.next_event() { 132 Some(DecoderEvent::FrameReady(frame)) => { 133 frame.sync().unwrap(); 134 let mut dst_frame = 135 (*self.alloc_cb.lock().unwrap())().expect("Allocation failed!"); 136 let src_frame = &*frame.video_frame(); 137 if let Err(err) = convert_video_frame(src_frame, &mut dst_frame) { 138 log::debug!("Error converting VideoFrame! {err}"); 139 *self.state.lock().unwrap() = C2State::C2Error; 140 (*self.error_cb.lock().unwrap())(C2Status::C2BadValue); 141 } 142 (*self.work_done_cb.lock().unwrap())(C2DecodeJob { 143 output: Some(Arc::new(dst_frame)), 144 timestamp: frame.timestamp(), 145 ..Default::default() 146 }); 147 } 148 Some(DecoderEvent::FormatChanged) => match stream_info { 149 Some(stream_info) => { 150 (*self.framepool_hint_cb.lock().unwrap())(stream_info.clone()); 151 self.auxiliary_frame_pool.as_mut().unwrap().resize(&stream_info); 152 } 153 None => { 154 log::debug!("Could not get stream info after format change!"); 155 *self.state.lock().unwrap() = C2State::C2Error; 156 (*self.error_cb.lock().unwrap())(C2Status::C2BadValue); 157 } 158 }, 159 _ => break, 160 }, 161 } 162 } 163 } 164 } 165 166 impl<V, B> C2Worker<C2DecodeJob<V>> for C2DecoderWorker<V, B> 167 where 168 V: VideoFrame, 169 B: C2DecoderBackend, 170 { 171 type Options = <B as C2DecoderBackend>::DecoderOptions; 172 new( input_fourcc: Fourcc, output_fourcc: Fourcc, awaiting_job_event: Arc<EventFd>, error_cb: Arc<Mutex<dyn FnMut(C2Status) + Send + 'static>>, work_done_cb: Arc<Mutex<dyn FnMut(C2DecodeJob<V>) + Send + 'static>>, work_queue: Arc<Mutex<VecDeque<C2DecodeJob<V>>>>, state: Arc<Mutex<C2State>>, framepool_hint_cb: Arc<Mutex<dyn FnMut(StreamInfo) + Send + 'static>>, alloc_cb: Arc<Mutex<dyn FnMut() -> Option<V> + Send + 'static>>, options: Self::Options, ) -> Result<Self, String>173 fn new( 174 input_fourcc: Fourcc, 175 output_fourcc: Fourcc, 176 awaiting_job_event: Arc<EventFd>, 177 error_cb: Arc<Mutex<dyn FnMut(C2Status) + Send + 'static>>, 178 work_done_cb: Arc<Mutex<dyn FnMut(C2DecodeJob<V>) + Send + 'static>>, 179 work_queue: Arc<Mutex<VecDeque<C2DecodeJob<V>>>>, 180 state: Arc<Mutex<C2State>>, 181 framepool_hint_cb: Arc<Mutex<dyn FnMut(StreamInfo) + Send + 'static>>, 182 alloc_cb: Arc<Mutex<dyn FnMut() -> Option<V> + Send + 'static>>, 183 options: Self::Options, 184 ) -> Result<Self, String> { 185 let mut backend = B::new(options)?; 186 let backend_fourccs = backend.supported_output_formats()?; 187 let (auxiliary_frame_pool, decoder) = if backend_fourccs.contains(&output_fourcc) { 188 ( 189 None, 190 C2Decoder::ImportingDecoder( 191 backend.get_decoder(EncodedFormat::from(input_fourcc))?, 192 ), 193 ) 194 } else { 195 #[cfg(feature = "vaapi")] 196 { 197 let gbm_device = Arc::new( 198 GbmDevice::open(PathBuf::from("/dev/dri/renderD128")) 199 .expect("Could not open GBM device!"), 200 ); 201 let framepool = FramePool::new(move |stream_info: &StreamInfo| { 202 // TODO: Query the driver for these alignment params. 203 <Arc<GbmDevice> as Clone>::clone(&gbm_device) 204 .new_frame( 205 Fourcc::from(stream_info.format), 206 stream_info.display_resolution.clone(), 207 stream_info.coded_resolution.clone(), 208 GbmUsage::Decode, 209 ) 210 .expect("Could not allocate frame for auxiliary frame pool!") 211 }); 212 ( 213 Some(framepool), 214 C2Decoder::ConvertingDecoder( 215 backend.get_decoder(EncodedFormat::from(input_fourcc))?, 216 ), 217 ) 218 } 219 #[cfg(feature = "v4l2")] 220 { 221 let framepool = FramePool::new(move |stream_info: &StreamInfo| { 222 V4l2MmapVideoFrame::new( 223 Fourcc::from(stream_info.format), 224 stream_info.display_resolution.clone(), 225 ) 226 }); 227 ( 228 Some(framepool), 229 C2Decoder::ConvertingDecoder( 230 backend.get_decoder(EncodedFormat::from(input_fourcc))?, 231 ), 232 ) 233 } 234 }; 235 Ok(Self { 236 decoder: decoder, 237 auxiliary_frame_pool: auxiliary_frame_pool, 238 epoll_fd: Epoll::new(EpollCreateFlags::empty()) 239 .map_err(C2DecoderPollErrorWrapper::Epoll) 240 .unwrap(), 241 awaiting_job_event: awaiting_job_event, 242 error_cb: error_cb, 243 work_done_cb: work_done_cb, 244 framepool_hint_cb: framepool_hint_cb, 245 alloc_cb: alloc_cb, 246 work_queue: work_queue, 247 state: state, 248 _phantom: Default::default(), 249 }) 250 } 251 process_loop(&mut self)252 fn process_loop(&mut self) { 253 self.epoll_fd = Epoll::new(EpollCreateFlags::empty()) 254 .map_err(C2DecoderPollErrorWrapper::Epoll) 255 .unwrap(); 256 let _ = self 257 .epoll_fd 258 .add( 259 match &self.decoder { 260 C2Decoder::ImportingDecoder(decoder) => decoder.poll_fd(), 261 C2Decoder::ConvertingDecoder(decoder) => decoder.poll_fd(), 262 }, 263 EpollEvent::new(EpollFlags::EPOLLIN, 1), 264 ) 265 .map_err(C2DecoderPollErrorWrapper::EpollAdd); 266 self.epoll_fd 267 .add(self.awaiting_job_event.as_fd(), EpollEvent::new(EpollFlags::EPOLLIN, 2)) 268 .map_err(C2DecoderPollErrorWrapper::EpollAdd) 269 .unwrap(); 270 271 while *self.state.lock().unwrap() == C2State::C2Running { 272 // Poll for decoder events or pending job events. 273 let mut events = [EpollEvent::empty()]; 274 let _nb_fds = self.epoll_fd.wait(&mut events, EpollTimeout::NONE).unwrap(); 275 276 if events == [EpollEvent::new(EpollFlags::EPOLLIN, 2)] { 277 self.awaiting_job_event.read().unwrap(); 278 } 279 280 // We want to try sending compressed buffers to the decoder regardless of what event 281 // woke us up, because we either have new work, or we might more output buffers 282 // available. 283 let mut possible_job = (*self.work_queue.lock().unwrap()).pop_front(); 284 while let Some(mut job) = possible_job { 285 let bitstream = job.input.as_slice(); 286 let decode_result = if !job.input.is_empty() { 287 match &mut self.decoder { 288 C2Decoder::ImportingDecoder(decoder) => decoder.decode( 289 job.timestamp, 290 bitstream, 291 &mut *self.alloc_cb.lock().unwrap(), 292 ), 293 C2Decoder::ConvertingDecoder(decoder) => { 294 decoder.decode(job.timestamp, bitstream, &mut || { 295 self.auxiliary_frame_pool.as_mut().unwrap().alloc() 296 }) 297 } 298 } 299 } else { 300 // This generally indicates a drain signal. Drain signals can be associated 301 // with specific C2Work objects, or they can be a standalone call to the 302 // drain() function of the C2Component, so we have to accommodate both. 303 Ok(0) 304 }; 305 match decode_result { 306 Ok(num_bytes) => { 307 if num_bytes != job.input.len() { 308 job.input = (&job.input[num_bytes..]).to_vec(); 309 (*self.work_queue.lock().unwrap()).push_front(job); 310 } else if job.get_drain() != DrainMode::NoDrain { 311 let flush_result = match &mut self.decoder { 312 C2Decoder::ImportingDecoder(decoder) => decoder.flush(), 313 C2Decoder::ConvertingDecoder(decoder) => decoder.flush(), 314 }; 315 if let Err(_) = flush_result { 316 log::debug!("Error handling drain request!"); 317 *self.state.lock().unwrap() = C2State::C2Error; 318 (*self.error_cb.lock().unwrap())(C2Status::C2BadValue); 319 } else { 320 self.check_events(); 321 if job.get_drain() == DrainMode::EOSDrain { 322 *self.state.lock().unwrap() = C2State::C2Stopped; 323 (*self.work_done_cb.lock().unwrap())(C2DecodeJob { 324 timestamp: job.timestamp, 325 drain: DrainMode::EOSDrain, 326 ..Default::default() 327 }); 328 } 329 } 330 break; 331 } 332 } 333 Err(DecodeError::NotEnoughOutputBuffers(_) | DecodeError::CheckEvents) => { 334 (*self.work_queue.lock().unwrap()).push_front(job); 335 break; 336 } 337 Err(e) => { 338 log::debug!("Unhandled error message from decoder {e:?}"); 339 *self.state.lock().unwrap() = C2State::C2Error; 340 (*self.error_cb.lock().unwrap())(C2Status::C2BadValue); 341 break; 342 } 343 } 344 possible_job = (*self.work_queue.lock().unwrap()).pop_front(); 345 } 346 self.check_events(); 347 } 348 } 349 } 350