1 /*
2 * FIFO pseudo-muxer
3 * Copyright (c) 2016 Jan Sebechlebsky
4 *
5 * This file is part of FFmpeg.
6 *
7 * FFmpeg is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public License
9 * as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * FFmpeg is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public License
18 * along with FFmpeg; if not, write to the Free Software * Foundation, Inc.,
19 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22 #include <stdatomic.h>
23
24 #include "libavutil/avassert.h"
25 #include "libavutil/opt.h"
26 #include "libavutil/time.h"
27 #include "libavutil/thread.h"
28 #include "libavutil/threadmessage.h"
29 #include "avformat.h"
30 #include "internal.h"
31
32 #define FIFO_DEFAULT_QUEUE_SIZE 60
33 #define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS 0
34 #define FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC 5000000 // 5 seconds
35
36 typedef struct FifoContext {
37 const AVClass *class;
38 AVFormatContext *avf;
39
40 char *format;
41 AVDictionary *format_options;
42
43 int queue_size;
44 AVThreadMessageQueue *queue;
45
46 pthread_t writer_thread;
47
48 /* Return value of last write_trailer_call */
49 int write_trailer_ret;
50
51 /* Time to wait before next recovery attempt
52 * This can refer to the time in processed stream,
53 * or real time. */
54 int64_t recovery_wait_time;
55
56 /* Maximal number of unsuccessful successive recovery attempts */
57 int max_recovery_attempts;
58
59 /* Whether to attempt recovery from failure */
60 int attempt_recovery;
61
62 /* If >0 stream time will be used when waiting
63 * for the recovery attempt instead of real time */
64 int recovery_wait_streamtime;
65
66 /* If >0 recovery will be attempted regardless of error code
67 * (except AVERROR_EXIT, so exit request is never ignored) */
68 int recover_any_error;
69
70 /* Whether to drop packets in case the queue is full. */
71 int drop_pkts_on_overflow;
72
73 /* Whether to wait for keyframe when recovering
74 * from failure or queue overflow */
75 int restart_with_keyframe;
76
77 pthread_mutex_t overflow_flag_lock;
78 int overflow_flag_lock_initialized;
79 /* Value > 0 signals queue overflow */
80 volatile uint8_t overflow_flag;
81
82 atomic_int_least64_t queue_duration;
83 int64_t last_sent_dts;
84 int64_t timeshift;
85 } FifoContext;
86
87 typedef struct FifoThreadContext {
88 AVFormatContext *avf;
89
90 /* Timestamp of last failure.
91 * This is either pts in case stream time is used,
92 * or microseconds as returned by av_getttime_relative() */
93 int64_t last_recovery_ts;
94
95 /* Number of current recovery process
96 * Value > 0 means we are in recovery process */
97 int recovery_nr;
98
99 /* If > 0 all frames will be dropped until keyframe is received */
100 uint8_t drop_until_keyframe;
101
102 /* Value > 0 means that the previous write_header call was successful
103 * so finalization by calling write_trailer and ff_io_close must be done
104 * before exiting / reinitialization of underlying muxer */
105 uint8_t header_written;
106
107 int64_t last_received_dts;
108 } FifoThreadContext;
109
110 typedef enum FifoMessageType {
111 FIFO_NOOP,
112 FIFO_WRITE_HEADER,
113 FIFO_WRITE_PACKET,
114 FIFO_FLUSH_OUTPUT
115 } FifoMessageType;
116
117 typedef struct FifoMessage {
118 FifoMessageType type;
119 AVPacket pkt;
120 } FifoMessage;
121
fifo_thread_write_header(FifoThreadContext * ctx)122 static int fifo_thread_write_header(FifoThreadContext *ctx)
123 {
124 AVFormatContext *avf = ctx->avf;
125 FifoContext *fifo = avf->priv_data;
126 AVFormatContext *avf2 = fifo->avf;
127 AVDictionary *format_options = NULL;
128 int ret, i;
129
130 ret = av_dict_copy(&format_options, fifo->format_options, 0);
131 if (ret < 0)
132 return ret;
133
134 ret = ff_format_output_open(avf2, avf->url, &format_options);
135 if (ret < 0) {
136 av_log(avf, AV_LOG_ERROR, "Error opening %s: %s\n", avf->url,
137 av_err2str(ret));
138 goto end;
139 }
140
141 for (i = 0;i < avf2->nb_streams; i++)
142 avf2->streams[i]->cur_dts = 0;
143
144 ret = avformat_write_header(avf2, &format_options);
145 if (!ret)
146 ctx->header_written = 1;
147
148 // Check for options unrecognized by underlying muxer
149 if (format_options) {
150 AVDictionaryEntry *entry = NULL;
151 while ((entry = av_dict_get(format_options, "", entry, AV_DICT_IGNORE_SUFFIX)))
152 av_log(avf2, AV_LOG_ERROR, "Unknown option '%s'\n", entry->key);
153 ret = AVERROR(EINVAL);
154 }
155
156 end:
157 av_dict_free(&format_options);
158 return ret;
159 }
160
fifo_thread_flush_output(FifoThreadContext * ctx)161 static int fifo_thread_flush_output(FifoThreadContext *ctx)
162 {
163 AVFormatContext *avf = ctx->avf;
164 FifoContext *fifo = avf->priv_data;
165 AVFormatContext *avf2 = fifo->avf;
166
167 return av_write_frame(avf2, NULL);
168 }
169
next_duration(AVFormatContext * avf,AVPacket * pkt,int64_t * last_dts)170 static int64_t next_duration(AVFormatContext *avf, AVPacket *pkt, int64_t *last_dts)
171 {
172 AVStream *st = avf->streams[pkt->stream_index];
173 int64_t dts = av_rescale_q(pkt->dts, st->time_base, AV_TIME_BASE_Q);
174 int64_t duration = (*last_dts == AV_NOPTS_VALUE ? 0 : dts - *last_dts);
175 *last_dts = dts;
176 return duration;
177 }
178
fifo_thread_write_packet(FifoThreadContext * ctx,AVPacket * pkt)179 static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt)
180 {
181 AVFormatContext *avf = ctx->avf;
182 FifoContext *fifo = avf->priv_data;
183 AVFormatContext *avf2 = fifo->avf;
184 AVRational src_tb, dst_tb;
185 int ret, s_idx;
186
187 if (fifo->timeshift && pkt->dts != AV_NOPTS_VALUE)
188 atomic_fetch_sub_explicit(&fifo->queue_duration, next_duration(avf, pkt, &ctx->last_received_dts), memory_order_relaxed);
189
190 if (ctx->drop_until_keyframe) {
191 if (pkt->flags & AV_PKT_FLAG_KEY) {
192 ctx->drop_until_keyframe = 0;
193 av_log(avf, AV_LOG_VERBOSE, "Keyframe received, recovering...\n");
194 } else {
195 av_log(avf, AV_LOG_VERBOSE, "Dropping non-keyframe packet\n");
196 av_packet_unref(pkt);
197 return 0;
198 }
199 }
200
201 s_idx = pkt->stream_index;
202 src_tb = avf->streams[s_idx]->time_base;
203 dst_tb = avf2->streams[s_idx]->time_base;
204 av_packet_rescale_ts(pkt, src_tb, dst_tb);
205
206 ret = av_write_frame(avf2, pkt);
207 if (ret >= 0)
208 av_packet_unref(pkt);
209 return ret;
210 }
211
fifo_thread_write_trailer(FifoThreadContext * ctx)212 static int fifo_thread_write_trailer(FifoThreadContext *ctx)
213 {
214 AVFormatContext *avf = ctx->avf;
215 FifoContext *fifo = avf->priv_data;
216 AVFormatContext *avf2 = fifo->avf;
217 int ret;
218
219 if (!ctx->header_written)
220 return 0;
221
222 ret = av_write_trailer(avf2);
223 ff_format_io_close(avf2, &avf2->pb);
224
225 return ret;
226 }
227
fifo_thread_dispatch_message(FifoThreadContext * ctx,FifoMessage * msg)228 static int fifo_thread_dispatch_message(FifoThreadContext *ctx, FifoMessage *msg)
229 {
230 int ret = AVERROR(EINVAL);
231
232 if (msg->type == FIFO_NOOP)
233 return 0;
234
235 if (!ctx->header_written) {
236 ret = fifo_thread_write_header(ctx);
237 if (ret < 0)
238 return ret;
239 }
240
241 switch(msg->type) {
242 case FIFO_WRITE_HEADER:
243 av_assert0(ret >= 0);
244 return ret;
245 case FIFO_WRITE_PACKET:
246 return fifo_thread_write_packet(ctx, &msg->pkt);
247 case FIFO_FLUSH_OUTPUT:
248 return fifo_thread_flush_output(ctx);
249 }
250
251 av_assert0(0);
252 return AVERROR(EINVAL);
253 }
254
is_recoverable(const FifoContext * fifo,int err_no)255 static int is_recoverable(const FifoContext *fifo, int err_no) {
256 if (!fifo->attempt_recovery)
257 return 0;
258
259 if (fifo->recover_any_error)
260 return err_no != AVERROR_EXIT;
261
262 switch (err_no) {
263 case AVERROR(EINVAL):
264 case AVERROR(ENOSYS):
265 case AVERROR_EOF:
266 case AVERROR_EXIT:
267 case AVERROR_PATCHWELCOME:
268 return 0;
269 default:
270 return 1;
271 }
272 }
273
free_message(void * msg)274 static void free_message(void *msg)
275 {
276 FifoMessage *fifo_msg = msg;
277
278 if (fifo_msg->type == FIFO_WRITE_PACKET)
279 av_packet_unref(&fifo_msg->pkt);
280 }
281
fifo_thread_process_recovery_failure(FifoThreadContext * ctx,AVPacket * pkt,int err_no)282 static int fifo_thread_process_recovery_failure(FifoThreadContext *ctx, AVPacket *pkt,
283 int err_no)
284 {
285 AVFormatContext *avf = ctx->avf;
286 FifoContext *fifo = avf->priv_data;
287 int ret;
288
289 av_log(avf, AV_LOG_INFO, "Recovery failed: %s\n",
290 av_err2str(err_no));
291
292 if (fifo->recovery_wait_streamtime) {
293 if (pkt->pts == AV_NOPTS_VALUE)
294 av_log(avf, AV_LOG_WARNING, "Packet does not contain presentation"
295 " timestamp, recovery will be attempted immediately");
296 ctx->last_recovery_ts = pkt->pts;
297 } else {
298 ctx->last_recovery_ts = av_gettime_relative();
299 }
300
301 if (fifo->max_recovery_attempts &&
302 ctx->recovery_nr >= fifo->max_recovery_attempts) {
303 av_log(avf, AV_LOG_ERROR,
304 "Maximal number of %d recovery attempts reached.\n",
305 fifo->max_recovery_attempts);
306 ret = err_no;
307 } else {
308 ret = AVERROR(EAGAIN);
309 }
310
311 return ret;
312 }
313
fifo_thread_attempt_recovery(FifoThreadContext * ctx,FifoMessage * msg,int err_no)314 static int fifo_thread_attempt_recovery(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
315 {
316 AVFormatContext *avf = ctx->avf;
317 FifoContext *fifo = avf->priv_data;
318 AVPacket *pkt = &msg->pkt;
319 int64_t time_since_recovery;
320 int ret;
321
322 if (!is_recoverable(fifo, err_no)) {
323 ret = err_no;
324 goto fail;
325 }
326
327 if (ctx->header_written) {
328 fifo->write_trailer_ret = fifo_thread_write_trailer(ctx);
329 ctx->header_written = 0;
330 }
331
332 if (!ctx->recovery_nr) {
333 ctx->last_recovery_ts = fifo->recovery_wait_streamtime ?
334 AV_NOPTS_VALUE : 0;
335 } else {
336 if (fifo->recovery_wait_streamtime) {
337 if (ctx->last_recovery_ts == AV_NOPTS_VALUE) {
338 AVRational tb = avf->streams[pkt->stream_index]->time_base;
339 time_since_recovery = av_rescale_q(pkt->pts - ctx->last_recovery_ts,
340 tb, AV_TIME_BASE_Q);
341 } else {
342 /* Enforce recovery immediately */
343 time_since_recovery = fifo->recovery_wait_time;
344 }
345 } else {
346 time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
347 }
348
349 if (time_since_recovery < fifo->recovery_wait_time)
350 return AVERROR(EAGAIN);
351 }
352
353 ctx->recovery_nr++;
354
355 if (fifo->max_recovery_attempts) {
356 av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d/%d\n",
357 ctx->recovery_nr, fifo->max_recovery_attempts);
358 } else {
359 av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d\n",
360 ctx->recovery_nr);
361 }
362
363 if (fifo->restart_with_keyframe && fifo->drop_pkts_on_overflow)
364 ctx->drop_until_keyframe = 1;
365
366 ret = fifo_thread_dispatch_message(ctx, msg);
367 if (ret < 0) {
368 if (is_recoverable(fifo, ret)) {
369 return fifo_thread_process_recovery_failure(ctx, pkt, ret);
370 } else {
371 goto fail;
372 }
373 } else {
374 av_log(avf, AV_LOG_INFO, "Recovery successful\n");
375 ctx->recovery_nr = 0;
376 }
377
378 return 0;
379
380 fail:
381 free_message(msg);
382 return ret;
383 }
384
fifo_thread_recover(FifoThreadContext * ctx,FifoMessage * msg,int err_no)385 static int fifo_thread_recover(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
386 {
387 AVFormatContext *avf = ctx->avf;
388 FifoContext *fifo = avf->priv_data;
389 int ret;
390
391 do {
392 if (!fifo->recovery_wait_streamtime && ctx->recovery_nr > 0) {
393 int64_t time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
394 int64_t time_to_wait = FFMAX(0, fifo->recovery_wait_time - time_since_recovery);
395 if (time_to_wait)
396 av_usleep(FFMIN(10000, time_to_wait));
397 }
398
399 ret = fifo_thread_attempt_recovery(ctx, msg, err_no);
400 } while (ret == AVERROR(EAGAIN) && !fifo->drop_pkts_on_overflow);
401
402 if (ret == AVERROR(EAGAIN) && fifo->drop_pkts_on_overflow) {
403 if (msg->type == FIFO_WRITE_PACKET)
404 av_packet_unref(&msg->pkt);
405 ret = 0;
406 }
407
408 return ret;
409 }
410
fifo_consumer_thread(void * data)411 static void *fifo_consumer_thread(void *data)
412 {
413 AVFormatContext *avf = data;
414 FifoContext *fifo = avf->priv_data;
415 AVThreadMessageQueue *queue = fifo->queue;
416 FifoMessage msg = {fifo->timeshift ? FIFO_NOOP : FIFO_WRITE_HEADER, {0}};
417 int ret;
418
419 FifoThreadContext fifo_thread_ctx;
420 memset(&fifo_thread_ctx, 0, sizeof(FifoThreadContext));
421 fifo_thread_ctx.avf = avf;
422 fifo_thread_ctx.last_received_dts = AV_NOPTS_VALUE;
423
424 while (1) {
425 uint8_t just_flushed = 0;
426
427 if (!fifo_thread_ctx.recovery_nr)
428 ret = fifo_thread_dispatch_message(&fifo_thread_ctx, &msg);
429
430 if (ret < 0 || fifo_thread_ctx.recovery_nr > 0) {
431 int rec_ret = fifo_thread_recover(&fifo_thread_ctx, &msg, ret);
432 if (rec_ret < 0) {
433 av_thread_message_queue_set_err_send(queue, rec_ret);
434 break;
435 }
436 }
437
438 /* If the queue is full at the moment when fifo_write_packet
439 * attempts to insert new message (packet) to the queue,
440 * it sets the fifo->overflow_flag to 1 and drops packet.
441 * Here in consumer thread, the flag is checked and if it is
442 * set, the queue is flushed and flag cleared. */
443 pthread_mutex_lock(&fifo->overflow_flag_lock);
444 if (fifo->overflow_flag) {
445 av_thread_message_flush(queue);
446 if (fifo->restart_with_keyframe)
447 fifo_thread_ctx.drop_until_keyframe = 1;
448 fifo->overflow_flag = 0;
449 just_flushed = 1;
450 }
451 pthread_mutex_unlock(&fifo->overflow_flag_lock);
452
453 if (just_flushed)
454 av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n");
455
456 if (fifo->timeshift)
457 while (atomic_load_explicit(&fifo->queue_duration, memory_order_relaxed) < fifo->timeshift)
458 av_usleep(10000);
459
460 ret = av_thread_message_queue_recv(queue, &msg, 0);
461 if (ret < 0) {
462 av_thread_message_queue_set_err_send(queue, ret);
463 break;
464 }
465 }
466
467 fifo->write_trailer_ret = fifo_thread_write_trailer(&fifo_thread_ctx);
468
469 return NULL;
470 }
471
fifo_mux_init(AVFormatContext * avf,ff_const59 AVOutputFormat * oformat,const char * filename)472 static int fifo_mux_init(AVFormatContext *avf, ff_const59 AVOutputFormat *oformat,
473 const char *filename)
474 {
475 FifoContext *fifo = avf->priv_data;
476 AVFormatContext *avf2;
477 int ret = 0, i;
478
479 ret = avformat_alloc_output_context2(&avf2, oformat, NULL, filename);
480 if (ret < 0)
481 return ret;
482
483 fifo->avf = avf2;
484
485 avf2->interrupt_callback = avf->interrupt_callback;
486 avf2->max_delay = avf->max_delay;
487 ret = av_dict_copy(&avf2->metadata, avf->metadata, 0);
488 if (ret < 0)
489 return ret;
490 avf2->opaque = avf->opaque;
491 avf2->io_close = avf->io_close;
492 avf2->io_open = avf->io_open;
493 avf2->flags = avf->flags;
494
495 for (i = 0; i < avf->nb_streams; ++i) {
496 AVStream *st = avformat_new_stream(avf2, NULL);
497 if (!st)
498 return AVERROR(ENOMEM);
499
500 ret = ff_stream_encode_params_copy(st, avf->streams[i]);
501 if (ret < 0)
502 return ret;
503 }
504
505 return 0;
506 }
507
fifo_init(AVFormatContext * avf)508 static int fifo_init(AVFormatContext *avf)
509 {
510 FifoContext *fifo = avf->priv_data;
511 ff_const59 AVOutputFormat *oformat;
512 int ret = 0;
513
514 if (fifo->recovery_wait_streamtime && !fifo->drop_pkts_on_overflow) {
515 av_log(avf, AV_LOG_ERROR, "recovery_wait_streamtime can be turned on"
516 " only when drop_pkts_on_overflow is also turned on\n");
517 return AVERROR(EINVAL);
518 }
519 atomic_init(&fifo->queue_duration, 0);
520 fifo->last_sent_dts = AV_NOPTS_VALUE;
521
522 oformat = av_guess_format(fifo->format, avf->url, NULL);
523 if (!oformat) {
524 ret = AVERROR_MUXER_NOT_FOUND;
525 return ret;
526 }
527
528 ret = fifo_mux_init(avf, oformat, avf->url);
529 if (ret < 0)
530 return ret;
531
532 ret = av_thread_message_queue_alloc(&fifo->queue, (unsigned) fifo->queue_size,
533 sizeof(FifoMessage));
534 if (ret < 0)
535 return ret;
536
537 av_thread_message_queue_set_free_func(fifo->queue, free_message);
538
539 ret = pthread_mutex_init(&fifo->overflow_flag_lock, NULL);
540 if (ret < 0)
541 return AVERROR(ret);
542 fifo->overflow_flag_lock_initialized = 1;
543
544 return 0;
545 }
546
fifo_write_header(AVFormatContext * avf)547 static int fifo_write_header(AVFormatContext *avf)
548 {
549 FifoContext * fifo = avf->priv_data;
550 int ret;
551
552 ret = pthread_create(&fifo->writer_thread, NULL, fifo_consumer_thread, avf);
553 if (ret) {
554 av_log(avf, AV_LOG_ERROR, "Failed to start thread: %s\n",
555 av_err2str(AVERROR(ret)));
556 ret = AVERROR(ret);
557 }
558
559 return ret;
560 }
561
fifo_write_packet(AVFormatContext * avf,AVPacket * pkt)562 static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt)
563 {
564 FifoContext *fifo = avf->priv_data;
565 FifoMessage msg = {.type = pkt ? FIFO_WRITE_PACKET : FIFO_FLUSH_OUTPUT};
566 int ret;
567
568 if (pkt) {
569 ret = av_packet_ref(&msg.pkt,pkt);
570 if (ret < 0)
571 return ret;
572 }
573
574 ret = av_thread_message_queue_send(fifo->queue, &msg,
575 fifo->drop_pkts_on_overflow ?
576 AV_THREAD_MESSAGE_NONBLOCK : 0);
577 if (ret == AVERROR(EAGAIN)) {
578 uint8_t overflow_set = 0;
579
580 /* Queue is full, set fifo->overflow_flag to 1
581 * to let consumer thread know the queue should
582 * be flushed. */
583 pthread_mutex_lock(&fifo->overflow_flag_lock);
584 if (!fifo->overflow_flag)
585 fifo->overflow_flag = overflow_set = 1;
586 pthread_mutex_unlock(&fifo->overflow_flag_lock);
587
588 if (overflow_set)
589 av_log(avf, AV_LOG_WARNING, "FIFO queue full\n");
590 ret = 0;
591 goto fail;
592 } else if (ret < 0) {
593 goto fail;
594 }
595
596 if (fifo->timeshift && pkt && pkt->dts != AV_NOPTS_VALUE)
597 atomic_fetch_add_explicit(&fifo->queue_duration, next_duration(avf, pkt, &fifo->last_sent_dts), memory_order_relaxed);
598
599 return ret;
600 fail:
601 if (pkt)
602 av_packet_unref(&msg.pkt);
603 return ret;
604 }
605
fifo_write_trailer(AVFormatContext * avf)606 static int fifo_write_trailer(AVFormatContext *avf)
607 {
608 FifoContext *fifo= avf->priv_data;
609 int ret;
610
611 av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF);
612 if (fifo->timeshift) {
613 int64_t now = av_gettime_relative();
614 int64_t elapsed = 0;
615 FifoMessage msg = {FIFO_NOOP};
616 do {
617 int64_t delay = av_gettime_relative() - now;
618 if (delay < 0) { // Discontinuity?
619 delay = 10000;
620 now = av_gettime_relative();
621 } else {
622 now += delay;
623 }
624 atomic_fetch_add_explicit(&fifo->queue_duration, delay, memory_order_relaxed);
625 elapsed += delay;
626 if (elapsed > fifo->timeshift)
627 break;
628 av_usleep(10000);
629 ret = av_thread_message_queue_send(fifo->queue, &msg, AV_THREAD_MESSAGE_NONBLOCK);
630 } while (ret >= 0 || ret == AVERROR(EAGAIN));
631 atomic_store(&fifo->queue_duration, INT64_MAX);
632 }
633
634 ret = pthread_join(fifo->writer_thread, NULL);
635 if (ret < 0) {
636 av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n",
637 av_err2str(AVERROR(ret)));
638 return AVERROR(ret);
639 }
640
641 ret = fifo->write_trailer_ret;
642 return ret;
643 }
644
fifo_deinit(AVFormatContext * avf)645 static void fifo_deinit(AVFormatContext *avf)
646 {
647 FifoContext *fifo = avf->priv_data;
648
649 avformat_free_context(fifo->avf);
650 av_thread_message_queue_free(&fifo->queue);
651 if (fifo->overflow_flag_lock_initialized)
652 pthread_mutex_destroy(&fifo->overflow_flag_lock);
653 }
654
655 #define OFFSET(x) offsetof(FifoContext, x)
656 static const AVOption options[] = {
657 {"fifo_format", "Target muxer", OFFSET(format),
658 AV_OPT_TYPE_STRING, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
659
660 {"queue_size", "Size of fifo queue", OFFSET(queue_size),
661 AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_QUEUE_SIZE}, 1, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM},
662
663 {"format_opts", "Options to be passed to underlying muxer", OFFSET(format_options),
664 AV_OPT_TYPE_DICT, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
665
666 {"drop_pkts_on_overflow", "Drop packets on fifo queue overflow not to block encoder", OFFSET(drop_pkts_on_overflow),
667 AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
668
669 {"restart_with_keyframe", "Wait for keyframe when restarting output", OFFSET(restart_with_keyframe),
670 AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
671
672 {"attempt_recovery", "Attempt recovery in case of failure", OFFSET(attempt_recovery),
673 AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
674
675 {"max_recovery_attempts", "Maximal number of recovery attempts", OFFSET(max_recovery_attempts),
676 AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS}, 0, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM},
677
678 {"recovery_wait_time", "Waiting time between recovery attempts", OFFSET(recovery_wait_time),
679 AV_OPT_TYPE_DURATION, {.i64 = FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM},
680
681 {"recovery_wait_streamtime", "Use stream time instead of real time while waiting for recovery",
682 OFFSET(recovery_wait_streamtime), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
683
684 {"recover_any_error", "Attempt recovery regardless of type of the error", OFFSET(recover_any_error),
685 AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
686
687 {"timeshift", "Delay fifo output", OFFSET(timeshift),
688 AV_OPT_TYPE_DURATION, {.i64 = 0}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM},
689
690 {NULL},
691 };
692
693 static const AVClass fifo_muxer_class = {
694 .class_name = "Fifo muxer",
695 .item_name = av_default_item_name,
696 .option = options,
697 .version = LIBAVUTIL_VERSION_INT,
698 };
699
700 AVOutputFormat ff_fifo_muxer = {
701 .name = "fifo",
702 .long_name = NULL_IF_CONFIG_SMALL("FIFO queue pseudo-muxer"),
703 .priv_data_size = sizeof(FifoContext),
704 .init = fifo_init,
705 .write_header = fifo_write_header,
706 .write_packet = fifo_write_packet,
707 .write_trailer = fifo_write_trailer,
708 .deinit = fifo_deinit,
709 .priv_class = &fifo_muxer_class,
710 .flags = AVFMT_NOFILE | AVFMT_ALLOW_FLUSH | AVFMT_TS_NEGATIVE,
711 };
712