• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef _GNU_SOURCE
17 #define _GNU_SOURCE
18 #endif
19 
20 #include "softbus_thread_pool.h"
21 
22 #include "softbus_adapter_mem.h"
23 #include "softbus_errcode.h"
24 #include "softbus_log.h"
25 
26 #ifndef MIN_STACK_SIZE
27 #define MIN_STACK_SIZE 0x8000
28 #endif
29 #define THREAD_POOL_NAME "THREAD_POOL"
30 
31 typedef void *(*Runnable)(void *argv);
32 typedef struct ThreadAttr ThreadAttr;
33 
34 struct ThreadAttr {
35     const char *name;
36     uint32_t stackSize;
37     SoftBusThreadPriority priority;
38 };
39 
40 static int32_t CreateThread(Runnable run, void *argv, const ThreadAttr *attr, uint32_t *threadId);
41 static ThreadPool* CreateThreadPool(int32_t threadNum, int32_t queueMaxNum);
42 static void JobCheck(ThreadPool *pool, Job *job);
43 static void ThreadPoolWorker(void *arg);
44 
CreateThread(Runnable run,void * argv,const ThreadAttr * attr,uint32_t * threadId)45 static int32_t CreateThread(Runnable run, void *argv, const ThreadAttr *attr, uint32_t *threadId)
46 {
47     SoftBusThreadAttr threadAttrInfo;
48     SoftBusThreadAttrInit(&threadAttrInfo);
49 
50     threadAttrInfo.stackSize = (attr->stackSize | MIN_STACK_SIZE);
51     threadAttrInfo.prior = attr->priority;
52     threadAttrInfo.policy = SOFTBUS_SCHED_RR;
53     int32_t errCode = SoftBusThreadCreate((SoftBusThread *)threadId, &threadAttrInfo, run, argv);
54 
55     return errCode;
56 }
57 
CreateThreadPool(int32_t threadNum,int32_t queueMaxNum)58 static ThreadPool* CreateThreadPool(int32_t threadNum, int32_t queueMaxNum)
59 {
60     if (threadNum <= 0 || queueMaxNum <= 0) {
61         SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Invalid para.");
62         return NULL;
63     }
64     ThreadPool *pool = (ThreadPool *)SoftBusCalloc(sizeof(ThreadPool));
65     if (pool == NULL) {
66         SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Failed to malloc ThreadPool");
67         return NULL;
68     }
69     pool->threadNum = threadNum;
70     pool->queueMaxNum = queueMaxNum;
71     pool->queueCurNum = 0;
72     pool->head = NULL;
73     pool->tail = NULL;
74     if (SoftBusMutexInit(&(pool->mutex), NULL) != SOFTBUS_OK) {
75         SoftBusFree(pool);
76         SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Failed to init mutex");
77         return NULL;
78     }
79     if (SoftBusCondInit(&(pool->queueEmpty)) != SOFTBUS_OK) {
80         SoftBusFree(pool);
81         SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Failed to init cond queueEmpty");
82         return NULL;
83     }
84     if (SoftBusCondInit(&(pool->queueNotEmpty)) != SOFTBUS_OK) {
85         SoftBusFree(pool);
86         SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Failed to init cond queueNotEmpty");
87         return NULL;
88     }
89     if (SoftBusCondInit(&(pool->queueNotFull)) != SOFTBUS_OK) {
90         SoftBusFree(pool);
91         SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Failed to init cond queueNotFull");
92         return NULL;
93     }
94     return pool;
95 }
96 
ThreadPoolInit(int32_t threadNum,int32_t queueMaxNum)97 ThreadPool *ThreadPoolInit(int32_t threadNum, int32_t queueMaxNum)
98 {
99     if (threadNum <= 0 || queueMaxNum <= 0) {
100         SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Invalid para.");
101         return NULL;
102     }
103     ThreadPool *pool = CreateThreadPool(threadNum, queueMaxNum);
104     if (pool == NULL) {
105         SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Failed to create thread pool");
106         return NULL;
107     }
108 
109     pool->pthreads = (SoftBusThread *)SoftBusCalloc((int32_t)(sizeof(SoftBusThread) * threadNum));
110     if (pool->pthreads == NULL) {
111         SoftBusFree(pool);
112         SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Failed to malloc pthreads");
113         return NULL;
114     }
115     pool->queueClose = 0;
116     pool->poolClose = 0;
117     if (SoftBusMutexLock(&(pool->mutex)) != 0) {
118         SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "lock failed");
119         goto EXIT;
120     }
121 
122     int32_t countSuccess = 0;
123     for (int32_t i = 0; i < pool->threadNum; ++i) {
124         ThreadAttr attr = {"ThreadPoolWorker", 0, SOFTBUS_PRIORITY_LOWEST};
125         SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_INFO, "create pthread now.");
126         if (CreateThread((Runnable)ThreadPoolWorker, (void *)pool, &attr, (uint32_t *)&(pool->pthreads[i])) != 0) {
127             SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "create pthreads no. [%d] failed\n", i);
128             pool->pthreads[i] = (SoftBusThread)0;
129         } else {
130             ++countSuccess;
131         }
132     }
133 
134     if (countSuccess < pool->threadNum) {
135         SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Failed to create %d threads", pool->threadNum - countSuccess);
136     }
137     if (countSuccess == 0) {
138         SoftBusMutexUnlock(&pool->mutex);
139         goto EXIT;
140     }
141     SoftBusMutexUnlock(&(pool->mutex));
142     return pool;
143 
144 EXIT:
145     SoftBusMutexDestroy(&pool->mutex);
146     SoftBusCondDestroy(&pool->queueEmpty);
147     SoftBusCondDestroy(&pool->queueNotEmpty);
148     SoftBusCondDestroy(&pool->queueNotFull);
149     SoftBusFree(pool->pthreads);
150     SoftBusFree(pool);
151     return NULL;
152 }
153 
JobCheck(ThreadPool * pool,Job * job)154 static void JobCheck(ThreadPool *pool, Job *job)
155 {
156     if (pool->queueClose || pool->poolClose) {
157         job->runnable = false;
158         SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_INFO, "Threadpool starts to close...");
159     }
160     if (job->jobMode == PERSISTENT && job->runnable == true) {
161         pool->queueCurNum++;
162         pool->tail->next = job;
163         pool->tail = job;
164     }
165     if (pool->queueCurNum == 0) {
166         pool->head = pool->tail = NULL;
167     } else {
168         pool->head = job->next;
169     }
170     if (pool->tail != NULL) {
171         pool->tail->next = NULL;
172     }
173 }
174 
ThreadPoolWorker(void * arg)175 static void ThreadPoolWorker(void *arg)
176 {
177     SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_INFO, "ThreadPoolWorker Start");
178     if (arg == NULL) {
179         SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "ThreadPoolWorker arg is NULL");
180         return;
181     }
182     ThreadPool *pool = (ThreadPool *)arg;
183     Job *job = NULL;
184     SoftBusThreadSetName(SoftBusThreadGetSelf(), THREAD_POOL_NAME);
185     while (1) {
186         if (SoftBusMutexLock(&(pool->mutex)) != 0) {
187             SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "lock failed");
188             return;
189         }
190         while ((pool->queueCurNum == 0) && !pool->poolClose) {
191             SoftBusCondWait(&(pool->queueNotEmpty), &(pool->mutex), NULL);
192         }
193         if (pool->poolClose || pool->queueCurNum <= 0) {
194             SoftBusMutexUnlock(&(pool->mutex));
195             break;
196         }
197         pool->queueCurNum--;
198         job = pool->head;
199         if (SoftBusMutexLock(&(job->mutex)) != 0) {
200             pool->queueCurNum++;
201             SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "lock failed");
202             SoftBusMutexUnlock(&(pool->mutex));
203             continue;
204         }
205         JobCheck(pool, job);
206         if (pool->queueCurNum == 0) {
207             SoftBusCondSignal(&(pool->queueEmpty));
208         }
209         if (pool->queueCurNum == pool->queueMaxNum - 1) {
210             SoftBusCondBroadcast(&(pool->queueNotFull));
211         }
212         SoftBusMutexUnlock(&(pool->mutex));
213         if (job->runnable) {
214             (void)(*(job->callbackFunction))(job->arg);
215         }
216         if (job->jobMode == ONCE || job->runnable == false) {
217             SoftBusMutexUnlock(&(job->mutex));
218             SoftBusMutexDestroy(&(job->mutex));
219             SoftBusFree(job);
220             job = NULL;
221         }
222         if (job != NULL) {
223             SoftBusMutexUnlock(&(job->mutex));
224         }
225     }
226     SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_INFO, "ThreadPoolWorker Exit");
227 }
228 
CheckThreadPoolAddReady(ThreadPool * pool,int32_t (* callbackFunction)(void * arg))229 static int32_t CheckThreadPoolAddReady(ThreadPool *pool, int32_t (*callbackFunction)(void *arg))
230 {
231     if (pool == NULL || callbackFunction == NULL) {
232         return SOFTBUS_INVALID_PARAM;
233     }
234     if (SoftBusMutexLock(&(pool->mutex)) != 0) {
235         SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "lock failed");
236         return SOFTBUS_LOCK_ERR;
237     }
238     if (pool->queueCurNum == pool->queueMaxNum) {
239         SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "queueCurNum equals queueMaxNum, just quit");
240         SoftBusMutexUnlock(&(pool->mutex));
241         return SOFTBUS_ERR;
242     }
243     while ((pool->queueCurNum == pool->queueMaxNum) && !(pool->queueClose || pool->poolClose)) {
244         SoftBusCondWait(&(pool->queueNotFull), &(pool->mutex), NULL);
245     }
246     if (pool->queueClose || pool->poolClose) {
247         SoftBusMutexUnlock(&(pool->mutex));
248         return SOFTBUS_ERR;
249     }
250     // will call SoftBusMutexUnlock in ThreadPoolAddJob
251     return SOFTBUS_OK;
252 }
253 
ThreadPoolAddJob(ThreadPool * pool,int32_t (* callbackFunction)(void * arg),void * arg,JobMode jobMode,uintptr_t handle)254 int32_t ThreadPoolAddJob(ThreadPool *pool, int32_t (*callbackFunction)(void *arg), void *arg,
255     JobMode jobMode, uintptr_t handle)
256 {
257     int32_t ret = CheckThreadPoolAddReady(pool, callbackFunction);
258     if (ret != SOFTBUS_OK) {
259         return ret;
260     }
261     Job* job = pool->head;
262     while (job != NULL) {
263         if (job->handle == handle && job->runnable == true) {
264             SoftBusMutexUnlock(&(pool->mutex));
265             return SOFTBUS_ALREADY_EXISTED;
266         }
267         job = job->next;
268     }
269     job = (Job *)SoftBusCalloc(sizeof(Job));
270     if (job == NULL) {
271         SoftBusMutexUnlock(&(pool->mutex));
272         return SOFTBUS_MALLOC_ERR;
273     }
274     job->callbackFunction = callbackFunction;
275     job->arg = arg;
276     job->jobMode = jobMode;
277     job->handle = handle;
278     job->runnable = true;
279     job->next = NULL;
280     if (SoftBusMutexInit(&(job->mutex), NULL)) {
281         SoftBusFree(job);
282         SoftBusMutexUnlock(&(pool->mutex));
283         return SOFTBUS_ERR;
284     }
285     if (pool->head == NULL) {
286         pool->head = pool->tail = job;
287         SoftBusCondBroadcast(&(pool->queueNotEmpty));
288     } else {
289         pool->tail->next = job;
290         pool->tail = job;
291     }
292     pool->queueCurNum++;
293     SoftBusMutexUnlock(&(pool->mutex));
294     return SOFTBUS_OK;
295 }
296 
ThreadPoolRemoveJob(ThreadPool * pool,uintptr_t handle)297 int32_t ThreadPoolRemoveJob(ThreadPool *pool, uintptr_t handle)
298 {
299     if (pool == NULL) {
300         SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "ThreadPoolRemoveJob failed, pool == NULL");
301         return SOFTBUS_INVALID_PARAM;
302     }
303     if (SoftBusMutexLock(&(pool->mutex)) != 0) {
304         SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "lock failed");
305         return SOFTBUS_LOCK_ERR;
306     }
307     Job* job = pool->head;
308     while (job != NULL) {
309         if (job->handle == handle && job->jobMode == PERSISTENT && job->runnable == true) {
310             break;
311         }
312         job = job->next;
313     }
314     if (job != NULL && job->runnable == true && job->jobMode == PERSISTENT) {
315         if (SoftBusMutexLock(&(job->mutex)) != 0) {
316             SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "lock failed");
317             SoftBusMutexUnlock(&(job->mutex));
318             return SOFTBUS_LOCK_ERR;
319         }
320         job->runnable = false;
321         SoftBusMutexUnlock(&(job->mutex));
322     }
323     SoftBusMutexUnlock(&(pool->mutex));
324     return SOFTBUS_OK;
325 }
326 
ThreadPoolDestroy(ThreadPool * pool)327 int32_t ThreadPoolDestroy(ThreadPool *pool)
328 {
329     if (pool == NULL) {
330         return SOFTBUS_INVALID_PARAM;
331     }
332     if (SoftBusMutexLock(&(pool->mutex)) != 0) {
333         SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "lock failed");
334         return SOFTBUS_LOCK_ERR;
335     }
336     if (pool->queueClose || pool->poolClose) {
337         SoftBusMutexUnlock(&(pool->mutex));
338         return SOFTBUS_OK;
339     }
340     pool->queueClose = 1;
341     while (pool->queueCurNum != 0) {
342         SoftBusCondWait(&(pool->queueEmpty), &(pool->mutex), NULL);
343     }
344     pool->poolClose = 1;
345     SoftBusMutexUnlock(&(pool->mutex));
346     SoftBusCondBroadcast(&(pool->queueNotEmpty));
347     SoftBusCondBroadcast(&(pool->queueNotFull));
348     for (int32_t i = 0; i < pool->threadNum; ++i) {
349         if (pool->pthreads != NULL) {
350             SoftBusThreadJoin(pool->pthreads[i], NULL);
351         }
352     }
353     SoftBusMutexDestroy(&(pool->mutex));
354     SoftBusCondDestroy(&(pool->queueEmpty));
355     SoftBusCondDestroy(&(pool->queueNotEmpty));
356     SoftBusCondDestroy(&(pool->queueNotFull));
357     SoftBusFree(pool->pthreads);
358     Job* job = NULL;
359     while (pool->head != NULL) {
360         job = pool->head;
361         pool->head = job->next;
362         SoftBusFree(job);
363     }
364     SoftBusFree(pool);
365     return SOFTBUS_OK;
366 }
367