• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2025 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use nix::errno::Errno;
6 use nix::sys::epoll::Epoll;
7 use nix::sys::epoll::EpollCreateFlags;
8 use nix::sys::epoll::EpollEvent;
9 use nix::sys::epoll::EpollFlags;
10 use nix::sys::epoll::EpollTimeout;
11 use nix::sys::eventfd::EventFd;
12 
13 use std::collections::VecDeque;
14 use std::marker::PhantomData;
15 use std::os::fd::AsFd;
16 #[cfg(feature = "vaapi")]
17 use std::path::PathBuf;
18 use std::sync::Arc;
19 use std::sync::Mutex;
20 use thiserror::Error;
21 
22 use crate::c2_wrapper::C2DecodeJob;
23 use crate::c2_wrapper::C2State;
24 use crate::c2_wrapper::C2Status;
25 use crate::c2_wrapper::C2Worker;
26 use crate::c2_wrapper::DrainMode;
27 use crate::c2_wrapper::Job;
28 use crate::decoder::stateless::DecodeError;
29 use crate::decoder::stateless::DynStatelessVideoDecoder;
30 use crate::decoder::DecoderEvent;
31 use crate::decoder::StreamInfo;
32 use crate::image_processing::convert_video_frame;
33 use crate::video_frame::frame_pool::FramePool;
34 use crate::video_frame::frame_pool::PooledVideoFrame;
35 #[cfg(feature = "vaapi")]
36 use crate::video_frame::gbm_video_frame::{GbmDevice, GbmUsage, GbmVideoFrame};
37 #[cfg(feature = "v4l2")]
38 use crate::video_frame::v4l2_mmap_video_frame::V4l2MmapVideoFrame;
39 use crate::video_frame::VideoFrame;
40 use crate::EncodedFormat;
41 use crate::Fourcc;
42 
43 #[derive(Debug, Error)]
44 pub enum C2DecoderPollErrorWrapper {
45     #[error("failed to create Epoll: {0}")]
46     Epoll(Errno),
47     #[error("failed to add poll FDs to Epoll: {0}")]
48     EpollAdd(Errno),
49 }
50 
51 pub trait C2DecoderBackend {
52     type DecoderOptions: Clone + Send + 'static;
53 
new(options: Self::DecoderOptions) -> Result<Self, String> where Self: Sized54     fn new(options: Self::DecoderOptions) -> Result<Self, String>
55     where
56         Self: Sized;
supported_output_formats(&self) -> Result<Vec<Fourcc>, String>57     fn supported_output_formats(&self) -> Result<Vec<Fourcc>, String>;
58     // TODO: Support stateful video decoders.
get_decoder<V: VideoFrame + 'static>( &mut self, input_format: EncodedFormat, ) -> Result<DynStatelessVideoDecoder<V>, String>59     fn get_decoder<V: VideoFrame + 'static>(
60         &mut self,
61         input_format: EncodedFormat,
62     ) -> Result<DynStatelessVideoDecoder<V>, String>;
63 }
64 
65 #[cfg(feature = "vaapi")]
66 type AuxiliaryVideoFrame = GbmVideoFrame;
67 #[cfg(feature = "v4l2")]
68 type AuxiliaryVideoFrame = V4l2MmapVideoFrame;
69 
70 // An "importing decoder" can directly import the DMA bufs we are getting, while a "converting
71 // decoder" is used for performing image processing routines to convert between the video hardware
72 // output and a pixel format that can be consumed by the GPU and display controller.
73 // TODO: Come up with a better name for these?
74 enum C2Decoder<V: VideoFrame> {
75     ImportingDecoder(DynStatelessVideoDecoder<V>),
76     ConvertingDecoder(DynStatelessVideoDecoder<PooledVideoFrame<AuxiliaryVideoFrame>>),
77 }
78 
79 pub struct C2DecoderWorker<V, B>
80 where
81     V: VideoFrame,
82     B: C2DecoderBackend,
83 {
84     decoder: C2Decoder<V>,
85     epoll_fd: Epoll,
86     awaiting_job_event: Arc<EventFd>,
87     auxiliary_frame_pool: Option<FramePool<AuxiliaryVideoFrame>>,
88     error_cb: Arc<Mutex<dyn FnMut(C2Status) + Send + 'static>>,
89     work_done_cb: Arc<Mutex<dyn FnMut(C2DecodeJob<V>) + Send + 'static>>,
90     framepool_hint_cb: Arc<Mutex<dyn FnMut(StreamInfo) + Send + 'static>>,
91     alloc_cb: Arc<Mutex<dyn FnMut() -> Option<V> + Send + 'static>>,
92     work_queue: Arc<Mutex<VecDeque<C2DecodeJob<V>>>>,
93     state: Arc<Mutex<C2State>>,
94     _phantom: PhantomData<B>,
95 }
96 
97 impl<V, B> C2DecoderWorker<V, B>
98 where
99     V: VideoFrame,
100     B: C2DecoderBackend,
101 {
102     // Processes events from the decoder. Primarily these are frame decoded events and DRCs.
check_events(&mut self)103     fn check_events(&mut self) {
104         loop {
105             let stream_info = match &self.decoder {
106                 C2Decoder::ImportingDecoder(decoder) => decoder.stream_info().map(|x| x.clone()),
107                 C2Decoder::ConvertingDecoder(decoder) => decoder.stream_info().map(|x| x.clone()),
108             };
109             match &mut self.decoder {
110                 C2Decoder::ImportingDecoder(decoder) => match decoder.next_event() {
111                     Some(DecoderEvent::FrameReady(frame)) => {
112                         frame.sync().unwrap();
113                         (*self.work_done_cb.lock().unwrap())(C2DecodeJob {
114                             output: Some(frame.video_frame()),
115                             timestamp: frame.timestamp(),
116                             ..Default::default()
117                         });
118                     }
119                     Some(DecoderEvent::FormatChanged) => match stream_info {
120                         Some(stream_info) => {
121                             (*self.framepool_hint_cb.lock().unwrap())(stream_info.clone());
122                         }
123                         None => {
124                             log::debug!("Could not get stream info after format change!");
125                             *self.state.lock().unwrap() = C2State::C2Error;
126                             (*self.error_cb.lock().unwrap())(C2Status::C2BadValue);
127                         }
128                     },
129                     _ => break,
130                 },
131                 C2Decoder::ConvertingDecoder(decoder) => match decoder.next_event() {
132                     Some(DecoderEvent::FrameReady(frame)) => {
133                         frame.sync().unwrap();
134                         let mut dst_frame =
135                             (*self.alloc_cb.lock().unwrap())().expect("Allocation failed!");
136                         let src_frame = &*frame.video_frame();
137                         if let Err(err) = convert_video_frame(src_frame, &mut dst_frame) {
138                             log::debug!("Error converting VideoFrame! {err}");
139                             *self.state.lock().unwrap() = C2State::C2Error;
140                             (*self.error_cb.lock().unwrap())(C2Status::C2BadValue);
141                         }
142                         (*self.work_done_cb.lock().unwrap())(C2DecodeJob {
143                             output: Some(Arc::new(dst_frame)),
144                             timestamp: frame.timestamp(),
145                             ..Default::default()
146                         });
147                     }
148                     Some(DecoderEvent::FormatChanged) => match stream_info {
149                         Some(stream_info) => {
150                             (*self.framepool_hint_cb.lock().unwrap())(stream_info.clone());
151                             self.auxiliary_frame_pool.as_mut().unwrap().resize(&stream_info);
152                         }
153                         None => {
154                             log::debug!("Could not get stream info after format change!");
155                             *self.state.lock().unwrap() = C2State::C2Error;
156                             (*self.error_cb.lock().unwrap())(C2Status::C2BadValue);
157                         }
158                     },
159                     _ => break,
160                 },
161             }
162         }
163     }
164 }
165 
166 impl<V, B> C2Worker<C2DecodeJob<V>> for C2DecoderWorker<V, B>
167 where
168     V: VideoFrame,
169     B: C2DecoderBackend,
170 {
171     type Options = <B as C2DecoderBackend>::DecoderOptions;
172 
new( input_fourcc: Fourcc, output_fourcc: Fourcc, awaiting_job_event: Arc<EventFd>, error_cb: Arc<Mutex<dyn FnMut(C2Status) + Send + 'static>>, work_done_cb: Arc<Mutex<dyn FnMut(C2DecodeJob<V>) + Send + 'static>>, work_queue: Arc<Mutex<VecDeque<C2DecodeJob<V>>>>, state: Arc<Mutex<C2State>>, framepool_hint_cb: Arc<Mutex<dyn FnMut(StreamInfo) + Send + 'static>>, alloc_cb: Arc<Mutex<dyn FnMut() -> Option<V> + Send + 'static>>, options: Self::Options, ) -> Result<Self, String>173     fn new(
174         input_fourcc: Fourcc,
175         output_fourcc: Fourcc,
176         awaiting_job_event: Arc<EventFd>,
177         error_cb: Arc<Mutex<dyn FnMut(C2Status) + Send + 'static>>,
178         work_done_cb: Arc<Mutex<dyn FnMut(C2DecodeJob<V>) + Send + 'static>>,
179         work_queue: Arc<Mutex<VecDeque<C2DecodeJob<V>>>>,
180         state: Arc<Mutex<C2State>>,
181         framepool_hint_cb: Arc<Mutex<dyn FnMut(StreamInfo) + Send + 'static>>,
182         alloc_cb: Arc<Mutex<dyn FnMut() -> Option<V> + Send + 'static>>,
183         options: Self::Options,
184     ) -> Result<Self, String> {
185         let mut backend = B::new(options)?;
186         let backend_fourccs = backend.supported_output_formats()?;
187         let (auxiliary_frame_pool, decoder) = if backend_fourccs.contains(&output_fourcc) {
188             (
189                 None,
190                 C2Decoder::ImportingDecoder(
191                     backend.get_decoder(EncodedFormat::from(input_fourcc))?,
192                 ),
193             )
194         } else {
195             #[cfg(feature = "vaapi")]
196             {
197                 let gbm_device = Arc::new(
198                     GbmDevice::open(PathBuf::from("/dev/dri/renderD128"))
199                         .expect("Could not open GBM device!"),
200                 );
201                 let framepool = FramePool::new(move |stream_info: &StreamInfo| {
202                     // TODO: Query the driver for these alignment params.
203                     <Arc<GbmDevice> as Clone>::clone(&gbm_device)
204                         .new_frame(
205                             Fourcc::from(stream_info.format),
206                             stream_info.display_resolution.clone(),
207                             stream_info.coded_resolution.clone(),
208                             GbmUsage::Decode,
209                         )
210                         .expect("Could not allocate frame for auxiliary frame pool!")
211                 });
212                 (
213                     Some(framepool),
214                     C2Decoder::ConvertingDecoder(
215                         backend.get_decoder(EncodedFormat::from(input_fourcc))?,
216                     ),
217                 )
218             }
219             #[cfg(feature = "v4l2")]
220             {
221                 let framepool = FramePool::new(move |stream_info: &StreamInfo| {
222                     V4l2MmapVideoFrame::new(
223                         Fourcc::from(stream_info.format),
224                         stream_info.display_resolution.clone(),
225                     )
226                 });
227                 (
228                     Some(framepool),
229                     C2Decoder::ConvertingDecoder(
230                         backend.get_decoder(EncodedFormat::from(input_fourcc))?,
231                     ),
232                 )
233             }
234         };
235         Ok(Self {
236             decoder: decoder,
237             auxiliary_frame_pool: auxiliary_frame_pool,
238             epoll_fd: Epoll::new(EpollCreateFlags::empty())
239                 .map_err(C2DecoderPollErrorWrapper::Epoll)
240                 .unwrap(),
241             awaiting_job_event: awaiting_job_event,
242             error_cb: error_cb,
243             work_done_cb: work_done_cb,
244             framepool_hint_cb: framepool_hint_cb,
245             alloc_cb: alloc_cb,
246             work_queue: work_queue,
247             state: state,
248             _phantom: Default::default(),
249         })
250     }
251 
process_loop(&mut self)252     fn process_loop(&mut self) {
253         self.epoll_fd = Epoll::new(EpollCreateFlags::empty())
254             .map_err(C2DecoderPollErrorWrapper::Epoll)
255             .unwrap();
256         let _ = self
257             .epoll_fd
258             .add(
259                 match &self.decoder {
260                     C2Decoder::ImportingDecoder(decoder) => decoder.poll_fd(),
261                     C2Decoder::ConvertingDecoder(decoder) => decoder.poll_fd(),
262                 },
263                 EpollEvent::new(EpollFlags::EPOLLIN, 1),
264             )
265             .map_err(C2DecoderPollErrorWrapper::EpollAdd);
266         self.epoll_fd
267             .add(self.awaiting_job_event.as_fd(), EpollEvent::new(EpollFlags::EPOLLIN, 2))
268             .map_err(C2DecoderPollErrorWrapper::EpollAdd)
269             .unwrap();
270 
271         while *self.state.lock().unwrap() == C2State::C2Running {
272             // Poll for decoder events or pending job events.
273             let mut events = [EpollEvent::empty()];
274             let _nb_fds = self.epoll_fd.wait(&mut events, EpollTimeout::NONE).unwrap();
275 
276             if events == [EpollEvent::new(EpollFlags::EPOLLIN, 2)] {
277                 self.awaiting_job_event.read().unwrap();
278             }
279 
280             // We want to try sending compressed buffers to the decoder regardless of what event
281             // woke us up, because we either have new work, or we might more output buffers
282             // available.
283             let mut possible_job = (*self.work_queue.lock().unwrap()).pop_front();
284             while let Some(mut job) = possible_job {
285                 let bitstream = job.input.as_slice();
286                 let decode_result = if !job.input.is_empty() {
287                     match &mut self.decoder {
288                         C2Decoder::ImportingDecoder(decoder) => decoder.decode(
289                             job.timestamp,
290                             bitstream,
291                             &mut *self.alloc_cb.lock().unwrap(),
292                         ),
293                         C2Decoder::ConvertingDecoder(decoder) => {
294                             decoder.decode(job.timestamp, bitstream, &mut || {
295                                 self.auxiliary_frame_pool.as_mut().unwrap().alloc()
296                             })
297                         }
298                     }
299                 } else {
300                     // This generally indicates a drain signal. Drain signals can be associated
301                     // with specific C2Work objects, or they can be a standalone call to the
302                     // drain() function of the C2Component, so we have to accommodate both.
303                     Ok(0)
304                 };
305                 match decode_result {
306                     Ok(num_bytes) => {
307                         if num_bytes != job.input.len() {
308                             job.input = (&job.input[num_bytes..]).to_vec();
309                             (*self.work_queue.lock().unwrap()).push_front(job);
310                         } else if job.get_drain() != DrainMode::NoDrain {
311                             let flush_result = match &mut self.decoder {
312                                 C2Decoder::ImportingDecoder(decoder) => decoder.flush(),
313                                 C2Decoder::ConvertingDecoder(decoder) => decoder.flush(),
314                             };
315                             if let Err(_) = flush_result {
316                                 log::debug!("Error handling drain request!");
317                                 *self.state.lock().unwrap() = C2State::C2Error;
318                                 (*self.error_cb.lock().unwrap())(C2Status::C2BadValue);
319                             } else {
320                                 self.check_events();
321                                 if job.get_drain() == DrainMode::EOSDrain {
322                                     *self.state.lock().unwrap() = C2State::C2Stopped;
323                                     (*self.work_done_cb.lock().unwrap())(C2DecodeJob {
324                                         timestamp: job.timestamp,
325                                         drain: DrainMode::EOSDrain,
326                                         ..Default::default()
327                                     });
328                                 }
329                             }
330                             break;
331                         }
332                     }
333                     Err(DecodeError::NotEnoughOutputBuffers(_) | DecodeError::CheckEvents) => {
334                         (*self.work_queue.lock().unwrap()).push_front(job);
335                         break;
336                     }
337                     Err(e) => {
338                         log::debug!("Unhandled error message from decoder {e:?}");
339                         *self.state.lock().unwrap() = C2State::C2Error;
340                         (*self.error_cb.lock().unwrap())(C2Status::C2BadValue);
341                         break;
342                     }
343                 }
344                 possible_job = (*self.work_queue.lock().unwrap()).pop_front();
345             }
346             self.check_events();
347         }
348     }
349 }
350