1 use crate::{
2 decoder::{
3 stateful::{CaptureThreadResponse, DecoderCommand, DecoderEvent, DrainError},
4 DecoderEventCallback, FormatChangedCallback, FormatChangedReply,
5 },
6 device::{
7 poller::{DeviceEvent, PollEvent, Poller, Waker},
8 queue::{
9 self, direction::Capture, handles_provider::HandlesProvider, BuffersAllocated,
10 CaptureQueueable, GetCaptureBufferByIndex, GetFreeCaptureBuffer, Queue, QueueInit,
11 },
12 AllocatedQueue, Device, Stream, TryDequeue,
13 },
14 ioctl::{self, SelectionTarget},
15 };
16
17 use std::{
18 io,
19 sync::{mpsc, Arc},
20 task::Wake,
21 };
22
23 use log::{debug, error, trace, warn};
24 use thiserror::Error;
25
26 /// Check if `device` has a dynamic resolution change event pending.
27 ///
28 /// Dequeues all pending V4L2 events and returns `true` if a
29 /// SRC_CHANGE_EVENT (indicating a format change on the CAPTURE queue) was
30 /// detected. This consumes all the event, meaning that if this method
31 /// returned `true` once it will return `false` until a new resolution
32 /// change happens in the stream.
is_drc_event_pending(device: &Device) -> Result<bool, ioctl::DqEventError>33 fn is_drc_event_pending(device: &Device) -> Result<bool, ioctl::DqEventError> {
34 let mut drc_pending = false;
35
36 loop {
37 // TODO what if we used an iterator here?
38 let event = match ioctl::dqevent(device) {
39 Ok(event) => event,
40 Err(ioctl::DqEventError::NotReady) => return Ok(drc_pending),
41 Err(e) => return Err(e),
42 };
43
44 match event {
45 ioctl::Event::SrcChangeEvent(changes) => {
46 if changes.contains(ioctl::SrcChanges::RESOLUTION) {
47 debug!("Received resolution change event");
48 drc_pending = true;
49 }
50 }
51 ioctl::Event::Eos => {
52 debug!("Received EOS event");
53 }
54 }
55 }
56 }
57
58 enum CaptureQueue<P: HandlesProvider> {
59 AwaitingResolution {
60 capture_queue: Queue<Capture, QueueInit>,
61 },
62 Decoding {
63 capture_queue: Queue<Capture, BuffersAllocated<P::HandleType>>,
64 provider: P,
65 cap_buffer_waker: Arc<Waker>,
66 // TODO not super elegant...
67 blocking_drain_in_progress: bool,
68 },
69 }
70
71 pub(super) struct CaptureThread<P, DecoderEventCb, FormatChangedCb>
72 where
73 P: HandlesProvider,
74 DecoderEventCb: DecoderEventCallback<P>,
75 FormatChangedCb: FormatChangedCallback<P>,
76 {
77 device: Arc<Device>,
78 capture_queue: CaptureQueue<P>,
79 pub(super) poller: Poller,
80
81 event_cb: DecoderEventCb,
82 set_capture_format_cb: FormatChangedCb,
83
84 // Waker signaled when the main thread has commands pending for us.
85 pub(super) command_waker: Arc<Waker>,
86 // Receiver we read commands from when `command_waker` is signaled.
87 command_receiver: mpsc::Receiver<DecoderCommand>,
88 // Sender we use to send status messages after receiving commands from the
89 // main thread.
90 response_sender: mpsc::Sender<CaptureThreadResponse>,
91 }
92
93 #[derive(Debug, Error)]
94 enum UpdateCaptureError {
95 #[error("error while enabling poller events: {0}")]
96 PollerEvents(io::Error),
97 #[error("error while removing CAPTURE waker: {0}")]
98 RemoveWaker(io::Error),
99 #[error("error while stopping CAPTURE queue: {0}")]
100 Streamoff(#[from] ioctl::StreamOffError),
101 #[error("error while freeing CAPTURE buffers: {0}")]
102 FreeBuffers(#[from] ioctl::ReqbufsError),
103 #[error("error while obtaining CAPTURE format: {0}")]
104 GFmt(#[from] ioctl::GFmtError),
105 #[error("error while obtaining selection target from CAPTURE queue: {0}")]
106 GSelection(#[from] ioctl::GSelectionError),
107 #[error("error while running the CAPTURE format callback: {0}")]
108 Callback(#[from] anyhow::Error),
109 #[error("error while requesting CAPTURE buffers: {0}")]
110 RequestBuffers(#[from] queue::RequestBuffersError),
111 #[error("error while adding the CAPTURE buffer waker: {0}")]
112 AddWaker(io::Error),
113 #[error("error while streaming CAPTURE queue: {0}")]
114 StreamOn(#[from] ioctl::StreamOnError),
115 }
116
117 const CAPTURE_READY: u32 = 1;
118 const COMMAND_WAITING: u32 = 2;
119
120 #[derive(Debug, Error)]
121 enum ProcessEventsError {
122 #[error("error while dequeueing event")]
123 DqEvent(#[from] ioctl::DqEventError),
124 #[error("error while requesting buffers")]
125 RequestBuffers(#[from] queue::RequestBuffersError),
126 #[error("error while updating CAPTURE format")]
127 UpdateCapture(#[from] UpdateCaptureError),
128 }
129
130 impl<P, DecoderEventCb, FormatChangedCb> CaptureThread<P, DecoderEventCb, FormatChangedCb>
131 where
132 P: HandlesProvider,
133 DecoderEventCb: DecoderEventCallback<P>,
134 FormatChangedCb: FormatChangedCallback<P>,
135 for<'a> Queue<Capture, BuffersAllocated<P::HandleType>>:
136 GetFreeCaptureBuffer<'a, P::HandleType> + GetCaptureBufferByIndex<'a, P::HandleType>,
137 {
new( device: &Arc<Device>, capture_queue: Queue<Capture, QueueInit>, event_cb: DecoderEventCb, set_capture_format_cb: FormatChangedCb, command_receiver: mpsc::Receiver<DecoderCommand>, response_sender: mpsc::Sender<CaptureThreadResponse>, ) -> io::Result<Self>138 pub(super) fn new(
139 device: &Arc<Device>,
140 capture_queue: Queue<Capture, QueueInit>,
141 event_cb: DecoderEventCb,
142 set_capture_format_cb: FormatChangedCb,
143 command_receiver: mpsc::Receiver<DecoderCommand>,
144 response_sender: mpsc::Sender<CaptureThreadResponse>,
145 ) -> io::Result<Self> {
146 // Start by only listening to V4L2 events in order to catch the initial
147 // resolution change, and to the stop waker in case the user had a
148 // change of heart about decoding something now.
149 let mut poller = Poller::new(Arc::clone(device))?;
150 poller.enable_event(DeviceEvent::V4L2Event)?;
151 let command_waker = poller.add_waker(COMMAND_WAITING)?;
152
153 let decoder_thread = CaptureThread {
154 device: Arc::clone(device),
155 capture_queue: CaptureQueue::AwaitingResolution { capture_queue },
156 poller,
157 event_cb,
158 set_capture_format_cb,
159 command_waker,
160 command_receiver,
161 response_sender,
162 };
163
164 Ok(decoder_thread)
165 }
166
send_response(&self, response: CaptureThreadResponse)167 fn send_response(&self, response: CaptureThreadResponse) {
168 trace!("Sending response: {:?}", response);
169
170 self.response_sender.send(response).unwrap();
171 }
172
drain(&mut self, blocking: bool)173 fn drain(&mut self, blocking: bool) {
174 trace!("Processing Drain({}) command", blocking);
175 let response = match &mut self.capture_queue {
176 // We cannot initiate the flush sequence before receiving the initial
177 // resolution.
178 CaptureQueue::AwaitingResolution { .. } => {
179 Some(CaptureThreadResponse::DrainDone(Err(DrainError::TryAgain)))
180 }
181 CaptureQueue::Decoding {
182 blocking_drain_in_progress,
183 ..
184 } => {
185 // We can receive the LAST buffer, send the STOP command
186 // and exit the loop once the buffer with the LAST tag is received.
187 ioctl::decoder_cmd::<_, ()>(&*self.device, ioctl::DecoderCmd::stop()).unwrap();
188 if blocking {
189 // If we are blocking, we will send the answer when the drain
190 // is completed.
191 *blocking_drain_in_progress = true;
192 None
193 } else {
194 // If not blocking, send the response now so the client can keep going.
195 Some(CaptureThreadResponse::DrainDone(Ok(false)))
196 }
197 }
198 };
199
200 if let Some(response) = response {
201 self.send_response(response);
202 }
203 }
204
flush(&mut self)205 fn flush(&mut self) {
206 trace!("Processing flush command");
207 match &mut self.capture_queue {
208 CaptureQueue::AwaitingResolution { .. } => {}
209 CaptureQueue::Decoding {
210 capture_queue,
211 blocking_drain_in_progress,
212 ..
213 } => {
214 // Stream the capture queue off and back on, dropping any queued
215 // buffer, and making the decoder ready to work again if it was
216 // halted.
217 capture_queue.stream_off().unwrap();
218 capture_queue.stream_on().unwrap();
219 *blocking_drain_in_progress = false;
220 }
221 }
222
223 self.send_response(CaptureThreadResponse::FlushDone(Ok(())));
224 self.enqueue_capture_buffers()
225 }
226
enqueue_capture_buffers(&mut self)227 fn enqueue_capture_buffers(&mut self) {
228 trace!("Queueing available CAPTURE buffers");
229 let (capture_queue, provider, cap_buffer_waker) = match &mut self.capture_queue {
230 // Capture queue is not set up yet, no buffers to queue.
231 CaptureQueue::AwaitingResolution { .. } => return,
232 CaptureQueue::Decoding {
233 capture_queue,
234 provider,
235 cap_buffer_waker,
236 ..
237 } => (capture_queue, provider, cap_buffer_waker),
238 };
239
240 // Requeue all available CAPTURE buffers.
241 'enqueue: while let Some(handles) = provider.get_handles(cap_buffer_waker) {
242 // TODO potential problem: the handles will be dropped if no V4L2 buffer
243 // is available. There is no guarantee that the provider will get them back
244 // in this case (e.g. with the C FFI).
245 let buffer = match provider.get_suitable_buffer_for(&handles, capture_queue) {
246 Ok(buffer) => buffer,
247 // It is possible that we run out of V4L2 buffers if there are more handles than
248 // buffers allocated. One example of this scenario is the `MmapProvider` which has
249 // an infinite number of handles. Break out of the loop when this happens - we will
250 // be called again the next time a CAPTURE buffer becomes available.
251 Err(queue::handles_provider::GetSuitableBufferError::TryGetFree(
252 queue::GetFreeBufferError::NoFreeBuffer,
253 )) => {
254 break 'enqueue;
255 }
256 Err(e) => {
257 error!("Could not find suitable buffer for handles: {}", e);
258 warn!("Handles potentially lost due to no V4L2 buffer being available");
259 break 'enqueue;
260 }
261 };
262 match buffer.queue_with_handles(handles) {
263 Ok(()) => (),
264 Err(e) => error!("Error while queueing CAPTURE buffer: {}", e),
265 }
266 }
267 }
268
process_v4l2_event(mut self) -> Self269 fn process_v4l2_event(mut self) -> Self {
270 trace!("Processing V4L2 event");
271 match self.capture_queue {
272 CaptureQueue::AwaitingResolution { .. } => {
273 if is_drc_event_pending(&self.device).unwrap() {
274 self = self.update_capture_format().unwrap()
275 }
276 }
277 CaptureQueue::Decoding { .. } => unreachable!(),
278 }
279
280 self
281 }
282
update_capture_format(mut self) -> Result<Self, UpdateCaptureError>283 fn update_capture_format(mut self) -> Result<Self, UpdateCaptureError> {
284 debug!("Updating CAPTURE format");
285 // First reset the capture queue to the `Init` state if needed.
286 let mut capture_queue = match self.capture_queue {
287 // Initial resolution
288 CaptureQueue::AwaitingResolution { capture_queue } => {
289 // Stop listening to V4L2 events. We will check them when we get
290 // a buffer with the LAST flag.
291 self.poller
292 .disable_event(DeviceEvent::V4L2Event)
293 .map_err(Into::<io::Error>::into)
294 .map_err(UpdateCaptureError::PollerEvents)?;
295 // Listen to CAPTURE buffers being ready to dequeue, as we will
296 // be streaming soon.
297 self.poller
298 .enable_event(DeviceEvent::CaptureReady)
299 .map_err(Into::<io::Error>::into)
300 .map_err(UpdateCaptureError::PollerEvents)?;
301 capture_queue
302 }
303 // Dynamic resolution change
304 CaptureQueue::Decoding { capture_queue, .. } => {
305 // Remove the waker for the previous buffers pool, as we will
306 // get a new set of buffers.
307 self.poller
308 .remove_waker(CAPTURE_READY)
309 .map_err(UpdateCaptureError::RemoveWaker)?;
310 // Deallocate the queue and return it to the `Init` state. Good
311 // as new!
312 capture_queue.stream_off()?;
313 capture_queue.free_buffers()?.queue
314 }
315 };
316
317 // Now get the parameters of the new format and build our new CAPTURE
318 // queue.
319
320 // TODO use the proper control to get the right value.
321 let min_num_buffers = 4usize;
322 debug!("Stream requires {} capture buffers", min_num_buffers);
323
324 let visible_rect = capture_queue.get_selection(SelectionTarget::Compose)?;
325 debug!(
326 "Visible rectangle: ({}, {}), {}x{}",
327 visible_rect.left, visible_rect.top, visible_rect.width, visible_rect.height
328 );
329
330 // Let the client adjust the new format and give us the handles provider.
331 let FormatChangedReply {
332 provider,
333 mem_type,
334 num_buffers,
335 } = (self.set_capture_format_cb)(
336 capture_queue.change_format()?,
337 visible_rect,
338 min_num_buffers,
339 )?;
340
341 debug!("Client requires {} capture buffers", num_buffers);
342
343 // Allocate the new CAPTURE buffers and get ourselves a new waker for
344 // returning buffers.
345 let capture_queue =
346 capture_queue.request_buffers_generic::<P::HandleType>(mem_type, num_buffers as u32)?;
347 let cap_buffer_waker = self
348 .poller
349 .add_waker(CAPTURE_READY)
350 .map_err(UpdateCaptureError::AddWaker)?;
351
352 // Ready to decode - signal the waker so we immediately enqueue buffers
353 // and start streaming.
354 cap_buffer_waker.wake_by_ref();
355 capture_queue.stream_on()?;
356
357 Ok(Self {
358 capture_queue: CaptureQueue::Decoding {
359 capture_queue,
360 provider,
361 cap_buffer_waker,
362 blocking_drain_in_progress: false,
363 },
364 ..self
365 })
366 }
367
368 /// Attempt to dequeue and process a single CAPTURE buffer.
369 ///
370 /// If a buffer can be dequeued, then the following processing takes place:
371 /// * Invoke the event callback with a `FrameDecoded` event containing the
372 /// dequeued buffer,
373 /// * If the buffer has the LAST flag set:
374 /// * If a resolution change event is pending, start the resolution change
375 /// procedure,
376 /// * If a resolution change event is not pending, invoke the event
377 /// callback with an 'EndOfStream` event,
378 /// * If a blocking drain was in progress, complete it.
dequeue_capture_buffer(mut self) -> Self379 fn dequeue_capture_buffer(mut self) -> Self {
380 trace!("Dequeueing decoded CAPTURE buffers");
381 let (capture_queue, cap_buffer_waker, blocking_drain_in_progress) =
382 match &mut self.capture_queue {
383 CaptureQueue::AwaitingResolution { .. } => unreachable!(),
384 CaptureQueue::Decoding {
385 capture_queue,
386 cap_buffer_waker,
387 blocking_drain_in_progress,
388 ..
389 } => (capture_queue, cap_buffer_waker, blocking_drain_in_progress),
390 };
391
392 let mut cap_buf = match capture_queue.try_dequeue() {
393 Ok(cap_buf) => cap_buf,
394 Err(e) => {
395 warn!(
396 "Expected a CAPTURE buffer but none available, possible driver bug: {}",
397 e
398 );
399 return self;
400 }
401 };
402
403 let is_last = cap_buf.data.is_last();
404
405 // Add a drop callback to the dequeued buffer so we
406 // re-queue it as soon as it is dropped.
407 let cap_waker = Arc::clone(cap_buffer_waker);
408 cap_buf.add_drop_callback(move |_dqbuf| {
409 // Intentionally ignore the result here.
410 cap_waker.wake();
411 });
412
413 // Pass buffers to the client
414 (self.event_cb)(DecoderEvent::FrameDecoded(cap_buf));
415
416 if is_last {
417 debug!("CAPTURE buffer marked with LAST flag");
418 if is_drc_event_pending(&self.device).unwrap() {
419 debug!("DRC event pending, updating CAPTURE format");
420 self = self.update_capture_format().unwrap()
421 }
422 // No DRC event pending, this is the end of the stream.
423 // We need to stop and restart the CAPTURE queue, otherwise
424 // it will keep signaling buffers as ready and dequeueing
425 // them will return `EPIPE`.
426 else {
427 debug!("No DRC event pending, restarting capture queue");
428 // We are supposed to be able to run the START command
429 // instead, but with vicodec the CAPTURE queue reports
430 // as ready in subsequent polls() and DQBUF returns
431 // -EPIPE...
432 capture_queue.stream_off().unwrap();
433 capture_queue.stream_on().unwrap();
434 (self.event_cb)(DecoderEvent::EndOfStream);
435 if *blocking_drain_in_progress {
436 debug!("Signaling end of blocking drain");
437 *blocking_drain_in_progress = false;
438 self.send_response(CaptureThreadResponse::DrainDone(Ok(true)));
439 }
440 }
441 }
442
443 self
444 }
445
run(mut self) -> Self446 pub(super) fn run(mut self) -> Self {
447 'mainloop: loop {
448 if let CaptureQueue::Decoding { capture_queue, .. } = &self.capture_queue {
449 match capture_queue.num_queued_buffers() {
450 // If there are no buffers on the CAPTURE queue, poll() will return
451 // immediately with EPOLLERR and we would loop indefinitely.
452 // Prevent this by temporarily disabling polling the CAPTURE queue
453 // in such cases.
454 0 => {
455 self.poller
456 .disable_event(DeviceEvent::CaptureReady)
457 .unwrap();
458 }
459 // If device polling was disabled and we have buffers queued, we
460 // can reenable it as poll will now wait for a CAPTURE buffer to
461 // be ready for dequeue.
462 _ => {
463 self.poller.enable_event(DeviceEvent::CaptureReady).unwrap();
464 }
465 }
466 }
467
468 trace!("Polling...");
469 let events = match self.poller.poll(None) {
470 Ok(events) => events,
471 Err(e) => {
472 error!("Polling failure, exiting capture thread: {}", e);
473 break 'mainloop;
474 }
475 };
476 for event in events {
477 self = match event {
478 PollEvent::Device(DeviceEvent::V4L2Event) => self.process_v4l2_event(),
479 PollEvent::Device(DeviceEvent::CaptureReady) => self.dequeue_capture_buffer(),
480 PollEvent::Waker(CAPTURE_READY) => {
481 self.enqueue_capture_buffers();
482 self
483 }
484 PollEvent::Waker(COMMAND_WAITING) => {
485 loop {
486 let command =
487 match self.command_receiver.recv_timeout(Default::default()) {
488 Ok(command) => command,
489 Err(mpsc::RecvTimeoutError::Timeout) => break,
490 Err(e) => {
491 error!("Error while reading decoder command: {}", e);
492 break;
493 }
494 };
495 match command {
496 DecoderCommand::Drain(blocking) => self.drain(blocking),
497 DecoderCommand::Flush => self.flush(),
498 DecoderCommand::Stop => {
499 trace!("Processing stop command");
500 break 'mainloop;
501 }
502 }
503 }
504 self
505 }
506 _ => panic!("Unexpected event!"),
507 }
508 }
509 }
510
511 // Return the decoder to the awaiting resolution state.
512 match self.capture_queue {
513 CaptureQueue::AwaitingResolution { .. } => self,
514 CaptureQueue::Decoding { capture_queue, .. } => Self {
515 capture_queue: CaptureQueue::AwaitingResolution {
516 capture_queue: {
517 capture_queue.stream_off().unwrap();
518 capture_queue.free_buffers().unwrap().queue
519 },
520 },
521 poller: {
522 let mut poller = self.poller;
523 poller.disable_event(DeviceEvent::CaptureReady).unwrap();
524 poller.enable_event(DeviceEvent::V4L2Event).unwrap();
525 poller.remove_waker(CAPTURE_READY).unwrap();
526 poller
527 },
528 ..self
529 },
530 }
531 }
532 }
533