• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2012 Michael Niedermayer <michaelni@gmx.at>
3  *
4  * This file is part of FFmpeg.
5  *
6  * FFmpeg is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * FFmpeg is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with FFmpeg; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19  */
20 
21 #include <stdatomic.h>
22 
23 #include "frame_thread_encoder.h"
24 
25 #include "libavutil/avassert.h"
26 #include "libavutil/cpu.h"
27 #include "libavutil/imgutils.h"
28 #include "libavutil/opt.h"
29 #include "libavutil/thread.h"
30 #include "avcodec.h"
31 #include "codec_internal.h"
32 #include "internal.h"
33 #include "pthread_internal.h"
34 #include "thread.h"
35 
36 #define MAX_THREADS 64
37 /* There can be as many as MAX_THREADS + 1 outstanding tasks.
38  * An additional + 1 is needed so that one can distinguish
39  * the case of zero and MAX_THREADS + 1 outstanding tasks modulo
40  * the number of buffers. */
41 #define BUFFER_SIZE (MAX_THREADS + 2)
42 
43 typedef struct{
44     AVFrame  *indata;
45     AVPacket *outdata;
46     int       return_code;
47     int       finished;
48 } Task;
49 
50 typedef struct{
51     AVCodecContext *parent_avctx;
52     pthread_mutex_t buffer_mutex;
53 
54     pthread_mutex_t task_fifo_mutex; /* Used to guard (next_)task_index */
55     pthread_cond_t task_fifo_cond;
56 
57     unsigned pthread_init_cnt;
58     unsigned max_tasks;
59     Task tasks[BUFFER_SIZE];
60     pthread_mutex_t finished_task_mutex; /* Guards tasks[i].finished */
61     pthread_cond_t finished_task_cond;
62 
63     unsigned next_task_index;
64     unsigned task_index;
65     unsigned finished_task_index;
66 
67     pthread_t worker[MAX_THREADS];
68     atomic_int exit;
69 } ThreadContext;
70 
71 #define OFF(member) offsetof(ThreadContext, member)
72 DEFINE_OFFSET_ARRAY(ThreadContext, thread_ctx, pthread_init_cnt,
73                     (OFF(buffer_mutex), OFF(task_fifo_mutex), OFF(finished_task_mutex)),
74                     (OFF(task_fifo_cond), OFF(finished_task_cond)));
75 #undef OFF
76 
worker(void * v)77 static void * attribute_align_arg worker(void *v){
78     AVCodecContext *avctx = v;
79     ThreadContext *c = avctx->internal->frame_thread_encoder;
80 
81     while (!atomic_load(&c->exit)) {
82         int got_packet = 0, ret;
83         AVPacket *pkt;
84         AVFrame *frame;
85         Task *task;
86         unsigned task_index;
87 
88         pthread_mutex_lock(&c->task_fifo_mutex);
89         while (c->next_task_index == c->task_index || atomic_load(&c->exit)) {
90             if (atomic_load(&c->exit)) {
91                 pthread_mutex_unlock(&c->task_fifo_mutex);
92                 goto end;
93             }
94             pthread_cond_wait(&c->task_fifo_cond, &c->task_fifo_mutex);
95         }
96         task_index         = c->next_task_index;
97         c->next_task_index = (c->next_task_index + 1) % c->max_tasks;
98         pthread_mutex_unlock(&c->task_fifo_mutex);
99         /* The main thread ensures that any two outstanding tasks have
100          * different indices, ergo each worker thread owns its element
101          * of c->tasks with the exception of finished, which is shared
102          * with the main thread and guarded by finished_task_mutex. */
103         task  = &c->tasks[task_index];
104         frame = task->indata;
105         pkt   = task->outdata;
106 
107         ret = ffcodec(avctx->codec)->cb.encode(avctx, pkt, frame, &got_packet);
108         if(got_packet) {
109             int ret2 = av_packet_make_refcounted(pkt);
110             if (ret >= 0 && ret2 < 0)
111                 ret = ret2;
112             pkt->pts = pkt->dts = frame->pts;
113         } else {
114             pkt->data = NULL;
115             pkt->size = 0;
116         }
117         pthread_mutex_lock(&c->buffer_mutex);
118         av_frame_unref(frame);
119         pthread_mutex_unlock(&c->buffer_mutex);
120         pthread_mutex_lock(&c->finished_task_mutex);
121         task->return_code = ret;
122         task->finished    = 1;
123         pthread_cond_signal(&c->finished_task_cond);
124         pthread_mutex_unlock(&c->finished_task_mutex);
125     }
126 end:
127     pthread_mutex_lock(&c->buffer_mutex);
128     avcodec_close(avctx);
129     pthread_mutex_unlock(&c->buffer_mutex);
130     av_freep(&avctx);
131     return NULL;
132 }
133 
ff_frame_thread_encoder_init(AVCodecContext * avctx)134 av_cold int ff_frame_thread_encoder_init(AVCodecContext *avctx)
135 {
136     int i=0;
137     ThreadContext *c;
138     AVCodecContext *thread_avctx = NULL;
139     int ret;
140 
141     if(   !(avctx->thread_type & FF_THREAD_FRAME)
142        || !(avctx->codec->capabilities & AV_CODEC_CAP_FRAME_THREADS))
143         return 0;
144 
145     if(   !avctx->thread_count
146        && avctx->codec_id == AV_CODEC_ID_MJPEG
147        && !(avctx->flags & AV_CODEC_FLAG_QSCALE)) {
148         av_log(avctx, AV_LOG_DEBUG,
149                "Forcing thread count to 1 for MJPEG encoding, use -thread_type slice "
150                "or a constant quantizer if you want to use multiple cpu cores\n");
151         avctx->thread_count = 1;
152     }
153     if(   avctx->thread_count > 1
154        && avctx->codec_id == AV_CODEC_ID_MJPEG
155        && !(avctx->flags & AV_CODEC_FLAG_QSCALE))
156         av_log(avctx, AV_LOG_WARNING,
157                "MJPEG CBR encoding works badly with frame multi-threading, consider "
158                "using -threads 1, -thread_type slice or a constant quantizer.\n");
159 
160     if (avctx->codec_id == AV_CODEC_ID_HUFFYUV ||
161         avctx->codec_id == AV_CODEC_ID_FFVHUFF) {
162         int warn = 0;
163         int64_t tmp;
164 
165         if (avctx->flags & AV_CODEC_FLAG_PASS1)
166             warn = 1;
167         else if (av_opt_get_int(avctx->priv_data, "context", 0, &tmp) >= 0 &&
168                  tmp > 0) {
169             warn = av_opt_get_int(avctx->priv_data, "non_deterministic", 0, &tmp) < 0
170                    || !tmp;
171         }
172         // huffyuv does not support these with multiple frame threads currently
173         if (warn) {
174             av_log(avctx, AV_LOG_WARNING,
175                "Forcing thread count to 1 for huffyuv encoding with first pass or context 1\n");
176             avctx->thread_count = 1;
177         }
178     }
179 
180     if(!avctx->thread_count) {
181         avctx->thread_count = av_cpu_count();
182         avctx->thread_count = FFMIN(avctx->thread_count, MAX_THREADS);
183     }
184 
185     if(avctx->thread_count <= 1)
186         return 0;
187 
188     if(avctx->thread_count > MAX_THREADS)
189         return AVERROR(EINVAL);
190 
191     av_assert0(!avctx->internal->frame_thread_encoder);
192     c = avctx->internal->frame_thread_encoder = av_mallocz(sizeof(ThreadContext));
193     if(!c)
194         return AVERROR(ENOMEM);
195 
196     c->parent_avctx = avctx;
197 
198     ret = ff_pthread_init(c, thread_ctx_offsets);
199     if (ret < 0)
200         goto fail;
201     atomic_init(&c->exit, 0);
202 
203     c->max_tasks = avctx->thread_count + 2;
204     for (unsigned j = 0; j < c->max_tasks; j++) {
205         if (!(c->tasks[j].indata  = av_frame_alloc()) ||
206             !(c->tasks[j].outdata = av_packet_alloc())) {
207             ret = AVERROR(ENOMEM);
208             goto fail;
209         }
210     }
211 
212     for(i=0; i<avctx->thread_count ; i++){
213         void *tmpv;
214         thread_avctx = avcodec_alloc_context3(avctx->codec);
215         if (!thread_avctx) {
216             ret = AVERROR(ENOMEM);
217             goto fail;
218         }
219         tmpv = thread_avctx->priv_data;
220         *thread_avctx = *avctx;
221         thread_avctx->priv_data = tmpv;
222         thread_avctx->internal = NULL;
223         thread_avctx->hw_frames_ctx = NULL;
224         ret = av_opt_copy(thread_avctx, avctx);
225         if (ret < 0)
226             goto fail;
227         if (avctx->codec->priv_class) {
228             ret = av_opt_copy(thread_avctx->priv_data, avctx->priv_data);
229             if (ret < 0)
230                 goto fail;
231         }
232         thread_avctx->thread_count = 1;
233         thread_avctx->active_thread_type &= ~FF_THREAD_FRAME;
234 
235         if ((ret = avcodec_open2(thread_avctx, avctx->codec, NULL)) < 0)
236             goto fail;
237         av_assert0(!thread_avctx->internal->frame_thread_encoder);
238         thread_avctx->internal->frame_thread_encoder = c;
239         if ((ret = pthread_create(&c->worker[i], NULL, worker, thread_avctx))) {
240             ret = AVERROR(ret);
241             goto fail;
242         }
243     }
244 
245     avctx->active_thread_type = FF_THREAD_FRAME;
246 
247     return 0;
248 fail:
249     avcodec_close(thread_avctx);
250     av_freep(&thread_avctx);
251     avctx->thread_count = i;
252     av_log(avctx, AV_LOG_ERROR, "ff_frame_thread_encoder_init failed\n");
253     ff_frame_thread_encoder_free(avctx);
254     return ret;
255 }
256 
ff_frame_thread_encoder_free(AVCodecContext * avctx)257 av_cold void ff_frame_thread_encoder_free(AVCodecContext *avctx)
258 {
259     ThreadContext *c= avctx->internal->frame_thread_encoder;
260 
261     /* In case initializing the mutexes/condition variables failed,
262      * they must not be used. In this case the thread_count is zero
263      * as no thread has been initialized yet. */
264     if (avctx->thread_count > 0) {
265         pthread_mutex_lock(&c->task_fifo_mutex);
266         atomic_store(&c->exit, 1);
267         pthread_cond_broadcast(&c->task_fifo_cond);
268         pthread_mutex_unlock(&c->task_fifo_mutex);
269 
270         for (int i = 0; i < avctx->thread_count; i++)
271             pthread_join(c->worker[i], NULL);
272     }
273 
274     for (unsigned i = 0; i < c->max_tasks; i++) {
275         av_frame_free(&c->tasks[i].indata);
276         av_packet_free(&c->tasks[i].outdata);
277     }
278 
279     ff_pthread_free(c, thread_ctx_offsets);
280     av_freep(&avctx->internal->frame_thread_encoder);
281 }
282 
ff_thread_video_encode_frame(AVCodecContext * avctx,AVPacket * pkt,AVFrame * frame,int * got_packet_ptr)283 int ff_thread_video_encode_frame(AVCodecContext *avctx, AVPacket *pkt,
284                                  AVFrame *frame, int *got_packet_ptr)
285 {
286     ThreadContext *c = avctx->internal->frame_thread_encoder;
287     Task *outtask;
288 
289     av_assert1(!*got_packet_ptr);
290 
291     if(frame){
292         av_frame_move_ref(c->tasks[c->task_index].indata, frame);
293 
294         pthread_mutex_lock(&c->task_fifo_mutex);
295         c->task_index = (c->task_index + 1) % c->max_tasks;
296         pthread_cond_signal(&c->task_fifo_cond);
297         pthread_mutex_unlock(&c->task_fifo_mutex);
298     }
299 
300     outtask = &c->tasks[c->finished_task_index];
301     pthread_mutex_lock(&c->finished_task_mutex);
302     /* The access to task_index in the following code is ok,
303      * because it is only ever changed by the main thread. */
304     if (c->task_index == c->finished_task_index ||
305         (frame && !outtask->finished &&
306          (c->task_index - c->finished_task_index + c->max_tasks) % c->max_tasks <= avctx->thread_count)) {
307             pthread_mutex_unlock(&c->finished_task_mutex);
308             return 0;
309         }
310     while (!outtask->finished) {
311         pthread_cond_wait(&c->finished_task_cond, &c->finished_task_mutex);
312     }
313     pthread_mutex_unlock(&c->finished_task_mutex);
314     /* We now own outtask completely: No worker thread touches it any more,
315      * because there is no outstanding task with this index. */
316     outtask->finished = 0;
317     av_packet_move_ref(pkt, outtask->outdata);
318     if(pkt->data)
319         *got_packet_ptr = 1;
320     c->finished_task_index = (c->finished_task_index + 1) % c->max_tasks;
321 
322     return outtask->return_code;
323 }
324