1 /*
2 * This file is part of FFmpeg.
3 *
4 * FFmpeg is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
8 *
9 * FFmpeg is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
13 *
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with FFmpeg; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18
19 #include <stdatomic.h>
20 #include "cpu.h"
21 #include "internal.h"
22 #include "slicethread.h"
23 #include "mem.h"
24 #include "thread.h"
25 #include "avassert.h"
26
27 #if HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS2THREADS
28
29 typedef struct WorkerContext {
30 AVSliceThread *ctx;
31 pthread_mutex_t mutex;
32 pthread_cond_t cond;
33 pthread_t thread;
34 int done;
35 } WorkerContext;
36
37 struct AVSliceThread {
38 WorkerContext *workers;
39 int nb_threads;
40 int nb_active_threads;
41 int nb_jobs;
42
43 atomic_uint first_job;
44 atomic_uint current_job;
45 pthread_mutex_t done_mutex;
46 pthread_cond_t done_cond;
47 int done;
48 int finished;
49
50 void *priv;
51 void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads);
52 void (*main_func)(void *priv);
53 };
54
run_jobs(AVSliceThread * ctx)55 static int run_jobs(AVSliceThread *ctx)
56 {
57 unsigned nb_jobs = ctx->nb_jobs;
58 unsigned nb_active_threads = ctx->nb_active_threads;
59 unsigned first_job = atomic_fetch_add_explicit(&ctx->first_job, 1, memory_order_acq_rel);
60 unsigned current_job = first_job;
61
62 do {
63 ctx->worker_func(ctx->priv, current_job, first_job, nb_jobs, nb_active_threads);
64 } while ((current_job = atomic_fetch_add_explicit(&ctx->current_job, 1, memory_order_acq_rel)) < nb_jobs);
65
66 return current_job == nb_jobs + nb_active_threads - 1;
67 }
68
thread_worker(void * v)69 static void *attribute_align_arg thread_worker(void *v)
70 {
71 WorkerContext *w = v;
72 AVSliceThread *ctx = w->ctx;
73
74 pthread_mutex_lock(&w->mutex);
75 pthread_cond_signal(&w->cond);
76
77 while (1) {
78 w->done = 1;
79 while (w->done)
80 pthread_cond_wait(&w->cond, &w->mutex);
81
82 if (ctx->finished) {
83 pthread_mutex_unlock(&w->mutex);
84 return NULL;
85 }
86
87 if (run_jobs(ctx)) {
88 pthread_mutex_lock(&ctx->done_mutex);
89 ctx->done = 1;
90 pthread_cond_signal(&ctx->done_cond);
91 pthread_mutex_unlock(&ctx->done_mutex);
92 }
93 }
94 }
95
avpriv_slicethread_create(AVSliceThread ** pctx,void * priv,void (* worker_func)(void * priv,int jobnr,int threadnr,int nb_jobs,int nb_threads),void (* main_func)(void * priv),int nb_threads)96 int avpriv_slicethread_create(AVSliceThread **pctx, void *priv,
97 void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads),
98 void (*main_func)(void *priv),
99 int nb_threads)
100 {
101 AVSliceThread *ctx;
102 int nb_workers, i;
103
104 av_assert0(nb_threads >= 0);
105 if (!nb_threads) {
106 int nb_cpus = av_cpu_count();
107 if (nb_cpus > 1)
108 nb_threads = nb_cpus + 1;
109 else
110 nb_threads = 1;
111 }
112
113 nb_workers = nb_threads;
114 if (!main_func)
115 nb_workers--;
116
117 *pctx = ctx = av_mallocz(sizeof(*ctx));
118 if (!ctx)
119 return AVERROR(ENOMEM);
120
121 if (nb_workers && !(ctx->workers = av_calloc(nb_workers, sizeof(*ctx->workers)))) {
122 av_freep(pctx);
123 return AVERROR(ENOMEM);
124 }
125
126 ctx->priv = priv;
127 ctx->worker_func = worker_func;
128 ctx->main_func = main_func;
129 ctx->nb_threads = nb_threads;
130 ctx->nb_active_threads = 0;
131 ctx->nb_jobs = 0;
132 ctx->finished = 0;
133
134 atomic_init(&ctx->first_job, 0);
135 atomic_init(&ctx->current_job, 0);
136 pthread_mutex_init(&ctx->done_mutex, NULL);
137 pthread_cond_init(&ctx->done_cond, NULL);
138 ctx->done = 0;
139
140 for (i = 0; i < nb_workers; i++) {
141 WorkerContext *w = &ctx->workers[i];
142 int ret;
143 w->ctx = ctx;
144 pthread_mutex_init(&w->mutex, NULL);
145 pthread_cond_init(&w->cond, NULL);
146 pthread_mutex_lock(&w->mutex);
147 w->done = 0;
148
149 if (ret = pthread_create(&w->thread, NULL, thread_worker, w)) {
150 ctx->nb_threads = main_func ? i : i + 1;
151 pthread_mutex_unlock(&w->mutex);
152 pthread_cond_destroy(&w->cond);
153 pthread_mutex_destroy(&w->mutex);
154 avpriv_slicethread_free(pctx);
155 return AVERROR(ret);
156 }
157
158 while (!w->done)
159 pthread_cond_wait(&w->cond, &w->mutex);
160 pthread_mutex_unlock(&w->mutex);
161 }
162
163 return nb_threads;
164 }
165
avpriv_slicethread_execute(AVSliceThread * ctx,int nb_jobs,int execute_main)166 void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
167 {
168 int nb_workers, i, is_last = 0;
169
170 av_assert0(nb_jobs > 0);
171 ctx->nb_jobs = nb_jobs;
172 ctx->nb_active_threads = FFMIN(nb_jobs, ctx->nb_threads);
173 atomic_store_explicit(&ctx->first_job, 0, memory_order_relaxed);
174 atomic_store_explicit(&ctx->current_job, ctx->nb_active_threads, memory_order_relaxed);
175 nb_workers = ctx->nb_active_threads;
176 if (!ctx->main_func || !execute_main)
177 nb_workers--;
178
179 for (i = 0; i < nb_workers; i++) {
180 WorkerContext *w = &ctx->workers[i];
181 pthread_mutex_lock(&w->mutex);
182 w->done = 0;
183 pthread_cond_signal(&w->cond);
184 pthread_mutex_unlock(&w->mutex);
185 }
186
187 if (ctx->main_func && execute_main)
188 ctx->main_func(ctx->priv);
189 else
190 is_last = run_jobs(ctx);
191
192 if (!is_last) {
193 pthread_mutex_lock(&ctx->done_mutex);
194 while (!ctx->done)
195 pthread_cond_wait(&ctx->done_cond, &ctx->done_mutex);
196 ctx->done = 0;
197 pthread_mutex_unlock(&ctx->done_mutex);
198 }
199 }
200
avpriv_slicethread_free(AVSliceThread ** pctx)201 void avpriv_slicethread_free(AVSliceThread **pctx)
202 {
203 AVSliceThread *ctx;
204 int nb_workers, i;
205
206 if (!pctx || !*pctx)
207 return;
208
209 ctx = *pctx;
210 nb_workers = ctx->nb_threads;
211 if (!ctx->main_func)
212 nb_workers--;
213
214 ctx->finished = 1;
215 for (i = 0; i < nb_workers; i++) {
216 WorkerContext *w = &ctx->workers[i];
217 pthread_mutex_lock(&w->mutex);
218 w->done = 0;
219 pthread_cond_signal(&w->cond);
220 pthread_mutex_unlock(&w->mutex);
221 }
222
223 for (i = 0; i < nb_workers; i++) {
224 WorkerContext *w = &ctx->workers[i];
225 pthread_join(w->thread, NULL);
226 pthread_cond_destroy(&w->cond);
227 pthread_mutex_destroy(&w->mutex);
228 }
229
230 pthread_cond_destroy(&ctx->done_cond);
231 pthread_mutex_destroy(&ctx->done_mutex);
232 av_freep(&ctx->workers);
233 av_freep(pctx);
234 }
235
236 #else /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */
237
avpriv_slicethread_create(AVSliceThread ** pctx,void * priv,void (* worker_func)(void * priv,int jobnr,int threadnr,int nb_jobs,int nb_threads),void (* main_func)(void * priv),int nb_threads)238 int avpriv_slicethread_create(AVSliceThread **pctx, void *priv,
239 void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads),
240 void (*main_func)(void *priv),
241 int nb_threads)
242 {
243 *pctx = NULL;
244 return AVERROR(ENOSYS);
245 }
246
avpriv_slicethread_execute(AVSliceThread * ctx,int nb_jobs,int execute_main)247 void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
248 {
249 av_assert0(0);
250 }
251
avpriv_slicethread_free(AVSliceThread ** pctx)252 void avpriv_slicethread_free(AVSliceThread **pctx)
253 {
254 av_assert0(!pctx || !*pctx);
255 }
256
257 #endif /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */
258