• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * This file is part of FFmpeg.
3  *
4  * FFmpeg is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or (at your option) any later version.
8  *
9  * FFmpeg is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with FFmpeg; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17  */
18 
19 #include <stdatomic.h>
20 #include "cpu.h"
21 #include "internal.h"
22 #include "slicethread.h"
23 #include "mem.h"
24 #include "thread.h"
25 #include "avassert.h"
26 
27 #if HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS2THREADS
28 
29 typedef struct WorkerContext {
30     AVSliceThread   *ctx;
31     pthread_mutex_t mutex;
32     pthread_cond_t  cond;
33     pthread_t       thread;
34     int             done;
35 } WorkerContext;
36 
37 struct AVSliceThread {
38     WorkerContext   *workers;
39     int             nb_threads;
40     int             nb_active_threads;
41     int             nb_jobs;
42 
43     atomic_uint     first_job;
44     atomic_uint     current_job;
45     pthread_mutex_t done_mutex;
46     pthread_cond_t  done_cond;
47     int             done;
48     int             finished;
49 
50     void            *priv;
51     void            (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads);
52     void            (*main_func)(void *priv);
53 };
54 
run_jobs(AVSliceThread * ctx)55 static int run_jobs(AVSliceThread *ctx)
56 {
57     unsigned nb_jobs    = ctx->nb_jobs;
58     unsigned nb_active_threads = ctx->nb_active_threads;
59     unsigned first_job    = atomic_fetch_add_explicit(&ctx->first_job, 1, memory_order_acq_rel);
60     unsigned current_job  = first_job;
61 
62     do {
63         ctx->worker_func(ctx->priv, current_job, first_job, nb_jobs, nb_active_threads);
64     } while ((current_job = atomic_fetch_add_explicit(&ctx->current_job, 1, memory_order_acq_rel)) < nb_jobs);
65 
66     return current_job == nb_jobs + nb_active_threads - 1;
67 }
68 
thread_worker(void * v)69 static void *attribute_align_arg thread_worker(void *v)
70 {
71     WorkerContext *w = v;
72     AVSliceThread *ctx = w->ctx;
73 
74     pthread_mutex_lock(&w->mutex);
75     pthread_cond_signal(&w->cond);
76 
77     while (1) {
78         w->done = 1;
79         while (w->done)
80             pthread_cond_wait(&w->cond, &w->mutex);
81 
82         if (ctx->finished) {
83             pthread_mutex_unlock(&w->mutex);
84             return NULL;
85         }
86 
87         if (run_jobs(ctx)) {
88             pthread_mutex_lock(&ctx->done_mutex);
89             ctx->done = 1;
90             pthread_cond_signal(&ctx->done_cond);
91             pthread_mutex_unlock(&ctx->done_mutex);
92         }
93     }
94 }
95 
avpriv_slicethread_create(AVSliceThread ** pctx,void * priv,void (* worker_func)(void * priv,int jobnr,int threadnr,int nb_jobs,int nb_threads),void (* main_func)(void * priv),int nb_threads)96 int avpriv_slicethread_create(AVSliceThread **pctx, void *priv,
97                               void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads),
98                               void (*main_func)(void *priv),
99                               int nb_threads)
100 {
101     AVSliceThread *ctx;
102     int nb_workers, i;
103 
104     av_assert0(nb_threads >= 0);
105     if (!nb_threads) {
106         int nb_cpus = av_cpu_count();
107         if (nb_cpus > 1)
108             nb_threads = nb_cpus + 1;
109         else
110             nb_threads = 1;
111     }
112 
113     nb_workers = nb_threads;
114     if (!main_func)
115         nb_workers--;
116 
117     *pctx = ctx = av_mallocz(sizeof(*ctx));
118     if (!ctx)
119         return AVERROR(ENOMEM);
120 
121     if (nb_workers && !(ctx->workers = av_calloc(nb_workers, sizeof(*ctx->workers)))) {
122         av_freep(pctx);
123         return AVERROR(ENOMEM);
124     }
125 
126     ctx->priv        = priv;
127     ctx->worker_func = worker_func;
128     ctx->main_func   = main_func;
129     ctx->nb_threads  = nb_threads;
130     ctx->nb_active_threads = 0;
131     ctx->nb_jobs     = 0;
132     ctx->finished    = 0;
133 
134     atomic_init(&ctx->first_job, 0);
135     atomic_init(&ctx->current_job, 0);
136     pthread_mutex_init(&ctx->done_mutex, NULL);
137     pthread_cond_init(&ctx->done_cond, NULL);
138     ctx->done        = 0;
139 
140     for (i = 0; i < nb_workers; i++) {
141         WorkerContext *w = &ctx->workers[i];
142         int ret;
143         w->ctx = ctx;
144         pthread_mutex_init(&w->mutex, NULL);
145         pthread_cond_init(&w->cond, NULL);
146         pthread_mutex_lock(&w->mutex);
147         w->done = 0;
148 
149         if (ret = pthread_create(&w->thread, NULL, thread_worker, w)) {
150             ctx->nb_threads = main_func ? i : i + 1;
151             pthread_mutex_unlock(&w->mutex);
152             pthread_cond_destroy(&w->cond);
153             pthread_mutex_destroy(&w->mutex);
154             avpriv_slicethread_free(pctx);
155             return AVERROR(ret);
156         }
157 
158         while (!w->done)
159             pthread_cond_wait(&w->cond, &w->mutex);
160         pthread_mutex_unlock(&w->mutex);
161     }
162 
163     return nb_threads;
164 }
165 
avpriv_slicethread_execute(AVSliceThread * ctx,int nb_jobs,int execute_main)166 void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
167 {
168     int nb_workers, i, is_last = 0;
169 
170     av_assert0(nb_jobs > 0);
171     ctx->nb_jobs           = nb_jobs;
172     ctx->nb_active_threads = FFMIN(nb_jobs, ctx->nb_threads);
173     atomic_store_explicit(&ctx->first_job, 0, memory_order_relaxed);
174     atomic_store_explicit(&ctx->current_job, ctx->nb_active_threads, memory_order_relaxed);
175     nb_workers             = ctx->nb_active_threads;
176     if (!ctx->main_func || !execute_main)
177         nb_workers--;
178 
179     for (i = 0; i < nb_workers; i++) {
180         WorkerContext *w = &ctx->workers[i];
181         pthread_mutex_lock(&w->mutex);
182         w->done = 0;
183         pthread_cond_signal(&w->cond);
184         pthread_mutex_unlock(&w->mutex);
185     }
186 
187     if (ctx->main_func && execute_main)
188         ctx->main_func(ctx->priv);
189     else
190         is_last = run_jobs(ctx);
191 
192     if (!is_last) {
193         pthread_mutex_lock(&ctx->done_mutex);
194         while (!ctx->done)
195             pthread_cond_wait(&ctx->done_cond, &ctx->done_mutex);
196         ctx->done = 0;
197         pthread_mutex_unlock(&ctx->done_mutex);
198     }
199 }
200 
avpriv_slicethread_free(AVSliceThread ** pctx)201 void avpriv_slicethread_free(AVSliceThread **pctx)
202 {
203     AVSliceThread *ctx;
204     int nb_workers, i;
205 
206     if (!pctx || !*pctx)
207         return;
208 
209     ctx = *pctx;
210     nb_workers = ctx->nb_threads;
211     if (!ctx->main_func)
212         nb_workers--;
213 
214     ctx->finished = 1;
215     for (i = 0; i < nb_workers; i++) {
216         WorkerContext *w = &ctx->workers[i];
217         pthread_mutex_lock(&w->mutex);
218         w->done = 0;
219         pthread_cond_signal(&w->cond);
220         pthread_mutex_unlock(&w->mutex);
221     }
222 
223     for (i = 0; i < nb_workers; i++) {
224         WorkerContext *w = &ctx->workers[i];
225         pthread_join(w->thread, NULL);
226         pthread_cond_destroy(&w->cond);
227         pthread_mutex_destroy(&w->mutex);
228     }
229 
230     pthread_cond_destroy(&ctx->done_cond);
231     pthread_mutex_destroy(&ctx->done_mutex);
232     av_freep(&ctx->workers);
233     av_freep(pctx);
234 }
235 
236 #else /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */
237 
avpriv_slicethread_create(AVSliceThread ** pctx,void * priv,void (* worker_func)(void * priv,int jobnr,int threadnr,int nb_jobs,int nb_threads),void (* main_func)(void * priv),int nb_threads)238 int avpriv_slicethread_create(AVSliceThread **pctx, void *priv,
239                               void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads),
240                               void (*main_func)(void *priv),
241                               int nb_threads)
242 {
243     *pctx = NULL;
244     return AVERROR(ENOSYS);
245 }
246 
avpriv_slicethread_execute(AVSliceThread * ctx,int nb_jobs,int execute_main)247 void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
248 {
249     av_assert0(0);
250 }
251 
avpriv_slicethread_free(AVSliceThread ** pctx)252 void avpriv_slicethread_free(AVSliceThread **pctx)
253 {
254     av_assert0(!pctx || !*pctx);
255 }
256 
257 #endif /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */
258