• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::collections::BTreeSet;
6 use std::collections::VecDeque;
7 
8 use thiserror::Error;
9 
10 use crate::encoder::CodedBitstreamBuffer;
11 use crate::encoder::EncodeError;
12 use crate::encoder::EncodeResult;
13 use crate::encoder::FrameMetadata;
14 use crate::encoder::Tunings;
15 use crate::encoder::VideoEncoder;
16 
17 pub mod h264;
18 pub mod h265;
19 pub mod vp8;
20 pub mod vp9;
21 
22 #[derive(Debug, Error)]
23 pub enum StatefulBackendError {
24     #[error("invalid internal state. This is likely a bug.")]
25     InvalidInternalState,
26     #[error(transparent)]
27     Other(#[from] anyhow::Error),
28 }
29 
30 pub type StatefulBackendResult<T> = Result<T, StatefulBackendError>;
31 
32 /// Unique identifier of the [`BackendRequest`]
33 #[repr(transparent)]
34 #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
35 pub struct BackendRequestId(usize);
36 
37 /// Request package that is offered to [`StatefulVideoEncoderBackend`] for processing
38 pub struct BackendRequest<Handle> {
39     /// Request's unique identifier
40     pub request_id: BackendRequestId,
41     /// Frame's metadata
42     pub meta: FrameMetadata,
43     /// Frame's handle
44     pub handle: Handle,
45     /// Tunings set for the request
46     pub tunings: Tunings,
47 }
48 
49 pub struct BackendOutput {
50     /// Request's unique identifier corresponding to [`BackendRequest`]
51     pub request_id: BackendRequestId,
52     /// Result of the request. [`CodedBitstreamBuffer`] containing encoded frame
53     pub buffer: CodedBitstreamBuffer,
54 }
55 
56 /// Generic trait for stateful encoder backends
57 pub trait StatefulVideoEncoderBackend<Handle> {
58     /// Try to submit encode request to the backend. The backend may not be able to accept the
59     /// request eg. if there are not enough available resources or backend desires to finish
60     /// previous request first. The function shall not be blocking.
61     /// If backend accepts the request for processing it shall take the `request` (take ownership of
62     /// [`BackendRequest`] and set ref mut to [`None`].
consume_request( &mut self, request: &mut Option<BackendRequest<Handle>>, ) -> StatefulBackendResult<()>63     fn consume_request(
64         &mut self,
65         request: &mut Option<BackendRequest<Handle>>,
66     ) -> StatefulBackendResult<()>;
67 
68     /// Function shall block, until the backend can accept request with [`consume_request`] or
69     /// will finished processing of some [`BackendRequest`] and [`poll`] can be used to
70     /// fetch is result.
71     ///
72     /// [`consume_request`]: StatefulVideoEncoderBackend::consume_request
73     /// [`poll`]: StatefulVideoEncoderBackend::poll
sync(&mut self) -> StatefulBackendResult<()>74     fn sync(&mut self) -> StatefulBackendResult<()>;
75 
76     /// Blocking function, until the backend finishes processing all [`BackendRequest`], that the
77     /// backend has accepted and all outputs of those requests are returned.
drain(&mut self) -> StatefulBackendResult<Vec<BackendOutput>>78     fn drain(&mut self) -> StatefulBackendResult<Vec<BackendOutput>>;
79 
80     /// If the processing of any [`BackendRequest`] is finished then the function should yield it's
81     /// corresponding [`BackendOutput`].
82     ///
83     /// [`consume_request`]: StatefulVideoEncoderBackend::consume_request
poll(&mut self) -> StatefulBackendResult<Option<BackendOutput>>84     fn poll(&mut self) -> StatefulBackendResult<Option<BackendOutput>>;
85 }
86 
87 pub struct StatefulEncoder<Handle, Backend>
88 where
89     Backend: StatefulVideoEncoderBackend<Handle>,
90 {
91     /// Pending queue of frames to encoded by the backend
92     queue: VecDeque<BackendRequest<Handle>>,
93 
94     /// Unique request identifier continue
95     request_counter: usize,
96 
97     /// Latest [`Tunings`], that will be cloned in to request
98     tunings: Tunings,
99 
100     /// Processed encoded bitstream queue for client to poll
101     coded_queue: VecDeque<CodedBitstreamBuffer>,
102 
103     /// Currently processed requests by the backend
104     processing: BTreeSet<BackendRequestId>,
105 
106     // [`StatefulVideoEncoderBackend`] instance to delegate [`BackendRequest`] to
107     backend: Backend,
108 }
109 
110 impl<Handle, Backend> StatefulEncoder<Handle, Backend>
111 where
112     Backend: StatefulVideoEncoderBackend<Handle>,
113 {
114     /// Utility function that creates an new [`StatefulEncoder`] with [`Tunings`] and
115     /// [`StatefulVideoEncoderBackend`] instance.
116     #[allow(dead_code)]
create(tunings: Tunings, backend: Backend) -> Self117     fn create(tunings: Tunings, backend: Backend) -> Self {
118         Self {
119             queue: Default::default(),
120             request_counter: 0,
121             tunings,
122             coded_queue: Default::default(),
123             processing: Default::default(),
124             backend,
125         }
126     }
127 
128     /// Handles the [`BackendOutput`] from the backend, ie add to the queue for client to poll.
handle_output(&mut self, output: BackendOutput) -> EncodeResult<()>129     fn handle_output(&mut self, output: BackendOutput) -> EncodeResult<()> {
130         log::debug!(
131             "Backend yieled output buffer for request id={:?} timestamp={} bytes={}",
132             output.request_id,
133             output.buffer.metadata.timestamp,
134             output.buffer.bitstream.len()
135         );
136         if !self.processing.remove(&output.request_id) {
137             log::warn!("Coded buffer returned for non existing or already processed request id={:?} timestamp={}",
138                 output.request_id,
139                 output.buffer.metadata.timestamp,
140             );
141         }
142         self.coded_queue.push_back(output.buffer);
143         Ok(())
144     }
145 
146     /// Poll the backend for outputs and handles them
poll_backend(&mut self) -> EncodeResult<()>147     fn poll_backend(&mut self) -> EncodeResult<()> {
148         while let Some(output) = self.backend.poll()? {
149             self.handle_output(output)?;
150         }
151 
152         Ok(())
153     }
154 
155     /// Performs essential processing. Poll the backend for outputs and tries to submit requests to
156     /// backends.
process(&mut self) -> EncodeResult<()>157     fn process(&mut self) -> EncodeResult<()> {
158         log::debug!(
159             "Pending requests: {}, currently processed: {:?}, pending coded buffer: {}",
160             self.queue.len(),
161             self.processing,
162             self.coded_queue.len()
163         );
164 
165         if !self.processing.is_empty() {
166             self.poll_backend()?;
167         }
168 
169         while let Some(request) = self.queue.pop_front() {
170             let request_id = request.request_id;
171             let timestamp = request.meta.timestamp;
172             let mut request = Some(request);
173 
174             log::trace!("Passing request to backend id={request_id:?} timestamp={timestamp}");
175             self.backend.consume_request(&mut request)?;
176 
177             if let Some(request) = request {
178                 log::trace!("Backend stalled request id={request_id:?} timestamp={timestamp}");
179                 self.queue.push_front(request);
180                 break;
181             } else {
182                 log::debug!("Backend consumed request id={request_id:?} timestamp={timestamp}");
183                 self.processing.insert(request_id);
184             }
185         }
186 
187         Ok(())
188     }
189 
190     /// [`StatefulVideoEncoderBackend`]'s instance
backend(&mut self) -> &Backend191     pub fn backend(&mut self) -> &Backend {
192         &self.backend
193     }
194 }
195 
196 impl<Handle, Backend> VideoEncoder<Handle> for StatefulEncoder<Handle, Backend>
197 where
198     Backend: StatefulVideoEncoderBackend<Handle>,
199 {
tune(&mut self, tunings: Tunings) -> EncodeResult<()>200     fn tune(&mut self, tunings: Tunings) -> EncodeResult<()> {
201         self.tunings = tunings;
202         Ok(())
203     }
204 
encode(&mut self, meta: FrameMetadata, handle: Handle) -> Result<(), EncodeError>205     fn encode(&mut self, meta: FrameMetadata, handle: Handle) -> Result<(), EncodeError> {
206         let request_id = BackendRequestId(self.request_counter);
207         self.request_counter = self.request_counter.wrapping_add(1);
208 
209         log::trace!("Got new request id={request_id:?} timestamp={}", meta.timestamp);
210 
211         let request = BackendRequest { request_id, meta, handle, tunings: self.tunings.clone() };
212 
213         self.queue.push_back(request);
214         self.process()?;
215 
216         Ok(())
217     }
218 
poll(&mut self) -> EncodeResult<Option<CodedBitstreamBuffer>>219     fn poll(&mut self) -> EncodeResult<Option<CodedBitstreamBuffer>> {
220         if !self.queue.is_empty() || !self.processing.is_empty() {
221             self.process()?;
222         }
223 
224         if let Some(buffer) = self.coded_queue.pop_front() {
225             log::debug!("Returning coded buffer timestamp={}", buffer.metadata.timestamp);
226             return Ok(Some(buffer));
227         }
228         Ok(None)
229     }
230 
drain(&mut self) -> EncodeResult<()>231     fn drain(&mut self) -> EncodeResult<()> {
232         log::debug!(
233             "Got drain request. Pending in queue: {}. Currently processed: {:?}",
234             self.queue.len(),
235             self.processing
236         );
237 
238         while !self.queue.is_empty() {
239             self.process()?;
240 
241             if !self.queue.is_empty() {
242                 self.backend.sync()?;
243             }
244         }
245 
246         if self.processing.is_empty() {
247             log::debug!("Skipping drain request to backend, everything is drained");
248         }
249 
250         log::debug!("Sending drain request to backend");
251         for output in self.backend.drain()? {
252             self.handle_output(output)?;
253         }
254 
255         Ok(())
256     }
257 }
258