• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2020 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 //! Implementation of the the `Encoder` struct, which is responsible for translation between the
6 //! virtio protocols and LibVDA APIs.
7 
8 mod encoder;
9 mod libvda_encoder;
10 
11 pub use encoder::EncoderError;
12 pub use libvda_encoder::LibvdaEncoder;
13 
14 use base::{error, warn, Tube, WaitContext};
15 use std::collections::{BTreeMap, BTreeSet};
16 
17 use crate::virtio::resource_bridge::{self, BufferInfo, ResourceInfo, ResourceRequest};
18 use crate::virtio::video::async_cmd_desc_map::AsyncCmdDescMap;
19 use crate::virtio::video::command::{QueueType, VideoCmd};
20 use crate::virtio::video::control::*;
21 use crate::virtio::video::device::VideoCmdResponseType;
22 use crate::virtio::video::device::{
23     AsyncCmdResponse, AsyncCmdTag, Device, Token, VideoEvtResponseType,
24 };
25 use crate::virtio::video::encoder::encoder::{
26     Encoder, EncoderEvent, EncoderSession, InputBufferId, OutputBufferId, SessionConfig,
27     VideoFramePlane,
28 };
29 use crate::virtio::video::error::*;
30 use crate::virtio::video::event::{EvtType, VideoEvt};
31 use crate::virtio::video::format::{Format, Level, PlaneFormat, Profile};
32 use crate::virtio::video::params::Params;
33 use crate::virtio::video::protocol;
34 use crate::virtio::video::response::CmdResponse;
35 
36 #[derive(Debug)]
37 struct QueuedInputResourceParams {
38     encoder_id: InputBufferId,
39     timestamp: u64,
40     in_queue: bool,
41 }
42 
43 struct InputResource {
44     resource_handle: u128,
45     planes: Vec<VideoFramePlane>,
46     queue_params: Option<QueuedInputResourceParams>,
47 }
48 
49 #[derive(Debug)]
50 struct QueuedOutputResourceParams {
51     encoder_id: OutputBufferId,
52     timestamp: u64,
53     in_queue: bool,
54 }
55 
56 struct OutputResource {
57     resource_handle: u128,
58     offset: u32,
59     queue_params: Option<QueuedOutputResourceParams>,
60 }
61 
62 #[derive(Debug, PartialEq, Eq, Hash, Ord, PartialOrd)]
63 enum PendingCommand {
64     GetSrcParams,
65     GetDstParams,
66     Drain,
67     SrcQueueClear,
68     DstQueueClear,
69 }
70 
71 struct Stream<T: EncoderSession> {
72     id: u32,
73     src_params: Params,
74     dst_params: Params,
75     dst_bitrate: u32,
76     dst_profile: Profile,
77     dst_h264_level: Option<Level>,
78     frame_rate: u32,
79     force_keyframe: bool,
80 
81     encoder_session: Option<T>,
82     received_input_buffers_event: bool,
83 
84     src_resources: BTreeMap<u32, InputResource>,
85     encoder_input_buffer_ids: BTreeMap<InputBufferId, u32>,
86 
87     dst_resources: BTreeMap<u32, OutputResource>,
88     encoder_output_buffer_ids: BTreeMap<OutputBufferId, u32>,
89 
90     pending_commands: BTreeSet<PendingCommand>,
91     eos_notification_buffer: Option<OutputBufferId>,
92 }
93 
94 impl<T: EncoderSession> Stream<T> {
new<E: Encoder<Session = T>>( id: u32, desired_format: Format, encoder: &EncoderDevice<E>, ) -> VideoResult<Self>95     fn new<E: Encoder<Session = T>>(
96         id: u32,
97         desired_format: Format,
98         encoder: &EncoderDevice<E>,
99     ) -> VideoResult<Self> {
100         const MIN_BUFFERS: u32 = 1;
101         const MAX_BUFFERS: u32 = 342;
102         const DEFAULT_WIDTH: u32 = 640;
103         const DEFAULT_HEIGHT: u32 = 480;
104         const DEFAULT_BITRATE: u32 = 6000;
105         const DEFAULT_BUFFER_SIZE: u32 = 2097152; // 2MB; chosen empirically for 1080p video
106         const DEFAULT_FPS: u32 = 30;
107 
108         let mut src_params = Params {
109             min_buffers: MIN_BUFFERS,
110             max_buffers: MAX_BUFFERS,
111             ..Default::default()
112         };
113 
114         let cros_capabilities = &encoder.cros_capabilities;
115 
116         cros_capabilities
117             .populate_src_params(
118                 &mut src_params,
119                 Format::NV12,
120                 DEFAULT_WIDTH,
121                 DEFAULT_HEIGHT,
122                 0,
123             )
124             .map_err(|_| VideoError::InvalidArgument)?;
125 
126         let mut dst_params = Default::default();
127 
128         // In order to support requesting encoder params change, we must know the default frame
129         // rate, because VEA's request_encoding_params_change requires both framerate and
130         // bitrate to be specified.
131         cros_capabilities
132             .populate_dst_params(&mut dst_params, desired_format, DEFAULT_BUFFER_SIZE)
133             .map_err(|_| VideoError::InvalidArgument)?;
134         // `format` is an Option since for the decoder, it is not populated until decoding has
135         // started. for encoder, format should always be populated.
136         let dest_format = dst_params.format.ok_or(VideoError::InvalidArgument)?;
137 
138         let dst_profile = cros_capabilities
139             .get_default_profile(&dest_format)
140             .ok_or(VideoError::InvalidArgument)?;
141 
142         let dst_h264_level = if dest_format == Format::H264 {
143             Some(Level::H264_1_0)
144         } else {
145             None
146         };
147 
148         Ok(Self {
149             id,
150             src_params,
151             dst_params,
152             dst_bitrate: DEFAULT_BITRATE,
153             dst_profile,
154             dst_h264_level,
155             frame_rate: DEFAULT_FPS,
156             force_keyframe: false,
157             encoder_session: None,
158             received_input_buffers_event: false,
159             src_resources: Default::default(),
160             encoder_input_buffer_ids: Default::default(),
161             dst_resources: Default::default(),
162             encoder_output_buffer_ids: Default::default(),
163             pending_commands: Default::default(),
164             eos_notification_buffer: None,
165         })
166     }
167 
has_encode_session(&self) -> bool168     fn has_encode_session(&self) -> bool {
169         self.encoder_session.is_some()
170     }
171 
set_encode_session<U: Encoder<Session = T>>( &mut self, encoder: &mut U, wait_ctx: &WaitContext<Token>, ) -> VideoResult<()>172     fn set_encode_session<U: Encoder<Session = T>>(
173         &mut self,
174         encoder: &mut U,
175         wait_ctx: &WaitContext<Token>,
176     ) -> VideoResult<()> {
177         if self.encoder_session.is_some() {
178             error!(
179                 "stream {}: tried to add encode session when one already exists.",
180                 self.id
181             );
182             return Err(VideoError::InvalidOperation);
183         }
184 
185         let new_session = encoder
186             .start_session(SessionConfig {
187                 src_params: self.src_params.clone(),
188                 dst_params: self.dst_params.clone(),
189                 dst_profile: self.dst_profile,
190                 dst_bitrate: self.dst_bitrate,
191                 dst_h264_level: self.dst_h264_level.clone(),
192                 frame_rate: self.frame_rate,
193             })
194             .map_err(|_| VideoError::InvalidOperation)?;
195 
196         let event_pipe = new_session.event_pipe();
197 
198         wait_ctx
199             .add(event_pipe, Token::Event { id: self.id })
200             .map_err(|e| {
201                 error!(
202                     "stream {}: failed to add FD to poll context: {}",
203                     self.id, e
204                 );
205                 VideoError::InvalidOperation
206             })?;
207         self.encoder_session.replace(new_session);
208         self.received_input_buffers_event = false;
209         Ok(())
210     }
211 
clear_encode_session(&mut self, wait_ctx: &WaitContext<Token>) -> VideoResult<()>212     fn clear_encode_session(&mut self, wait_ctx: &WaitContext<Token>) -> VideoResult<()> {
213         if let Some(session) = self.encoder_session.take() {
214             let event_pipe = session.event_pipe();
215             wait_ctx.delete(event_pipe).map_err(|e| {
216                 error!(
217                     "stream: {}: failed to remove fd from poll context: {}",
218                     self.id, e
219                 );
220                 VideoError::InvalidOperation
221             })?;
222         }
223         Ok(())
224     }
225 
require_input_buffers( &mut self, input_count: u32, input_frame_width: u32, input_frame_height: u32, output_buffer_size: u32, ) -> Option<Vec<VideoEvtResponseType>>226     fn require_input_buffers(
227         &mut self,
228         input_count: u32,
229         input_frame_width: u32,
230         input_frame_height: u32,
231         output_buffer_size: u32,
232     ) -> Option<Vec<VideoEvtResponseType>> {
233         // TODO(alexlau): Does this always arrive after start_session,
234         // but before the first encode call?
235         // TODO(alexlau): set plane info from input_frame_width and input_frame_height
236         self.src_params.min_buffers = input_count;
237         self.src_params.max_buffers = 32;
238         self.src_params.frame_width = input_frame_width;
239         self.src_params.frame_height = input_frame_height;
240         self.dst_params.plane_formats[0].plane_size = output_buffer_size;
241         self.received_input_buffers_event = true;
242 
243         let mut responses = vec![];
244 
245         // Respond to any GetParams commands that were waiting.
246         if self.pending_commands.remove(&PendingCommand::GetSrcParams) {
247             responses.push(VideoEvtResponseType::AsyncCmd(
248                 AsyncCmdResponse::from_response(
249                     AsyncCmdTag::GetParams {
250                         stream_id: self.id,
251                         queue_type: QueueType::Input,
252                     },
253                     CmdResponse::GetParams {
254                         queue_type: QueueType::Input,
255                         params: self.src_params.clone(),
256                     },
257                 ),
258             ));
259         }
260         if self.pending_commands.remove(&PendingCommand::GetDstParams) {
261             responses.push(VideoEvtResponseType::AsyncCmd(
262                 AsyncCmdResponse::from_response(
263                     AsyncCmdTag::GetParams {
264                         stream_id: self.id,
265                         queue_type: QueueType::Output,
266                     },
267                     CmdResponse::GetParams {
268                         queue_type: QueueType::Output,
269                         params: self.dst_params.clone(),
270                     },
271                 ),
272             ));
273         }
274 
275         if responses.len() > 0 {
276             Some(responses)
277         } else {
278             None
279         }
280     }
281 
processed_input_buffer( &mut self, input_buffer_id: InputBufferId, ) -> Option<Vec<VideoEvtResponseType>>282     fn processed_input_buffer(
283         &mut self,
284         input_buffer_id: InputBufferId,
285     ) -> Option<Vec<VideoEvtResponseType>> {
286         let resource_id = *match self.encoder_input_buffer_ids.get(&input_buffer_id) {
287             Some(id) => id,
288             None => {
289                 warn!("Received processed input buffer event for input buffer id {}, but missing resource, ResourceDestroyAll?", input_buffer_id);
290                 return None;
291             }
292         };
293 
294         let resource = match self.src_resources.get_mut(&resource_id) {
295             Some(r) => r,
296             None => {
297                 error!(
298                     "Received processed input buffer event but missing resource with id {}",
299                     resource_id
300                 );
301                 return None;
302             }
303         };
304 
305         let queue_params = match resource.queue_params.take() {
306             Some(p) => p,
307             None => {
308                 error!(
309                     "Received processed input buffer event but resource with id {} was not queued.",
310                     resource_id
311                 );
312                 return None;
313             }
314         };
315 
316         if !queue_params.in_queue {
317             // A QueueClear command occurred after this buffer was queued.
318             return None;
319         }
320 
321         let tag = AsyncCmdTag::Queue {
322             stream_id: self.id,
323             queue_type: QueueType::Input,
324             resource_id,
325         };
326 
327         let resp = CmdResponse::ResourceQueue {
328             timestamp: queue_params.timestamp,
329             flags: 0,
330             size: 0,
331         };
332 
333         Some(vec![VideoEvtResponseType::AsyncCmd(
334             AsyncCmdResponse::from_response(tag, resp),
335         )])
336     }
337 
processed_output_buffer( &mut self, output_buffer_id: OutputBufferId, bytesused: u32, keyframe: bool, timestamp: u64, ) -> Option<Vec<VideoEvtResponseType>>338     fn processed_output_buffer(
339         &mut self,
340         output_buffer_id: OutputBufferId,
341         bytesused: u32,
342         keyframe: bool,
343         timestamp: u64,
344     ) -> Option<Vec<VideoEvtResponseType>> {
345         let resource_id = *match self.encoder_output_buffer_ids.get(&output_buffer_id) {
346             Some(id) => id,
347             None => {
348                 warn!("Received processed output buffer event for output buffer id {}, but missing resource, ResourceDestroyAll?", output_buffer_id);
349                 return None;
350             }
351         };
352 
353         let resource = match self.dst_resources.get_mut(&resource_id) {
354             Some(r) => r,
355             None => {
356                 error!(
357                     "Received processed output buffer event but missing resource with id {}",
358                     resource_id
359                 );
360                 return None;
361             }
362         };
363 
364         let queue_params = match resource.queue_params.take() {
365             Some(p) => p,
366             None => {
367                 error!("Received processed output buffer event but resource with id {} was not queued.", resource_id);
368                 return None;
369             }
370         };
371 
372         if !queue_params.in_queue {
373             // A QueueClear command occurred after this buffer was queued.
374             return None;
375         }
376 
377         let tag = AsyncCmdTag::Queue {
378             stream_id: self.id,
379             queue_type: QueueType::Output,
380             resource_id,
381         };
382 
383         let resp = CmdResponse::ResourceQueue {
384             timestamp,
385             // At the moment, a buffer is saved in `eos_notification_buffer`, and
386             // the EOS flag is populated and returned after a flush() command.
387             // TODO(b/149725148): Populate flags once libvda supports it.
388             flags: if keyframe {
389                 protocol::VIRTIO_VIDEO_BUFFER_FLAG_IFRAME
390             } else {
391                 0
392             },
393             size: bytesused,
394         };
395 
396         Some(vec![VideoEvtResponseType::AsyncCmd(
397             AsyncCmdResponse::from_response(tag, resp),
398         )])
399     }
400 
flush_response(&mut self, flush_done: bool) -> Option<Vec<VideoEvtResponseType>>401     fn flush_response(&mut self, flush_done: bool) -> Option<Vec<VideoEvtResponseType>> {
402         let command_response = if flush_done {
403             CmdResponse::NoData
404         } else {
405             error!("Flush could not be completed for stream {}", self.id);
406             VideoError::InvalidOperation.into()
407         };
408 
409         let mut async_responses = vec![];
410 
411         let eos_resource_id = match self.eos_notification_buffer {
412             Some(r) => r,
413             None => {
414                 error!(
415                     "No EOS resource available on successful flush response (stream id {})",
416                     self.id
417                 );
418                 return Some(vec![VideoEvtResponseType::Event(VideoEvt {
419                     typ: EvtType::Error,
420                     stream_id: self.id,
421                 })]);
422             }
423         };
424 
425         let eos_tag = AsyncCmdTag::Queue {
426             stream_id: self.id,
427             queue_type: QueueType::Output,
428             resource_id: eos_resource_id,
429         };
430 
431         let eos_response = CmdResponse::ResourceQueue {
432             timestamp: 0,
433             flags: protocol::VIRTIO_VIDEO_BUFFER_FLAG_EOS,
434             size: 0,
435         };
436 
437         async_responses.push(VideoEvtResponseType::AsyncCmd(
438             AsyncCmdResponse::from_response(eos_tag, eos_response),
439         ));
440 
441         if self.pending_commands.remove(&PendingCommand::Drain) {
442             async_responses.push(VideoEvtResponseType::AsyncCmd(
443                 AsyncCmdResponse::from_response(
444                     AsyncCmdTag::Drain { stream_id: self.id },
445                     command_response.clone(),
446                 ),
447             ));
448         }
449 
450         if self.pending_commands.remove(&PendingCommand::SrcQueueClear) {
451             async_responses.push(VideoEvtResponseType::AsyncCmd(
452                 AsyncCmdResponse::from_response(
453                     AsyncCmdTag::Clear {
454                         stream_id: self.id,
455                         queue_type: QueueType::Input,
456                     },
457                     command_response.clone(),
458                 ),
459             ));
460         }
461 
462         if self.pending_commands.remove(&PendingCommand::DstQueueClear) {
463             async_responses.push(VideoEvtResponseType::AsyncCmd(
464                 AsyncCmdResponse::from_response(
465                     AsyncCmdTag::Clear {
466                         stream_id: self.id,
467                         queue_type: QueueType::Output,
468                     },
469                     command_response,
470                 ),
471             ));
472         }
473 
474         if async_responses.is_empty() {
475             error!("Received flush response but there are no pending commands.");
476             None
477         } else {
478             Some(async_responses)
479         }
480     }
481 
482     #[allow(clippy::unnecessary_wraps)]
notify_error(&self, error: EncoderError) -> Option<Vec<VideoEvtResponseType>>483     fn notify_error(&self, error: EncoderError) -> Option<Vec<VideoEvtResponseType>> {
484         error!(
485             "Received encoder error event for stream {}: {}",
486             self.id, error
487         );
488         Some(vec![VideoEvtResponseType::Event(VideoEvt {
489             typ: EvtType::Error,
490             stream_id: self.id,
491         })])
492     }
493 }
494 
495 pub struct EncoderDevice<T: Encoder> {
496     cros_capabilities: encoder::EncoderCapabilities,
497     encoder: T,
498     streams: BTreeMap<u32, Stream<T::Session>>,
499 }
500 
get_resource_info(res_bridge: &Tube, uuid: u128) -> VideoResult<BufferInfo>501 fn get_resource_info(res_bridge: &Tube, uuid: u128) -> VideoResult<BufferInfo> {
502     match resource_bridge::get_resource_info(
503         res_bridge,
504         ResourceRequest::GetBuffer { id: uuid as u32 },
505     ) {
506         Ok(ResourceInfo::Buffer(buffer_info)) => Ok(buffer_info),
507         Ok(_) => Err(VideoError::InvalidArgument),
508         Err(e) => Err(VideoError::ResourceBridgeFailure(e)),
509     }
510 }
511 
512 impl<T: Encoder> EncoderDevice<T> {
new(encoder: T) -> encoder::Result<Self>513     pub fn new(encoder: T) -> encoder::Result<Self> {
514         Ok(Self {
515             cros_capabilities: encoder.query_capabilities()?,
516             encoder,
517             streams: Default::default(),
518         })
519     }
520 
521     #[allow(clippy::unnecessary_wraps)]
query_capabilities(&self, queue_type: QueueType) -> VideoResult<VideoCmdResponseType>522     fn query_capabilities(&self, queue_type: QueueType) -> VideoResult<VideoCmdResponseType> {
523         let descs = match queue_type {
524             QueueType::Input => self.cros_capabilities.input_format_descs.clone(),
525             QueueType::Output => self.cros_capabilities.output_format_descs.clone(),
526         };
527         Ok(VideoCmdResponseType::Sync(CmdResponse::QueryCapability(
528             descs,
529         )))
530     }
531 
stream_create( &mut self, stream_id: u32, desired_format: Format, ) -> VideoResult<VideoCmdResponseType>532     fn stream_create(
533         &mut self,
534         stream_id: u32,
535         desired_format: Format,
536     ) -> VideoResult<VideoCmdResponseType> {
537         if self.streams.contains_key(&stream_id) {
538             return Err(VideoError::InvalidStreamId(stream_id));
539         }
540         let new_stream = Stream::new(stream_id, desired_format, self)?;
541 
542         self.streams.insert(stream_id, new_stream);
543         Ok(VideoCmdResponseType::Sync(CmdResponse::NoData))
544     }
545 
stream_destroy(&mut self, stream_id: u32) -> VideoResult<VideoCmdResponseType>546     fn stream_destroy(&mut self, stream_id: u32) -> VideoResult<VideoCmdResponseType> {
547         let mut stream = self
548             .streams
549             .remove(&stream_id)
550             .ok_or(VideoError::InvalidStreamId(stream_id))?;
551         // TODO(alexlau): Handle resources that have been queued.
552         if let Some(session) = stream.encoder_session.take() {
553             if let Err(e) = self.encoder.stop_session(session) {
554                 error!("Failed to stop encode session {}: {}", stream_id, e);
555             }
556         }
557         Ok(VideoCmdResponseType::Sync(CmdResponse::NoData))
558     }
559 
stream_drain(&mut self, stream_id: u32) -> VideoResult<VideoCmdResponseType>560     fn stream_drain(&mut self, stream_id: u32) -> VideoResult<VideoCmdResponseType> {
561         let stream = self
562             .streams
563             .get_mut(&stream_id)
564             .ok_or(VideoError::InvalidStreamId(stream_id))?;
565         match stream.encoder_session {
566             Some(ref mut session) => {
567                 if stream.pending_commands.contains(&PendingCommand::Drain) {
568                     error!("A pending Drain command already exists.");
569                     return Err(VideoError::InvalidOperation);
570                 }
571                 stream.pending_commands.insert(PendingCommand::Drain);
572 
573                 if !stream
574                     .pending_commands
575                     .contains(&PendingCommand::SrcQueueClear)
576                     && !stream
577                         .pending_commands
578                         .contains(&PendingCommand::DstQueueClear)
579                 {
580                     // If a source or dest QueueClear is underway, a flush has
581                     // already been sent.
582                     if let Err(e) = session.flush() {
583                         error!("Flush failed for stream id {}: {}", stream_id, e);
584                     }
585                 }
586                 Ok(VideoCmdResponseType::Async(AsyncCmdTag::Drain {
587                     stream_id,
588                 }))
589             }
590             None => {
591                 // Return an OK response since nothing has been queued yet.
592                 Ok(VideoCmdResponseType::Sync(CmdResponse::NoData))
593             }
594         }
595     }
596 
resource_create( &mut self, wait_ctx: &WaitContext<Token>, resource_bridge: &Tube, stream_id: u32, queue_type: QueueType, resource_id: u32, plane_offsets: Vec<u32>, uuid: u128, ) -> VideoResult<VideoCmdResponseType>597     fn resource_create(
598         &mut self,
599         wait_ctx: &WaitContext<Token>,
600         resource_bridge: &Tube,
601         stream_id: u32,
602         queue_type: QueueType,
603         resource_id: u32,
604         plane_offsets: Vec<u32>,
605         uuid: u128,
606     ) -> VideoResult<VideoCmdResponseType> {
607         let stream = self
608             .streams
609             .get_mut(&stream_id)
610             .ok_or(VideoError::InvalidStreamId(stream_id))?;
611 
612         if !stream.has_encode_session() {
613             // No encode session would have been created upon the first
614             // QBUF if there was no previous S_FMT call.
615             stream.set_encode_session(&mut self.encoder, wait_ctx)?;
616         }
617 
618         let num_planes = plane_offsets.len();
619 
620         match queue_type {
621             QueueType::Input => {
622                 if num_planes != stream.src_params.plane_formats.len() {
623                     return Err(VideoError::InvalidParameter);
624                 }
625 
626                 if stream.src_resources.contains_key(&resource_id) {
627                     warn!("Replacing source resource with id {}", resource_id);
628                 }
629 
630                 let resource_info = get_resource_info(resource_bridge, uuid)?;
631 
632                 let planes: Vec<VideoFramePlane> = resource_info.planes[0..num_planes]
633                     .into_iter()
634                     .map(|plane_info| VideoFramePlane {
635                         offset: plane_info.offset as usize,
636                         stride: plane_info.stride as usize,
637                     })
638                     .collect();
639 
640                 stream.src_resources.insert(
641                     resource_id,
642                     InputResource {
643                         resource_handle: uuid,
644                         planes,
645                         queue_params: None,
646                     },
647                 );
648             }
649             QueueType::Output => {
650                 if num_planes != stream.dst_params.plane_formats.len() {
651                     return Err(VideoError::InvalidParameter);
652                 }
653 
654                 if stream.dst_resources.contains_key(&resource_id) {
655                     warn!("Replacing dest resource with id {}", resource_id);
656                 }
657 
658                 let offset = plane_offsets[0];
659                 stream.dst_resources.insert(
660                     resource_id,
661                     OutputResource {
662                         resource_handle: uuid,
663                         offset,
664                         queue_params: None,
665                     },
666                 );
667             }
668         }
669 
670         Ok(VideoCmdResponseType::Sync(CmdResponse::NoData))
671     }
672 
resource_queue( &mut self, resource_bridge: &Tube, stream_id: u32, queue_type: QueueType, resource_id: u32, timestamp: u64, data_sizes: Vec<u32>, ) -> VideoResult<VideoCmdResponseType>673     fn resource_queue(
674         &mut self,
675         resource_bridge: &Tube,
676         stream_id: u32,
677         queue_type: QueueType,
678         resource_id: u32,
679         timestamp: u64,
680         data_sizes: Vec<u32>,
681     ) -> VideoResult<VideoCmdResponseType> {
682         let stream = self
683             .streams
684             .get_mut(&stream_id)
685             .ok_or(VideoError::InvalidStreamId(stream_id))?;
686 
687         let encoder_session = match stream.encoder_session {
688             Some(ref mut e) => e,
689             None => {
690                 // The encoder session is created on the first ResourceCreate,
691                 // so it should exist here.
692                 error!("Encoder session did not exist at resource_queue.");
693                 return Err(VideoError::InvalidOperation);
694             }
695         };
696 
697         match queue_type {
698             QueueType::Input => {
699                 if data_sizes.len() != stream.src_params.plane_formats.len() {
700                     return Err(VideoError::InvalidParameter);
701                 }
702 
703                 let src_resource = stream.src_resources.get_mut(&resource_id).ok_or(
704                     VideoError::InvalidResourceId {
705                         stream_id,
706                         resource_id,
707                     },
708                 )?;
709 
710                 let resource_info =
711                     get_resource_info(resource_bridge, src_resource.resource_handle)?;
712 
713                 let force_keyframe = std::mem::replace(&mut stream.force_keyframe, false);
714 
715                 match encoder_session.encode(
716                     resource_info.file,
717                     &src_resource.planes,
718                     timestamp,
719                     force_keyframe,
720                 ) {
721                     Ok(input_buffer_id) => {
722                         if let Some(last_resource_id) = stream
723                             .encoder_input_buffer_ids
724                             .insert(input_buffer_id, resource_id)
725                         {
726                             error!(
727                                 "encoder input id {} was already mapped to resource id {}",
728                                 input_buffer_id, last_resource_id
729                             );
730                             return Err(VideoError::InvalidOperation);
731                         }
732                         let queue_params = QueuedInputResourceParams {
733                             encoder_id: input_buffer_id,
734                             timestamp,
735                             in_queue: true,
736                         };
737                         if let Some(last_queue_params) =
738                             src_resource.queue_params.replace(queue_params)
739                         {
740                             if last_queue_params.in_queue {
741                                 error!(
742                                     "resource {} was already queued ({:?})",
743                                     resource_id, last_queue_params
744                                 );
745                                 return Err(VideoError::InvalidOperation);
746                             }
747                         }
748                     }
749                     Err(e) => {
750                         // TODO(alexlau): Return the actual error
751                         error!("encode failed: {}", e);
752                         return Err(VideoError::InvalidOperation);
753                     }
754                 }
755                 Ok(VideoCmdResponseType::Async(AsyncCmdTag::Queue {
756                     stream_id,
757                     queue_type: QueueType::Input,
758                     resource_id,
759                 }))
760             }
761             QueueType::Output => {
762                 if data_sizes.len() != stream.dst_params.plane_formats.len() {
763                     return Err(VideoError::InvalidParameter);
764                 }
765 
766                 let dst_resource = stream.dst_resources.get_mut(&resource_id).ok_or(
767                     VideoError::InvalidResourceId {
768                         stream_id,
769                         resource_id,
770                     },
771                 )?;
772 
773                 let resource_info =
774                     get_resource_info(resource_bridge, dst_resource.resource_handle)?;
775 
776                 let mut buffer_size = data_sizes[0];
777 
778                 // It seems that data_sizes[0] is 0 here. For now, take the stride
779                 // from resource_info instead because we're always allocating <size> x 1
780                 // blobs..
781                 // TODO(alexlau): Figure out how to fix this.
782                 if buffer_size == 0 {
783                     buffer_size = resource_info.planes[0].offset + resource_info.planes[0].stride;
784                 }
785 
786                 // Stores an output buffer to notify EOS.
787                 // This is necessary because libvda is unable to indicate EOS along with returned buffers.
788                 // For now, when a `Flush()` completes, this saved resource will be returned as a zero-sized
789                 // buffer with the EOS flag.
790                 if stream.eos_notification_buffer.is_none() {
791                     stream.eos_notification_buffer = Some(resource_id);
792                     return Ok(VideoCmdResponseType::Async(AsyncCmdTag::Queue {
793                         stream_id,
794                         queue_type: QueueType::Output,
795                         resource_id,
796                     }));
797                 }
798 
799                 match encoder_session.use_output_buffer(
800                     resource_info.file,
801                     dst_resource.offset,
802                     buffer_size,
803                 ) {
804                     Ok(output_buffer_id) => {
805                         if let Some(last_resource_id) = stream
806                             .encoder_output_buffer_ids
807                             .insert(output_buffer_id, resource_id)
808                         {
809                             error!(
810                                 "encoder output id {} was already mapped to resource id {}",
811                                 output_buffer_id, last_resource_id
812                             );
813                         }
814                         let queue_params = QueuedOutputResourceParams {
815                             encoder_id: output_buffer_id,
816                             timestamp,
817                             in_queue: true,
818                         };
819                         if let Some(last_queue_params) =
820                             dst_resource.queue_params.replace(queue_params)
821                         {
822                             if last_queue_params.in_queue {
823                                 error!(
824                                     "resource {} was already queued ({:?})",
825                                     resource_id, last_queue_params
826                                 );
827                             }
828                         }
829                     }
830                     Err(e) => {
831                         error!("use_output_buffer failed: {}", e);
832                         return Err(VideoError::InvalidOperation);
833                     }
834                 }
835                 Ok(VideoCmdResponseType::Async(AsyncCmdTag::Queue {
836                     stream_id,
837                     queue_type: QueueType::Output,
838                     resource_id,
839                 }))
840             }
841         }
842     }
843 
resource_destroy_all(&mut self, stream_id: u32) -> VideoResult<VideoCmdResponseType>844     fn resource_destroy_all(&mut self, stream_id: u32) -> VideoResult<VideoCmdResponseType> {
845         let stream = self
846             .streams
847             .get_mut(&stream_id)
848             .ok_or(VideoError::InvalidStreamId(stream_id))?;
849         stream.src_resources.clear();
850         stream.encoder_input_buffer_ids.clear();
851         stream.dst_resources.clear();
852         stream.encoder_output_buffer_ids.clear();
853         stream.eos_notification_buffer.take();
854         Ok(VideoCmdResponseType::Sync(CmdResponse::NoData))
855     }
856 
queue_clear( &mut self, stream_id: u32, queue_type: QueueType, ) -> VideoResult<VideoCmdResponseType>857     fn queue_clear(
858         &mut self,
859         stream_id: u32,
860         queue_type: QueueType,
861     ) -> VideoResult<VideoCmdResponseType> {
862         // Unfortunately, there is no way to clear the queue with VEA.
863         // VDA has Reset() which also isn't done on a per-queue basis,
864         // but VEA has no such API.
865         // Doing a Flush() here and waiting for the flush response is also
866         // not an option, because the virtio-video driver expects a prompt
867         // response (search for "timed out waiting for queue clear" in
868         // virtio_video_enc.c).
869         // So for now, we do a Flush(), but also mark each currently
870         // queued resource as no longer `in_queue`, and skip them when they
871         // are returned.
872         // TODO(b/153406792): Support per-queue clearing.
873         let stream = self
874             .streams
875             .get_mut(&stream_id)
876             .ok_or(VideoError::InvalidStreamId(stream_id))?;
877 
878         match queue_type {
879             QueueType::Input => {
880                 for src_resource in stream.src_resources.values_mut() {
881                     if let Some(ref mut queue_params) = src_resource.queue_params {
882                         queue_params.in_queue = false;
883                     }
884                 }
885             }
886             QueueType::Output => {
887                 for dst_resource in stream.dst_resources.values_mut() {
888                     if let Some(ref mut queue_params) = dst_resource.queue_params {
889                         queue_params.in_queue = false;
890                     }
891                 }
892                 stream.eos_notification_buffer = None;
893             }
894         }
895         Ok(VideoCmdResponseType::Sync(CmdResponse::NoData))
896     }
897 
get_params( &mut self, stream_id: u32, queue_type: QueueType, ) -> VideoResult<VideoCmdResponseType>898     fn get_params(
899         &mut self,
900         stream_id: u32,
901         queue_type: QueueType,
902     ) -> VideoResult<VideoCmdResponseType> {
903         let stream = self
904             .streams
905             .get_mut(&stream_id)
906             .ok_or(VideoError::InvalidStreamId(stream_id))?;
907 
908         if stream.encoder_session.is_some() && !stream.received_input_buffers_event {
909             // If we haven't yet received an RequireInputBuffers
910             // event, we need to wait for that before replying so that
911             // the G_FMT response has the correct data.
912             let pending_command = match queue_type {
913                 QueueType::Input => PendingCommand::GetSrcParams,
914                 QueueType::Output => PendingCommand::GetDstParams,
915             };
916 
917             if !stream.pending_commands.insert(pending_command) {
918                 // There is already a G_FMT call waiting.
919                 error!("Pending get params call already exists.");
920                 return Err(VideoError::InvalidOperation);
921             }
922 
923             Ok(VideoCmdResponseType::Async(AsyncCmdTag::GetParams {
924                 stream_id,
925                 queue_type,
926             }))
927         } else {
928             let params = match queue_type {
929                 QueueType::Input => stream.src_params.clone(),
930                 QueueType::Output => stream.dst_params.clone(),
931             };
932             Ok(VideoCmdResponseType::Sync(CmdResponse::GetParams {
933                 queue_type,
934                 params,
935             }))
936         }
937     }
938 
set_params( &mut self, wait_ctx: &WaitContext<Token>, stream_id: u32, queue_type: QueueType, format: Option<Format>, frame_width: u32, frame_height: u32, frame_rate: u32, plane_formats: Vec<PlaneFormat>, ) -> VideoResult<VideoCmdResponseType>939     fn set_params(
940         &mut self,
941         wait_ctx: &WaitContext<Token>,
942         stream_id: u32,
943         queue_type: QueueType,
944         format: Option<Format>,
945         frame_width: u32,
946         frame_height: u32,
947         frame_rate: u32,
948         plane_formats: Vec<PlaneFormat>,
949     ) -> VideoResult<VideoCmdResponseType> {
950         let stream = self
951             .streams
952             .get_mut(&stream_id)
953             .ok_or(VideoError::InvalidStreamId(stream_id))?;
954 
955         if stream.src_resources.len() > 0 || stream.dst_resources.len() > 0 {
956             // Buffers have already been queued and encoding has already started.
957             return Err(VideoError::InvalidOperation);
958         }
959 
960         match queue_type {
961             QueueType::Input => {
962                 // There should be at least a single plane.
963                 if plane_formats.is_empty() {
964                     return Err(VideoError::InvalidArgument);
965                 }
966 
967                 let desired_format = format.or(stream.src_params.format).unwrap_or(Format::NV12);
968                 self.cros_capabilities
969                     .populate_src_params(
970                         &mut stream.src_params,
971                         desired_format,
972                         frame_width,
973                         frame_height,
974                         plane_formats[0].stride,
975                     )
976                     .map_err(VideoError::EncoderImpl)?;
977 
978                 // Following the V4L2 standard the framerate requested on the
979                 // input queue should also be applied to the output queue.
980                 if frame_rate > 0 {
981                     stream.frame_rate = frame_rate;
982                 }
983             }
984             QueueType::Output => {
985                 let desired_format = format.or(stream.dst_params.format).unwrap_or(Format::H264);
986 
987                 // There should be exactly one output buffer.
988                 if plane_formats.len() != 1 {
989                     return Err(VideoError::InvalidArgument);
990                 }
991 
992                 self.cros_capabilities
993                     .populate_dst_params(
994                         &mut stream.dst_params,
995                         desired_format,
996                         plane_formats[0].plane_size,
997                     )
998                     .map_err(VideoError::EncoderImpl)?;
999 
1000                 if frame_rate > 0 {
1001                     stream.frame_rate = frame_rate;
1002                 }
1003 
1004                 // Format is always populated for encoder.
1005                 let new_format = stream
1006                     .dst_params
1007                     .format
1008                     .ok_or(VideoError::InvalidArgument)?;
1009 
1010                 // If the selected profile no longer corresponds to the selected coded format,
1011                 // reset it.
1012                 stream.dst_profile = self
1013                     .cros_capabilities
1014                     .get_default_profile(&new_format)
1015                     .ok_or(VideoError::InvalidArgument)?;
1016 
1017                 if new_format == Format::H264 {
1018                     stream.dst_h264_level = Some(Level::H264_1_0);
1019                 } else {
1020                     stream.dst_h264_level = None;
1021                 }
1022             }
1023         }
1024 
1025         // An encoder session has to be created immediately upon a SetParams
1026         // (S_FMT) call, because we need to receive the RequireInputBuffers
1027         // callback which has output buffer size info, in order to populate
1028         // dst_params to have the correct size on subsequent GetParams (G_FMT) calls.
1029         if stream.encoder_session.is_some() {
1030             stream.clear_encode_session(wait_ctx)?;
1031             if !stream.received_input_buffers_event {
1032                 // This could happen if two SetParams calls are occuring at the same time.
1033                 // For example, the user calls SetParams for the input queue on one thread,
1034                 // and a new encode session is created. Then on another thread, SetParams
1035                 // is called for the output queue before the first SetParams call has returned.
1036                 // At this point, there is a new EncodeSession being created that has not
1037                 // yet received a RequireInputBuffers event.
1038                 // Even if we clear the encoder session and recreate it, this case
1039                 // is handled because stream.pending_commands will still contain
1040                 // the waiting GetParams responses, which will then receive fresh data once
1041                 // the new session's RequireInputBuffers event happens.
1042                 warn!("New encoder session being created while waiting for RequireInputBuffers.")
1043             }
1044         }
1045         stream.set_encode_session(&mut self.encoder, wait_ctx)?;
1046         Ok(VideoCmdResponseType::Sync(CmdResponse::NoData))
1047     }
1048 
query_control(&self, query_ctrl_type: QueryCtrlType) -> VideoResult<VideoCmdResponseType>1049     fn query_control(&self, query_ctrl_type: QueryCtrlType) -> VideoResult<VideoCmdResponseType> {
1050         let query_ctrl_response = match query_ctrl_type {
1051             QueryCtrlType::Profile(format) => match self.cros_capabilities.get_profiles(&format) {
1052                 Some(profiles) => QueryCtrlResponse::Profile(profiles.clone()),
1053                 None => {
1054                     return Err(VideoError::UnsupportedControl(CtrlType::Profile));
1055                 }
1056             },
1057             QueryCtrlType::Level(format) => {
1058                 match format {
1059                     Format::H264 => QueryCtrlResponse::Level(vec![
1060                         Level::H264_1_0,
1061                         Level::H264_1_1,
1062                         Level::H264_1_2,
1063                         Level::H264_1_3,
1064                         Level::H264_2_0,
1065                         Level::H264_2_1,
1066                         Level::H264_2_2,
1067                         Level::H264_3_0,
1068                         Level::H264_3_1,
1069                         Level::H264_3_2,
1070                         Level::H264_4_0,
1071                         Level::H264_4_1,
1072                         Level::H264_4_2,
1073                         Level::H264_5_0,
1074                         Level::H264_5_1,
1075                     ]),
1076                     _ => {
1077                         // Levels are only supported for H264.
1078                         return Err(VideoError::UnsupportedControl(CtrlType::Level));
1079                     }
1080                 }
1081             }
1082         };
1083 
1084         Ok(VideoCmdResponseType::Sync(CmdResponse::QueryControl(
1085             query_ctrl_response,
1086         )))
1087     }
1088 
get_control( &self, stream_id: u32, ctrl_type: CtrlType, ) -> VideoResult<VideoCmdResponseType>1089     fn get_control(
1090         &self,
1091         stream_id: u32,
1092         ctrl_type: CtrlType,
1093     ) -> VideoResult<VideoCmdResponseType> {
1094         let stream = self
1095             .streams
1096             .get(&stream_id)
1097             .ok_or(VideoError::InvalidStreamId(stream_id))?;
1098         let ctrl_val = match ctrl_type {
1099             CtrlType::Bitrate => CtrlVal::Bitrate(stream.dst_bitrate),
1100             CtrlType::Profile => CtrlVal::Profile(stream.dst_profile),
1101             CtrlType::Level => {
1102                 let format = stream
1103                     .dst_params
1104                     .format
1105                     .ok_or(VideoError::InvalidArgument)?;
1106                 match format {
1107                     Format::H264 => CtrlVal::Level(stream.dst_h264_level.ok_or_else(|| {
1108                         error!("H264 level not set");
1109                         VideoError::InvalidArgument
1110                     })?),
1111                     _ => {
1112                         return Err(VideoError::UnsupportedControl(ctrl_type));
1113                     }
1114                 }
1115             }
1116             // Button controls should not be queried.
1117             CtrlType::ForceKeyframe => return Err(VideoError::UnsupportedControl(ctrl_type)),
1118         };
1119         Ok(VideoCmdResponseType::Sync(CmdResponse::GetControl(
1120             ctrl_val,
1121         )))
1122     }
1123 
set_control( &mut self, stream_id: u32, ctrl_val: CtrlVal, ) -> VideoResult<VideoCmdResponseType>1124     fn set_control(
1125         &mut self,
1126         stream_id: u32,
1127         ctrl_val: CtrlVal,
1128     ) -> VideoResult<VideoCmdResponseType> {
1129         let stream = self
1130             .streams
1131             .get_mut(&stream_id)
1132             .ok_or(VideoError::InvalidStreamId(stream_id))?;
1133         match ctrl_val {
1134             CtrlVal::Bitrate(bitrate) => {
1135                 if let Some(ref mut encoder_session) = stream.encoder_session {
1136                     if let Err(e) =
1137                         encoder_session.request_encoding_params_change(bitrate, stream.frame_rate)
1138                     {
1139                         error!(
1140                             "failed to dynamically request encoding params change: {}",
1141                             e
1142                         );
1143                         return Err(VideoError::InvalidOperation);
1144                     }
1145                 }
1146                 stream.dst_bitrate = bitrate;
1147             }
1148             CtrlVal::Profile(profile) => {
1149                 if stream.encoder_session.is_some() {
1150                     // TODO(alexlau): If no resources have yet been queued,
1151                     // should the encoder session be recreated with the new
1152                     // desired level?
1153                     error!("set control called for profile but encoder session already exists.");
1154                     return Err(VideoError::InvalidOperation);
1155                 }
1156                 let format = stream
1157                     .dst_params
1158                     .format
1159                     .ok_or(VideoError::InvalidArgument)?;
1160                 if format != profile.to_format() {
1161                     error!(
1162                         "specified profile does not correspond to the selected format ({})",
1163                         format
1164                     );
1165                     return Err(VideoError::InvalidOperation);
1166                 }
1167                 stream.dst_profile = profile;
1168             }
1169             CtrlVal::Level(level) => {
1170                 if stream.encoder_session.is_some() {
1171                     // TODO(alexlau): If no resources have yet been queued,
1172                     // should the encoder session be recreated with the new
1173                     // desired level?
1174                     error!("set control called for level but encoder session already exists.");
1175                     return Err(VideoError::InvalidOperation);
1176                 }
1177                 let format = stream
1178                     .dst_params
1179                     .format
1180                     .ok_or(VideoError::InvalidArgument)?;
1181                 if format != Format::H264 {
1182                     error!(
1183                         "set control called for level but format is not H264 ({})",
1184                         format
1185                     );
1186                     return Err(VideoError::InvalidOperation);
1187                 }
1188                 stream.dst_h264_level = Some(level);
1189             }
1190             CtrlVal::ForceKeyframe() => {
1191                 stream.force_keyframe = true;
1192             }
1193         }
1194         Ok(VideoCmdResponseType::Sync(CmdResponse::SetControl))
1195     }
1196 }
1197 
1198 impl<T: Encoder> Device for EncoderDevice<T> {
process_cmd( &mut self, req: VideoCmd, wait_ctx: &WaitContext<Token>, resource_bridge: &Tube, ) -> ( VideoCmdResponseType, Option<(u32, Vec<VideoEvtResponseType>)>, )1199     fn process_cmd(
1200         &mut self,
1201         req: VideoCmd,
1202         wait_ctx: &WaitContext<Token>,
1203         resource_bridge: &Tube,
1204     ) -> (
1205         VideoCmdResponseType,
1206         Option<(u32, Vec<VideoEvtResponseType>)>,
1207     ) {
1208         let cmd_response = match req {
1209             VideoCmd::QueryCapability { queue_type } => self.query_capabilities(queue_type),
1210             VideoCmd::StreamCreate {
1211                 stream_id,
1212                 coded_format: desired_format,
1213             } => self.stream_create(stream_id, desired_format),
1214             VideoCmd::StreamDestroy { stream_id } => self.stream_destroy(stream_id),
1215             VideoCmd::StreamDrain { stream_id } => self.stream_drain(stream_id),
1216             VideoCmd::ResourceCreate {
1217                 stream_id,
1218                 queue_type,
1219                 resource_id,
1220                 plane_offsets,
1221                 uuid,
1222             } => self.resource_create(
1223                 wait_ctx,
1224                 resource_bridge,
1225                 stream_id,
1226                 queue_type,
1227                 resource_id,
1228                 plane_offsets,
1229                 uuid,
1230             ),
1231             VideoCmd::ResourceQueue {
1232                 stream_id,
1233                 queue_type,
1234                 resource_id,
1235                 timestamp,
1236                 data_sizes,
1237             } => self.resource_queue(
1238                 resource_bridge,
1239                 stream_id,
1240                 queue_type,
1241                 resource_id,
1242                 timestamp,
1243                 data_sizes,
1244             ),
1245             VideoCmd::ResourceDestroyAll { stream_id, .. } => self.resource_destroy_all(stream_id),
1246             VideoCmd::QueueClear {
1247                 stream_id,
1248                 queue_type,
1249             } => self.queue_clear(stream_id, queue_type),
1250             VideoCmd::GetParams {
1251                 stream_id,
1252                 queue_type,
1253             } => self.get_params(stream_id, queue_type),
1254             VideoCmd::SetParams {
1255                 stream_id,
1256                 queue_type,
1257                 params:
1258                     Params {
1259                         format,
1260                         frame_width,
1261                         frame_height,
1262                         frame_rate,
1263                         plane_formats,
1264                         ..
1265                     },
1266             } => self.set_params(
1267                 wait_ctx,
1268                 stream_id,
1269                 queue_type,
1270                 format,
1271                 frame_width,
1272                 frame_height,
1273                 frame_rate,
1274                 plane_formats,
1275             ),
1276             VideoCmd::QueryControl { query_ctrl_type } => self.query_control(query_ctrl_type),
1277             VideoCmd::GetControl {
1278                 stream_id,
1279                 ctrl_type,
1280             } => self.get_control(stream_id, ctrl_type),
1281             VideoCmd::SetControl {
1282                 stream_id,
1283                 ctrl_val,
1284             } => self.set_control(stream_id, ctrl_val),
1285         };
1286         let cmd_ret = match cmd_response {
1287             Ok(r) => r,
1288             Err(e) => {
1289                 error!("returning error response: {}", &e);
1290                 VideoCmdResponseType::Sync(e.into())
1291             }
1292         };
1293         (cmd_ret, None)
1294     }
1295 
process_event( &mut self, _desc_map: &mut AsyncCmdDescMap, stream_id: u32, ) -> Option<Vec<VideoEvtResponseType>>1296     fn process_event(
1297         &mut self,
1298         _desc_map: &mut AsyncCmdDescMap,
1299         stream_id: u32,
1300     ) -> Option<Vec<VideoEvtResponseType>> {
1301         let stream = match self.streams.get_mut(&stream_id) {
1302             Some(s) => s,
1303             None => {
1304                 // TODO: remove fd from poll context?
1305                 error!("Received event for missing stream id {}", stream_id);
1306                 return None;
1307             }
1308         };
1309 
1310         let encoder_session = match stream.encoder_session {
1311             Some(ref mut s) => s,
1312             None => {
1313                 error!(
1314                     "Received event for missing encoder session of stream id {}",
1315                     stream_id
1316                 );
1317                 return None;
1318             }
1319         };
1320 
1321         let event = match encoder_session.read_event() {
1322             Ok(e) => e,
1323             Err(e) => {
1324                 error!("Failed to read event for stream id {}: {}", stream_id, e);
1325                 return None;
1326             }
1327         };
1328 
1329         match event {
1330             EncoderEvent::RequireInputBuffers {
1331                 input_count,
1332                 input_frame_width,
1333                 input_frame_height,
1334                 output_buffer_size,
1335             } => stream.require_input_buffers(
1336                 input_count,
1337                 input_frame_width,
1338                 input_frame_height,
1339                 output_buffer_size,
1340             ),
1341             EncoderEvent::ProcessedInputBuffer {
1342                 id: input_buffer_id,
1343             } => stream.processed_input_buffer(input_buffer_id),
1344             EncoderEvent::ProcessedOutputBuffer {
1345                 id: output_buffer_id,
1346                 bytesused,
1347                 keyframe,
1348                 timestamp,
1349             } => stream.processed_output_buffer(output_buffer_id, bytesused, keyframe, timestamp),
1350             EncoderEvent::FlushResponse { flush_done } => stream.flush_response(flush_done),
1351             EncoderEvent::NotifyError { error } => stream.notify_error(error),
1352         }
1353     }
1354 }
1355