1 // Copyright 2024 The ChromiumOS Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 use std::collections::BTreeSet; 6 use std::collections::VecDeque; 7 8 use thiserror::Error; 9 10 use crate::encoder::CodedBitstreamBuffer; 11 use crate::encoder::EncodeError; 12 use crate::encoder::EncodeResult; 13 use crate::encoder::FrameMetadata; 14 use crate::encoder::Tunings; 15 use crate::encoder::VideoEncoder; 16 17 pub mod h264; 18 pub mod h265; 19 pub mod vp8; 20 pub mod vp9; 21 22 #[derive(Debug, Error)] 23 pub enum StatefulBackendError { 24 #[error("invalid internal state. This is likely a bug.")] 25 InvalidInternalState, 26 #[error(transparent)] 27 Other(#[from] anyhow::Error), 28 } 29 30 pub type StatefulBackendResult<T> = Result<T, StatefulBackendError>; 31 32 /// Unique identifier of the [`BackendRequest`] 33 #[repr(transparent)] 34 #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] 35 pub struct BackendRequestId(usize); 36 37 /// Request package that is offered to [`StatefulVideoEncoderBackend`] for processing 38 pub struct BackendRequest<Handle> { 39 /// Request's unique identifier 40 pub request_id: BackendRequestId, 41 /// Frame's metadata 42 pub meta: FrameMetadata, 43 /// Frame's handle 44 pub handle: Handle, 45 /// Tunings set for the request 46 pub tunings: Tunings, 47 } 48 49 pub struct BackendOutput { 50 /// Request's unique identifier corresponding to [`BackendRequest`] 51 pub request_id: BackendRequestId, 52 /// Result of the request. [`CodedBitstreamBuffer`] containing encoded frame 53 pub buffer: CodedBitstreamBuffer, 54 } 55 56 /// Generic trait for stateful encoder backends 57 pub trait StatefulVideoEncoderBackend<Handle> { 58 /// Try to submit encode request to the backend. The backend may not be able to accept the 59 /// request eg. if there are not enough available resources or backend desires to finish 60 /// previous request first. The function shall not be blocking. 61 /// If backend accepts the request for processing it shall take the `request` (take ownership of 62 /// [`BackendRequest`] and set ref mut to [`None`]. consume_request( &mut self, request: &mut Option<BackendRequest<Handle>>, ) -> StatefulBackendResult<()>63 fn consume_request( 64 &mut self, 65 request: &mut Option<BackendRequest<Handle>>, 66 ) -> StatefulBackendResult<()>; 67 68 /// Function shall block, until the backend can accept request with [`consume_request`] or 69 /// will finished processing of some [`BackendRequest`] and [`poll`] can be used to 70 /// fetch is result. 71 /// 72 /// [`consume_request`]: StatefulVideoEncoderBackend::consume_request 73 /// [`poll`]: StatefulVideoEncoderBackend::poll sync(&mut self) -> StatefulBackendResult<()>74 fn sync(&mut self) -> StatefulBackendResult<()>; 75 76 /// Blocking function, until the backend finishes processing all [`BackendRequest`], that the 77 /// backend has accepted and all outputs of those requests are returned. drain(&mut self) -> StatefulBackendResult<Vec<BackendOutput>>78 fn drain(&mut self) -> StatefulBackendResult<Vec<BackendOutput>>; 79 80 /// If the processing of any [`BackendRequest`] is finished then the function should yield it's 81 /// corresponding [`BackendOutput`]. 82 /// 83 /// [`consume_request`]: StatefulVideoEncoderBackend::consume_request poll(&mut self) -> StatefulBackendResult<Option<BackendOutput>>84 fn poll(&mut self) -> StatefulBackendResult<Option<BackendOutput>>; 85 } 86 87 pub struct StatefulEncoder<Handle, Backend> 88 where 89 Backend: StatefulVideoEncoderBackend<Handle>, 90 { 91 /// Pending queue of frames to encoded by the backend 92 queue: VecDeque<BackendRequest<Handle>>, 93 94 /// Unique request identifier continue 95 request_counter: usize, 96 97 /// Latest [`Tunings`], that will be cloned in to request 98 tunings: Tunings, 99 100 /// Processed encoded bitstream queue for client to poll 101 coded_queue: VecDeque<CodedBitstreamBuffer>, 102 103 /// Currently processed requests by the backend 104 processing: BTreeSet<BackendRequestId>, 105 106 // [`StatefulVideoEncoderBackend`] instance to delegate [`BackendRequest`] to 107 backend: Backend, 108 } 109 110 impl<Handle, Backend> StatefulEncoder<Handle, Backend> 111 where 112 Backend: StatefulVideoEncoderBackend<Handle>, 113 { 114 /// Utility function that creates an new [`StatefulEncoder`] with [`Tunings`] and 115 /// [`StatefulVideoEncoderBackend`] instance. 116 #[allow(dead_code)] create(tunings: Tunings, backend: Backend) -> Self117 fn create(tunings: Tunings, backend: Backend) -> Self { 118 Self { 119 queue: Default::default(), 120 request_counter: 0, 121 tunings, 122 coded_queue: Default::default(), 123 processing: Default::default(), 124 backend, 125 } 126 } 127 128 /// Handles the [`BackendOutput`] from the backend, ie add to the queue for client to poll. handle_output(&mut self, output: BackendOutput) -> EncodeResult<()>129 fn handle_output(&mut self, output: BackendOutput) -> EncodeResult<()> { 130 log::debug!( 131 "Backend yieled output buffer for request id={:?} timestamp={} bytes={}", 132 output.request_id, 133 output.buffer.metadata.timestamp, 134 output.buffer.bitstream.len() 135 ); 136 if !self.processing.remove(&output.request_id) { 137 log::warn!("Coded buffer returned for non existing or already processed request id={:?} timestamp={}", 138 output.request_id, 139 output.buffer.metadata.timestamp, 140 ); 141 } 142 self.coded_queue.push_back(output.buffer); 143 Ok(()) 144 } 145 146 /// Poll the backend for outputs and handles them poll_backend(&mut self) -> EncodeResult<()>147 fn poll_backend(&mut self) -> EncodeResult<()> { 148 while let Some(output) = self.backend.poll()? { 149 self.handle_output(output)?; 150 } 151 152 Ok(()) 153 } 154 155 /// Performs essential processing. Poll the backend for outputs and tries to submit requests to 156 /// backends. process(&mut self) -> EncodeResult<()>157 fn process(&mut self) -> EncodeResult<()> { 158 log::debug!( 159 "Pending requests: {}, currently processed: {:?}, pending coded buffer: {}", 160 self.queue.len(), 161 self.processing, 162 self.coded_queue.len() 163 ); 164 165 if !self.processing.is_empty() { 166 self.poll_backend()?; 167 } 168 169 while let Some(request) = self.queue.pop_front() { 170 let request_id = request.request_id; 171 let timestamp = request.meta.timestamp; 172 let mut request = Some(request); 173 174 log::trace!("Passing request to backend id={request_id:?} timestamp={timestamp}"); 175 self.backend.consume_request(&mut request)?; 176 177 if let Some(request) = request { 178 log::trace!("Backend stalled request id={request_id:?} timestamp={timestamp}"); 179 self.queue.push_front(request); 180 break; 181 } else { 182 log::debug!("Backend consumed request id={request_id:?} timestamp={timestamp}"); 183 self.processing.insert(request_id); 184 } 185 } 186 187 Ok(()) 188 } 189 190 /// [`StatefulVideoEncoderBackend`]'s instance backend(&mut self) -> &Backend191 pub fn backend(&mut self) -> &Backend { 192 &self.backend 193 } 194 } 195 196 impl<Handle, Backend> VideoEncoder<Handle> for StatefulEncoder<Handle, Backend> 197 where 198 Backend: StatefulVideoEncoderBackend<Handle>, 199 { tune(&mut self, tunings: Tunings) -> EncodeResult<()>200 fn tune(&mut self, tunings: Tunings) -> EncodeResult<()> { 201 self.tunings = tunings; 202 Ok(()) 203 } 204 encode(&mut self, meta: FrameMetadata, handle: Handle) -> Result<(), EncodeError>205 fn encode(&mut self, meta: FrameMetadata, handle: Handle) -> Result<(), EncodeError> { 206 let request_id = BackendRequestId(self.request_counter); 207 self.request_counter = self.request_counter.wrapping_add(1); 208 209 log::trace!("Got new request id={request_id:?} timestamp={}", meta.timestamp); 210 211 let request = BackendRequest { request_id, meta, handle, tunings: self.tunings.clone() }; 212 213 self.queue.push_back(request); 214 self.process()?; 215 216 Ok(()) 217 } 218 poll(&mut self) -> EncodeResult<Option<CodedBitstreamBuffer>>219 fn poll(&mut self) -> EncodeResult<Option<CodedBitstreamBuffer>> { 220 if !self.queue.is_empty() || !self.processing.is_empty() { 221 self.process()?; 222 } 223 224 if let Some(buffer) = self.coded_queue.pop_front() { 225 log::debug!("Returning coded buffer timestamp={}", buffer.metadata.timestamp); 226 return Ok(Some(buffer)); 227 } 228 Ok(None) 229 } 230 drain(&mut self) -> EncodeResult<()>231 fn drain(&mut self) -> EncodeResult<()> { 232 log::debug!( 233 "Got drain request. Pending in queue: {}. Currently processed: {:?}", 234 self.queue.len(), 235 self.processing 236 ); 237 238 while !self.queue.is_empty() { 239 self.process()?; 240 241 if !self.queue.is_empty() { 242 self.backend.sync()?; 243 } 244 } 245 246 if self.processing.is_empty() { 247 log::debug!("Skipping drain request to backend, everything is drained"); 248 } 249 250 log::debug!("Sending drain request to backend"); 251 for output in self.backend.drain()? { 252 self.handle_output(output)?; 253 } 254 255 Ok(()) 256 } 257 } 258