• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use nix::errno::Errno;
6 use nix::sys::eventfd::EfdFlags;
7 use nix::sys::eventfd::EventFd;
8 
9 use thiserror::Error;
10 
11 use std::collections::VecDeque;
12 use std::marker::PhantomData;
13 use std::sync::atomic::AtomicU32;
14 use std::sync::Arc;
15 use std::sync::Mutex;
16 use std::thread;
17 use std::thread::JoinHandle;
18 use std::vec::Vec;
19 
20 use crate::decoder::StreamInfo;
21 use crate::video_frame::VideoFrame;
22 use crate::Fourcc;
23 
24 pub mod c2_decoder;
25 pub mod c2_encoder;
26 #[cfg(feature = "v4l2")]
27 pub mod c2_v4l2_decoder;
28 #[cfg(feature = "v4l2")]
29 pub mod c2_v4l2_encoder;
30 #[cfg(feature = "vaapi")]
31 pub mod c2_vaapi_decoder;
32 #[cfg(feature = "vaapi")]
33 pub mod c2_vaapi_encoder;
34 
35 #[derive(Debug, Default, PartialEq, Eq, Copy, Clone)]
36 pub enum DrainMode {
37     // Not draining
38     #[default]
39     NoDrain = -1,
40     // Drain the C2 component and signal an EOS. Currently we also change the state to stop.
41     EOSDrain = 0,
42     // Drain the C2 component, but keep accepting new jobs in the queue immediately after.
43     NoEOSDrain = 1,
44 }
45 
46 #[derive(Debug)]
47 pub struct C2DecodeJob<V: VideoFrame> {
48     // Compressed input data
49     // TODO: Use VideoFrame for input too
50     pub input: Vec<u8>,
51     // Decompressed output frame. Note that this needs to be reference counted because we may still
52     // use this frame as a reference frame even while we're displaying it.
53     pub output: Option<Arc<V>>,
54     pub timestamp: u64,
55     pub drain: DrainMode,
56     // TODO: Add output delay and color aspect support as needed.
57 }
58 
59 impl<V> Job for C2DecodeJob<V>
60 where
61     V: VideoFrame,
62 {
63     type Frame = V;
64 
set_drain(&mut self, drain: DrainMode)65     fn set_drain(&mut self, drain: DrainMode) {
66         self.drain = drain;
67     }
68 
get_drain(&self) -> DrainMode69     fn get_drain(&self) -> DrainMode {
70         self.drain
71     }
72 }
73 
74 impl<V: VideoFrame> Default for C2DecodeJob<V> {
default() -> Self75     fn default() -> Self {
76         Self { input: vec![], output: None, timestamp: 0, drain: DrainMode::NoDrain }
77     }
78 }
79 
80 pub trait Job: Send + 'static {
81     type Frame: VideoFrame;
82 
set_drain(&mut self, drain: DrainMode) -> ()83     fn set_drain(&mut self, drain: DrainMode) -> ();
get_drain(&self) -> DrainMode84     fn get_drain(&self) -> DrainMode;
85 }
86 
87 #[derive(Debug)]
88 pub struct C2EncodeJob<V: VideoFrame> {
89     pub input: Option<V>,
90     // TODO: Use VideoFrame for output too
91     pub output: Vec<u8>,
92     // In microseconds.
93     pub timestamp: u64,
94     // TODO: only support CBR right now, follow up with VBR support.
95     pub bitrate: u64,
96     // Framerate is actually negotiated, so the encoder can change this value
97     // based on the timestamps of the frames it receives.
98     pub framerate: Arc<AtomicU32>,
99     pub drain: DrainMode,
100 }
101 
102 impl<V: VideoFrame> Default for C2EncodeJob<V> {
default() -> Self103     fn default() -> Self {
104         Self {
105             input: None,
106             output: vec![],
107             timestamp: 0,
108             bitrate: 0,
109             framerate: Arc::new(AtomicU32::new(0)),
110             drain: DrainMode::NoDrain,
111         }
112     }
113 }
114 
115 impl<V> Job for C2EncodeJob<V>
116 where
117     V: VideoFrame,
118 {
119     type Frame = V;
120 
set_drain(&mut self, drain: DrainMode)121     fn set_drain(&mut self, drain: DrainMode) {
122         self.drain = drain;
123     }
124 
get_drain(&self) -> DrainMode125     fn get_drain(&self) -> DrainMode {
126         self.drain
127     }
128 }
129 
130 #[derive(Debug, PartialEq, Eq, Copy, Clone)]
131 pub enum C2State {
132     C2Running,
133     C2Stopped,
134     // Note that on state C2Error, stop() must be called before we can start()
135     // again.
136     C2Error,
137 }
138 
139 // This is not a very "Rust-y" way of doing error handling, but it will
140 // hopefully make the FFI easier to write. Numerical values taken from
141 // frameworks/av/media/codec2/core/include/C2.h
142 // TODO: Improve error handling by adding more statuses.
143 #[derive(Debug, PartialEq, Eq, Copy, Clone)]
144 pub enum C2Status {
145     C2Ok = 0,
146     C2BadState = 1,  // EPERM
147     C2BadValue = 22, // EINVAL
148 }
149 
150 // J should be either C2DecodeJob or C2EncodeJob.
151 pub trait C2Worker<J>
152 where
153     J: Send + Job + 'static,
154 {
155     type Options: Clone + Send + 'static;
156 
new( input_fourcc: Fourcc, output_fourcc: Fourcc, awaiting_job_event: Arc<EventFd>, error_cb: Arc<Mutex<dyn FnMut(C2Status) + Send + 'static>>, work_done_cb: Arc<Mutex<dyn FnMut(J) + Send + 'static>>, work_queue: Arc<Mutex<VecDeque<J>>>, state: Arc<Mutex<C2State>>, framepool_hint_cb: Arc<Mutex<dyn FnMut(StreamInfo) + Send + 'static>>, alloc_cb: Arc<Mutex<dyn FnMut() -> Option<<J as Job>::Frame> + Send + 'static>>, options: Self::Options, ) -> Result<Self, String> where Self: Sized157     fn new(
158         input_fourcc: Fourcc,
159         output_fourcc: Fourcc,
160         awaiting_job_event: Arc<EventFd>,
161         error_cb: Arc<Mutex<dyn FnMut(C2Status) + Send + 'static>>,
162         work_done_cb: Arc<Mutex<dyn FnMut(J) + Send + 'static>>,
163         work_queue: Arc<Mutex<VecDeque<J>>>,
164         state: Arc<Mutex<C2State>>,
165         framepool_hint_cb: Arc<Mutex<dyn FnMut(StreamInfo) + Send + 'static>>,
166         alloc_cb: Arc<Mutex<dyn FnMut() -> Option<<J as Job>::Frame> + Send + 'static>>,
167         options: Self::Options,
168     ) -> Result<Self, String>
169     where
170         Self: Sized;
171 
process_loop(&mut self)172     fn process_loop(&mut self);
173 }
174 
175 #[derive(Debug, Error)]
176 pub enum C2WrapperError {
177     #[error("failed to create EventFd for awaiting job event: {0}")]
178     AwaitingJobEventFd(Errno),
179 }
180 
181 // Note that we do not guarantee thread safety in C2CrosCodecsWrapper.
182 pub struct C2Wrapper<J, W>
183 where
184     J: Send + Default + Job + 'static,
185     W: C2Worker<J>,
186 {
187     awaiting_job_event: Arc<EventFd>,
188     input_fourcc: Fourcc,
189     output_fourcc: Fourcc,
190     error_cb: Arc<Mutex<dyn FnMut(C2Status) + Send + 'static>>,
191     work_done_cb: Arc<Mutex<dyn FnMut(J) + Send + 'static>>,
192     work_queue: Arc<Mutex<VecDeque<J>>>,
193     state: Arc<Mutex<C2State>>,
194     framepool_hint_cb: Arc<Mutex<dyn FnMut(StreamInfo) + Send + 'static>>,
195     alloc_cb: Arc<Mutex<dyn FnMut() -> Option<<J as Job>::Frame> + Send + 'static>>,
196     options: <W as C2Worker<J>>::Options,
197     worker_thread: Option<JoinHandle<()>>,
198     // The instance of V actually lives in the thread creation closure, not
199     // this struct.
200     _phantom: PhantomData<W>,
201 }
202 
203 impl<J, W> C2Wrapper<J, W>
204 where
205     J: Send + Default + Job + 'static,
206     W: C2Worker<J>,
207 {
new( input_fourcc: Fourcc, output_fourcc: Fourcc, error_cb: impl FnMut(C2Status) + Send + 'static, work_done_cb: impl FnMut(J) + Send + 'static, framepool_hint_cb: impl FnMut(StreamInfo) + Send + 'static, alloc_cb: impl FnMut() -> Option<<J as Job>::Frame> + Send + 'static, options: <W as C2Worker<J>>::Options, ) -> Self208     pub fn new(
209         input_fourcc: Fourcc,
210         output_fourcc: Fourcc,
211         error_cb: impl FnMut(C2Status) + Send + 'static,
212         work_done_cb: impl FnMut(J) + Send + 'static,
213         framepool_hint_cb: impl FnMut(StreamInfo) + Send + 'static,
214         alloc_cb: impl FnMut() -> Option<<J as Job>::Frame> + Send + 'static,
215         options: <W as C2Worker<J>>::Options,
216     ) -> Self {
217         Self {
218             awaiting_job_event: Arc::new(
219                 EventFd::from_flags(EfdFlags::EFD_SEMAPHORE)
220                     .map_err(C2WrapperError::AwaitingJobEventFd)
221                     .unwrap(),
222             ),
223             input_fourcc: input_fourcc,
224             output_fourcc: output_fourcc,
225             error_cb: Arc::new(Mutex::new(error_cb)),
226             work_done_cb: Arc::new(Mutex::new(work_done_cb)),
227             work_queue: Arc::new(Mutex::new(VecDeque::new())),
228             state: Arc::new(Mutex::new(C2State::C2Stopped)),
229             framepool_hint_cb: Arc::new(Mutex::new(framepool_hint_cb)),
230             alloc_cb: Arc::new(Mutex::new(alloc_cb)),
231             options: options,
232             worker_thread: None,
233             _phantom: Default::default(),
234         }
235     }
236 
237     // This isn't part of C2, but it's convenient to check if our worker thread
238     // is still running.
is_alive(&self) -> bool239     pub fn is_alive(&self) -> bool {
240         match &self.worker_thread {
241             Some(worker_thread) => !worker_thread.is_finished(),
242             None => false,
243         }
244     }
245 
246     // Start the decoder/encoder.
247     // State will be C2Running after this call.
start(&mut self) -> C2Status248     pub fn start(&mut self) -> C2Status {
249         {
250             let mut state = self.state.lock().unwrap();
251             if *state != C2State::C2Stopped {
252                 (*self.error_cb.lock().unwrap())(C2Status::C2BadState);
253                 return C2Status::C2BadState;
254             }
255             *state = C2State::C2Running;
256         }
257 
258         let input_fourcc = self.input_fourcc.clone();
259         let output_fourcc = self.output_fourcc.clone();
260         let error_cb = self.error_cb.clone();
261         let work_done_cb = self.work_done_cb.clone();
262         let work_queue = self.work_queue.clone();
263         let state = self.state.clone();
264         let options = self.options.clone();
265         let awaiting_job_event = self.awaiting_job_event.clone();
266         let framepool_hint_cb = self.framepool_hint_cb.clone();
267         let alloc_cb = self.alloc_cb.clone();
268         self.worker_thread = Some(thread::spawn(move || {
269             let worker = W::new(
270                 input_fourcc,
271                 output_fourcc,
272                 awaiting_job_event,
273                 error_cb.clone(),
274                 work_done_cb,
275                 work_queue,
276                 state.clone(),
277                 framepool_hint_cb,
278                 alloc_cb,
279                 options,
280             );
281             match worker {
282                 Ok(mut worker) => worker.process_loop(),
283                 Err(msg) => {
284                     log::debug!("Error instantiating C2Worker {}", msg);
285                     *state.lock().unwrap() = C2State::C2Error;
286                     (*error_cb.lock().unwrap())(C2Status::C2BadValue);
287                 }
288             };
289         }));
290 
291         C2Status::C2Ok
292     }
293 
294     // Stop the decoder/encoder and abandon in-flight work.
295     // C2's reset() function is equivalent for our purposes.
296     // Note that in event of error, stop() must be called before we can start()
297     // again. This is to ensure we clear out the work queue.
298     // State will be C2Stopped after this call.
stop(&mut self) -> C2Status299     pub fn stop(&mut self) -> C2Status {
300         *self.state.lock().unwrap() = C2State::C2Stopped;
301 
302         self.work_queue.lock().unwrap().drain(..);
303 
304         self.awaiting_job_event.write(1).unwrap();
305 
306         let mut worker_thread: Option<JoinHandle<()>> = None;
307         std::mem::swap(&mut worker_thread, &mut self.worker_thread);
308         self.worker_thread = match worker_thread {
309             Some(worker_thread) => {
310                 let _ = worker_thread.join();
311                 None
312             }
313             None => None,
314         };
315 
316         C2Status::C2Ok
317     }
318 
319     // Add work to the work queue.
320     // State must be C2Running or this function is invalid.
321     // State will remain C2Running.
queue(&mut self, work_items: Vec<J>) -> C2Status322     pub fn queue(&mut self, work_items: Vec<J>) -> C2Status {
323         if *self.state.lock().unwrap() != C2State::C2Running {
324             (*self.error_cb.lock().unwrap())(C2Status::C2BadState);
325             return C2Status::C2BadState;
326         }
327 
328         self.work_queue.lock().unwrap().extend(work_items.into_iter());
329 
330         self.awaiting_job_event.write(1).unwrap();
331 
332         C2Status::C2Ok
333     }
334 
335     // Flush work from the queue and return it as |flushed_work|.
336     // State will not change after this call.
337     // TODO: Support different flush modes.
flush(&mut self, flushed_work: &mut Vec<J>) -> C2Status338     pub fn flush(&mut self, flushed_work: &mut Vec<J>) -> C2Status {
339         if *self.state.lock().unwrap() != C2State::C2Running {
340             (*self.error_cb.lock().unwrap())(C2Status::C2BadState);
341             return C2Status::C2BadState;
342         }
343 
344         {
345             let mut work_queue = self.work_queue.lock().unwrap();
346             let mut tmp = work_queue.drain(..).collect::<Vec<J>>();
347             flushed_work.append(&mut tmp);
348 
349             // Note that we don't just call drain() because we want to guarantee atomicity with respect
350             // to the work_queue eviction.
351             let mut drain_job: J = Default::default();
352             drain_job.set_drain(DrainMode::NoEOSDrain);
353             work_queue.push_back(drain_job);
354         }
355 
356         C2Status::C2Ok
357     }
358 
359     // Signal to the decoder/encoder that it does not need to wait for
360     // additional work to begin processing. This is an unusual name for this
361     // function, but it is the convention that C2 uses.
362     // State must be C2Running or this function is invalid.
363     // State will remain C2Running until the last frames drain, at which point
364     // the state will change to C2Stopped.
365     // TODO: Support different drain modes.
drain(&mut self, mode: DrainMode) -> C2Status366     pub fn drain(&mut self, mode: DrainMode) -> C2Status {
367         if *self.state.lock().unwrap() != C2State::C2Running {
368             (*self.error_cb.lock().unwrap())(C2Status::C2BadState);
369             return C2Status::C2BadState;
370         }
371 
372         let mut drain_job: J = Default::default();
373         drain_job.set_drain(mode);
374         self.work_queue.lock().unwrap().push_back(drain_job);
375 
376         self.awaiting_job_event.write(1).unwrap();
377 
378         C2Status::C2Ok
379     }
380 }
381 
382 // Instead of C2's release() function, we implement Drop and use RAII to
383 // accomplish the same thing
384 impl<J, W> Drop for C2Wrapper<J, W>
385 where
386     J: Send + Default + Job + 'static,
387     W: C2Worker<J>,
388 {
drop(&mut self)389     fn drop(&mut self) {
390         self.stop();
391     }
392 }
393