1 // SPDX-License-Identifier: GPL-2.0+ OR Apache-2.0
2 #include <pthread.h>
3 #include <stdlib.h>
4 #include "erofs/workqueue.h"
5
worker_thread(void * arg)6 static void *worker_thread(void *arg)
7 {
8 struct erofs_workqueue *wq = arg;
9 struct erofs_work *work;
10 void *tlsp = NULL;
11
12 if (wq->on_start)
13 tlsp = (wq->on_start)(wq, NULL);
14
15 while (true) {
16 pthread_mutex_lock(&wq->lock);
17
18 while (!wq->job_count && !wq->shutdown)
19 pthread_cond_wait(&wq->cond_empty, &wq->lock);
20 if (!wq->job_count && wq->shutdown) {
21 pthread_mutex_unlock(&wq->lock);
22 break;
23 }
24
25 work = wq->head;
26 wq->head = work->next;
27 if (!wq->head)
28 wq->tail = NULL;
29 wq->job_count--;
30
31 if (wq->job_count == wq->max_jobs - 1)
32 pthread_cond_broadcast(&wq->cond_full);
33
34 pthread_mutex_unlock(&wq->lock);
35 work->fn(work, tlsp);
36 }
37
38 if (wq->on_exit)
39 (void)(wq->on_exit)(wq, tlsp);
40 return NULL;
41 }
42
erofs_destroy_workqueue(struct erofs_workqueue * wq)43 int erofs_destroy_workqueue(struct erofs_workqueue *wq)
44 {
45 if (!wq)
46 return -EINVAL;
47
48 pthread_mutex_lock(&wq->lock);
49 wq->shutdown = true;
50 pthread_cond_broadcast(&wq->cond_empty);
51 pthread_mutex_unlock(&wq->lock);
52
53 while (wq->nworker) {
54 int ret = -pthread_join(wq->workers[wq->nworker - 1], NULL);
55
56 if (ret)
57 return ret;
58 --wq->nworker;
59 }
60 free(wq->workers);
61 pthread_mutex_destroy(&wq->lock);
62 pthread_cond_destroy(&wq->cond_empty);
63 pthread_cond_destroy(&wq->cond_full);
64 return 0;
65 }
66
erofs_alloc_workqueue(struct erofs_workqueue * wq,unsigned int nworker,unsigned int max_jobs,erofs_wq_func_t on_start,erofs_wq_func_t on_exit)67 int erofs_alloc_workqueue(struct erofs_workqueue *wq, unsigned int nworker,
68 unsigned int max_jobs, erofs_wq_func_t on_start,
69 erofs_wq_func_t on_exit)
70 {
71 unsigned int i;
72 int ret;
73
74 if (!wq || nworker <= 0 || max_jobs <= 0)
75 return -EINVAL;
76
77 wq->head = wq->tail = NULL;
78 wq->max_jobs = max_jobs;
79 wq->job_count = 0;
80 wq->shutdown = false;
81 wq->on_start = on_start;
82 wq->on_exit = on_exit;
83 pthread_mutex_init(&wq->lock, NULL);
84 pthread_cond_init(&wq->cond_empty, NULL);
85 pthread_cond_init(&wq->cond_full, NULL);
86
87 wq->workers = malloc(nworker * sizeof(pthread_t));
88 if (!wq->workers)
89 return -ENOMEM;
90
91 for (i = 0; i < nworker; i++) {
92 ret = -pthread_create(&wq->workers[i], NULL, worker_thread, wq);
93 if (ret)
94 break;
95 }
96 wq->nworker = i;
97 if (ret)
98 erofs_destroy_workqueue(wq);
99 return ret;
100 }
101
erofs_queue_work(struct erofs_workqueue * wq,struct erofs_work * work)102 int erofs_queue_work(struct erofs_workqueue *wq, struct erofs_work *work)
103 {
104 if (!wq || !work)
105 return -EINVAL;
106
107 pthread_mutex_lock(&wq->lock);
108
109 while (wq->job_count == wq->max_jobs)
110 pthread_cond_wait(&wq->cond_full, &wq->lock);
111
112 work->next = NULL;
113 if (!wq->head)
114 wq->head = work;
115 else
116 wq->tail->next = work;
117 wq->tail = work;
118 wq->job_count++;
119
120 pthread_cond_signal(&wq->cond_empty);
121 pthread_mutex_unlock(&wq->lock);
122 return 0;
123 }
124