• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::{
2     decoder::{
3         stateful::{CaptureThreadResponse, DecoderCommand, DecoderEvent, DrainError},
4         DecoderEventCallback, FormatChangedCallback, FormatChangedReply,
5     },
6     device::{
7         poller::{DeviceEvent, PollEvent, Poller, Waker},
8         queue::{
9             self, direction::Capture, handles_provider::HandlesProvider, BuffersAllocated,
10             CaptureQueueable, GetCaptureBufferByIndex, GetFreeCaptureBuffer, Queue, QueueInit,
11         },
12         AllocatedQueue, Device, Stream, TryDequeue,
13     },
14     ioctl::{self, SelectionTarget},
15 };
16 
17 use std::{
18     io,
19     sync::{mpsc, Arc},
20     task::Wake,
21 };
22 
23 use log::{debug, error, trace, warn};
24 use thiserror::Error;
25 
26 /// Check if `device` has a dynamic resolution change event pending.
27 ///
28 /// Dequeues all pending V4L2 events and returns `true` if a
29 /// SRC_CHANGE_EVENT (indicating a format change on the CAPTURE queue) was
30 /// detected. This consumes all the event, meaning that if this method
31 /// returned `true` once it will return `false` until a new resolution
32 /// change happens in the stream.
is_drc_event_pending(device: &Device) -> Result<bool, ioctl::DqEventError>33 fn is_drc_event_pending(device: &Device) -> Result<bool, ioctl::DqEventError> {
34     let mut drc_pending = false;
35 
36     loop {
37         // TODO what if we used an iterator here?
38         let event = match ioctl::dqevent(device) {
39             Ok(event) => event,
40             Err(ioctl::DqEventError::NotReady) => return Ok(drc_pending),
41             Err(e) => return Err(e),
42         };
43 
44         match event {
45             ioctl::Event::SrcChangeEvent(changes) => {
46                 if changes.contains(ioctl::SrcChanges::RESOLUTION) {
47                     debug!("Received resolution change event");
48                     drc_pending = true;
49                 }
50             }
51             ioctl::Event::Eos => {
52                 debug!("Received EOS event");
53             }
54         }
55     }
56 }
57 
58 enum CaptureQueue<P: HandlesProvider> {
59     AwaitingResolution {
60         capture_queue: Queue<Capture, QueueInit>,
61     },
62     Decoding {
63         capture_queue: Queue<Capture, BuffersAllocated<P::HandleType>>,
64         provider: P,
65         cap_buffer_waker: Arc<Waker>,
66         // TODO not super elegant...
67         blocking_drain_in_progress: bool,
68     },
69 }
70 
71 pub(super) struct CaptureThread<P, DecoderEventCb, FormatChangedCb>
72 where
73     P: HandlesProvider,
74     DecoderEventCb: DecoderEventCallback<P>,
75     FormatChangedCb: FormatChangedCallback<P>,
76 {
77     device: Arc<Device>,
78     capture_queue: CaptureQueue<P>,
79     pub(super) poller: Poller,
80 
81     event_cb: DecoderEventCb,
82     set_capture_format_cb: FormatChangedCb,
83 
84     // Waker signaled when the main thread has commands pending for us.
85     pub(super) command_waker: Arc<Waker>,
86     // Receiver we read commands from when `command_waker` is signaled.
87     command_receiver: mpsc::Receiver<DecoderCommand>,
88     // Sender we use to send status messages after receiving commands from the
89     // main thread.
90     response_sender: mpsc::Sender<CaptureThreadResponse>,
91 }
92 
93 #[derive(Debug, Error)]
94 enum UpdateCaptureError {
95     #[error("error while enabling poller events: {0}")]
96     PollerEvents(io::Error),
97     #[error("error while removing CAPTURE waker: {0}")]
98     RemoveWaker(io::Error),
99     #[error("error while stopping CAPTURE queue: {0}")]
100     Streamoff(#[from] ioctl::StreamOffError),
101     #[error("error while freeing CAPTURE buffers: {0}")]
102     FreeBuffers(#[from] ioctl::ReqbufsError),
103     #[error("error while obtaining CAPTURE format: {0}")]
104     GFmt(#[from] ioctl::GFmtError),
105     #[error("error while obtaining selection target from CAPTURE queue: {0}")]
106     GSelection(#[from] ioctl::GSelectionError),
107     #[error("error while running the CAPTURE format callback: {0}")]
108     Callback(#[from] anyhow::Error),
109     #[error("error while requesting CAPTURE buffers: {0}")]
110     RequestBuffers(#[from] queue::RequestBuffersError),
111     #[error("error while adding the CAPTURE buffer waker: {0}")]
112     AddWaker(io::Error),
113     #[error("error while streaming CAPTURE queue: {0}")]
114     StreamOn(#[from] ioctl::StreamOnError),
115 }
116 
117 const CAPTURE_READY: u32 = 1;
118 const COMMAND_WAITING: u32 = 2;
119 
120 #[derive(Debug, Error)]
121 enum ProcessEventsError {
122     #[error("error while dequeueing event")]
123     DqEvent(#[from] ioctl::DqEventError),
124     #[error("error while requesting buffers")]
125     RequestBuffers(#[from] queue::RequestBuffersError),
126     #[error("error while updating CAPTURE format")]
127     UpdateCapture(#[from] UpdateCaptureError),
128 }
129 
130 impl<P, DecoderEventCb, FormatChangedCb> CaptureThread<P, DecoderEventCb, FormatChangedCb>
131 where
132     P: HandlesProvider,
133     DecoderEventCb: DecoderEventCallback<P>,
134     FormatChangedCb: FormatChangedCallback<P>,
135     for<'a> Queue<Capture, BuffersAllocated<P::HandleType>>:
136         GetFreeCaptureBuffer<'a, P::HandleType> + GetCaptureBufferByIndex<'a, P::HandleType>,
137 {
new( device: &Arc<Device>, capture_queue: Queue<Capture, QueueInit>, event_cb: DecoderEventCb, set_capture_format_cb: FormatChangedCb, command_receiver: mpsc::Receiver<DecoderCommand>, response_sender: mpsc::Sender<CaptureThreadResponse>, ) -> io::Result<Self>138     pub(super) fn new(
139         device: &Arc<Device>,
140         capture_queue: Queue<Capture, QueueInit>,
141         event_cb: DecoderEventCb,
142         set_capture_format_cb: FormatChangedCb,
143         command_receiver: mpsc::Receiver<DecoderCommand>,
144         response_sender: mpsc::Sender<CaptureThreadResponse>,
145     ) -> io::Result<Self> {
146         // Start by only listening to V4L2 events in order to catch the initial
147         // resolution change, and to the stop waker in case the user had a
148         // change of heart about decoding something now.
149         let mut poller = Poller::new(Arc::clone(device))?;
150         poller.enable_event(DeviceEvent::V4L2Event)?;
151         let command_waker = poller.add_waker(COMMAND_WAITING)?;
152 
153         let decoder_thread = CaptureThread {
154             device: Arc::clone(device),
155             capture_queue: CaptureQueue::AwaitingResolution { capture_queue },
156             poller,
157             event_cb,
158             set_capture_format_cb,
159             command_waker,
160             command_receiver,
161             response_sender,
162         };
163 
164         Ok(decoder_thread)
165     }
166 
send_response(&self, response: CaptureThreadResponse)167     fn send_response(&self, response: CaptureThreadResponse) {
168         trace!("Sending response: {:?}", response);
169 
170         self.response_sender.send(response).unwrap();
171     }
172 
drain(&mut self, blocking: bool)173     fn drain(&mut self, blocking: bool) {
174         trace!("Processing Drain({}) command", blocking);
175         let response = match &mut self.capture_queue {
176             // We cannot initiate the flush sequence before receiving the initial
177             // resolution.
178             CaptureQueue::AwaitingResolution { .. } => {
179                 Some(CaptureThreadResponse::DrainDone(Err(DrainError::TryAgain)))
180             }
181             CaptureQueue::Decoding {
182                 blocking_drain_in_progress,
183                 ..
184             } => {
185                 // We can receive the LAST buffer, send the STOP command
186                 // and exit the loop once the buffer with the LAST tag is received.
187                 ioctl::decoder_cmd::<_, ()>(&*self.device, ioctl::DecoderCmd::stop()).unwrap();
188                 if blocking {
189                     // If we are blocking, we will send the answer when the drain
190                     // is completed.
191                     *blocking_drain_in_progress = true;
192                     None
193                 } else {
194                     // If not blocking, send the response now so the client can keep going.
195                     Some(CaptureThreadResponse::DrainDone(Ok(false)))
196                 }
197             }
198         };
199 
200         if let Some(response) = response {
201             self.send_response(response);
202         }
203     }
204 
flush(&mut self)205     fn flush(&mut self) {
206         trace!("Processing flush command");
207         match &mut self.capture_queue {
208             CaptureQueue::AwaitingResolution { .. } => {}
209             CaptureQueue::Decoding {
210                 capture_queue,
211                 blocking_drain_in_progress,
212                 ..
213             } => {
214                 // Stream the capture queue off and back on, dropping any queued
215                 // buffer, and making the decoder ready to work again if it was
216                 // halted.
217                 capture_queue.stream_off().unwrap();
218                 capture_queue.stream_on().unwrap();
219                 *blocking_drain_in_progress = false;
220             }
221         }
222 
223         self.send_response(CaptureThreadResponse::FlushDone(Ok(())));
224         self.enqueue_capture_buffers()
225     }
226 
enqueue_capture_buffers(&mut self)227     fn enqueue_capture_buffers(&mut self) {
228         trace!("Queueing available CAPTURE buffers");
229         let (capture_queue, provider, cap_buffer_waker) = match &mut self.capture_queue {
230             // Capture queue is not set up yet, no buffers to queue.
231             CaptureQueue::AwaitingResolution { .. } => return,
232             CaptureQueue::Decoding {
233                 capture_queue,
234                 provider,
235                 cap_buffer_waker,
236                 ..
237             } => (capture_queue, provider, cap_buffer_waker),
238         };
239 
240         // Requeue all available CAPTURE buffers.
241         'enqueue: while let Some(handles) = provider.get_handles(cap_buffer_waker) {
242             // TODO potential problem: the handles will be dropped if no V4L2 buffer
243             // is available. There is no guarantee that the provider will get them back
244             // in this case (e.g. with the C FFI).
245             let buffer = match provider.get_suitable_buffer_for(&handles, capture_queue) {
246                 Ok(buffer) => buffer,
247                 // It is possible that we run out of V4L2 buffers if there are more handles than
248                 // buffers allocated. One example of this scenario is the `MmapProvider` which has
249                 // an infinite number of handles. Break out of the loop when this happens - we will
250                 // be called again the next time a CAPTURE buffer becomes available.
251                 Err(queue::handles_provider::GetSuitableBufferError::TryGetFree(
252                     queue::GetFreeBufferError::NoFreeBuffer,
253                 )) => {
254                     break 'enqueue;
255                 }
256                 Err(e) => {
257                     error!("Could not find suitable buffer for handles: {}", e);
258                     warn!("Handles potentially lost due to no V4L2 buffer being available");
259                     break 'enqueue;
260                 }
261             };
262             match buffer.queue_with_handles(handles) {
263                 Ok(()) => (),
264                 Err(e) => error!("Error while queueing CAPTURE buffer: {}", e),
265             }
266         }
267     }
268 
process_v4l2_event(mut self) -> Self269     fn process_v4l2_event(mut self) -> Self {
270         trace!("Processing V4L2 event");
271         match self.capture_queue {
272             CaptureQueue::AwaitingResolution { .. } => {
273                 if is_drc_event_pending(&self.device).unwrap() {
274                     self = self.update_capture_format().unwrap()
275                 }
276             }
277             CaptureQueue::Decoding { .. } => unreachable!(),
278         }
279 
280         self
281     }
282 
update_capture_format(mut self) -> Result<Self, UpdateCaptureError>283     fn update_capture_format(mut self) -> Result<Self, UpdateCaptureError> {
284         debug!("Updating CAPTURE format");
285         // First reset the capture queue to the `Init` state if needed.
286         let mut capture_queue = match self.capture_queue {
287             // Initial resolution
288             CaptureQueue::AwaitingResolution { capture_queue } => {
289                 // Stop listening to V4L2 events. We will check them when we get
290                 // a buffer with the LAST flag.
291                 self.poller
292                     .disable_event(DeviceEvent::V4L2Event)
293                     .map_err(Into::<io::Error>::into)
294                     .map_err(UpdateCaptureError::PollerEvents)?;
295                 // Listen to CAPTURE buffers being ready to dequeue, as we will
296                 // be streaming soon.
297                 self.poller
298                     .enable_event(DeviceEvent::CaptureReady)
299                     .map_err(Into::<io::Error>::into)
300                     .map_err(UpdateCaptureError::PollerEvents)?;
301                 capture_queue
302             }
303             // Dynamic resolution change
304             CaptureQueue::Decoding { capture_queue, .. } => {
305                 // Remove the waker for the previous buffers pool, as we will
306                 // get a new set of buffers.
307                 self.poller
308                     .remove_waker(CAPTURE_READY)
309                     .map_err(UpdateCaptureError::RemoveWaker)?;
310                 // Deallocate the queue and return it to the `Init` state. Good
311                 // as new!
312                 capture_queue.stream_off()?;
313                 capture_queue.free_buffers()?.queue
314             }
315         };
316 
317         // Now get the parameters of the new format and build our new CAPTURE
318         // queue.
319 
320         // TODO use the proper control to get the right value.
321         let min_num_buffers = 4usize;
322         debug!("Stream requires {} capture buffers", min_num_buffers);
323 
324         let visible_rect = capture_queue.get_selection(SelectionTarget::Compose)?;
325         debug!(
326             "Visible rectangle: ({}, {}), {}x{}",
327             visible_rect.left, visible_rect.top, visible_rect.width, visible_rect.height
328         );
329 
330         // Let the client adjust the new format and give us the handles provider.
331         let FormatChangedReply {
332             provider,
333             mem_type,
334             num_buffers,
335         } = (self.set_capture_format_cb)(
336             capture_queue.change_format()?,
337             visible_rect,
338             min_num_buffers,
339         )?;
340 
341         debug!("Client requires {} capture buffers", num_buffers);
342 
343         // Allocate the new CAPTURE buffers and get ourselves a new waker for
344         // returning buffers.
345         let capture_queue =
346             capture_queue.request_buffers_generic::<P::HandleType>(mem_type, num_buffers as u32)?;
347         let cap_buffer_waker = self
348             .poller
349             .add_waker(CAPTURE_READY)
350             .map_err(UpdateCaptureError::AddWaker)?;
351 
352         // Ready to decode - signal the waker so we immediately enqueue buffers
353         // and start streaming.
354         cap_buffer_waker.wake_by_ref();
355         capture_queue.stream_on()?;
356 
357         Ok(Self {
358             capture_queue: CaptureQueue::Decoding {
359                 capture_queue,
360                 provider,
361                 cap_buffer_waker,
362                 blocking_drain_in_progress: false,
363             },
364             ..self
365         })
366     }
367 
368     /// Attempt to dequeue and process a single CAPTURE buffer.
369     ///
370     /// If a buffer can be dequeued, then the following processing takes place:
371     /// * Invoke the event callback with a `FrameDecoded` event containing the
372     ///   dequeued buffer,
373     /// * If the buffer has the LAST flag set:
374     ///   * If a resolution change event is pending, start the resolution change
375     ///     procedure,
376     ///   * If a resolution change event is not pending, invoke the event
377     ///     callback with an 'EndOfStream` event,
378     ///   * If a blocking drain was in progress, complete it.
dequeue_capture_buffer(mut self) -> Self379     fn dequeue_capture_buffer(mut self) -> Self {
380         trace!("Dequeueing decoded CAPTURE buffers");
381         let (capture_queue, cap_buffer_waker, blocking_drain_in_progress) =
382             match &mut self.capture_queue {
383                 CaptureQueue::AwaitingResolution { .. } => unreachable!(),
384                 CaptureQueue::Decoding {
385                     capture_queue,
386                     cap_buffer_waker,
387                     blocking_drain_in_progress,
388                     ..
389                 } => (capture_queue, cap_buffer_waker, blocking_drain_in_progress),
390             };
391 
392         let mut cap_buf = match capture_queue.try_dequeue() {
393             Ok(cap_buf) => cap_buf,
394             Err(e) => {
395                 warn!(
396                     "Expected a CAPTURE buffer but none available, possible driver bug: {}",
397                     e
398                 );
399                 return self;
400             }
401         };
402 
403         let is_last = cap_buf.data.is_last();
404 
405         // Add a drop callback to the dequeued buffer so we
406         // re-queue it as soon as it is dropped.
407         let cap_waker = Arc::clone(cap_buffer_waker);
408         cap_buf.add_drop_callback(move |_dqbuf| {
409             // Intentionally ignore the result here.
410             cap_waker.wake();
411         });
412 
413         // Pass buffers to the client
414         (self.event_cb)(DecoderEvent::FrameDecoded(cap_buf));
415 
416         if is_last {
417             debug!("CAPTURE buffer marked with LAST flag");
418             if is_drc_event_pending(&self.device).unwrap() {
419                 debug!("DRC event pending, updating CAPTURE format");
420                 self = self.update_capture_format().unwrap()
421             }
422             // No DRC event pending, this is the end of the stream.
423             // We need to stop and restart the CAPTURE queue, otherwise
424             // it will keep signaling buffers as ready and dequeueing
425             // them will return `EPIPE`.
426             else {
427                 debug!("No DRC event pending, restarting capture queue");
428                 // We are supposed to be able to run the START command
429                 // instead, but with vicodec the CAPTURE queue reports
430                 // as ready in subsequent polls() and DQBUF returns
431                 // -EPIPE...
432                 capture_queue.stream_off().unwrap();
433                 capture_queue.stream_on().unwrap();
434                 (self.event_cb)(DecoderEvent::EndOfStream);
435                 if *blocking_drain_in_progress {
436                     debug!("Signaling end of blocking drain");
437                     *blocking_drain_in_progress = false;
438                     self.send_response(CaptureThreadResponse::DrainDone(Ok(true)));
439                 }
440             }
441         }
442 
443         self
444     }
445 
run(mut self) -> Self446     pub(super) fn run(mut self) -> Self {
447         'mainloop: loop {
448             if let CaptureQueue::Decoding { capture_queue, .. } = &self.capture_queue {
449                 match capture_queue.num_queued_buffers() {
450                     // If there are no buffers on the CAPTURE queue, poll() will return
451                     // immediately with EPOLLERR and we would loop indefinitely.
452                     // Prevent this by temporarily disabling polling the CAPTURE queue
453                     // in such cases.
454                     0 => {
455                         self.poller
456                             .disable_event(DeviceEvent::CaptureReady)
457                             .unwrap();
458                     }
459                     // If device polling was disabled and we have buffers queued, we
460                     // can reenable it as poll will now wait for a CAPTURE buffer to
461                     // be ready for dequeue.
462                     _ => {
463                         self.poller.enable_event(DeviceEvent::CaptureReady).unwrap();
464                     }
465                 }
466             }
467 
468             trace!("Polling...");
469             let events = match self.poller.poll(None) {
470                 Ok(events) => events,
471                 Err(e) => {
472                     error!("Polling failure, exiting capture thread: {}", e);
473                     break 'mainloop;
474                 }
475             };
476             for event in events {
477                 self = match event {
478                     PollEvent::Device(DeviceEvent::V4L2Event) => self.process_v4l2_event(),
479                     PollEvent::Device(DeviceEvent::CaptureReady) => self.dequeue_capture_buffer(),
480                     PollEvent::Waker(CAPTURE_READY) => {
481                         self.enqueue_capture_buffers();
482                         self
483                     }
484                     PollEvent::Waker(COMMAND_WAITING) => {
485                         loop {
486                             let command =
487                                 match self.command_receiver.recv_timeout(Default::default()) {
488                                     Ok(command) => command,
489                                     Err(mpsc::RecvTimeoutError::Timeout) => break,
490                                     Err(e) => {
491                                         error!("Error while reading decoder command: {}", e);
492                                         break;
493                                     }
494                                 };
495                             match command {
496                                 DecoderCommand::Drain(blocking) => self.drain(blocking),
497                                 DecoderCommand::Flush => self.flush(),
498                                 DecoderCommand::Stop => {
499                                     trace!("Processing stop command");
500                                     break 'mainloop;
501                                 }
502                             }
503                         }
504                         self
505                     }
506                     _ => panic!("Unexpected event!"),
507                 }
508             }
509         }
510 
511         // Return the decoder to the awaiting resolution state.
512         match self.capture_queue {
513             CaptureQueue::AwaitingResolution { .. } => self,
514             CaptureQueue::Decoding { capture_queue, .. } => Self {
515                 capture_queue: CaptureQueue::AwaitingResolution {
516                     capture_queue: {
517                         capture_queue.stream_off().unwrap();
518                         capture_queue.free_buffers().unwrap().queue
519                     },
520                 },
521                 poller: {
522                     let mut poller = self.poller;
523                     poller.disable_event(DeviceEvent::CaptureReady).unwrap();
524                     poller.enable_event(DeviceEvent::V4L2Event).unwrap();
525                     poller.remove_waker(CAPTURE_READY).unwrap();
526                     poller
527                 },
528                 ..self
529             },
530         }
531     }
532 }
533