• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * FIFO pseudo-muxer
3  * Copyright (c) 2016 Jan Sebechlebsky
4  *
5  * This file is part of FFmpeg.
6  *
7  * FFmpeg is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public License
9  * as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * FFmpeg is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with FFmpeg; if not, write to the Free Software * Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21 
22 #include <stdatomic.h>
23 
24 #include "libavutil/avassert.h"
25 #include "libavutil/opt.h"
26 #include "libavutil/time.h"
27 #include "libavutil/thread.h"
28 #include "libavutil/threadmessage.h"
29 #include "avformat.h"
30 #include "internal.h"
31 #include "mux.h"
32 
33 #define FIFO_DEFAULT_QUEUE_SIZE              60
34 #define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS   0
35 #define FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC 5000000 // 5 seconds
36 
37 typedef struct FifoContext {
38     const AVClass *class;
39     AVFormatContext *avf;
40 
41     char *format;
42     AVDictionary *format_options;
43 
44     int queue_size;
45     AVThreadMessageQueue *queue;
46 
47     pthread_t writer_thread;
48 
49     /* Return value of last write_trailer_call */
50     int write_trailer_ret;
51 
52     /* Time to wait before next recovery attempt
53      * This can refer to the time in processed stream,
54      * or real time. */
55     int64_t recovery_wait_time;
56 
57     /* Maximal number of unsuccessful successive recovery attempts */
58     int max_recovery_attempts;
59 
60     /* Whether to attempt recovery from failure */
61     int attempt_recovery;
62 
63     /* If >0 stream time will be used when waiting
64      * for the recovery attempt instead of real time */
65     int recovery_wait_streamtime;
66 
67     /* If >0 recovery will be attempted regardless of error code
68      * (except AVERROR_EXIT, so exit request is never ignored) */
69     int recover_any_error;
70 
71     /* Whether to drop packets in case the queue is full. */
72     int drop_pkts_on_overflow;
73 
74     /* Whether to wait for keyframe when recovering
75      * from failure or queue overflow */
76     int restart_with_keyframe;
77 
78     pthread_mutex_t overflow_flag_lock;
79     int overflow_flag_lock_initialized;
80     /* Value > 0 signals queue overflow */
81     volatile uint8_t overflow_flag;
82 
83     atomic_int_least64_t queue_duration;
84     int64_t last_sent_dts;
85     int64_t timeshift;
86 } FifoContext;
87 
88 typedef struct FifoThreadContext {
89     AVFormatContext *avf;
90 
91     /* Timestamp of last failure.
92      * This is either pts in case stream time is used,
93      * or microseconds as returned by av_getttime_relative() */
94     int64_t last_recovery_ts;
95 
96     /* Number of current recovery process
97      * Value > 0 means we are in recovery process */
98     int recovery_nr;
99 
100     /* If > 0 all frames will be dropped until keyframe is received */
101     uint8_t drop_until_keyframe;
102 
103     /* Value > 0 means that the previous write_header call was successful
104      * so finalization by calling write_trailer and ff_io_close must be done
105      * before exiting / reinitialization of underlying muxer */
106     uint8_t header_written;
107 
108     int64_t last_received_dts;
109 } FifoThreadContext;
110 
111 typedef enum FifoMessageType {
112     FIFO_NOOP,
113     FIFO_WRITE_HEADER,
114     FIFO_WRITE_PACKET,
115     FIFO_FLUSH_OUTPUT
116 } FifoMessageType;
117 
118 typedef struct FifoMessage {
119     FifoMessageType type;
120     AVPacket pkt;
121 } FifoMessage;
122 
fifo_thread_write_header(FifoThreadContext * ctx)123 static int fifo_thread_write_header(FifoThreadContext *ctx)
124 {
125     AVFormatContext *avf = ctx->avf;
126     FifoContext *fifo = avf->priv_data;
127     AVFormatContext *avf2 = fifo->avf;
128     AVDictionary *format_options = NULL;
129     int ret, i;
130 
131     ret = av_dict_copy(&format_options, fifo->format_options, 0);
132     if (ret < 0)
133         goto end;
134 
135     ret = ff_format_output_open(avf2, avf->url, &format_options);
136     if (ret < 0) {
137         av_log(avf, AV_LOG_ERROR, "Error opening %s: %s\n", avf->url,
138                av_err2str(ret));
139         goto end;
140     }
141 
142     for (i = 0;i < avf2->nb_streams; i++)
143         ffstream(avf2->streams[i])->cur_dts = 0;
144 
145     ret = avformat_write_header(avf2, &format_options);
146     if (!ret)
147         ctx->header_written = 1;
148 
149     // Check for options unrecognized by underlying muxer
150     if (format_options) {
151         AVDictionaryEntry *entry = NULL;
152         while ((entry = av_dict_get(format_options, "", entry, AV_DICT_IGNORE_SUFFIX)))
153             av_log(avf2, AV_LOG_ERROR, "Unknown option '%s'\n", entry->key);
154         ret = AVERROR(EINVAL);
155     }
156 
157 end:
158     av_dict_free(&format_options);
159     return ret;
160 }
161 
fifo_thread_flush_output(FifoThreadContext * ctx)162 static int fifo_thread_flush_output(FifoThreadContext *ctx)
163 {
164     AVFormatContext *avf = ctx->avf;
165     FifoContext *fifo = avf->priv_data;
166     AVFormatContext *avf2 = fifo->avf;
167 
168     return av_write_frame(avf2, NULL);
169 }
170 
next_duration(AVFormatContext * avf,AVPacket * pkt,int64_t * last_dts)171 static int64_t next_duration(AVFormatContext *avf, AVPacket *pkt, int64_t *last_dts)
172 {
173     AVStream *st = avf->streams[pkt->stream_index];
174     int64_t dts = av_rescale_q(pkt->dts, st->time_base, AV_TIME_BASE_Q);
175     int64_t duration = (*last_dts == AV_NOPTS_VALUE ? 0 : dts - *last_dts);
176     *last_dts = dts;
177     return duration;
178 }
179 
fifo_thread_write_packet(FifoThreadContext * ctx,AVPacket * pkt)180 static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt)
181 {
182     AVFormatContext *avf = ctx->avf;
183     FifoContext *fifo = avf->priv_data;
184     AVFormatContext *avf2 = fifo->avf;
185     AVRational src_tb, dst_tb;
186     int ret, s_idx;
187     int64_t orig_pts, orig_dts, orig_duration;
188 
189     if (fifo->timeshift && pkt->dts != AV_NOPTS_VALUE)
190         atomic_fetch_sub_explicit(&fifo->queue_duration, next_duration(avf, pkt, &ctx->last_received_dts), memory_order_relaxed);
191 
192     if (ctx->drop_until_keyframe) {
193         if (pkt->flags & AV_PKT_FLAG_KEY) {
194             ctx->drop_until_keyframe = 0;
195             av_log(avf, AV_LOG_VERBOSE, "Keyframe received, recovering...\n");
196         } else {
197             av_log(avf, AV_LOG_VERBOSE, "Dropping non-keyframe packet\n");
198             av_packet_unref(pkt);
199             return 0;
200         }
201     }
202 
203     orig_pts = pkt->pts;
204     orig_dts = pkt->dts;
205     orig_duration = pkt->duration;
206     s_idx = pkt->stream_index;
207     src_tb = avf->streams[s_idx]->time_base;
208     dst_tb = avf2->streams[s_idx]->time_base;
209     av_packet_rescale_ts(pkt, src_tb, dst_tb);
210 
211     ret = av_write_frame(avf2, pkt);
212     if (ret >= 0) {
213         av_packet_unref(pkt);
214     } else {
215         // avoid scaling twice
216         pkt->pts = orig_pts;
217         pkt->dts = orig_dts;
218         pkt->duration = orig_duration;
219     }
220     return ret;
221 }
222 
fifo_thread_write_trailer(FifoThreadContext * ctx)223 static int fifo_thread_write_trailer(FifoThreadContext *ctx)
224 {
225     AVFormatContext *avf = ctx->avf;
226     FifoContext *fifo = avf->priv_data;
227     AVFormatContext *avf2 = fifo->avf;
228     int ret;
229 
230     if (!ctx->header_written)
231         return 0;
232 
233     ret = av_write_trailer(avf2);
234     ff_format_io_close(avf2, &avf2->pb);
235 
236     return ret;
237 }
238 
fifo_thread_dispatch_message(FifoThreadContext * ctx,FifoMessage * msg)239 static int fifo_thread_dispatch_message(FifoThreadContext *ctx, FifoMessage *msg)
240 {
241     int ret = AVERROR(EINVAL);
242 
243     if (msg->type == FIFO_NOOP)
244         return 0;
245 
246     if (!ctx->header_written) {
247         ret = fifo_thread_write_header(ctx);
248         if (ret < 0)
249             return ret;
250     }
251 
252     switch(msg->type) {
253     case FIFO_WRITE_HEADER:
254         av_assert0(ret >= 0);
255         return ret;
256     case FIFO_WRITE_PACKET:
257         return fifo_thread_write_packet(ctx, &msg->pkt);
258     case FIFO_FLUSH_OUTPUT:
259         return fifo_thread_flush_output(ctx);
260     }
261 
262     av_assert0(0);
263     return AVERROR(EINVAL);
264 }
265 
is_recoverable(const FifoContext * fifo,int err_no)266 static int is_recoverable(const FifoContext *fifo, int err_no) {
267     if (!fifo->attempt_recovery)
268         return 0;
269 
270     if (fifo->recover_any_error)
271         return err_no != AVERROR_EXIT;
272 
273     switch (err_no) {
274     case AVERROR(EINVAL):
275     case AVERROR(ENOSYS):
276     case AVERROR_EOF:
277     case AVERROR_EXIT:
278     case AVERROR_PATCHWELCOME:
279         return 0;
280     default:
281         return 1;
282     }
283 }
284 
free_message(void * msg)285 static void free_message(void *msg)
286 {
287     FifoMessage *fifo_msg = msg;
288 
289     if (fifo_msg->type == FIFO_WRITE_PACKET)
290         av_packet_unref(&fifo_msg->pkt);
291 }
292 
fifo_thread_process_recovery_failure(FifoThreadContext * ctx,AVPacket * pkt,int err_no)293 static int fifo_thread_process_recovery_failure(FifoThreadContext *ctx, AVPacket *pkt,
294                                                 int err_no)
295 {
296     AVFormatContext *avf = ctx->avf;
297     FifoContext *fifo = avf->priv_data;
298     int ret;
299 
300     av_log(avf, AV_LOG_INFO, "Recovery failed: %s\n",
301            av_err2str(err_no));
302 
303     if (fifo->recovery_wait_streamtime) {
304         if (pkt->pts == AV_NOPTS_VALUE)
305             av_log(avf, AV_LOG_WARNING, "Packet does not contain presentation"
306                    " timestamp, recovery will be attempted immediately");
307         ctx->last_recovery_ts = pkt->pts;
308     } else {
309         ctx->last_recovery_ts = av_gettime_relative();
310     }
311 
312     if (fifo->max_recovery_attempts &&
313         ctx->recovery_nr >= fifo->max_recovery_attempts) {
314         av_log(avf, AV_LOG_ERROR,
315                "Maximal number of %d recovery attempts reached.\n",
316                fifo->max_recovery_attempts);
317         ret = err_no;
318     } else {
319         ret = AVERROR(EAGAIN);
320     }
321 
322     return ret;
323 }
324 
fifo_thread_attempt_recovery(FifoThreadContext * ctx,FifoMessage * msg,int err_no)325 static int fifo_thread_attempt_recovery(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
326 {
327     AVFormatContext *avf = ctx->avf;
328     FifoContext *fifo = avf->priv_data;
329     AVPacket *pkt = &msg->pkt;
330     int64_t time_since_recovery;
331     int ret;
332 
333     if (!is_recoverable(fifo, err_no)) {
334         ret = err_no;
335         goto fail;
336     }
337 
338     if (ctx->header_written) {
339         fifo->write_trailer_ret = fifo_thread_write_trailer(ctx);
340         ctx->header_written = 0;
341     }
342 
343     if (!ctx->recovery_nr) {
344         ctx->last_recovery_ts = fifo->recovery_wait_streamtime ?
345                                 AV_NOPTS_VALUE : 0;
346     } else {
347         if (fifo->recovery_wait_streamtime) {
348             if (ctx->last_recovery_ts == AV_NOPTS_VALUE) {
349                 AVRational tb = avf->streams[pkt->stream_index]->time_base;
350                 time_since_recovery = av_rescale_q(pkt->pts - ctx->last_recovery_ts,
351                                                    tb, AV_TIME_BASE_Q);
352             } else {
353                 /* Enforce recovery immediately */
354                 time_since_recovery = fifo->recovery_wait_time;
355             }
356         } else {
357             time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
358         }
359 
360         if (time_since_recovery < fifo->recovery_wait_time)
361             return AVERROR(EAGAIN);
362     }
363 
364     ctx->recovery_nr++;
365 
366     if (fifo->max_recovery_attempts) {
367         av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d/%d\n",
368                ctx->recovery_nr, fifo->max_recovery_attempts);
369     } else {
370         av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d\n",
371                ctx->recovery_nr);
372     }
373 
374     if (fifo->restart_with_keyframe && fifo->drop_pkts_on_overflow)
375         ctx->drop_until_keyframe = 1;
376 
377     ret = fifo_thread_dispatch_message(ctx, msg);
378     if (ret < 0) {
379         if (is_recoverable(fifo, ret)) {
380             return fifo_thread_process_recovery_failure(ctx, pkt, ret);
381         } else {
382             goto fail;
383         }
384     } else {
385         av_log(avf, AV_LOG_INFO, "Recovery successful\n");
386         ctx->recovery_nr = 0;
387     }
388 
389     return 0;
390 
391 fail:
392     free_message(msg);
393     return ret;
394 }
395 
fifo_thread_recover(FifoThreadContext * ctx,FifoMessage * msg,int err_no)396 static int fifo_thread_recover(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
397 {
398     AVFormatContext *avf = ctx->avf;
399     FifoContext *fifo = avf->priv_data;
400     int ret;
401 
402     do {
403         if (!fifo->recovery_wait_streamtime && ctx->recovery_nr > 0) {
404             int64_t time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
405             int64_t time_to_wait = FFMAX(0, fifo->recovery_wait_time - time_since_recovery);
406             if (time_to_wait)
407                 av_usleep(FFMIN(10000, time_to_wait));
408         }
409 
410         ret = fifo_thread_attempt_recovery(ctx, msg, err_no);
411     } while (ret == AVERROR(EAGAIN) && !fifo->drop_pkts_on_overflow);
412 
413     if (ret == AVERROR(EAGAIN) && fifo->drop_pkts_on_overflow) {
414         if (msg->type == FIFO_WRITE_PACKET)
415             av_packet_unref(&msg->pkt);
416         ret = 0;
417     }
418 
419     return ret;
420 }
421 
fifo_consumer_thread(void * data)422 static void *fifo_consumer_thread(void *data)
423 {
424     AVFormatContext *avf = data;
425     FifoContext *fifo = avf->priv_data;
426     AVThreadMessageQueue *queue = fifo->queue;
427     FifoMessage msg = {fifo->timeshift ? FIFO_NOOP : FIFO_WRITE_HEADER, {0}};
428     int ret;
429 
430     FifoThreadContext fifo_thread_ctx;
431     memset(&fifo_thread_ctx, 0, sizeof(FifoThreadContext));
432     fifo_thread_ctx.avf = avf;
433     fifo_thread_ctx.last_received_dts = AV_NOPTS_VALUE;
434 
435     while (1) {
436         uint8_t just_flushed = 0;
437 
438         if (!fifo_thread_ctx.recovery_nr)
439             ret = fifo_thread_dispatch_message(&fifo_thread_ctx, &msg);
440 
441         if (ret < 0 || fifo_thread_ctx.recovery_nr > 0) {
442             int rec_ret = fifo_thread_recover(&fifo_thread_ctx, &msg, ret);
443             if (rec_ret < 0) {
444                 av_thread_message_queue_set_err_send(queue, rec_ret);
445                 break;
446             }
447         }
448 
449         /* If the queue is full at the moment when fifo_write_packet
450          * attempts to insert new message (packet) to the queue,
451          * it sets the fifo->overflow_flag to 1 and drops packet.
452          * Here in consumer thread, the flag is checked and if it is
453          * set, the queue is flushed and flag cleared. */
454         pthread_mutex_lock(&fifo->overflow_flag_lock);
455         if (fifo->overflow_flag) {
456             av_thread_message_flush(queue);
457             if (fifo->restart_with_keyframe)
458                 fifo_thread_ctx.drop_until_keyframe = 1;
459             fifo->overflow_flag = 0;
460             just_flushed = 1;
461         }
462         pthread_mutex_unlock(&fifo->overflow_flag_lock);
463 
464         if (just_flushed)
465             av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n");
466 
467         if (fifo->timeshift)
468             while (atomic_load_explicit(&fifo->queue_duration, memory_order_relaxed) < fifo->timeshift)
469                 av_usleep(10000);
470 
471         ret = av_thread_message_queue_recv(queue, &msg, 0);
472         if (ret < 0) {
473             av_thread_message_queue_set_err_send(queue, ret);
474             break;
475         }
476     }
477 
478     fifo->write_trailer_ret = fifo_thread_write_trailer(&fifo_thread_ctx);
479 
480     return NULL;
481 }
482 
fifo_mux_init(AVFormatContext * avf,const AVOutputFormat * oformat,const char * filename)483 static int fifo_mux_init(AVFormatContext *avf, const AVOutputFormat *oformat,
484                          const char *filename)
485 {
486     FifoContext *fifo = avf->priv_data;
487     AVFormatContext *avf2;
488     int ret = 0, i;
489 
490     ret = avformat_alloc_output_context2(&avf2, oformat, NULL, filename);
491     if (ret < 0)
492         return ret;
493 
494     fifo->avf = avf2;
495 
496     avf2->interrupt_callback = avf->interrupt_callback;
497     avf2->max_delay = avf->max_delay;
498     ret = av_dict_copy(&avf2->metadata, avf->metadata, 0);
499     if (ret < 0)
500         return ret;
501     avf2->opaque = avf->opaque;
502     avf2->io_close = avf->io_close;
503     avf2->io_close2 = avf->io_close2;
504     avf2->io_open = avf->io_open;
505     avf2->flags = avf->flags;
506 
507     for (i = 0; i < avf->nb_streams; ++i) {
508         AVStream *st = avformat_new_stream(avf2, NULL);
509         if (!st)
510             return AVERROR(ENOMEM);
511 
512         ret = ff_stream_encode_params_copy(st, avf->streams[i]);
513         if (ret < 0)
514             return ret;
515     }
516 
517     return 0;
518 }
519 
fifo_init(AVFormatContext * avf)520 static int fifo_init(AVFormatContext *avf)
521 {
522     FifoContext *fifo = avf->priv_data;
523     const AVOutputFormat *oformat;
524     int ret = 0;
525 
526     if (fifo->recovery_wait_streamtime && !fifo->drop_pkts_on_overflow) {
527         av_log(avf, AV_LOG_ERROR, "recovery_wait_streamtime can be turned on"
528                " only when drop_pkts_on_overflow is also turned on\n");
529         return AVERROR(EINVAL);
530     }
531     atomic_init(&fifo->queue_duration, 0);
532     fifo->last_sent_dts = AV_NOPTS_VALUE;
533 
534     oformat = av_guess_format(fifo->format, avf->url, NULL);
535     if (!oformat) {
536         ret = AVERROR_MUXER_NOT_FOUND;
537         return ret;
538     }
539 
540     ret = fifo_mux_init(avf, oformat, avf->url);
541     if (ret < 0)
542         return ret;
543 
544     ret = av_thread_message_queue_alloc(&fifo->queue, (unsigned) fifo->queue_size,
545                                         sizeof(FifoMessage));
546     if (ret < 0)
547         return ret;
548 
549     av_thread_message_queue_set_free_func(fifo->queue, free_message);
550 
551     ret = pthread_mutex_init(&fifo->overflow_flag_lock, NULL);
552     if (ret < 0)
553         return AVERROR(ret);
554     fifo->overflow_flag_lock_initialized = 1;
555 
556     return 0;
557 }
558 
fifo_write_header(AVFormatContext * avf)559 static int fifo_write_header(AVFormatContext *avf)
560 {
561     FifoContext * fifo = avf->priv_data;
562     int ret;
563 
564     ret = pthread_create(&fifo->writer_thread, NULL, fifo_consumer_thread, avf);
565     if (ret) {
566         av_log(avf, AV_LOG_ERROR, "Failed to start thread: %s\n",
567                av_err2str(AVERROR(ret)));
568         ret = AVERROR(ret);
569     }
570 
571     return ret;
572 }
573 
fifo_write_packet(AVFormatContext * avf,AVPacket * pkt)574 static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt)
575 {
576     FifoContext *fifo = avf->priv_data;
577     FifoMessage msg = {.type = pkt ? FIFO_WRITE_PACKET : FIFO_FLUSH_OUTPUT};
578     int ret;
579 
580     if (pkt) {
581         ret = av_packet_ref(&msg.pkt,pkt);
582         if (ret < 0)
583             return ret;
584     }
585 
586     ret = av_thread_message_queue_send(fifo->queue, &msg,
587                                        fifo->drop_pkts_on_overflow ?
588                                        AV_THREAD_MESSAGE_NONBLOCK : 0);
589     if (ret == AVERROR(EAGAIN)) {
590         uint8_t overflow_set = 0;
591 
592         /* Queue is full, set fifo->overflow_flag to 1
593          * to let consumer thread know the queue should
594          * be flushed. */
595         pthread_mutex_lock(&fifo->overflow_flag_lock);
596         if (!fifo->overflow_flag)
597             fifo->overflow_flag = overflow_set = 1;
598         pthread_mutex_unlock(&fifo->overflow_flag_lock);
599 
600         if (overflow_set)
601             av_log(avf, AV_LOG_WARNING, "FIFO queue full\n");
602         ret = 0;
603         goto fail;
604     } else if (ret < 0) {
605         goto fail;
606     }
607 
608     if (fifo->timeshift && pkt && pkt->dts != AV_NOPTS_VALUE)
609         atomic_fetch_add_explicit(&fifo->queue_duration, next_duration(avf, pkt, &fifo->last_sent_dts), memory_order_relaxed);
610 
611     return ret;
612 fail:
613     if (pkt)
614         av_packet_unref(&msg.pkt);
615     return ret;
616 }
617 
fifo_write_trailer(AVFormatContext * avf)618 static int fifo_write_trailer(AVFormatContext *avf)
619 {
620     FifoContext *fifo= avf->priv_data;
621     int ret;
622 
623     av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF);
624     if (fifo->timeshift) {
625         int64_t now = av_gettime_relative();
626         int64_t elapsed = 0;
627         FifoMessage msg = {FIFO_NOOP};
628         do {
629             int64_t delay = av_gettime_relative() - now;
630             if (delay < 0) { // Discontinuity?
631                 delay = 10000;
632                 now = av_gettime_relative();
633             } else {
634                 now += delay;
635             }
636             atomic_fetch_add_explicit(&fifo->queue_duration, delay, memory_order_relaxed);
637             elapsed += delay;
638             if (elapsed > fifo->timeshift)
639                 break;
640             av_usleep(10000);
641             ret = av_thread_message_queue_send(fifo->queue, &msg, AV_THREAD_MESSAGE_NONBLOCK);
642         } while (ret >= 0 || ret == AVERROR(EAGAIN));
643         atomic_store(&fifo->queue_duration, INT64_MAX);
644     }
645 
646     ret = pthread_join(fifo->writer_thread, NULL);
647     if (ret < 0) {
648         av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n",
649                av_err2str(AVERROR(ret)));
650         return AVERROR(ret);
651     }
652 
653     ret = fifo->write_trailer_ret;
654     return ret;
655 }
656 
fifo_deinit(AVFormatContext * avf)657 static void fifo_deinit(AVFormatContext *avf)
658 {
659     FifoContext *fifo = avf->priv_data;
660 
661     avformat_free_context(fifo->avf);
662     av_thread_message_queue_free(&fifo->queue);
663     if (fifo->overflow_flag_lock_initialized)
664         pthread_mutex_destroy(&fifo->overflow_flag_lock);
665 }
666 
667 #define OFFSET(x) offsetof(FifoContext, x)
668 static const AVOption options[] = {
669         {"fifo_format", "Target muxer", OFFSET(format),
670          AV_OPT_TYPE_STRING, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
671 
672         {"queue_size", "Size of fifo queue", OFFSET(queue_size),
673          AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_QUEUE_SIZE}, 1, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM},
674 
675         {"format_opts", "Options to be passed to underlying muxer", OFFSET(format_options),
676          AV_OPT_TYPE_DICT, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
677 
678         {"drop_pkts_on_overflow", "Drop packets on fifo queue overflow not to block encoder", OFFSET(drop_pkts_on_overflow),
679          AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
680 
681         {"restart_with_keyframe", "Wait for keyframe when restarting output", OFFSET(restart_with_keyframe),
682          AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
683 
684         {"attempt_recovery", "Attempt recovery in case of failure", OFFSET(attempt_recovery),
685         AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
686 
687         {"max_recovery_attempts", "Maximal number of recovery attempts", OFFSET(max_recovery_attempts),
688          AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS}, 0, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM},
689 
690         {"recovery_wait_time", "Waiting time between recovery attempts", OFFSET(recovery_wait_time),
691          AV_OPT_TYPE_DURATION, {.i64 = FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM},
692 
693         {"recovery_wait_streamtime", "Use stream time instead of real time while waiting for recovery",
694          OFFSET(recovery_wait_streamtime), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
695 
696         {"recover_any_error", "Attempt recovery regardless of type of the error", OFFSET(recover_any_error),
697          AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
698 
699         {"timeshift", "Delay fifo output", OFFSET(timeshift),
700          AV_OPT_TYPE_DURATION, {.i64 = 0}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM},
701 
702         {NULL},
703 };
704 
705 static const AVClass fifo_muxer_class = {
706     .class_name = "Fifo muxer",
707     .item_name  = av_default_item_name,
708     .option     = options,
709     .version    = LIBAVUTIL_VERSION_INT,
710 };
711 
712 const AVOutputFormat ff_fifo_muxer = {
713     .name           = "fifo",
714     .long_name      = NULL_IF_CONFIG_SMALL("FIFO queue pseudo-muxer"),
715     .priv_data_size = sizeof(FifoContext),
716     .init           = fifo_init,
717     .write_header   = fifo_write_header,
718     .write_packet   = fifo_write_packet,
719     .write_trailer  = fifo_write_trailer,
720     .deinit         = fifo_deinit,
721     .priv_class     = &fifo_muxer_class,
722     .flags          = AVFMT_NOFILE | AVFMT_ALLOW_FLUSH | AVFMT_TS_NEGATIVE,
723 };
724