• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // SPDX-License-Identifier: GPL-2.0+ OR Apache-2.0
2 #include <pthread.h>
3 #include <stdlib.h>
4 #include "erofs/workqueue.h"
5 
worker_thread(void * arg)6 static void *worker_thread(void *arg)
7 {
8 	struct erofs_workqueue *wq = arg;
9 	struct erofs_work *work;
10 	void *tlsp = NULL;
11 
12 	if (wq->on_start)
13 		tlsp = (wq->on_start)(wq, NULL);
14 
15 	while (true) {
16 		pthread_mutex_lock(&wq->lock);
17 
18 		while (!wq->job_count && !wq->shutdown)
19 			pthread_cond_wait(&wq->cond_empty, &wq->lock);
20 		if (!wq->job_count && wq->shutdown) {
21 			pthread_mutex_unlock(&wq->lock);
22 			break;
23 		}
24 
25 		work = wq->head;
26 		wq->head = work->next;
27 		if (!wq->head)
28 			wq->tail = NULL;
29 		wq->job_count--;
30 
31 		if (wq->job_count == wq->max_jobs - 1)
32 			pthread_cond_broadcast(&wq->cond_full);
33 
34 		pthread_mutex_unlock(&wq->lock);
35 		work->fn(work, tlsp);
36 	}
37 
38 	if (wq->on_exit)
39 		(void)(wq->on_exit)(wq, tlsp);
40 	return NULL;
41 }
42 
erofs_destroy_workqueue(struct erofs_workqueue * wq)43 int erofs_destroy_workqueue(struct erofs_workqueue *wq)
44 {
45 	if (!wq)
46 		return -EINVAL;
47 
48 	pthread_mutex_lock(&wq->lock);
49 	wq->shutdown = true;
50 	pthread_cond_broadcast(&wq->cond_empty);
51 	pthread_mutex_unlock(&wq->lock);
52 
53 	while (wq->nworker) {
54 		int ret = -pthread_join(wq->workers[wq->nworker - 1], NULL);
55 
56 		if (ret)
57 			return ret;
58 		--wq->nworker;
59 	}
60 	free(wq->workers);
61 	pthread_mutex_destroy(&wq->lock);
62 	pthread_cond_destroy(&wq->cond_empty);
63 	pthread_cond_destroy(&wq->cond_full);
64 	return 0;
65 }
66 
erofs_alloc_workqueue(struct erofs_workqueue * wq,unsigned int nworker,unsigned int max_jobs,erofs_wq_func_t on_start,erofs_wq_func_t on_exit)67 int erofs_alloc_workqueue(struct erofs_workqueue *wq, unsigned int nworker,
68 			  unsigned int max_jobs, erofs_wq_func_t on_start,
69 			  erofs_wq_func_t on_exit)
70 {
71 	unsigned int i;
72 	int ret;
73 
74 	if (!wq || nworker <= 0 || max_jobs <= 0)
75 		return -EINVAL;
76 
77 	wq->head = wq->tail = NULL;
78 	wq->max_jobs = max_jobs;
79 	wq->job_count = 0;
80 	wq->shutdown = false;
81 	wq->on_start = on_start;
82 	wq->on_exit = on_exit;
83 	pthread_mutex_init(&wq->lock, NULL);
84 	pthread_cond_init(&wq->cond_empty, NULL);
85 	pthread_cond_init(&wq->cond_full, NULL);
86 
87 	wq->workers = malloc(nworker * sizeof(pthread_t));
88 	if (!wq->workers)
89 		return -ENOMEM;
90 
91 	for (i = 0; i < nworker; i++) {
92 		ret = -pthread_create(&wq->workers[i], NULL, worker_thread, wq);
93 		if (ret)
94 			break;
95 	}
96 	wq->nworker = i;
97 	if (ret)
98 		erofs_destroy_workqueue(wq);
99 	return ret;
100 }
101 
erofs_queue_work(struct erofs_workqueue * wq,struct erofs_work * work)102 int erofs_queue_work(struct erofs_workqueue *wq, struct erofs_work *work)
103 {
104 	if (!wq || !work)
105 		return -EINVAL;
106 
107 	pthread_mutex_lock(&wq->lock);
108 
109 	while (wq->job_count == wq->max_jobs)
110 		pthread_cond_wait(&wq->cond_full, &wq->lock);
111 
112 	work->next = NULL;
113 	if (!wq->head)
114 		wq->head = work;
115 	else
116 		wq->tail->next = work;
117 	wq->tail = work;
118 	wq->job_count++;
119 
120 	pthread_cond_signal(&wq->cond_empty);
121 	pthread_mutex_unlock(&wq->lock);
122 	return 0;
123 }
124