• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * FIFO pseudo-muxer
3  * Copyright (c) 2016 Jan Sebechlebsky
4  *
5  * This file is part of FFmpeg.
6  *
7  * FFmpeg is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public License
9  * as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * FFmpeg is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with FFmpeg; if not, write to the Free Software * Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21 
22 #include <stdatomic.h>
23 
24 #include "libavutil/avassert.h"
25 #include "libavutil/opt.h"
26 #include "libavutil/time.h"
27 #include "libavutil/thread.h"
28 #include "libavutil/threadmessage.h"
29 #include "avformat.h"
30 #include "internal.h"
31 
32 #define FIFO_DEFAULT_QUEUE_SIZE              60
33 #define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS   0
34 #define FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC 5000000 // 5 seconds
35 
36 typedef struct FifoContext {
37     const AVClass *class;
38     AVFormatContext *avf;
39 
40     char *format;
41     AVDictionary *format_options;
42 
43     int queue_size;
44     AVThreadMessageQueue *queue;
45 
46     pthread_t writer_thread;
47 
48     /* Return value of last write_trailer_call */
49     int write_trailer_ret;
50 
51     /* Time to wait before next recovery attempt
52      * This can refer to the time in processed stream,
53      * or real time. */
54     int64_t recovery_wait_time;
55 
56     /* Maximal number of unsuccessful successive recovery attempts */
57     int max_recovery_attempts;
58 
59     /* Whether to attempt recovery from failure */
60     int attempt_recovery;
61 
62     /* If >0 stream time will be used when waiting
63      * for the recovery attempt instead of real time */
64     int recovery_wait_streamtime;
65 
66     /* If >0 recovery will be attempted regardless of error code
67      * (except AVERROR_EXIT, so exit request is never ignored) */
68     int recover_any_error;
69 
70     /* Whether to drop packets in case the queue is full. */
71     int drop_pkts_on_overflow;
72 
73     /* Whether to wait for keyframe when recovering
74      * from failure or queue overflow */
75     int restart_with_keyframe;
76 
77     pthread_mutex_t overflow_flag_lock;
78     int overflow_flag_lock_initialized;
79     /* Value > 0 signals queue overflow */
80     volatile uint8_t overflow_flag;
81 
82     atomic_int_least64_t queue_duration;
83     int64_t last_sent_dts;
84     int64_t timeshift;
85 } FifoContext;
86 
87 typedef struct FifoThreadContext {
88     AVFormatContext *avf;
89 
90     /* Timestamp of last failure.
91      * This is either pts in case stream time is used,
92      * or microseconds as returned by av_getttime_relative() */
93     int64_t last_recovery_ts;
94 
95     /* Number of current recovery process
96      * Value > 0 means we are in recovery process */
97     int recovery_nr;
98 
99     /* If > 0 all frames will be dropped until keyframe is received */
100     uint8_t drop_until_keyframe;
101 
102     /* Value > 0 means that the previous write_header call was successful
103      * so finalization by calling write_trailer and ff_io_close must be done
104      * before exiting / reinitialization of underlying muxer */
105     uint8_t header_written;
106 
107     int64_t last_received_dts;
108 } FifoThreadContext;
109 
110 typedef enum FifoMessageType {
111     FIFO_NOOP,
112     FIFO_WRITE_HEADER,
113     FIFO_WRITE_PACKET,
114     FIFO_FLUSH_OUTPUT
115 } FifoMessageType;
116 
117 typedef struct FifoMessage {
118     FifoMessageType type;
119     AVPacket pkt;
120 } FifoMessage;
121 
fifo_thread_write_header(FifoThreadContext * ctx)122 static int fifo_thread_write_header(FifoThreadContext *ctx)
123 {
124     AVFormatContext *avf = ctx->avf;
125     FifoContext *fifo = avf->priv_data;
126     AVFormatContext *avf2 = fifo->avf;
127     AVDictionary *format_options = NULL;
128     int ret, i;
129 
130     ret = av_dict_copy(&format_options, fifo->format_options, 0);
131     if (ret < 0)
132         return ret;
133 
134     ret = ff_format_output_open(avf2, avf->url, &format_options);
135     if (ret < 0) {
136         av_log(avf, AV_LOG_ERROR, "Error opening %s: %s\n", avf->url,
137                av_err2str(ret));
138         goto end;
139     }
140 
141     for (i = 0;i < avf2->nb_streams; i++)
142         avf2->streams[i]->cur_dts = 0;
143 
144     ret = avformat_write_header(avf2, &format_options);
145     if (!ret)
146         ctx->header_written = 1;
147 
148     // Check for options unrecognized by underlying muxer
149     if (format_options) {
150         AVDictionaryEntry *entry = NULL;
151         while ((entry = av_dict_get(format_options, "", entry, AV_DICT_IGNORE_SUFFIX)))
152             av_log(avf2, AV_LOG_ERROR, "Unknown option '%s'\n", entry->key);
153         ret = AVERROR(EINVAL);
154     }
155 
156 end:
157     av_dict_free(&format_options);
158     return ret;
159 }
160 
fifo_thread_flush_output(FifoThreadContext * ctx)161 static int fifo_thread_flush_output(FifoThreadContext *ctx)
162 {
163     AVFormatContext *avf = ctx->avf;
164     FifoContext *fifo = avf->priv_data;
165     AVFormatContext *avf2 = fifo->avf;
166 
167     return av_write_frame(avf2, NULL);
168 }
169 
next_duration(AVFormatContext * avf,AVPacket * pkt,int64_t * last_dts)170 static int64_t next_duration(AVFormatContext *avf, AVPacket *pkt, int64_t *last_dts)
171 {
172     AVStream *st = avf->streams[pkt->stream_index];
173     int64_t dts = av_rescale_q(pkt->dts, st->time_base, AV_TIME_BASE_Q);
174     int64_t duration = (*last_dts == AV_NOPTS_VALUE ? 0 : dts - *last_dts);
175     *last_dts = dts;
176     return duration;
177 }
178 
fifo_thread_write_packet(FifoThreadContext * ctx,AVPacket * pkt)179 static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt)
180 {
181     AVFormatContext *avf = ctx->avf;
182     FifoContext *fifo = avf->priv_data;
183     AVFormatContext *avf2 = fifo->avf;
184     AVRational src_tb, dst_tb;
185     int ret, s_idx;
186 
187     if (fifo->timeshift && pkt->dts != AV_NOPTS_VALUE)
188         atomic_fetch_sub_explicit(&fifo->queue_duration, next_duration(avf, pkt, &ctx->last_received_dts), memory_order_relaxed);
189 
190     if (ctx->drop_until_keyframe) {
191         if (pkt->flags & AV_PKT_FLAG_KEY) {
192             ctx->drop_until_keyframe = 0;
193             av_log(avf, AV_LOG_VERBOSE, "Keyframe received, recovering...\n");
194         } else {
195             av_log(avf, AV_LOG_VERBOSE, "Dropping non-keyframe packet\n");
196             av_packet_unref(pkt);
197             return 0;
198         }
199     }
200 
201     s_idx = pkt->stream_index;
202     src_tb = avf->streams[s_idx]->time_base;
203     dst_tb = avf2->streams[s_idx]->time_base;
204     av_packet_rescale_ts(pkt, src_tb, dst_tb);
205 
206     ret = av_write_frame(avf2, pkt);
207     if (ret >= 0)
208         av_packet_unref(pkt);
209     return ret;
210 }
211 
fifo_thread_write_trailer(FifoThreadContext * ctx)212 static int fifo_thread_write_trailer(FifoThreadContext *ctx)
213 {
214     AVFormatContext *avf = ctx->avf;
215     FifoContext *fifo = avf->priv_data;
216     AVFormatContext *avf2 = fifo->avf;
217     int ret;
218 
219     if (!ctx->header_written)
220         return 0;
221 
222     ret = av_write_trailer(avf2);
223     ff_format_io_close(avf2, &avf2->pb);
224 
225     return ret;
226 }
227 
fifo_thread_dispatch_message(FifoThreadContext * ctx,FifoMessage * msg)228 static int fifo_thread_dispatch_message(FifoThreadContext *ctx, FifoMessage *msg)
229 {
230     int ret = AVERROR(EINVAL);
231 
232     if (msg->type == FIFO_NOOP)
233         return 0;
234 
235     if (!ctx->header_written) {
236         ret = fifo_thread_write_header(ctx);
237         if (ret < 0)
238             return ret;
239     }
240 
241     switch(msg->type) {
242     case FIFO_WRITE_HEADER:
243         av_assert0(ret >= 0);
244         return ret;
245     case FIFO_WRITE_PACKET:
246         return fifo_thread_write_packet(ctx, &msg->pkt);
247     case FIFO_FLUSH_OUTPUT:
248         return fifo_thread_flush_output(ctx);
249     }
250 
251     av_assert0(0);
252     return AVERROR(EINVAL);
253 }
254 
is_recoverable(const FifoContext * fifo,int err_no)255 static int is_recoverable(const FifoContext *fifo, int err_no) {
256     if (!fifo->attempt_recovery)
257         return 0;
258 
259     if (fifo->recover_any_error)
260         return err_no != AVERROR_EXIT;
261 
262     switch (err_no) {
263     case AVERROR(EINVAL):
264     case AVERROR(ENOSYS):
265     case AVERROR_EOF:
266     case AVERROR_EXIT:
267     case AVERROR_PATCHWELCOME:
268         return 0;
269     default:
270         return 1;
271     }
272 }
273 
free_message(void * msg)274 static void free_message(void *msg)
275 {
276     FifoMessage *fifo_msg = msg;
277 
278     if (fifo_msg->type == FIFO_WRITE_PACKET)
279         av_packet_unref(&fifo_msg->pkt);
280 }
281 
fifo_thread_process_recovery_failure(FifoThreadContext * ctx,AVPacket * pkt,int err_no)282 static int fifo_thread_process_recovery_failure(FifoThreadContext *ctx, AVPacket *pkt,
283                                                 int err_no)
284 {
285     AVFormatContext *avf = ctx->avf;
286     FifoContext *fifo = avf->priv_data;
287     int ret;
288 
289     av_log(avf, AV_LOG_INFO, "Recovery failed: %s\n",
290            av_err2str(err_no));
291 
292     if (fifo->recovery_wait_streamtime) {
293         if (pkt->pts == AV_NOPTS_VALUE)
294             av_log(avf, AV_LOG_WARNING, "Packet does not contain presentation"
295                    " timestamp, recovery will be attempted immediately");
296         ctx->last_recovery_ts = pkt->pts;
297     } else {
298         ctx->last_recovery_ts = av_gettime_relative();
299     }
300 
301     if (fifo->max_recovery_attempts &&
302         ctx->recovery_nr >= fifo->max_recovery_attempts) {
303         av_log(avf, AV_LOG_ERROR,
304                "Maximal number of %d recovery attempts reached.\n",
305                fifo->max_recovery_attempts);
306         ret = err_no;
307     } else {
308         ret = AVERROR(EAGAIN);
309     }
310 
311     return ret;
312 }
313 
fifo_thread_attempt_recovery(FifoThreadContext * ctx,FifoMessage * msg,int err_no)314 static int fifo_thread_attempt_recovery(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
315 {
316     AVFormatContext *avf = ctx->avf;
317     FifoContext *fifo = avf->priv_data;
318     AVPacket *pkt = &msg->pkt;
319     int64_t time_since_recovery;
320     int ret;
321 
322     if (!is_recoverable(fifo, err_no)) {
323         ret = err_no;
324         goto fail;
325     }
326 
327     if (ctx->header_written) {
328         fifo->write_trailer_ret = fifo_thread_write_trailer(ctx);
329         ctx->header_written = 0;
330     }
331 
332     if (!ctx->recovery_nr) {
333         ctx->last_recovery_ts = fifo->recovery_wait_streamtime ?
334                                 AV_NOPTS_VALUE : 0;
335     } else {
336         if (fifo->recovery_wait_streamtime) {
337             if (ctx->last_recovery_ts == AV_NOPTS_VALUE) {
338                 AVRational tb = avf->streams[pkt->stream_index]->time_base;
339                 time_since_recovery = av_rescale_q(pkt->pts - ctx->last_recovery_ts,
340                                                    tb, AV_TIME_BASE_Q);
341             } else {
342                 /* Enforce recovery immediately */
343                 time_since_recovery = fifo->recovery_wait_time;
344             }
345         } else {
346             time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
347         }
348 
349         if (time_since_recovery < fifo->recovery_wait_time)
350             return AVERROR(EAGAIN);
351     }
352 
353     ctx->recovery_nr++;
354 
355     if (fifo->max_recovery_attempts) {
356         av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d/%d\n",
357                ctx->recovery_nr, fifo->max_recovery_attempts);
358     } else {
359         av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d\n",
360                ctx->recovery_nr);
361     }
362 
363     if (fifo->restart_with_keyframe && fifo->drop_pkts_on_overflow)
364         ctx->drop_until_keyframe = 1;
365 
366     ret = fifo_thread_dispatch_message(ctx, msg);
367     if (ret < 0) {
368         if (is_recoverable(fifo, ret)) {
369             return fifo_thread_process_recovery_failure(ctx, pkt, ret);
370         } else {
371             goto fail;
372         }
373     } else {
374         av_log(avf, AV_LOG_INFO, "Recovery successful\n");
375         ctx->recovery_nr = 0;
376     }
377 
378     return 0;
379 
380 fail:
381     free_message(msg);
382     return ret;
383 }
384 
fifo_thread_recover(FifoThreadContext * ctx,FifoMessage * msg,int err_no)385 static int fifo_thread_recover(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
386 {
387     AVFormatContext *avf = ctx->avf;
388     FifoContext *fifo = avf->priv_data;
389     int ret;
390 
391     do {
392         if (!fifo->recovery_wait_streamtime && ctx->recovery_nr > 0) {
393             int64_t time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
394             int64_t time_to_wait = FFMAX(0, fifo->recovery_wait_time - time_since_recovery);
395             if (time_to_wait)
396                 av_usleep(FFMIN(10000, time_to_wait));
397         }
398 
399         ret = fifo_thread_attempt_recovery(ctx, msg, err_no);
400     } while (ret == AVERROR(EAGAIN) && !fifo->drop_pkts_on_overflow);
401 
402     if (ret == AVERROR(EAGAIN) && fifo->drop_pkts_on_overflow) {
403         if (msg->type == FIFO_WRITE_PACKET)
404             av_packet_unref(&msg->pkt);
405         ret = 0;
406     }
407 
408     return ret;
409 }
410 
fifo_consumer_thread(void * data)411 static void *fifo_consumer_thread(void *data)
412 {
413     AVFormatContext *avf = data;
414     FifoContext *fifo = avf->priv_data;
415     AVThreadMessageQueue *queue = fifo->queue;
416     FifoMessage msg = {fifo->timeshift ? FIFO_NOOP : FIFO_WRITE_HEADER, {0}};
417     int ret;
418 
419     FifoThreadContext fifo_thread_ctx;
420     memset(&fifo_thread_ctx, 0, sizeof(FifoThreadContext));
421     fifo_thread_ctx.avf = avf;
422     fifo_thread_ctx.last_received_dts = AV_NOPTS_VALUE;
423 
424     while (1) {
425         uint8_t just_flushed = 0;
426 
427         if (!fifo_thread_ctx.recovery_nr)
428             ret = fifo_thread_dispatch_message(&fifo_thread_ctx, &msg);
429 
430         if (ret < 0 || fifo_thread_ctx.recovery_nr > 0) {
431             int rec_ret = fifo_thread_recover(&fifo_thread_ctx, &msg, ret);
432             if (rec_ret < 0) {
433                 av_thread_message_queue_set_err_send(queue, rec_ret);
434                 break;
435             }
436         }
437 
438         /* If the queue is full at the moment when fifo_write_packet
439          * attempts to insert new message (packet) to the queue,
440          * it sets the fifo->overflow_flag to 1 and drops packet.
441          * Here in consumer thread, the flag is checked and if it is
442          * set, the queue is flushed and flag cleared. */
443         pthread_mutex_lock(&fifo->overflow_flag_lock);
444         if (fifo->overflow_flag) {
445             av_thread_message_flush(queue);
446             if (fifo->restart_with_keyframe)
447                 fifo_thread_ctx.drop_until_keyframe = 1;
448             fifo->overflow_flag = 0;
449             just_flushed = 1;
450         }
451         pthread_mutex_unlock(&fifo->overflow_flag_lock);
452 
453         if (just_flushed)
454             av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n");
455 
456         if (fifo->timeshift)
457             while (atomic_load_explicit(&fifo->queue_duration, memory_order_relaxed) < fifo->timeshift)
458                 av_usleep(10000);
459 
460         ret = av_thread_message_queue_recv(queue, &msg, 0);
461         if (ret < 0) {
462             av_thread_message_queue_set_err_send(queue, ret);
463             break;
464         }
465     }
466 
467     fifo->write_trailer_ret = fifo_thread_write_trailer(&fifo_thread_ctx);
468 
469     return NULL;
470 }
471 
fifo_mux_init(AVFormatContext * avf,ff_const59 AVOutputFormat * oformat,const char * filename)472 static int fifo_mux_init(AVFormatContext *avf, ff_const59 AVOutputFormat *oformat,
473                          const char *filename)
474 {
475     FifoContext *fifo = avf->priv_data;
476     AVFormatContext *avf2;
477     int ret = 0, i;
478 
479     ret = avformat_alloc_output_context2(&avf2, oformat, NULL, filename);
480     if (ret < 0)
481         return ret;
482 
483     fifo->avf = avf2;
484 
485     avf2->interrupt_callback = avf->interrupt_callback;
486     avf2->max_delay = avf->max_delay;
487     ret = av_dict_copy(&avf2->metadata, avf->metadata, 0);
488     if (ret < 0)
489         return ret;
490     avf2->opaque = avf->opaque;
491     avf2->io_close = avf->io_close;
492     avf2->io_open = avf->io_open;
493     avf2->flags = avf->flags;
494 
495     for (i = 0; i < avf->nb_streams; ++i) {
496         AVStream *st = avformat_new_stream(avf2, NULL);
497         if (!st)
498             return AVERROR(ENOMEM);
499 
500         ret = ff_stream_encode_params_copy(st, avf->streams[i]);
501         if (ret < 0)
502             return ret;
503     }
504 
505     return 0;
506 }
507 
fifo_init(AVFormatContext * avf)508 static int fifo_init(AVFormatContext *avf)
509 {
510     FifoContext *fifo = avf->priv_data;
511     ff_const59 AVOutputFormat *oformat;
512     int ret = 0;
513 
514     if (fifo->recovery_wait_streamtime && !fifo->drop_pkts_on_overflow) {
515         av_log(avf, AV_LOG_ERROR, "recovery_wait_streamtime can be turned on"
516                " only when drop_pkts_on_overflow is also turned on\n");
517         return AVERROR(EINVAL);
518     }
519     atomic_init(&fifo->queue_duration, 0);
520     fifo->last_sent_dts = AV_NOPTS_VALUE;
521 
522     oformat = av_guess_format(fifo->format, avf->url, NULL);
523     if (!oformat) {
524         ret = AVERROR_MUXER_NOT_FOUND;
525         return ret;
526     }
527 
528     ret = fifo_mux_init(avf, oformat, avf->url);
529     if (ret < 0)
530         return ret;
531 
532     ret = av_thread_message_queue_alloc(&fifo->queue, (unsigned) fifo->queue_size,
533                                         sizeof(FifoMessage));
534     if (ret < 0)
535         return ret;
536 
537     av_thread_message_queue_set_free_func(fifo->queue, free_message);
538 
539     ret = pthread_mutex_init(&fifo->overflow_flag_lock, NULL);
540     if (ret < 0)
541         return AVERROR(ret);
542     fifo->overflow_flag_lock_initialized = 1;
543 
544     return 0;
545 }
546 
fifo_write_header(AVFormatContext * avf)547 static int fifo_write_header(AVFormatContext *avf)
548 {
549     FifoContext * fifo = avf->priv_data;
550     int ret;
551 
552     ret = pthread_create(&fifo->writer_thread, NULL, fifo_consumer_thread, avf);
553     if (ret) {
554         av_log(avf, AV_LOG_ERROR, "Failed to start thread: %s\n",
555                av_err2str(AVERROR(ret)));
556         ret = AVERROR(ret);
557     }
558 
559     return ret;
560 }
561 
fifo_write_packet(AVFormatContext * avf,AVPacket * pkt)562 static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt)
563 {
564     FifoContext *fifo = avf->priv_data;
565     FifoMessage msg = {.type = pkt ? FIFO_WRITE_PACKET : FIFO_FLUSH_OUTPUT};
566     int ret;
567 
568     if (pkt) {
569         ret = av_packet_ref(&msg.pkt,pkt);
570         if (ret < 0)
571             return ret;
572     }
573 
574     ret = av_thread_message_queue_send(fifo->queue, &msg,
575                                        fifo->drop_pkts_on_overflow ?
576                                        AV_THREAD_MESSAGE_NONBLOCK : 0);
577     if (ret == AVERROR(EAGAIN)) {
578         uint8_t overflow_set = 0;
579 
580         /* Queue is full, set fifo->overflow_flag to 1
581          * to let consumer thread know the queue should
582          * be flushed. */
583         pthread_mutex_lock(&fifo->overflow_flag_lock);
584         if (!fifo->overflow_flag)
585             fifo->overflow_flag = overflow_set = 1;
586         pthread_mutex_unlock(&fifo->overflow_flag_lock);
587 
588         if (overflow_set)
589             av_log(avf, AV_LOG_WARNING, "FIFO queue full\n");
590         ret = 0;
591         goto fail;
592     } else if (ret < 0) {
593         goto fail;
594     }
595 
596     if (fifo->timeshift && pkt && pkt->dts != AV_NOPTS_VALUE)
597         atomic_fetch_add_explicit(&fifo->queue_duration, next_duration(avf, pkt, &fifo->last_sent_dts), memory_order_relaxed);
598 
599     return ret;
600 fail:
601     if (pkt)
602         av_packet_unref(&msg.pkt);
603     return ret;
604 }
605 
fifo_write_trailer(AVFormatContext * avf)606 static int fifo_write_trailer(AVFormatContext *avf)
607 {
608     FifoContext *fifo= avf->priv_data;
609     int ret;
610 
611     av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF);
612     if (fifo->timeshift) {
613         int64_t now = av_gettime_relative();
614         int64_t elapsed = 0;
615         FifoMessage msg = {FIFO_NOOP};
616         do {
617             int64_t delay = av_gettime_relative() - now;
618             if (delay < 0) { // Discontinuity?
619                 delay = 10000;
620                 now = av_gettime_relative();
621             } else {
622                 now += delay;
623             }
624             atomic_fetch_add_explicit(&fifo->queue_duration, delay, memory_order_relaxed);
625             elapsed += delay;
626             if (elapsed > fifo->timeshift)
627                 break;
628             av_usleep(10000);
629             ret = av_thread_message_queue_send(fifo->queue, &msg, AV_THREAD_MESSAGE_NONBLOCK);
630         } while (ret >= 0 || ret == AVERROR(EAGAIN));
631         atomic_store(&fifo->queue_duration, INT64_MAX);
632     }
633 
634     ret = pthread_join(fifo->writer_thread, NULL);
635     if (ret < 0) {
636         av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n",
637                av_err2str(AVERROR(ret)));
638         return AVERROR(ret);
639     }
640 
641     ret = fifo->write_trailer_ret;
642     return ret;
643 }
644 
fifo_deinit(AVFormatContext * avf)645 static void fifo_deinit(AVFormatContext *avf)
646 {
647     FifoContext *fifo = avf->priv_data;
648 
649     avformat_free_context(fifo->avf);
650     av_thread_message_queue_free(&fifo->queue);
651     if (fifo->overflow_flag_lock_initialized)
652         pthread_mutex_destroy(&fifo->overflow_flag_lock);
653 }
654 
655 #define OFFSET(x) offsetof(FifoContext, x)
656 static const AVOption options[] = {
657         {"fifo_format", "Target muxer", OFFSET(format),
658          AV_OPT_TYPE_STRING, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
659 
660         {"queue_size", "Size of fifo queue", OFFSET(queue_size),
661          AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_QUEUE_SIZE}, 1, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM},
662 
663         {"format_opts", "Options to be passed to underlying muxer", OFFSET(format_options),
664          AV_OPT_TYPE_DICT, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
665 
666         {"drop_pkts_on_overflow", "Drop packets on fifo queue overflow not to block encoder", OFFSET(drop_pkts_on_overflow),
667          AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
668 
669         {"restart_with_keyframe", "Wait for keyframe when restarting output", OFFSET(restart_with_keyframe),
670          AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
671 
672         {"attempt_recovery", "Attempt recovery in case of failure", OFFSET(attempt_recovery),
673         AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
674 
675         {"max_recovery_attempts", "Maximal number of recovery attempts", OFFSET(max_recovery_attempts),
676          AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS}, 0, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM},
677 
678         {"recovery_wait_time", "Waiting time between recovery attempts", OFFSET(recovery_wait_time),
679          AV_OPT_TYPE_DURATION, {.i64 = FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM},
680 
681         {"recovery_wait_streamtime", "Use stream time instead of real time while waiting for recovery",
682          OFFSET(recovery_wait_streamtime), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
683 
684         {"recover_any_error", "Attempt recovery regardless of type of the error", OFFSET(recover_any_error),
685          AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
686 
687         {"timeshift", "Delay fifo output", OFFSET(timeshift),
688          AV_OPT_TYPE_DURATION, {.i64 = 0}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM},
689 
690         {NULL},
691 };
692 
693 static const AVClass fifo_muxer_class = {
694     .class_name = "Fifo muxer",
695     .item_name  = av_default_item_name,
696     .option     = options,
697     .version    = LIBAVUTIL_VERSION_INT,
698 };
699 
700 AVOutputFormat ff_fifo_muxer = {
701     .name           = "fifo",
702     .long_name      = NULL_IF_CONFIG_SMALL("FIFO queue pseudo-muxer"),
703     .priv_data_size = sizeof(FifoContext),
704     .init           = fifo_init,
705     .write_header   = fifo_write_header,
706     .write_packet   = fifo_write_packet,
707     .write_trailer  = fifo_write_trailer,
708     .deinit         = fifo_deinit,
709     .priv_class     = &fifo_muxer_class,
710     .flags          = AVFMT_NOFILE | AVFMT_ALLOW_FLUSH | AVFMT_TS_NEGATIVE,
711 };
712