1 /*
2  * Copyright (C) 2016-2020 Yann Collet, Facebook, Inc.
3  * All rights reserved.
4  *
5  * This source code is licensed under both the BSD-style license (found in the
6  * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7  * in the COPYING file in the root directory of this source tree).
8  * You may select, at your option, one of the above-listed licenses.
9  */
10 
11 
12 /* ======   Dependencies   ======= */
13 #include <stddef.h>    /* size_t */
14 #include <stdlib.h>    /* malloc, calloc, free */
15 #include <string.h>    /* memcpy */
16 #include <assert.h>
17 
18 #include "pool.h"
19 
20 
21 /* ======   Compiler specifics   ====== */
22 #if defined(_MSC_VER)
23 #  pragma warning(disable : 4204)        /* disable: C4204: non-constant aggregate initializer */
24 #endif
25 
26 
27 /* ===  Build Macro  === */
28 
29 #ifndef POOL_MT   // can be defined on command line
30 #  define POOL_MT 1
31 #endif
32 
33 
34 /* ===  Implementation  === */
35 
36 #if POOL_MT
37 
38 #include "threading.h"   /* pthread adaptation */
39 
40 /* A job is a function and an opaque argument */
41 typedef struct POOL_job_s {
42     POOL_function function;
43     void *opaque;
44 } POOL_job;
45 
46 struct POOL_ctx_s {
47     /* Keep track of the threads */
48     ZSTD_pthread_t* threads;
49     size_t threadCapacity;
50     size_t threadLimit;
51 
52     /* The queue is a circular buffer */
53     POOL_job *queue;
54     size_t queueHead;
55     size_t queueTail;
56     size_t queueSize;
57 
58     /* The number of threads working on jobs */
59     size_t numThreadsBusy;
60     /* Indicates if the queue is empty */
61     int queueEmpty;
62 
63     /* The mutex protects the queue */
64     ZSTD_pthread_mutex_t queueMutex;
65     /* Condition variable for pushers to wait on when the queue is full */
66     ZSTD_pthread_cond_t queuePushCond;
67     /* Condition variables for poppers to wait on when the queue is empty */
68     ZSTD_pthread_cond_t queuePopCond;
69     /* Indicates if the queue is shutting down */
70     int shutdown;
71 };
72 
73 /* POOL_thread() :
74  * Work thread for the thread pool.
75  * Waits for jobs and executes them.
76  * @returns : NULL on failure else non-null.
77  */
POOL_thread(void * opaque)78 static void* POOL_thread(void* opaque)
79 {
80     POOL_ctx* const ctx = (POOL_ctx*)opaque;
81     if (!ctx) { return NULL; }
82     for (;;) {
83         /* Lock the mutex and wait for a non-empty queue or until shutdown */
84         ZSTD_pthread_mutex_lock(&ctx->queueMutex);
85 
86         while ( ctx->queueEmpty
87             || (ctx->numThreadsBusy >= ctx->threadLimit) ) {
88             if (ctx->shutdown) {
89                 /* even if !queueEmpty, (possible if numThreadsBusy >= threadLimit),
90                  * a few threads will be shutdown while !queueEmpty,
91                  * but enough threads will remain active to finish the queue */
92                 ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
93                 return opaque;
94             }
95             ZSTD_pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
96         }
97         /* Pop a job off the queue */
98         {   POOL_job const job = ctx->queue[ctx->queueHead];
99             ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;
100             ctx->numThreadsBusy++;
101             ctx->queueEmpty = ctx->queueHead == ctx->queueTail;
102             /* Unlock the mutex, signal a pusher, and run the job */
103             ZSTD_pthread_cond_signal(&ctx->queuePushCond);
104             ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
105 
106             job.function(job.opaque);
107 
108             /* If the intended queue size was 0, signal after finishing job */
109             ZSTD_pthread_mutex_lock(&ctx->queueMutex);
110             ctx->numThreadsBusy--;
111             if (ctx->queueSize == 1) {
112                 ZSTD_pthread_cond_signal(&ctx->queuePushCond);
113             }
114             ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
115         }
116     }  /* for (;;) */
117     assert(0);  /* Unreachable */
118 }
119 
POOL_create(size_t numThreads,size_t queueSize)120 POOL_ctx* POOL_create(size_t numThreads, size_t queueSize)
121 {
122     POOL_ctx* ctx;
123     /* Check parameters */
124     if (!numThreads) { return NULL; }
125     /* Allocate the context and zero initialize */
126     ctx = (POOL_ctx*)calloc(1, sizeof(POOL_ctx));
127     if (!ctx) { return NULL; }
128     /* Initialize the job queue.
129      * It needs one extra space since one space is wasted to differentiate
130      * empty and full queues.
131      */
132     ctx->queueSize = queueSize + 1;
133     ctx->queue = (POOL_job*)malloc(ctx->queueSize * sizeof(POOL_job));
134     ctx->queueHead = 0;
135     ctx->queueTail = 0;
136     ctx->numThreadsBusy = 0;
137     ctx->queueEmpty = 1;
138     (void)ZSTD_pthread_mutex_init(&ctx->queueMutex, NULL);
139     (void)ZSTD_pthread_cond_init(&ctx->queuePushCond, NULL);
140     (void)ZSTD_pthread_cond_init(&ctx->queuePopCond, NULL);
141     ctx->shutdown = 0;
142     /* Allocate space for the thread handles */
143     ctx->threads = (ZSTD_pthread_t*)malloc(numThreads * sizeof(ZSTD_pthread_t));
144     ctx->threadCapacity = 0;
145     /* Check for errors */
146     if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; }
147     /* Initialize the threads */
148     {   size_t i;
149         for (i = 0; i < numThreads; ++i) {
150             if (ZSTD_pthread_create(&ctx->threads[i], NULL, &POOL_thread, ctx)) {
151                 ctx->threadCapacity = i;
152                 POOL_free(ctx);
153                 return NULL;
154         }   }
155         ctx->threadCapacity = numThreads;
156         ctx->threadLimit = numThreads;
157     }
158     return ctx;
159 }
160 
161 /*! POOL_join() :
162     Shutdown the queue, wake any sleeping threads, and join all of the threads.
163 */
POOL_join(POOL_ctx * ctx)164 static void POOL_join(POOL_ctx* ctx) {
165     /* Shut down the queue */
166     ZSTD_pthread_mutex_lock(&ctx->queueMutex);
167     ctx->shutdown = 1;
168     ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
169 
170     /* Wake up sleeping threads */
171     ZSTD_pthread_cond_broadcast(&ctx->queuePushCond);
172     ZSTD_pthread_cond_broadcast(&ctx->queuePopCond);
173 
174     /* Join all of the threads */
175     {   size_t i;
176         for (i = 0; i < ctx->threadCapacity; ++i) {
177             ZSTD_pthread_join(ctx->threads[i], NULL);  /* note : could fail */
178     }   }
179 }
180 
POOL_free(POOL_ctx * ctx)181 void POOL_free(POOL_ctx *ctx) {
182     if (!ctx) { return; }
183     POOL_join(ctx);
184     ZSTD_pthread_mutex_destroy(&ctx->queueMutex);
185     ZSTD_pthread_cond_destroy(&ctx->queuePushCond);
186     ZSTD_pthread_cond_destroy(&ctx->queuePopCond);
187     free(ctx->queue);
188     free(ctx->threads);
189     free(ctx);
190 }
191 
192 
193 
POOL_sizeof(POOL_ctx * ctx)194 size_t POOL_sizeof(POOL_ctx *ctx) {
195     if (ctx==NULL) return 0;  /* supports sizeof NULL */
196     return sizeof(*ctx)
197         + ctx->queueSize * sizeof(POOL_job)
198         + ctx->threadCapacity * sizeof(ZSTD_pthread_t);
199 }
200 
201 
202 /* @return : 0 on success, 1 on error */
POOL_resize_internal(POOL_ctx * ctx,size_t numThreads)203 static int POOL_resize_internal(POOL_ctx* ctx, size_t numThreads)
204 {
205     if (numThreads <= ctx->threadCapacity) {
206         if (!numThreads) return 1;
207         ctx->threadLimit = numThreads;
208         return 0;
209     }
210     /* numThreads > threadCapacity */
211     {   ZSTD_pthread_t* const threadPool = (ZSTD_pthread_t*)malloc(numThreads * sizeof(ZSTD_pthread_t));
212         if (!threadPool) return 1;
213         /* replace existing thread pool */
214         memcpy(threadPool, ctx->threads, ctx->threadCapacity * sizeof(*threadPool));
215         free(ctx->threads);
216         ctx->threads = threadPool;
217         /* Initialize additional threads */
218         {   size_t threadId;
219             for (threadId = ctx->threadCapacity; threadId < numThreads; ++threadId) {
220                 if (ZSTD_pthread_create(&threadPool[threadId], NULL, &POOL_thread, ctx)) {
221                     ctx->threadCapacity = threadId;
222                     return 1;
223             }   }
224     }   }
225     /* successfully expanded */
226     ctx->threadCapacity = numThreads;
227     ctx->threadLimit = numThreads;
228     return 0;
229 }
230 
231 /* @return : 0 on success, 1 on error */
POOL_resize(POOL_ctx * ctx,size_t numThreads)232 int POOL_resize(POOL_ctx* ctx, size_t numThreads)
233 {
234     int result;
235     if (ctx==NULL) return 1;
236     ZSTD_pthread_mutex_lock(&ctx->queueMutex);
237     result = POOL_resize_internal(ctx, numThreads);
238     ZSTD_pthread_cond_broadcast(&ctx->queuePopCond);
239     ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
240     return result;
241 }
242 
243 /**
244  * Returns 1 if the queue is full and 0 otherwise.
245  *
246  * When queueSize is 1 (pool was created with an intended queueSize of 0),
247  * then a queue is empty if there is a thread free _and_ no job is waiting.
248  */
isQueueFull(POOL_ctx const * ctx)249 static int isQueueFull(POOL_ctx const* ctx) {
250     if (ctx->queueSize > 1) {
251         return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize);
252     } else {
253         return (ctx->numThreadsBusy == ctx->threadLimit) ||
254                !ctx->queueEmpty;
255     }
256 }
257 
258 
POOL_add_internal(POOL_ctx * ctx,POOL_function function,void * opaque)259 static void POOL_add_internal(POOL_ctx* ctx, POOL_function function, void *opaque)
260 {
261     POOL_job const job = {function, opaque};
262     assert(ctx != NULL);
263     if (ctx->shutdown) return;
264 
265     ctx->queueEmpty = 0;
266     ctx->queue[ctx->queueTail] = job;
267     ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize;
268     ZSTD_pthread_cond_signal(&ctx->queuePopCond);
269 }
270 
POOL_add(POOL_ctx * ctx,POOL_function function,void * opaque)271 void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque)
272 {
273     assert(ctx != NULL);
274     ZSTD_pthread_mutex_lock(&ctx->queueMutex);
275     /* Wait until there is space in the queue for the new job */
276     while (isQueueFull(ctx) && (!ctx->shutdown)) {
277         ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
278     }
279     POOL_add_internal(ctx, function, opaque);
280     ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
281 }
282 
283 
POOL_tryAdd(POOL_ctx * ctx,POOL_function function,void * opaque)284 int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque)
285 {
286     assert(ctx != NULL);
287     ZSTD_pthread_mutex_lock(&ctx->queueMutex);
288     if (isQueueFull(ctx)) {
289         ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
290         return 0;
291     }
292     POOL_add_internal(ctx, function, opaque);
293     ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
294     return 1;
295 }
296 
297 
298 #else  /* POOL_MT  not defined */
299 
300 /* ========================== */
301 /* No multi-threading support */
302 /* ========================== */
303 
304 
305 /* We don't need any data, but if it is empty, malloc() might return NULL. */
306 struct POOL_ctx_s {
307     int dummy;
308 };
309 static POOL_ctx g_ctx;
310 
POOL_create(size_t numThreads,size_t queueSize)311 POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) {
312     (void)numThreads;
313     (void)queueSize;
314     return &g_ctx;
315 }
316 
POOL_free(POOL_ctx * ctx)317 void POOL_free(POOL_ctx* ctx) {
318     assert(!ctx || ctx == &g_ctx);
319     (void)ctx;
320 }
321 
POOL_resize(POOL_ctx * ctx,size_t numThreads)322 int POOL_resize(POOL_ctx* ctx, size_t numThreads) {
323     (void)ctx; (void)numThreads;
324     return 0;
325 }
326 
POOL_add(POOL_ctx * ctx,POOL_function function,void * opaque)327 void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque) {
328     (void)ctx;
329     function(opaque);
330 }
331 
POOL_tryAdd(POOL_ctx * ctx,POOL_function function,void * opaque)332 int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque) {
333     (void)ctx;
334     function(opaque);
335     return 1;
336 }
337 
POOL_sizeof(POOL_ctx * ctx)338 size_t POOL_sizeof(POOL_ctx* ctx) {
339     if (ctx==NULL) return 0;  /* supports sizeof NULL */
340     assert(ctx == &g_ctx);
341     return sizeof(*ctx);
342 }
343 
344 #endif  /* ZSTD_MULTITHREAD */
345