1 /*
2 * FIFO pseudo-muxer
3 * Copyright (c) 2016 Jan Sebechlebsky
4 *
5 * This file is part of FFmpeg.
6 *
7 * FFmpeg is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public License
9 * as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * FFmpeg is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public License
18 * along with FFmpeg; if not, write to the Free Software * Foundation, Inc.,
19 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22 #include <stdatomic.h>
23
24 #include "libavutil/avassert.h"
25 #include "libavutil/opt.h"
26 #include "libavutil/time.h"
27 #include "libavutil/thread.h"
28 #include "libavutil/threadmessage.h"
29 #include "avformat.h"
30 #include "internal.h"
31 #include "mux.h"
32
33 #define FIFO_DEFAULT_QUEUE_SIZE 60
34 #define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS 0
35 #define FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC 5000000 // 5 seconds
36
37 typedef struct FifoContext {
38 const AVClass *class;
39 AVFormatContext *avf;
40
41 char *format;
42 AVDictionary *format_options;
43
44 int queue_size;
45 AVThreadMessageQueue *queue;
46
47 pthread_t writer_thread;
48
49 /* Return value of last write_trailer_call */
50 int write_trailer_ret;
51
52 /* Time to wait before next recovery attempt
53 * This can refer to the time in processed stream,
54 * or real time. */
55 int64_t recovery_wait_time;
56
57 /* Maximal number of unsuccessful successive recovery attempts */
58 int max_recovery_attempts;
59
60 /* Whether to attempt recovery from failure */
61 int attempt_recovery;
62
63 /* If >0 stream time will be used when waiting
64 * for the recovery attempt instead of real time */
65 int recovery_wait_streamtime;
66
67 /* If >0 recovery will be attempted regardless of error code
68 * (except AVERROR_EXIT, so exit request is never ignored) */
69 int recover_any_error;
70
71 /* Whether to drop packets in case the queue is full. */
72 int drop_pkts_on_overflow;
73
74 /* Whether to wait for keyframe when recovering
75 * from failure or queue overflow */
76 int restart_with_keyframe;
77
78 pthread_mutex_t overflow_flag_lock;
79 int overflow_flag_lock_initialized;
80 /* Value > 0 signals queue overflow */
81 volatile uint8_t overflow_flag;
82
83 atomic_int_least64_t queue_duration;
84 int64_t last_sent_dts;
85 int64_t timeshift;
86 } FifoContext;
87
88 typedef struct FifoThreadContext {
89 AVFormatContext *avf;
90
91 /* Timestamp of last failure.
92 * This is either pts in case stream time is used,
93 * or microseconds as returned by av_getttime_relative() */
94 int64_t last_recovery_ts;
95
96 /* Number of current recovery process
97 * Value > 0 means we are in recovery process */
98 int recovery_nr;
99
100 /* If > 0 all frames will be dropped until keyframe is received */
101 uint8_t drop_until_keyframe;
102
103 /* Value > 0 means that the previous write_header call was successful
104 * so finalization by calling write_trailer and ff_io_close must be done
105 * before exiting / reinitialization of underlying muxer */
106 uint8_t header_written;
107
108 int64_t last_received_dts;
109 } FifoThreadContext;
110
111 typedef enum FifoMessageType {
112 FIFO_NOOP,
113 FIFO_WRITE_HEADER,
114 FIFO_WRITE_PACKET,
115 FIFO_FLUSH_OUTPUT
116 } FifoMessageType;
117
118 typedef struct FifoMessage {
119 FifoMessageType type;
120 AVPacket pkt;
121 } FifoMessage;
122
fifo_thread_write_header(FifoThreadContext * ctx)123 static int fifo_thread_write_header(FifoThreadContext *ctx)
124 {
125 AVFormatContext *avf = ctx->avf;
126 FifoContext *fifo = avf->priv_data;
127 AVFormatContext *avf2 = fifo->avf;
128 AVDictionary *format_options = NULL;
129 int ret, i;
130
131 ret = av_dict_copy(&format_options, fifo->format_options, 0);
132 if (ret < 0)
133 goto end;
134
135 ret = ff_format_output_open(avf2, avf->url, &format_options);
136 if (ret < 0) {
137 av_log(avf, AV_LOG_ERROR, "Error opening %s: %s\n", avf->url,
138 av_err2str(ret));
139 goto end;
140 }
141
142 for (i = 0;i < avf2->nb_streams; i++)
143 ffstream(avf2->streams[i])->cur_dts = 0;
144
145 ret = avformat_write_header(avf2, &format_options);
146 if (!ret)
147 ctx->header_written = 1;
148
149 // Check for options unrecognized by underlying muxer
150 if (format_options) {
151 AVDictionaryEntry *entry = NULL;
152 while ((entry = av_dict_get(format_options, "", entry, AV_DICT_IGNORE_SUFFIX)))
153 av_log(avf2, AV_LOG_ERROR, "Unknown option '%s'\n", entry->key);
154 ret = AVERROR(EINVAL);
155 }
156
157 end:
158 av_dict_free(&format_options);
159 return ret;
160 }
161
fifo_thread_flush_output(FifoThreadContext * ctx)162 static int fifo_thread_flush_output(FifoThreadContext *ctx)
163 {
164 AVFormatContext *avf = ctx->avf;
165 FifoContext *fifo = avf->priv_data;
166 AVFormatContext *avf2 = fifo->avf;
167
168 return av_write_frame(avf2, NULL);
169 }
170
next_duration(AVFormatContext * avf,AVPacket * pkt,int64_t * last_dts)171 static int64_t next_duration(AVFormatContext *avf, AVPacket *pkt, int64_t *last_dts)
172 {
173 AVStream *st = avf->streams[pkt->stream_index];
174 int64_t dts = av_rescale_q(pkt->dts, st->time_base, AV_TIME_BASE_Q);
175 int64_t duration = (*last_dts == AV_NOPTS_VALUE ? 0 : dts - *last_dts);
176 *last_dts = dts;
177 return duration;
178 }
179
fifo_thread_write_packet(FifoThreadContext * ctx,AVPacket * pkt)180 static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt)
181 {
182 AVFormatContext *avf = ctx->avf;
183 FifoContext *fifo = avf->priv_data;
184 AVFormatContext *avf2 = fifo->avf;
185 AVRational src_tb, dst_tb;
186 int ret, s_idx;
187 int64_t orig_pts, orig_dts, orig_duration;
188
189 if (fifo->timeshift && pkt->dts != AV_NOPTS_VALUE)
190 atomic_fetch_sub_explicit(&fifo->queue_duration, next_duration(avf, pkt, &ctx->last_received_dts), memory_order_relaxed);
191
192 if (ctx->drop_until_keyframe) {
193 if (pkt->flags & AV_PKT_FLAG_KEY) {
194 ctx->drop_until_keyframe = 0;
195 av_log(avf, AV_LOG_VERBOSE, "Keyframe received, recovering...\n");
196 } else {
197 av_log(avf, AV_LOG_VERBOSE, "Dropping non-keyframe packet\n");
198 av_packet_unref(pkt);
199 return 0;
200 }
201 }
202
203 orig_pts = pkt->pts;
204 orig_dts = pkt->dts;
205 orig_duration = pkt->duration;
206 s_idx = pkt->stream_index;
207 src_tb = avf->streams[s_idx]->time_base;
208 dst_tb = avf2->streams[s_idx]->time_base;
209 av_packet_rescale_ts(pkt, src_tb, dst_tb);
210
211 ret = av_write_frame(avf2, pkt);
212 if (ret >= 0) {
213 av_packet_unref(pkt);
214 } else {
215 // avoid scaling twice
216 pkt->pts = orig_pts;
217 pkt->dts = orig_dts;
218 pkt->duration = orig_duration;
219 }
220 return ret;
221 }
222
fifo_thread_write_trailer(FifoThreadContext * ctx)223 static int fifo_thread_write_trailer(FifoThreadContext *ctx)
224 {
225 AVFormatContext *avf = ctx->avf;
226 FifoContext *fifo = avf->priv_data;
227 AVFormatContext *avf2 = fifo->avf;
228 int ret;
229
230 if (!ctx->header_written)
231 return 0;
232
233 ret = av_write_trailer(avf2);
234 ff_format_io_close(avf2, &avf2->pb);
235
236 return ret;
237 }
238
fifo_thread_dispatch_message(FifoThreadContext * ctx,FifoMessage * msg)239 static int fifo_thread_dispatch_message(FifoThreadContext *ctx, FifoMessage *msg)
240 {
241 int ret = AVERROR(EINVAL);
242
243 if (msg->type == FIFO_NOOP)
244 return 0;
245
246 if (!ctx->header_written) {
247 ret = fifo_thread_write_header(ctx);
248 if (ret < 0)
249 return ret;
250 }
251
252 switch(msg->type) {
253 case FIFO_WRITE_HEADER:
254 av_assert0(ret >= 0);
255 return ret;
256 case FIFO_WRITE_PACKET:
257 return fifo_thread_write_packet(ctx, &msg->pkt);
258 case FIFO_FLUSH_OUTPUT:
259 return fifo_thread_flush_output(ctx);
260 }
261
262 av_assert0(0);
263 return AVERROR(EINVAL);
264 }
265
is_recoverable(const FifoContext * fifo,int err_no)266 static int is_recoverable(const FifoContext *fifo, int err_no) {
267 if (!fifo->attempt_recovery)
268 return 0;
269
270 if (fifo->recover_any_error)
271 return err_no != AVERROR_EXIT;
272
273 switch (err_no) {
274 case AVERROR(EINVAL):
275 case AVERROR(ENOSYS):
276 case AVERROR_EOF:
277 case AVERROR_EXIT:
278 case AVERROR_PATCHWELCOME:
279 return 0;
280 default:
281 return 1;
282 }
283 }
284
free_message(void * msg)285 static void free_message(void *msg)
286 {
287 FifoMessage *fifo_msg = msg;
288
289 if (fifo_msg->type == FIFO_WRITE_PACKET)
290 av_packet_unref(&fifo_msg->pkt);
291 }
292
fifo_thread_process_recovery_failure(FifoThreadContext * ctx,AVPacket * pkt,int err_no)293 static int fifo_thread_process_recovery_failure(FifoThreadContext *ctx, AVPacket *pkt,
294 int err_no)
295 {
296 AVFormatContext *avf = ctx->avf;
297 FifoContext *fifo = avf->priv_data;
298 int ret;
299
300 av_log(avf, AV_LOG_INFO, "Recovery failed: %s\n",
301 av_err2str(err_no));
302
303 if (fifo->recovery_wait_streamtime) {
304 if (pkt->pts == AV_NOPTS_VALUE)
305 av_log(avf, AV_LOG_WARNING, "Packet does not contain presentation"
306 " timestamp, recovery will be attempted immediately");
307 ctx->last_recovery_ts = pkt->pts;
308 } else {
309 ctx->last_recovery_ts = av_gettime_relative();
310 }
311
312 if (fifo->max_recovery_attempts &&
313 ctx->recovery_nr >= fifo->max_recovery_attempts) {
314 av_log(avf, AV_LOG_ERROR,
315 "Maximal number of %d recovery attempts reached.\n",
316 fifo->max_recovery_attempts);
317 ret = err_no;
318 } else {
319 ret = AVERROR(EAGAIN);
320 }
321
322 return ret;
323 }
324
fifo_thread_attempt_recovery(FifoThreadContext * ctx,FifoMessage * msg,int err_no)325 static int fifo_thread_attempt_recovery(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
326 {
327 AVFormatContext *avf = ctx->avf;
328 FifoContext *fifo = avf->priv_data;
329 AVPacket *pkt = &msg->pkt;
330 int64_t time_since_recovery;
331 int ret;
332
333 if (!is_recoverable(fifo, err_no)) {
334 ret = err_no;
335 goto fail;
336 }
337
338 if (ctx->header_written) {
339 fifo->write_trailer_ret = fifo_thread_write_trailer(ctx);
340 ctx->header_written = 0;
341 }
342
343 if (!ctx->recovery_nr) {
344 ctx->last_recovery_ts = fifo->recovery_wait_streamtime ?
345 AV_NOPTS_VALUE : 0;
346 } else {
347 if (fifo->recovery_wait_streamtime) {
348 if (ctx->last_recovery_ts == AV_NOPTS_VALUE) {
349 AVRational tb = avf->streams[pkt->stream_index]->time_base;
350 time_since_recovery = av_rescale_q(pkt->pts - ctx->last_recovery_ts,
351 tb, AV_TIME_BASE_Q);
352 } else {
353 /* Enforce recovery immediately */
354 time_since_recovery = fifo->recovery_wait_time;
355 }
356 } else {
357 time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
358 }
359
360 if (time_since_recovery < fifo->recovery_wait_time)
361 return AVERROR(EAGAIN);
362 }
363
364 ctx->recovery_nr++;
365
366 if (fifo->max_recovery_attempts) {
367 av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d/%d\n",
368 ctx->recovery_nr, fifo->max_recovery_attempts);
369 } else {
370 av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d\n",
371 ctx->recovery_nr);
372 }
373
374 if (fifo->restart_with_keyframe && fifo->drop_pkts_on_overflow)
375 ctx->drop_until_keyframe = 1;
376
377 ret = fifo_thread_dispatch_message(ctx, msg);
378 if (ret < 0) {
379 if (is_recoverable(fifo, ret)) {
380 return fifo_thread_process_recovery_failure(ctx, pkt, ret);
381 } else {
382 goto fail;
383 }
384 } else {
385 av_log(avf, AV_LOG_INFO, "Recovery successful\n");
386 ctx->recovery_nr = 0;
387 }
388
389 return 0;
390
391 fail:
392 free_message(msg);
393 return ret;
394 }
395
fifo_thread_recover(FifoThreadContext * ctx,FifoMessage * msg,int err_no)396 static int fifo_thread_recover(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
397 {
398 AVFormatContext *avf = ctx->avf;
399 FifoContext *fifo = avf->priv_data;
400 int ret;
401
402 do {
403 if (!fifo->recovery_wait_streamtime && ctx->recovery_nr > 0) {
404 int64_t time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
405 int64_t time_to_wait = FFMAX(0, fifo->recovery_wait_time - time_since_recovery);
406 if (time_to_wait)
407 av_usleep(FFMIN(10000, time_to_wait));
408 }
409
410 ret = fifo_thread_attempt_recovery(ctx, msg, err_no);
411 } while (ret == AVERROR(EAGAIN) && !fifo->drop_pkts_on_overflow);
412
413 if (ret == AVERROR(EAGAIN) && fifo->drop_pkts_on_overflow) {
414 if (msg->type == FIFO_WRITE_PACKET)
415 av_packet_unref(&msg->pkt);
416 ret = 0;
417 }
418
419 return ret;
420 }
421
fifo_consumer_thread(void * data)422 static void *fifo_consumer_thread(void *data)
423 {
424 AVFormatContext *avf = data;
425 FifoContext *fifo = avf->priv_data;
426 AVThreadMessageQueue *queue = fifo->queue;
427 FifoMessage msg = {fifo->timeshift ? FIFO_NOOP : FIFO_WRITE_HEADER, {0}};
428 int ret;
429
430 FifoThreadContext fifo_thread_ctx;
431 memset(&fifo_thread_ctx, 0, sizeof(FifoThreadContext));
432 fifo_thread_ctx.avf = avf;
433 fifo_thread_ctx.last_received_dts = AV_NOPTS_VALUE;
434
435 while (1) {
436 uint8_t just_flushed = 0;
437
438 if (!fifo_thread_ctx.recovery_nr)
439 ret = fifo_thread_dispatch_message(&fifo_thread_ctx, &msg);
440
441 if (ret < 0 || fifo_thread_ctx.recovery_nr > 0) {
442 int rec_ret = fifo_thread_recover(&fifo_thread_ctx, &msg, ret);
443 if (rec_ret < 0) {
444 av_thread_message_queue_set_err_send(queue, rec_ret);
445 break;
446 }
447 }
448
449 /* If the queue is full at the moment when fifo_write_packet
450 * attempts to insert new message (packet) to the queue,
451 * it sets the fifo->overflow_flag to 1 and drops packet.
452 * Here in consumer thread, the flag is checked and if it is
453 * set, the queue is flushed and flag cleared. */
454 pthread_mutex_lock(&fifo->overflow_flag_lock);
455 if (fifo->overflow_flag) {
456 av_thread_message_flush(queue);
457 if (fifo->restart_with_keyframe)
458 fifo_thread_ctx.drop_until_keyframe = 1;
459 fifo->overflow_flag = 0;
460 just_flushed = 1;
461 }
462 pthread_mutex_unlock(&fifo->overflow_flag_lock);
463
464 if (just_flushed)
465 av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n");
466
467 if (fifo->timeshift)
468 while (atomic_load_explicit(&fifo->queue_duration, memory_order_relaxed) < fifo->timeshift)
469 av_usleep(10000);
470
471 ret = av_thread_message_queue_recv(queue, &msg, 0);
472 if (ret < 0) {
473 av_thread_message_queue_set_err_send(queue, ret);
474 break;
475 }
476 }
477
478 fifo->write_trailer_ret = fifo_thread_write_trailer(&fifo_thread_ctx);
479
480 return NULL;
481 }
482
fifo_mux_init(AVFormatContext * avf,const AVOutputFormat * oformat,const char * filename)483 static int fifo_mux_init(AVFormatContext *avf, const AVOutputFormat *oformat,
484 const char *filename)
485 {
486 FifoContext *fifo = avf->priv_data;
487 AVFormatContext *avf2;
488 int ret = 0, i;
489
490 ret = avformat_alloc_output_context2(&avf2, oformat, NULL, filename);
491 if (ret < 0)
492 return ret;
493
494 fifo->avf = avf2;
495
496 avf2->interrupt_callback = avf->interrupt_callback;
497 avf2->max_delay = avf->max_delay;
498 ret = av_dict_copy(&avf2->metadata, avf->metadata, 0);
499 if (ret < 0)
500 return ret;
501 avf2->opaque = avf->opaque;
502 avf2->io_close = avf->io_close;
503 avf2->io_close2 = avf->io_close2;
504 avf2->io_open = avf->io_open;
505 avf2->flags = avf->flags;
506
507 for (i = 0; i < avf->nb_streams; ++i) {
508 AVStream *st = avformat_new_stream(avf2, NULL);
509 if (!st)
510 return AVERROR(ENOMEM);
511
512 ret = ff_stream_encode_params_copy(st, avf->streams[i]);
513 if (ret < 0)
514 return ret;
515 }
516
517 return 0;
518 }
519
fifo_init(AVFormatContext * avf)520 static int fifo_init(AVFormatContext *avf)
521 {
522 FifoContext *fifo = avf->priv_data;
523 const AVOutputFormat *oformat;
524 int ret = 0;
525
526 if (fifo->recovery_wait_streamtime && !fifo->drop_pkts_on_overflow) {
527 av_log(avf, AV_LOG_ERROR, "recovery_wait_streamtime can be turned on"
528 " only when drop_pkts_on_overflow is also turned on\n");
529 return AVERROR(EINVAL);
530 }
531 atomic_init(&fifo->queue_duration, 0);
532 fifo->last_sent_dts = AV_NOPTS_VALUE;
533
534 oformat = av_guess_format(fifo->format, avf->url, NULL);
535 if (!oformat) {
536 ret = AVERROR_MUXER_NOT_FOUND;
537 return ret;
538 }
539
540 ret = fifo_mux_init(avf, oformat, avf->url);
541 if (ret < 0)
542 return ret;
543
544 ret = av_thread_message_queue_alloc(&fifo->queue, (unsigned) fifo->queue_size,
545 sizeof(FifoMessage));
546 if (ret < 0)
547 return ret;
548
549 av_thread_message_queue_set_free_func(fifo->queue, free_message);
550
551 ret = pthread_mutex_init(&fifo->overflow_flag_lock, NULL);
552 if (ret < 0)
553 return AVERROR(ret);
554 fifo->overflow_flag_lock_initialized = 1;
555
556 return 0;
557 }
558
fifo_write_header(AVFormatContext * avf)559 static int fifo_write_header(AVFormatContext *avf)
560 {
561 FifoContext * fifo = avf->priv_data;
562 int ret;
563
564 ret = pthread_create(&fifo->writer_thread, NULL, fifo_consumer_thread, avf);
565 if (ret) {
566 av_log(avf, AV_LOG_ERROR, "Failed to start thread: %s\n",
567 av_err2str(AVERROR(ret)));
568 ret = AVERROR(ret);
569 }
570
571 return ret;
572 }
573
fifo_write_packet(AVFormatContext * avf,AVPacket * pkt)574 static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt)
575 {
576 FifoContext *fifo = avf->priv_data;
577 FifoMessage msg = {.type = pkt ? FIFO_WRITE_PACKET : FIFO_FLUSH_OUTPUT};
578 int ret;
579
580 if (pkt) {
581 ret = av_packet_ref(&msg.pkt,pkt);
582 if (ret < 0)
583 return ret;
584 }
585
586 ret = av_thread_message_queue_send(fifo->queue, &msg,
587 fifo->drop_pkts_on_overflow ?
588 AV_THREAD_MESSAGE_NONBLOCK : 0);
589 if (ret == AVERROR(EAGAIN)) {
590 uint8_t overflow_set = 0;
591
592 /* Queue is full, set fifo->overflow_flag to 1
593 * to let consumer thread know the queue should
594 * be flushed. */
595 pthread_mutex_lock(&fifo->overflow_flag_lock);
596 if (!fifo->overflow_flag)
597 fifo->overflow_flag = overflow_set = 1;
598 pthread_mutex_unlock(&fifo->overflow_flag_lock);
599
600 if (overflow_set)
601 av_log(avf, AV_LOG_WARNING, "FIFO queue full\n");
602 ret = 0;
603 goto fail;
604 } else if (ret < 0) {
605 goto fail;
606 }
607
608 if (fifo->timeshift && pkt && pkt->dts != AV_NOPTS_VALUE)
609 atomic_fetch_add_explicit(&fifo->queue_duration, next_duration(avf, pkt, &fifo->last_sent_dts), memory_order_relaxed);
610
611 return ret;
612 fail:
613 if (pkt)
614 av_packet_unref(&msg.pkt);
615 return ret;
616 }
617
fifo_write_trailer(AVFormatContext * avf)618 static int fifo_write_trailer(AVFormatContext *avf)
619 {
620 FifoContext *fifo= avf->priv_data;
621 int ret;
622
623 av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF);
624 if (fifo->timeshift) {
625 int64_t now = av_gettime_relative();
626 int64_t elapsed = 0;
627 FifoMessage msg = {FIFO_NOOP};
628 do {
629 int64_t delay = av_gettime_relative() - now;
630 if (delay < 0) { // Discontinuity?
631 delay = 10000;
632 now = av_gettime_relative();
633 } else {
634 now += delay;
635 }
636 atomic_fetch_add_explicit(&fifo->queue_duration, delay, memory_order_relaxed);
637 elapsed += delay;
638 if (elapsed > fifo->timeshift)
639 break;
640 av_usleep(10000);
641 ret = av_thread_message_queue_send(fifo->queue, &msg, AV_THREAD_MESSAGE_NONBLOCK);
642 } while (ret >= 0 || ret == AVERROR(EAGAIN));
643 atomic_store(&fifo->queue_duration, INT64_MAX);
644 }
645
646 ret = pthread_join(fifo->writer_thread, NULL);
647 if (ret < 0) {
648 av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n",
649 av_err2str(AVERROR(ret)));
650 return AVERROR(ret);
651 }
652
653 ret = fifo->write_trailer_ret;
654 return ret;
655 }
656
fifo_deinit(AVFormatContext * avf)657 static void fifo_deinit(AVFormatContext *avf)
658 {
659 FifoContext *fifo = avf->priv_data;
660
661 avformat_free_context(fifo->avf);
662 av_thread_message_queue_free(&fifo->queue);
663 if (fifo->overflow_flag_lock_initialized)
664 pthread_mutex_destroy(&fifo->overflow_flag_lock);
665 }
666
667 #define OFFSET(x) offsetof(FifoContext, x)
668 static const AVOption options[] = {
669 {"fifo_format", "Target muxer", OFFSET(format),
670 AV_OPT_TYPE_STRING, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
671
672 {"queue_size", "Size of fifo queue", OFFSET(queue_size),
673 AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_QUEUE_SIZE}, 1, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM},
674
675 {"format_opts", "Options to be passed to underlying muxer", OFFSET(format_options),
676 AV_OPT_TYPE_DICT, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
677
678 {"drop_pkts_on_overflow", "Drop packets on fifo queue overflow not to block encoder", OFFSET(drop_pkts_on_overflow),
679 AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
680
681 {"restart_with_keyframe", "Wait for keyframe when restarting output", OFFSET(restart_with_keyframe),
682 AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
683
684 {"attempt_recovery", "Attempt recovery in case of failure", OFFSET(attempt_recovery),
685 AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
686
687 {"max_recovery_attempts", "Maximal number of recovery attempts", OFFSET(max_recovery_attempts),
688 AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS}, 0, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM},
689
690 {"recovery_wait_time", "Waiting time between recovery attempts", OFFSET(recovery_wait_time),
691 AV_OPT_TYPE_DURATION, {.i64 = FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM},
692
693 {"recovery_wait_streamtime", "Use stream time instead of real time while waiting for recovery",
694 OFFSET(recovery_wait_streamtime), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
695
696 {"recover_any_error", "Attempt recovery regardless of type of the error", OFFSET(recover_any_error),
697 AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
698
699 {"timeshift", "Delay fifo output", OFFSET(timeshift),
700 AV_OPT_TYPE_DURATION, {.i64 = 0}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM},
701
702 {NULL},
703 };
704
705 static const AVClass fifo_muxer_class = {
706 .class_name = "Fifo muxer",
707 .item_name = av_default_item_name,
708 .option = options,
709 .version = LIBAVUTIL_VERSION_INT,
710 };
711
712 const AVOutputFormat ff_fifo_muxer = {
713 .name = "fifo",
714 .long_name = NULL_IF_CONFIG_SMALL("FIFO queue pseudo-muxer"),
715 .priv_data_size = sizeof(FifoContext),
716 .init = fifo_init,
717 .write_header = fifo_write_header,
718 .write_packet = fifo_write_packet,
719 .write_trailer = fifo_write_trailer,
720 .deinit = fifo_deinit,
721 .priv_class = &fifo_muxer_class,
722 .flags = AVFMT_NOFILE | AVFMT_ALLOW_FLUSH | AVFMT_TS_NEGATIVE,
723 };
724