1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #ifndef _GNU_SOURCE
17 #define _GNU_SOURCE
18 #endif
19
20 #include "softbus_thread_pool.h"
21
22 #include "softbus_adapter_mem.h"
23 #include "softbus_errcode.h"
24 #include "softbus_log.h"
25
26 #ifndef MIN_STACK_SIZE
27 #define MIN_STACK_SIZE 0x8000
28 #endif
29 #define THREAD_POOL_NAME "SoftBusConnect"
30
31 typedef void *(*Runnable)(void *argv);
32 typedef struct ThreadAttr ThreadAttr;
33
34 struct ThreadAttr {
35 const char *name;
36 uint32_t stackSize;
37 SoftBusThreadPriority priority;
38 };
39
40 static int32_t CreateThread(Runnable run, void *argv, const ThreadAttr *attr, uint32_t *threadId);
41 static ThreadPool* CreateThreadPool(int32_t threadNum, int32_t queueMaxNum);
42 static void JobCheck(ThreadPool *pool, Job *job);
43 static void ThreadPoolWorker(void *arg);
44
CreateThread(Runnable run,void * argv,const ThreadAttr * attr,uint32_t * threadId)45 static int32_t CreateThread(Runnable run, void *argv, const ThreadAttr *attr, uint32_t *threadId)
46 {
47 SoftBusThreadAttr threadAttrInfo;
48 SoftBusThreadAttrInit(&threadAttrInfo);
49
50 threadAttrInfo.stackSize = (attr->stackSize | MIN_STACK_SIZE);
51 threadAttrInfo.prior = attr->priority;
52 #ifndef __aarch64__
53 threadAttrInfo.policy = SOFTBUS_SCHED_RR;
54 #endif
55 int32_t errCode = SoftBusThreadCreate((SoftBusThread *)threadId, &threadAttrInfo, run, argv);
56
57 return errCode;
58 }
59
CreateThreadPool(int32_t threadNum,int32_t queueMaxNum)60 static ThreadPool* CreateThreadPool(int32_t threadNum, int32_t queueMaxNum)
61 {
62 if (threadNum <= 0 || queueMaxNum <= 0) {
63 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Invalid para.");
64 return NULL;
65 }
66 ThreadPool *pool = (ThreadPool *)SoftBusCalloc(sizeof(ThreadPool));
67 if (pool == NULL) {
68 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Failed to malloc ThreadPool");
69 return NULL;
70 }
71 pool->threadNum = threadNum;
72 pool->queueMaxNum = queueMaxNum;
73 pool->queueCurNum = 0;
74 pool->head = NULL;
75 pool->tail = NULL;
76 if (SoftBusMutexInit(&(pool->mutex), NULL) != SOFTBUS_OK) {
77 SoftBusFree(pool);
78 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Failed to init mutex");
79 return NULL;
80 }
81 if (SoftBusCondInit(&(pool->queueEmpty)) != SOFTBUS_OK) {
82 SoftBusFree(pool);
83 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Failed to init cond queueEmpty");
84 return NULL;
85 }
86 if (SoftBusCondInit(&(pool->queueNotEmpty)) != SOFTBUS_OK) {
87 SoftBusFree(pool);
88 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Failed to init cond queueNotEmpty");
89 return NULL;
90 }
91 if (SoftBusCondInit(&(pool->queueNotFull)) != SOFTBUS_OK) {
92 SoftBusFree(pool);
93 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Failed to init cond queueNotFull");
94 return NULL;
95 }
96 return pool;
97 }
98
ThreadPoolInit(int32_t threadNum,int32_t queueMaxNum)99 ThreadPool *ThreadPoolInit(int32_t threadNum, int32_t queueMaxNum)
100 {
101 if (threadNum <= 0 || queueMaxNum <= 0) {
102 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Invalid para.");
103 return NULL;
104 }
105 ThreadPool *pool = CreateThreadPool(threadNum, queueMaxNum);
106 if (pool == NULL) {
107 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Failed to create thread pool");
108 return NULL;
109 }
110 int32_t countSuccess = 0;
111 pool->pthreads = (SoftBusThread *)SoftBusCalloc((int32_t)(sizeof(SoftBusThread) * threadNum));
112 if (pool->pthreads == NULL) {
113 SoftBusFree(pool);
114 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Failed to malloc pthreads");
115 return NULL;
116 }
117 pool->queueClose = 0;
118 pool->poolClose = 0;
119 if (SoftBusMutexLock(&(pool->mutex)) != 0) {
120 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "%s:lock failed", __func__);
121 goto EXIT;
122 }
123
124 for (int32_t i = 0; i < pool->threadNum; ++i) {
125 ThreadAttr attr = {"ThreadPoolWorker", 0, SOFTBUS_PRIORITY_LOWEST};
126 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_INFO, "create pthread now.");
127 if (CreateThread((Runnable)ThreadPoolWorker, (void *)pool, &attr, (uint32_t *)&(pool->pthreads[i])) != 0) {
128 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "create pthreads no. [%d] failed\n", i);
129 pool->pthreads[i] = (SoftBusThread)0;
130 } else {
131 ++countSuccess;
132 }
133 }
134
135 if (countSuccess < pool->threadNum) {
136 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Failed to create %d threads", pool->threadNum - countSuccess);
137 }
138 if (countSuccess == 0) {
139 SoftBusMutexUnlock(&pool->mutex);
140 goto EXIT;
141 }
142 SoftBusMutexUnlock(&(pool->mutex));
143 return pool;
144
145 EXIT:
146 SoftBusMutexDestroy(&pool->mutex);
147 SoftBusCondDestroy(&pool->queueEmpty);
148 SoftBusCondDestroy(&pool->queueNotEmpty);
149 SoftBusCondDestroy(&pool->queueNotFull);
150 SoftBusFree(pool->pthreads);
151 SoftBusFree(pool);
152 return NULL;
153 }
154
JobCheck(ThreadPool * pool,Job * job)155 static void JobCheck(ThreadPool *pool, Job *job)
156 {
157 if (pool->queueClose || pool->poolClose) {
158 job->runnable = false;
159 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_INFO, "Threadpool starts to close...");
160 }
161 if (job->jobMode == PERSISTENT && job->runnable == true) {
162 pool->queueCurNum++;
163 pool->tail->next = job;
164 pool->tail = job;
165 }
166 if (pool->queueCurNum == 0) {
167 pool->head = pool->tail = NULL;
168 } else {
169 pool->head = job->next;
170 }
171 if (pool->tail != NULL) {
172 pool->tail->next = NULL;
173 }
174 }
175
ThreadPoolWorker(void * arg)176 static void ThreadPoolWorker(void *arg)
177 {
178 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_INFO, "ThreadPoolWorker Start");
179 if (arg == NULL) {
180 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "ThreadPoolWorker arg is NULL");
181 return;
182 }
183 ThreadPool *pool = (ThreadPool *)arg;
184 Job *job = NULL;
185 SoftBusThreadSetName(SoftBusThreadGetSelf(), THREAD_POOL_NAME);
186 while (1) {
187 if (SoftBusMutexLock(&(pool->mutex)) != 0) {
188 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "%s:lock failed", __func__);
189 return;
190 }
191 while ((pool->queueCurNum == 0) && !pool->poolClose) {
192 SoftBusCondWait(&(pool->queueNotEmpty), &(pool->mutex), NULL);
193 }
194 if (pool->poolClose || pool->queueCurNum <= 0) {
195 SoftBusMutexUnlock(&(pool->mutex));
196 break;
197 }
198 pool->queueCurNum--;
199 job = pool->head;
200 if (SoftBusMutexLock(&(job->mutex)) != 0) {
201 pool->queueCurNum++;
202 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "%s:lock failed", __func__);
203 SoftBusMutexUnlock(&(pool->mutex));
204 continue;
205 }
206 JobCheck(pool, job);
207 if (pool->queueCurNum == 0) {
208 SoftBusCondSignal(&(pool->queueEmpty));
209 }
210 if (pool->queueCurNum == pool->queueMaxNum - 1) {
211 SoftBusCondBroadcast(&(pool->queueNotFull));
212 }
213 SoftBusMutexUnlock(&(pool->mutex));
214
215 // copy job task relative variables to run it after leave job mutex
216 bool runnable = job->runnable;
217 JobTask task = job->callbackFunction;
218 void *arguement = job->arg;
219 if (job->jobMode == ONCE || job->runnable == false) {
220 SoftBusMutexUnlock(&(job->mutex));
221 SoftBusMutexDestroy(&(job->mutex));
222 SoftBusFree(job);
223 job = NULL;
224 } else {
225 SoftBusMutexUnlock(&(job->mutex));
226 }
227 if (runnable) {
228 (void)(task(arguement));
229 }
230 }
231 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_INFO, "ThreadPoolWorker Exit");
232 }
233
CheckThreadPoolAddReady(ThreadPool * pool,int32_t (* callbackFunction)(void * arg))234 static int32_t CheckThreadPoolAddReady(ThreadPool *pool, int32_t (*callbackFunction)(void *arg))
235 {
236 if (pool == NULL || callbackFunction == NULL) {
237 return SOFTBUS_INVALID_PARAM;
238 }
239 if (SoftBusMutexLock(&(pool->mutex)) != 0) {
240 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "%s:lock failed", __func__);
241 return SOFTBUS_LOCK_ERR;
242 }
243 if (pool->queueCurNum == pool->queueMaxNum) {
244 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "queueCurNum equals queueMaxNum, just quit");
245 SoftBusMutexUnlock(&(pool->mutex));
246 return SOFTBUS_ERR;
247 }
248 while ((pool->queueCurNum == pool->queueMaxNum) && !(pool->queueClose || pool->poolClose)) {
249 SoftBusCondWait(&(pool->queueNotFull), &(pool->mutex), NULL);
250 }
251 if (pool->queueClose || pool->poolClose) {
252 SoftBusMutexUnlock(&(pool->mutex));
253 return SOFTBUS_ERR;
254 }
255 // will call SoftBusMutexUnlock in ThreadPoolAddJob
256 return SOFTBUS_OK;
257 }
258
ThreadPoolAddJob(ThreadPool * pool,int32_t (* callbackFunction)(void * arg),void * arg,JobMode jobMode,uintptr_t handle)259 int32_t ThreadPoolAddJob(ThreadPool *pool, int32_t (*callbackFunction)(void *arg), void *arg,
260 JobMode jobMode, uintptr_t handle)
261 {
262 int32_t ret = CheckThreadPoolAddReady(pool, callbackFunction);
263 if (ret != SOFTBUS_OK) {
264 return ret;
265 }
266 Job* job = pool->head;
267 while (job != NULL) {
268 if (job->handle == handle && job->runnable == true) {
269 SoftBusMutexUnlock(&(pool->mutex));
270 return SOFTBUS_ALREADY_EXISTED;
271 }
272 job = job->next;
273 }
274 job = (Job *)SoftBusCalloc(sizeof(Job));
275 if (job == NULL) {
276 SoftBusMutexUnlock(&(pool->mutex));
277 return SOFTBUS_MALLOC_ERR;
278 }
279 job->callbackFunction = callbackFunction;
280 job->arg = arg;
281 job->jobMode = jobMode;
282 job->handle = handle;
283 job->runnable = true;
284 job->next = NULL;
285 if (SoftBusMutexInit(&(job->mutex), NULL)) {
286 SoftBusFree(job);
287 SoftBusMutexUnlock(&(pool->mutex));
288 return SOFTBUS_ERR;
289 }
290 if (pool->head == NULL) {
291 pool->head = pool->tail = job;
292 SoftBusCondBroadcast(&(pool->queueNotEmpty));
293 } else {
294 pool->tail->next = job;
295 pool->tail = job;
296 }
297 pool->queueCurNum++;
298 SoftBusMutexUnlock(&(pool->mutex));
299 return SOFTBUS_OK;
300 }
301
ThreadPoolRemoveJob(ThreadPool * pool,uintptr_t handle)302 int32_t ThreadPoolRemoveJob(ThreadPool *pool, uintptr_t handle)
303 {
304 if (pool == NULL) {
305 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "ThreadPoolRemoveJob failed, pool == NULL");
306 return SOFTBUS_INVALID_PARAM;
307 }
308 if (SoftBusMutexLock(&(pool->mutex)) != 0) {
309 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "%s:lock failed", __func__);
310 return SOFTBUS_LOCK_ERR;
311 }
312 Job* job = pool->head;
313 while (job != NULL) {
314 if (job->handle == handle && job->jobMode == PERSISTENT && job->runnable == true) {
315 break;
316 }
317 job = job->next;
318 }
319 if (job != NULL && job->runnable == true && job->jobMode == PERSISTENT) {
320 if (SoftBusMutexLock(&(job->mutex)) != 0) {
321 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "%s:lock failed", __func__);
322 return SOFTBUS_LOCK_ERR;
323 }
324 job->runnable = false;
325 SoftBusMutexUnlock(&(job->mutex));
326 }
327 SoftBusMutexUnlock(&(pool->mutex));
328 return SOFTBUS_OK;
329 }
330
ThreadPoolDestroy(ThreadPool * pool)331 int32_t ThreadPoolDestroy(ThreadPool *pool)
332 {
333 if (pool == NULL) {
334 return SOFTBUS_INVALID_PARAM;
335 }
336 if (SoftBusMutexLock(&(pool->mutex)) != 0) {
337 SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "%s:lock failed", __func__);
338 return SOFTBUS_LOCK_ERR;
339 }
340 if (pool->queueClose || pool->poolClose) {
341 SoftBusMutexUnlock(&(pool->mutex));
342 return SOFTBUS_OK;
343 }
344 pool->queueClose = 1;
345 while (pool->queueCurNum != 0) {
346 SoftBusCondWait(&(pool->queueEmpty), &(pool->mutex), NULL);
347 }
348 pool->poolClose = 1;
349 SoftBusMutexUnlock(&(pool->mutex));
350 SoftBusCondBroadcast(&(pool->queueNotEmpty));
351 SoftBusCondBroadcast(&(pool->queueNotFull));
352 for (int32_t i = 0; i < pool->threadNum; ++i) {
353 if (pool->pthreads != NULL) {
354 SoftBusThreadJoin(pool->pthreads[i], NULL);
355 }
356 }
357 SoftBusMutexDestroy(&(pool->mutex));
358 SoftBusCondDestroy(&(pool->queueEmpty));
359 SoftBusCondDestroy(&(pool->queueNotEmpty));
360 SoftBusCondDestroy(&(pool->queueNotFull));
361 SoftBusFree(pool->pthreads);
362 Job* job = NULL;
363 while (pool->head != NULL) {
364 job = pool->head;
365 pool->head = job->next;
366 SoftBusFree(job);
367 }
368 SoftBusFree(pool);
369 return SOFTBUS_OK;
370 }
371