1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #ifndef _GNU_SOURCE
17 #define _GNU_SOURCE
18 #endif
19
20 #include "softbus_thread_pool.h"
21
22 #include "softbus_adapter_mem.h"
23 #include "softbus_errcode.h"
24 #include "softbus_log.h"
25
26 #ifndef MIN_STACK_SIZE
27 #define MIN_STACK_SIZE 0x8000
28 #endif
29 #define THREAD_POOL_NAME "THREAD_POOL"
30
31 typedef void *(*Runnable)(void *argv);
32 typedef struct ThreadAttr ThreadAttr;
33
34 struct ThreadAttr {
35 const char *name;
36 uint32_t stackSize;
37 SoftBusThreadPriority priority;
38 };
39
40 static int32_t CreateThread(Runnable run, void *argv, const ThreadAttr *attr, uint32_t *threadId);
41 static ThreadPool* CreateThreadPool(int32_t threadNum, int32_t queueMaxNum);
42 static void JobCheck(ThreadPool *pool, Job *job);
43 static void ThreadPoolWorker(void *arg);
44
CreateThread(Runnable run,void * argv,const ThreadAttr * attr,uint32_t * threadId)45 static int32_t CreateThread(Runnable run, void *argv, const ThreadAttr *attr, uint32_t *threadId)
46 {
47 SoftBusThreadAttr threadAttrInfo;
48 SoftBusThreadAttrInit(&threadAttrInfo);
49
50 threadAttrInfo.stackSize = (attr->stackSize | MIN_STACK_SIZE);
51 threadAttrInfo.prior = attr->priority;
52 threadAttrInfo.policy = SOFTBUS_SCHED_RR;
53 int32_t errCode = SoftBusThreadCreate((SoftBusThread *)threadId, &threadAttrInfo, run, argv);
54
55 return errCode;
56 }
57
CreateThreadPool(int32_t threadNum,int32_t queueMaxNum)58 static ThreadPool* CreateThreadPool(int32_t threadNum, int32_t queueMaxNum)
59 {
60 if (threadNum <= 0 || queueMaxNum <= 0) {
61 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Invalid para.");
62 return NULL;
63 }
64 ThreadPool *pool = (ThreadPool *)SoftBusCalloc(sizeof(ThreadPool));
65 if (pool == NULL) {
66 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Failed to malloc ThreadPool");
67 return NULL;
68 }
69 pool->threadNum = threadNum;
70 pool->queueMaxNum = queueMaxNum;
71 pool->queueCurNum = 0;
72 pool->head = NULL;
73 pool->tail = NULL;
74 if (SoftBusMutexInit(&(pool->mutex), NULL) != SOFTBUS_OK) {
75 SoftBusFree(pool);
76 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Failed to init mutex");
77 return NULL;
78 }
79 if (SoftBusCondInit(&(pool->queueEmpty)) != SOFTBUS_OK) {
80 SoftBusFree(pool);
81 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Failed to init cond queueEmpty");
82 return NULL;
83 }
84 if (SoftBusCondInit(&(pool->queueNotEmpty)) != SOFTBUS_OK) {
85 SoftBusFree(pool);
86 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Failed to init cond queueNotEmpty");
87 return NULL;
88 }
89 if (SoftBusCondInit(&(pool->queueNotFull)) != SOFTBUS_OK) {
90 SoftBusFree(pool);
91 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Failed to init cond queueNotFull");
92 return NULL;
93 }
94 return pool;
95 }
96
ThreadPoolInit(int32_t threadNum,int32_t queueMaxNum)97 ThreadPool *ThreadPoolInit(int32_t threadNum, int32_t queueMaxNum)
98 {
99 if (threadNum <= 0 || queueMaxNum <= 0) {
100 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Invalid para.");
101 return NULL;
102 }
103 ThreadPool *pool = CreateThreadPool(threadNum, queueMaxNum);
104 if (pool == NULL) {
105 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Failed to create thread pool");
106 return NULL;
107 }
108
109 pool->pthreads = (SoftBusThread *)SoftBusCalloc((int32_t)(sizeof(SoftBusThread) * threadNum));
110 if (pool->pthreads == NULL) {
111 SoftBusFree(pool);
112 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Failed to malloc pthreads");
113 return NULL;
114 }
115 pool->queueClose = 0;
116 pool->poolClose = 0;
117 if (SoftBusMutexLock(&(pool->mutex)) != 0) {
118 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "lock failed");
119 goto EXIT;
120 }
121
122 int32_t countSuccess = 0;
123 for (int32_t i = 0; i < pool->threadNum; ++i) {
124 ThreadAttr attr = {"ThreadPoolWorker", 0, SOFTBUS_PRIORITY_LOWEST};
125 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_INFO, "create pthread now.");
126 if (CreateThread((Runnable)ThreadPoolWorker, (void *)pool, &attr, (uint32_t *)&(pool->pthreads[i])) != 0) {
127 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "create pthreads no. [%d] failed\n", i);
128 pool->pthreads[i] = (SoftBusThread)0;
129 } else {
130 ++countSuccess;
131 }
132 }
133
134 if (countSuccess < pool->threadNum) {
135 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Failed to create %d threads", pool->threadNum - countSuccess);
136 }
137 if (countSuccess == 0) {
138 SoftBusMutexUnlock(&pool->mutex);
139 goto EXIT;
140 }
141 SoftBusMutexUnlock(&(pool->mutex));
142 return pool;
143
144 EXIT:
145 SoftBusMutexDestroy(&pool->mutex);
146 SoftBusCondDestroy(&pool->queueEmpty);
147 SoftBusCondDestroy(&pool->queueNotEmpty);
148 SoftBusCondDestroy(&pool->queueNotFull);
149 SoftBusFree(pool->pthreads);
150 SoftBusFree(pool);
151 return NULL;
152 }
153
JobCheck(ThreadPool * pool,Job * job)154 static void JobCheck(ThreadPool *pool, Job *job)
155 {
156 if (pool->queueClose || pool->poolClose) {
157 job->runnable = false;
158 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_INFO, "Threadpool starts to close...");
159 }
160 if (job->jobMode == PERSISTENT && job->runnable == true) {
161 pool->queueCurNum++;
162 pool->tail->next = job;
163 pool->tail = job;
164 }
165 if (pool->queueCurNum == 0) {
166 pool->head = pool->tail = NULL;
167 } else {
168 pool->head = job->next;
169 }
170 if (pool->tail != NULL) {
171 pool->tail->next = NULL;
172 }
173 }
174
ThreadPoolWorker(void * arg)175 static void ThreadPoolWorker(void *arg)
176 {
177 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_INFO, "ThreadPoolWorker Start");
178 if (arg == NULL) {
179 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "ThreadPoolWorker arg is NULL");
180 return;
181 }
182 ThreadPool *pool = (ThreadPool *)arg;
183 Job *job = NULL;
184 SoftBusThreadSetName(SoftBusThreadGetSelf(), THREAD_POOL_NAME);
185 while (1) {
186 if (SoftBusMutexLock(&(pool->mutex)) != 0) {
187 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "lock failed");
188 return;
189 }
190 while ((pool->queueCurNum == 0) && !pool->poolClose) {
191 SoftBusCondWait(&(pool->queueNotEmpty), &(pool->mutex), NULL);
192 }
193 if (pool->poolClose || pool->queueCurNum <= 0) {
194 SoftBusMutexUnlock(&(pool->mutex));
195 break;
196 }
197 pool->queueCurNum--;
198 job = pool->head;
199 if (SoftBusMutexLock(&(job->mutex)) != 0) {
200 pool->queueCurNum++;
201 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "lock failed");
202 SoftBusMutexUnlock(&(pool->mutex));
203 continue;
204 }
205 JobCheck(pool, job);
206 if (pool->queueCurNum == 0) {
207 SoftBusCondSignal(&(pool->queueEmpty));
208 }
209 if (pool->queueCurNum == pool->queueMaxNum - 1) {
210 SoftBusCondBroadcast(&(pool->queueNotFull));
211 }
212 SoftBusMutexUnlock(&(pool->mutex));
213 if (job->runnable) {
214 (void)(*(job->callbackFunction))(job->arg);
215 }
216 if (job->jobMode == ONCE || job->runnable == false) {
217 SoftBusMutexUnlock(&(job->mutex));
218 SoftBusMutexDestroy(&(job->mutex));
219 SoftBusFree(job);
220 job = NULL;
221 }
222 if (job != NULL) {
223 SoftBusMutexUnlock(&(job->mutex));
224 }
225 }
226 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_INFO, "ThreadPoolWorker Exit");
227 }
228
CheckThreadPoolAddReady(ThreadPool * pool,int32_t (* callbackFunction)(void * arg))229 static int32_t CheckThreadPoolAddReady(ThreadPool *pool, int32_t (*callbackFunction)(void *arg))
230 {
231 if (pool == NULL || callbackFunction == NULL) {
232 return SOFTBUS_INVALID_PARAM;
233 }
234 if (SoftBusMutexLock(&(pool->mutex)) != 0) {
235 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "lock failed");
236 return SOFTBUS_LOCK_ERR;
237 }
238 if (pool->queueCurNum == pool->queueMaxNum) {
239 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "queueCurNum equals queueMaxNum, just quit");
240 SoftBusMutexUnlock(&(pool->mutex));
241 return SOFTBUS_ERR;
242 }
243 while ((pool->queueCurNum == pool->queueMaxNum) && !(pool->queueClose || pool->poolClose)) {
244 SoftBusCondWait(&(pool->queueNotFull), &(pool->mutex), NULL);
245 }
246 if (pool->queueClose || pool->poolClose) {
247 SoftBusMutexUnlock(&(pool->mutex));
248 return SOFTBUS_ERR;
249 }
250 // will call SoftBusMutexUnlock in ThreadPoolAddJob
251 return SOFTBUS_OK;
252 }
253
ThreadPoolAddJob(ThreadPool * pool,int32_t (* callbackFunction)(void * arg),void * arg,JobMode jobMode,uintptr_t handle)254 int32_t ThreadPoolAddJob(ThreadPool *pool, int32_t (*callbackFunction)(void *arg), void *arg,
255 JobMode jobMode, uintptr_t handle)
256 {
257 int32_t ret = CheckThreadPoolAddReady(pool, callbackFunction);
258 if (ret != SOFTBUS_OK) {
259 return ret;
260 }
261 Job* job = pool->head;
262 while (job != NULL) {
263 if (job->handle == handle && job->runnable == true) {
264 SoftBusMutexUnlock(&(pool->mutex));
265 return SOFTBUS_ALREADY_EXISTED;
266 }
267 job = job->next;
268 }
269 job = (Job *)SoftBusCalloc(sizeof(Job));
270 if (job == NULL) {
271 SoftBusMutexUnlock(&(pool->mutex));
272 return SOFTBUS_MALLOC_ERR;
273 }
274 job->callbackFunction = callbackFunction;
275 job->arg = arg;
276 job->jobMode = jobMode;
277 job->handle = handle;
278 job->runnable = true;
279 job->next = NULL;
280 if (SoftBusMutexInit(&(job->mutex), NULL)) {
281 SoftBusFree(job);
282 SoftBusMutexUnlock(&(pool->mutex));
283 return SOFTBUS_ERR;
284 }
285 if (pool->head == NULL) {
286 pool->head = pool->tail = job;
287 SoftBusCondBroadcast(&(pool->queueNotEmpty));
288 } else {
289 pool->tail->next = job;
290 pool->tail = job;
291 }
292 pool->queueCurNum++;
293 SoftBusMutexUnlock(&(pool->mutex));
294 return SOFTBUS_OK;
295 }
296
ThreadPoolRemoveJob(ThreadPool * pool,uintptr_t handle)297 int32_t ThreadPoolRemoveJob(ThreadPool *pool, uintptr_t handle)
298 {
299 if (pool == NULL) {
300 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "ThreadPoolRemoveJob failed, pool == NULL");
301 return SOFTBUS_INVALID_PARAM;
302 }
303 if (SoftBusMutexLock(&(pool->mutex)) != 0) {
304 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "lock failed");
305 return SOFTBUS_LOCK_ERR;
306 }
307 Job* job = pool->head;
308 while (job != NULL) {
309 if (job->handle == handle && job->jobMode == PERSISTENT && job->runnable == true) {
310 break;
311 }
312 job = job->next;
313 }
314 if (job != NULL && job->runnable == true && job->jobMode == PERSISTENT) {
315 if (SoftBusMutexLock(&(job->mutex)) != 0) {
316 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "lock failed");
317 SoftBusMutexUnlock(&(job->mutex));
318 return SOFTBUS_LOCK_ERR;
319 }
320 job->runnable = false;
321 SoftBusMutexUnlock(&(job->mutex));
322 }
323 SoftBusMutexUnlock(&(pool->mutex));
324 return SOFTBUS_OK;
325 }
326
ThreadPoolDestroy(ThreadPool * pool)327 int32_t ThreadPoolDestroy(ThreadPool *pool)
328 {
329 if (pool == NULL) {
330 return SOFTBUS_INVALID_PARAM;
331 }
332 if (SoftBusMutexLock(&(pool->mutex)) != 0) {
333 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "lock failed");
334 return SOFTBUS_LOCK_ERR;
335 }
336 if (pool->queueClose || pool->poolClose) {
337 SoftBusMutexUnlock(&(pool->mutex));
338 return SOFTBUS_OK;
339 }
340 pool->queueClose = 1;
341 while (pool->queueCurNum != 0) {
342 SoftBusCondWait(&(pool->queueEmpty), &(pool->mutex), NULL);
343 }
344 pool->poolClose = 1;
345 SoftBusMutexUnlock(&(pool->mutex));
346 SoftBusCondBroadcast(&(pool->queueNotEmpty));
347 SoftBusCondBroadcast(&(pool->queueNotFull));
348 for (int32_t i = 0; i < pool->threadNum; ++i) {
349 if (pool->pthreads != NULL) {
350 SoftBusThreadJoin(pool->pthreads[i], NULL);
351 }
352 }
353 SoftBusMutexDestroy(&(pool->mutex));
354 SoftBusCondDestroy(&(pool->queueEmpty));
355 SoftBusCondDestroy(&(pool->queueNotEmpty));
356 SoftBusCondDestroy(&(pool->queueNotFull));
357 SoftBusFree(pool->pthreads);
358 Job* job = NULL;
359 while (pool->head != NULL) {
360 job = pool->head;
361 pool->head = job->next;
362 SoftBusFree(job);
363 }
364 SoftBusFree(pool);
365 return SOFTBUS_OK;
366 }
367