• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * FIFO pseudo-muxer
3  * Copyright (c) 2016 Jan Sebechlebsky
4  *
5  * This file is part of FFmpeg.
6  *
7  * FFmpeg is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public License
9  * as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * FFmpeg is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with FFmpeg; if not, write to the Free Software * Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21 
22 #include "libavutil/avassert.h"
23 #include "libavutil/opt.h"
24 #include "libavutil/time.h"
25 #include "libavutil/thread.h"
26 #include "libavutil/threadmessage.h"
27 #include "avformat.h"
28 #include "internal.h"
29 
30 #define FIFO_DEFAULT_QUEUE_SIZE              60
31 #define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS   0
32 #define FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC 5000000 // 5 seconds
33 
34 typedef struct FifoContext {
35     const AVClass *class;
36     AVFormatContext *avf;
37 
38     char *format;
39     AVDictionary *format_options;
40 
41     int queue_size;
42     AVThreadMessageQueue *queue;
43 
44     pthread_t writer_thread;
45 
46     /* Return value of last write_trailer_call */
47     int write_trailer_ret;
48 
49     /* Time to wait before next recovery attempt
50      * This can refer to the time in processed stream,
51      * or real time. */
52     int64_t recovery_wait_time;
53 
54     /* Maximal number of unsuccessful successive recovery attempts */
55     int max_recovery_attempts;
56 
57     /* Whether to attempt recovery from failure */
58     int attempt_recovery;
59 
60     /* If >0 stream time will be used when waiting
61      * for the recovery attempt instead of real time */
62     int recovery_wait_streamtime;
63 
64     /* If >0 recovery will be attempted regardless of error code
65      * (except AVERROR_EXIT, so exit request is never ignored) */
66     int recover_any_error;
67 
68     /* Whether to drop packets in case the queue is full. */
69     int drop_pkts_on_overflow;
70 
71     /* Whether to wait for keyframe when recovering
72      * from failure or queue overflow */
73     int restart_with_keyframe;
74 
75     pthread_mutex_t overflow_flag_lock;
76     int overflow_flag_lock_initialized;
77     /* Value > 0 signals queue overflow */
78     volatile uint8_t overflow_flag;
79 
80 } FifoContext;
81 
82 typedef struct FifoThreadContext {
83     AVFormatContext *avf;
84 
85     /* Timestamp of last failure.
86      * This is either pts in case stream time is used,
87      * or microseconds as returned by av_getttime_relative() */
88     int64_t last_recovery_ts;
89 
90     /* Number of current recovery process
91      * Value > 0 means we are in recovery process */
92     int recovery_nr;
93 
94     /* If > 0 all frames will be dropped until keyframe is received */
95     uint8_t drop_until_keyframe;
96 
97     /* Value > 0 means that the previous write_header call was successful
98      * so finalization by calling write_trailer and ff_io_close must be done
99      * before exiting / reinitialization of underlying muxer */
100     uint8_t header_written;
101 } FifoThreadContext;
102 
103 typedef enum FifoMessageType {
104     FIFO_WRITE_HEADER,
105     FIFO_WRITE_PACKET,
106     FIFO_FLUSH_OUTPUT
107 } FifoMessageType;
108 
109 typedef struct FifoMessage {
110     FifoMessageType type;
111     AVPacket pkt;
112 } FifoMessage;
113 
fifo_thread_write_header(FifoThreadContext * ctx)114 static int fifo_thread_write_header(FifoThreadContext *ctx)
115 {
116     AVFormatContext *avf = ctx->avf;
117     FifoContext *fifo = avf->priv_data;
118     AVFormatContext *avf2 = fifo->avf;
119     AVDictionary *format_options = NULL;
120     int ret, i;
121 
122     ret = av_dict_copy(&format_options, fifo->format_options, 0);
123     if (ret < 0)
124         return ret;
125 
126     ret = ff_format_output_open(avf2, avf->url, &format_options);
127     if (ret < 0) {
128         av_log(avf, AV_LOG_ERROR, "Error opening %s: %s\n", avf->url,
129                av_err2str(ret));
130         goto end;
131     }
132 
133     for (i = 0;i < avf2->nb_streams; i++)
134         avf2->streams[i]->cur_dts = 0;
135 
136     ret = avformat_write_header(avf2, &format_options);
137     if (!ret)
138         ctx->header_written = 1;
139 
140     // Check for options unrecognized by underlying muxer
141     if (format_options) {
142         AVDictionaryEntry *entry = NULL;
143         while ((entry = av_dict_get(format_options, "", entry, AV_DICT_IGNORE_SUFFIX)))
144             av_log(avf2, AV_LOG_ERROR, "Unknown option '%s'\n", entry->key);
145         ret = AVERROR(EINVAL);
146     }
147 
148 end:
149     av_dict_free(&format_options);
150     return ret;
151 }
152 
fifo_thread_flush_output(FifoThreadContext * ctx)153 static int fifo_thread_flush_output(FifoThreadContext *ctx)
154 {
155     AVFormatContext *avf = ctx->avf;
156     FifoContext *fifo = avf->priv_data;
157     AVFormatContext *avf2 = fifo->avf;
158 
159     return av_write_frame(avf2, NULL);
160 }
161 
fifo_thread_write_packet(FifoThreadContext * ctx,AVPacket * pkt)162 static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt)
163 {
164     AVFormatContext *avf = ctx->avf;
165     FifoContext *fifo = avf->priv_data;
166     AVFormatContext *avf2 = fifo->avf;
167     AVRational src_tb, dst_tb;
168     int ret, s_idx;
169 
170     if (ctx->drop_until_keyframe) {
171         if (pkt->flags & AV_PKT_FLAG_KEY) {
172             ctx->drop_until_keyframe = 0;
173             av_log(avf, AV_LOG_VERBOSE, "Keyframe received, recovering...\n");
174         } else {
175             av_log(avf, AV_LOG_VERBOSE, "Dropping non-keyframe packet\n");
176             av_packet_unref(pkt);
177             return 0;
178         }
179     }
180 
181     s_idx = pkt->stream_index;
182     src_tb = avf->streams[s_idx]->time_base;
183     dst_tb = avf2->streams[s_idx]->time_base;
184     av_packet_rescale_ts(pkt, src_tb, dst_tb);
185 
186     ret = av_write_frame(avf2, pkt);
187     if (ret >= 0)
188         av_packet_unref(pkt);
189     return ret;
190 }
191 
fifo_thread_write_trailer(FifoThreadContext * ctx)192 static int fifo_thread_write_trailer(FifoThreadContext *ctx)
193 {
194     AVFormatContext *avf = ctx->avf;
195     FifoContext *fifo = avf->priv_data;
196     AVFormatContext *avf2 = fifo->avf;
197     int ret;
198 
199     if (!ctx->header_written)
200         return 0;
201 
202     ret = av_write_trailer(avf2);
203     ff_format_io_close(avf2, &avf2->pb);
204 
205     return ret;
206 }
207 
fifo_thread_dispatch_message(FifoThreadContext * ctx,FifoMessage * msg)208 static int fifo_thread_dispatch_message(FifoThreadContext *ctx, FifoMessage *msg)
209 {
210     int ret = AVERROR(EINVAL);
211 
212     if (!ctx->header_written) {
213         ret = fifo_thread_write_header(ctx);
214         if (ret < 0)
215             return ret;
216     }
217 
218     switch(msg->type) {
219     case FIFO_WRITE_HEADER:
220         av_assert0(ret >= 0);
221         return ret;
222     case FIFO_WRITE_PACKET:
223         return fifo_thread_write_packet(ctx, &msg->pkt);
224     case FIFO_FLUSH_OUTPUT:
225         return fifo_thread_flush_output(ctx);
226     }
227 
228     av_assert0(0);
229     return AVERROR(EINVAL);
230 }
231 
is_recoverable(const FifoContext * fifo,int err_no)232 static int is_recoverable(const FifoContext *fifo, int err_no) {
233     if (!fifo->attempt_recovery)
234         return 0;
235 
236     if (fifo->recover_any_error)
237         return err_no != AVERROR_EXIT;
238 
239     switch (err_no) {
240     case AVERROR(EINVAL):
241     case AVERROR(ENOSYS):
242     case AVERROR_EOF:
243     case AVERROR_EXIT:
244     case AVERROR_PATCHWELCOME:
245         return 0;
246     default:
247         return 1;
248     }
249 }
250 
free_message(void * msg)251 static void free_message(void *msg)
252 {
253     FifoMessage *fifo_msg = msg;
254 
255     if (fifo_msg->type == FIFO_WRITE_PACKET)
256         av_packet_unref(&fifo_msg->pkt);
257 }
258 
fifo_thread_process_recovery_failure(FifoThreadContext * ctx,AVPacket * pkt,int err_no)259 static int fifo_thread_process_recovery_failure(FifoThreadContext *ctx, AVPacket *pkt,
260                                                 int err_no)
261 {
262     AVFormatContext *avf = ctx->avf;
263     FifoContext *fifo = avf->priv_data;
264     int ret;
265 
266     av_log(avf, AV_LOG_INFO, "Recovery failed: %s\n",
267            av_err2str(err_no));
268 
269     if (fifo->recovery_wait_streamtime) {
270         if (pkt->pts == AV_NOPTS_VALUE)
271             av_log(avf, AV_LOG_WARNING, "Packet does not contain presentation"
272                    " timestamp, recovery will be attempted immediately");
273         ctx->last_recovery_ts = pkt->pts;
274     } else {
275         ctx->last_recovery_ts = av_gettime_relative();
276     }
277 
278     if (fifo->max_recovery_attempts &&
279         ctx->recovery_nr >= fifo->max_recovery_attempts) {
280         av_log(avf, AV_LOG_ERROR,
281                "Maximal number of %d recovery attempts reached.\n",
282                fifo->max_recovery_attempts);
283         ret = err_no;
284     } else {
285         ret = AVERROR(EAGAIN);
286     }
287 
288     return ret;
289 }
290 
fifo_thread_attempt_recovery(FifoThreadContext * ctx,FifoMessage * msg,int err_no)291 static int fifo_thread_attempt_recovery(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
292 {
293     AVFormatContext *avf = ctx->avf;
294     FifoContext *fifo = avf->priv_data;
295     AVPacket *pkt = &msg->pkt;
296     int64_t time_since_recovery;
297     int ret;
298 
299     if (!is_recoverable(fifo, err_no)) {
300         ret = err_no;
301         goto fail;
302     }
303 
304     if (ctx->header_written) {
305         fifo->write_trailer_ret = fifo_thread_write_trailer(ctx);
306         ctx->header_written = 0;
307     }
308 
309     if (!ctx->recovery_nr) {
310         ctx->last_recovery_ts = fifo->recovery_wait_streamtime ?
311                                 AV_NOPTS_VALUE : 0;
312     } else {
313         if (fifo->recovery_wait_streamtime) {
314             if (ctx->last_recovery_ts == AV_NOPTS_VALUE) {
315                 AVRational tb = avf->streams[pkt->stream_index]->time_base;
316                 time_since_recovery = av_rescale_q(pkt->pts - ctx->last_recovery_ts,
317                                                    tb, AV_TIME_BASE_Q);
318             } else {
319                 /* Enforce recovery immediately */
320                 time_since_recovery = fifo->recovery_wait_time;
321             }
322         } else {
323             time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
324         }
325 
326         if (time_since_recovery < fifo->recovery_wait_time)
327             return AVERROR(EAGAIN);
328     }
329 
330     ctx->recovery_nr++;
331 
332     if (fifo->max_recovery_attempts) {
333         av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d/%d\n",
334                ctx->recovery_nr, fifo->max_recovery_attempts);
335     } else {
336         av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d\n",
337                ctx->recovery_nr);
338     }
339 
340     if (fifo->restart_with_keyframe && fifo->drop_pkts_on_overflow)
341         ctx->drop_until_keyframe = 1;
342 
343     ret = fifo_thread_dispatch_message(ctx, msg);
344     if (ret < 0) {
345         if (is_recoverable(fifo, ret)) {
346             return fifo_thread_process_recovery_failure(ctx, pkt, ret);
347         } else {
348             goto fail;
349         }
350     } else {
351         av_log(avf, AV_LOG_INFO, "Recovery successful\n");
352         ctx->recovery_nr = 0;
353     }
354 
355     return 0;
356 
357 fail:
358     free_message(msg);
359     return ret;
360 }
361 
fifo_thread_recover(FifoThreadContext * ctx,FifoMessage * msg,int err_no)362 static int fifo_thread_recover(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
363 {
364     AVFormatContext *avf = ctx->avf;
365     FifoContext *fifo = avf->priv_data;
366     int ret;
367 
368     do {
369         if (!fifo->recovery_wait_streamtime && ctx->recovery_nr > 0) {
370             int64_t time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
371             int64_t time_to_wait = FFMAX(0, fifo->recovery_wait_time - time_since_recovery);
372             if (time_to_wait)
373                 av_usleep(FFMIN(10000, time_to_wait));
374         }
375 
376         ret = fifo_thread_attempt_recovery(ctx, msg, err_no);
377     } while (ret == AVERROR(EAGAIN) && !fifo->drop_pkts_on_overflow);
378 
379     if (ret == AVERROR(EAGAIN) && fifo->drop_pkts_on_overflow) {
380         if (msg->type == FIFO_WRITE_PACKET)
381             av_packet_unref(&msg->pkt);
382         ret = 0;
383     }
384 
385     return ret;
386 }
387 
fifo_consumer_thread(void * data)388 static void *fifo_consumer_thread(void *data)
389 {
390     AVFormatContext *avf = data;
391     FifoContext *fifo = avf->priv_data;
392     AVThreadMessageQueue *queue = fifo->queue;
393     FifoMessage msg = {FIFO_WRITE_HEADER, {0}};
394     int ret;
395 
396     FifoThreadContext fifo_thread_ctx;
397     memset(&fifo_thread_ctx, 0, sizeof(FifoThreadContext));
398     fifo_thread_ctx.avf = avf;
399 
400     while (1) {
401         uint8_t just_flushed = 0;
402 
403         if (!fifo_thread_ctx.recovery_nr)
404             ret = fifo_thread_dispatch_message(&fifo_thread_ctx, &msg);
405 
406         if (ret < 0 || fifo_thread_ctx.recovery_nr > 0) {
407             int rec_ret = fifo_thread_recover(&fifo_thread_ctx, &msg, ret);
408             if (rec_ret < 0) {
409                 av_thread_message_queue_set_err_send(queue, rec_ret);
410                 break;
411             }
412         }
413 
414         /* If the queue is full at the moment when fifo_write_packet
415          * attempts to insert new message (packet) to the queue,
416          * it sets the fifo->overflow_flag to 1 and drops packet.
417          * Here in consumer thread, the flag is checked and if it is
418          * set, the queue is flushed and flag cleared. */
419         pthread_mutex_lock(&fifo->overflow_flag_lock);
420         if (fifo->overflow_flag) {
421             av_thread_message_flush(queue);
422             if (fifo->restart_with_keyframe)
423                 fifo_thread_ctx.drop_until_keyframe = 1;
424             fifo->overflow_flag = 0;
425             just_flushed = 1;
426         }
427         pthread_mutex_unlock(&fifo->overflow_flag_lock);
428 
429         if (just_flushed)
430             av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n");
431 
432         ret = av_thread_message_queue_recv(queue, &msg, 0);
433         if (ret < 0) {
434             av_thread_message_queue_set_err_send(queue, ret);
435             break;
436         }
437     }
438 
439     fifo->write_trailer_ret = fifo_thread_write_trailer(&fifo_thread_ctx);
440 
441     return NULL;
442 }
443 
fifo_mux_init(AVFormatContext * avf,ff_const59 AVOutputFormat * oformat,const char * filename)444 static int fifo_mux_init(AVFormatContext *avf, ff_const59 AVOutputFormat *oformat,
445                          const char *filename)
446 {
447     FifoContext *fifo = avf->priv_data;
448     AVFormatContext *avf2;
449     int ret = 0, i;
450 
451     ret = avformat_alloc_output_context2(&avf2, oformat, NULL, filename);
452     if (ret < 0)
453         return ret;
454 
455     fifo->avf = avf2;
456 
457     avf2->interrupt_callback = avf->interrupt_callback;
458     avf2->max_delay = avf->max_delay;
459     ret = av_dict_copy(&avf2->metadata, avf->metadata, 0);
460     if (ret < 0)
461         return ret;
462     avf2->opaque = avf->opaque;
463     avf2->io_close = avf->io_close;
464     avf2->io_open = avf->io_open;
465     avf2->flags = avf->flags;
466 
467     for (i = 0; i < avf->nb_streams; ++i) {
468         AVStream *st = avformat_new_stream(avf2, NULL);
469         if (!st)
470             return AVERROR(ENOMEM);
471 
472         ret = ff_stream_encode_params_copy(st, avf->streams[i]);
473         if (ret < 0)
474             return ret;
475     }
476 
477     return 0;
478 }
479 
fifo_init(AVFormatContext * avf)480 static int fifo_init(AVFormatContext *avf)
481 {
482     FifoContext *fifo = avf->priv_data;
483     ff_const59 AVOutputFormat *oformat;
484     int ret = 0;
485 
486     if (fifo->recovery_wait_streamtime && !fifo->drop_pkts_on_overflow) {
487         av_log(avf, AV_LOG_ERROR, "recovery_wait_streamtime can be turned on"
488                " only when drop_pkts_on_overflow is also turned on\n");
489         return AVERROR(EINVAL);
490     }
491 
492     oformat = av_guess_format(fifo->format, avf->url, NULL);
493     if (!oformat) {
494         ret = AVERROR_MUXER_NOT_FOUND;
495         return ret;
496     }
497 
498     ret = fifo_mux_init(avf, oformat, avf->url);
499     if (ret < 0)
500         return ret;
501 
502     ret = av_thread_message_queue_alloc(&fifo->queue, (unsigned) fifo->queue_size,
503                                         sizeof(FifoMessage));
504     if (ret < 0)
505         return ret;
506 
507     av_thread_message_queue_set_free_func(fifo->queue, free_message);
508 
509     ret = pthread_mutex_init(&fifo->overflow_flag_lock, NULL);
510     if (ret < 0)
511         return AVERROR(ret);
512     fifo->overflow_flag_lock_initialized = 1;
513 
514     return 0;
515 }
516 
fifo_write_header(AVFormatContext * avf)517 static int fifo_write_header(AVFormatContext *avf)
518 {
519     FifoContext * fifo = avf->priv_data;
520     int ret;
521 
522     ret = pthread_create(&fifo->writer_thread, NULL, fifo_consumer_thread, avf);
523     if (ret) {
524         av_log(avf, AV_LOG_ERROR, "Failed to start thread: %s\n",
525                av_err2str(AVERROR(ret)));
526         ret = AVERROR(ret);
527     }
528 
529     return ret;
530 }
531 
fifo_write_packet(AVFormatContext * avf,AVPacket * pkt)532 static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt)
533 {
534     FifoContext *fifo = avf->priv_data;
535     FifoMessage msg = {.type = pkt ? FIFO_WRITE_PACKET : FIFO_FLUSH_OUTPUT};
536     int ret;
537 
538     if (pkt) {
539         ret = av_packet_ref(&msg.pkt,pkt);
540         if (ret < 0)
541             return ret;
542     }
543 
544     ret = av_thread_message_queue_send(fifo->queue, &msg,
545                                        fifo->drop_pkts_on_overflow ?
546                                        AV_THREAD_MESSAGE_NONBLOCK : 0);
547     if (ret == AVERROR(EAGAIN)) {
548         uint8_t overflow_set = 0;
549 
550         /* Queue is full, set fifo->overflow_flag to 1
551          * to let consumer thread know the queue should
552          * be flushed. */
553         pthread_mutex_lock(&fifo->overflow_flag_lock);
554         if (!fifo->overflow_flag)
555             fifo->overflow_flag = overflow_set = 1;
556         pthread_mutex_unlock(&fifo->overflow_flag_lock);
557 
558         if (overflow_set)
559             av_log(avf, AV_LOG_WARNING, "FIFO queue full\n");
560         ret = 0;
561         goto fail;
562     } else if (ret < 0) {
563         goto fail;
564     }
565 
566     return ret;
567 fail:
568     if (pkt)
569         av_packet_unref(&msg.pkt);
570     return ret;
571 }
572 
fifo_write_trailer(AVFormatContext * avf)573 static int fifo_write_trailer(AVFormatContext *avf)
574 {
575     FifoContext *fifo= avf->priv_data;
576     int ret;
577 
578     av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF);
579 
580     ret = pthread_join(fifo->writer_thread, NULL);
581     if (ret < 0) {
582         av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n",
583                av_err2str(AVERROR(ret)));
584         return AVERROR(ret);
585     }
586 
587     ret = fifo->write_trailer_ret;
588     return ret;
589 }
590 
fifo_deinit(AVFormatContext * avf)591 static void fifo_deinit(AVFormatContext *avf)
592 {
593     FifoContext *fifo = avf->priv_data;
594 
595     avformat_free_context(fifo->avf);
596     av_thread_message_queue_free(&fifo->queue);
597     if (fifo->overflow_flag_lock_initialized)
598         pthread_mutex_destroy(&fifo->overflow_flag_lock);
599 }
600 
601 #define OFFSET(x) offsetof(FifoContext, x)
602 static const AVOption options[] = {
603         {"fifo_format", "Target muxer", OFFSET(format),
604          AV_OPT_TYPE_STRING, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
605 
606         {"queue_size", "Size of fifo queue", OFFSET(queue_size),
607          AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_QUEUE_SIZE}, 1, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM},
608 
609         {"format_opts", "Options to be passed to underlying muxer", OFFSET(format_options),
610          AV_OPT_TYPE_DICT, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
611 
612         {"drop_pkts_on_overflow", "Drop packets on fifo queue overflow not to block encoder", OFFSET(drop_pkts_on_overflow),
613          AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
614 
615         {"restart_with_keyframe", "Wait for keyframe when restarting output", OFFSET(restart_with_keyframe),
616          AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
617 
618         {"attempt_recovery", "Attempt recovery in case of failure", OFFSET(attempt_recovery),
619         AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
620 
621         {"max_recovery_attempts", "Maximal number of recovery attempts", OFFSET(max_recovery_attempts),
622          AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS}, 0, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM},
623 
624         {"recovery_wait_time", "Waiting time between recovery attempts", OFFSET(recovery_wait_time),
625          AV_OPT_TYPE_DURATION, {.i64 = FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM},
626 
627         {"recovery_wait_streamtime", "Use stream time instead of real time while waiting for recovery",
628          OFFSET(recovery_wait_streamtime), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
629 
630         {"recover_any_error", "Attempt recovery regardless of type of the error", OFFSET(recover_any_error),
631          AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
632 
633         {NULL},
634 };
635 
636 static const AVClass fifo_muxer_class = {
637     .class_name = "Fifo muxer",
638     .item_name  = av_default_item_name,
639     .option     = options,
640     .version    = LIBAVUTIL_VERSION_INT,
641 };
642 
643 AVOutputFormat ff_fifo_muxer = {
644     .name           = "fifo",
645     .long_name      = NULL_IF_CONFIG_SMALL("FIFO queue pseudo-muxer"),
646     .priv_data_size = sizeof(FifoContext),
647     .init           = fifo_init,
648     .write_header   = fifo_write_header,
649     .write_packet   = fifo_write_packet,
650     .write_trailer  = fifo_write_trailer,
651     .deinit         = fifo_deinit,
652     .priv_class     = &fifo_muxer_class,
653     .flags          = AVFMT_NOFILE | AVFMT_ALLOW_FLUSH | AVFMT_TS_NEGATIVE,
654 };
655