• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef _GNU_SOURCE
17 #define _GNU_SOURCE
18 #endif
19 
20 #include "softbus_thread_pool.h"
21 
22 #include "softbus_adapter_mem.h"
23 #include "softbus_errcode.h"
24 #include "softbus_log.h"
25 
26 #ifndef MIN_STACK_SIZE
27 #define MIN_STACK_SIZE 0x8000
28 #endif
29 #define THREAD_POOL_NAME "SoftBusConnect"
30 
31 typedef void *(*Runnable)(void *argv);
32 typedef struct ThreadAttr ThreadAttr;
33 
34 struct ThreadAttr {
35     const char *name;
36     uint32_t stackSize;
37     SoftBusThreadPriority priority;
38 };
39 
40 static int32_t CreateThread(Runnable run, void *argv, const ThreadAttr *attr, uint32_t *threadId);
41 static ThreadPool* CreateThreadPool(int32_t threadNum, int32_t queueMaxNum);
42 static void JobCheck(ThreadPool *pool, Job *job);
43 static void ThreadPoolWorker(void *arg);
44 
CreateThread(Runnable run,void * argv,const ThreadAttr * attr,uint32_t * threadId)45 static int32_t CreateThread(Runnable run, void *argv, const ThreadAttr *attr, uint32_t *threadId)
46 {
47     SoftBusThreadAttr threadAttrInfo;
48     SoftBusThreadAttrInit(&threadAttrInfo);
49 
50     threadAttrInfo.stackSize = (attr->stackSize | MIN_STACK_SIZE);
51     threadAttrInfo.prior = attr->priority;
52 #ifndef __aarch64__
53     threadAttrInfo.policy = SOFTBUS_SCHED_RR;
54 #endif
55     int32_t errCode = SoftBusThreadCreate((SoftBusThread *)threadId, &threadAttrInfo, run, argv);
56 
57     return errCode;
58 }
59 
CreateThreadPool(int32_t threadNum,int32_t queueMaxNum)60 static ThreadPool* CreateThreadPool(int32_t threadNum, int32_t queueMaxNum)
61 {
62     if (threadNum <= 0 || queueMaxNum <= 0) {
63         SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Invalid para.");
64         return NULL;
65     }
66     ThreadPool *pool = (ThreadPool *)SoftBusCalloc(sizeof(ThreadPool));
67     if (pool == NULL) {
68         SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Failed to malloc ThreadPool");
69         return NULL;
70     }
71     pool->threadNum = threadNum;
72     pool->queueMaxNum = queueMaxNum;
73     pool->queueCurNum = 0;
74     pool->head = NULL;
75     pool->tail = NULL;
76     if (SoftBusMutexInit(&(pool->mutex), NULL) != SOFTBUS_OK) {
77         SoftBusFree(pool);
78         SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Failed to init mutex");
79         return NULL;
80     }
81     if (SoftBusCondInit(&(pool->queueEmpty)) != SOFTBUS_OK) {
82         SoftBusFree(pool);
83         SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Failed to init cond queueEmpty");
84         return NULL;
85     }
86     if (SoftBusCondInit(&(pool->queueNotEmpty)) != SOFTBUS_OK) {
87         SoftBusFree(pool);
88         SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Failed to init cond queueNotEmpty");
89         return NULL;
90     }
91     if (SoftBusCondInit(&(pool->queueNotFull)) != SOFTBUS_OK) {
92         SoftBusFree(pool);
93         SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Failed to init cond queueNotFull");
94         return NULL;
95     }
96     return pool;
97 }
98 
ThreadPoolInit(int32_t threadNum,int32_t queueMaxNum)99 ThreadPool *ThreadPoolInit(int32_t threadNum, int32_t queueMaxNum)
100 {
101     if (threadNum <= 0 || queueMaxNum <= 0) {
102         SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Invalid para.");
103         return NULL;
104     }
105     ThreadPool *pool = CreateThreadPool(threadNum, queueMaxNum);
106     if (pool == NULL) {
107         SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Failed to create thread pool");
108         return NULL;
109     }
110     int32_t countSuccess = 0;
111     pool->pthreads = (SoftBusThread *)SoftBusCalloc((int32_t)(sizeof(SoftBusThread) * threadNum));
112     if (pool->pthreads == NULL) {
113         SoftBusFree(pool);
114         SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Failed to malloc pthreads");
115         return NULL;
116     }
117     pool->queueClose = 0;
118     pool->poolClose = 0;
119     if (SoftBusMutexLock(&(pool->mutex)) != 0) {
120         SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "%s:lock failed", __func__);
121         goto EXIT;
122     }
123 
124     for (int32_t i = 0; i < pool->threadNum; ++i) {
125         ThreadAttr attr = {"ThreadPoolWorker", 0, SOFTBUS_PRIORITY_LOWEST};
126         SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_INFO, "create pthread now.");
127         if (CreateThread((Runnable)ThreadPoolWorker, (void *)pool, &attr, (uint32_t *)&(pool->pthreads[i])) != 0) {
128             SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "create pthreads no. [%d] failed\n", i);
129             pool->pthreads[i] = (SoftBusThread)0;
130         } else {
131             ++countSuccess;
132         }
133     }
134 
135     if (countSuccess < pool->threadNum) {
136         SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "Failed to create %d threads", pool->threadNum - countSuccess);
137     }
138     if (countSuccess == 0) {
139         SoftBusMutexUnlock(&pool->mutex);
140         goto EXIT;
141     }
142     SoftBusMutexUnlock(&(pool->mutex));
143     return pool;
144 
145 EXIT:
146     SoftBusMutexDestroy(&pool->mutex);
147     SoftBusCondDestroy(&pool->queueEmpty);
148     SoftBusCondDestroy(&pool->queueNotEmpty);
149     SoftBusCondDestroy(&pool->queueNotFull);
150     SoftBusFree(pool->pthreads);
151     SoftBusFree(pool);
152     return NULL;
153 }
154 
JobCheck(ThreadPool * pool,Job * job)155 static void JobCheck(ThreadPool *pool, Job *job)
156 {
157     if (pool->queueClose || pool->poolClose) {
158         job->runnable = false;
159         SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_INFO, "Threadpool starts to close...");
160     }
161     if (job->jobMode == PERSISTENT && job->runnable == true) {
162         pool->queueCurNum++;
163         pool->tail->next = job;
164         pool->tail = job;
165     }
166     if (pool->queueCurNum == 0) {
167         pool->head = pool->tail = NULL;
168     } else {
169         pool->head = job->next;
170     }
171     if (pool->tail != NULL) {
172         pool->tail->next = NULL;
173     }
174 }
175 
ThreadPoolWorker(void * arg)176 static void ThreadPoolWorker(void *arg)
177 {
178     SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_INFO, "ThreadPoolWorker Start");
179     if (arg == NULL) {
180         SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "ThreadPoolWorker arg is NULL");
181         return;
182     }
183     ThreadPool *pool = (ThreadPool *)arg;
184     Job *job = NULL;
185     SoftBusThreadSetName(SoftBusThreadGetSelf(), THREAD_POOL_NAME);
186     while (1) {
187         if (SoftBusMutexLock(&(pool->mutex)) != 0) {
188             SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "%s:lock failed", __func__);
189             return;
190         }
191         while ((pool->queueCurNum == 0) && !pool->poolClose) {
192             SoftBusCondWait(&(pool->queueNotEmpty), &(pool->mutex), NULL);
193         }
194         if (pool->poolClose || pool->queueCurNum <= 0) {
195             SoftBusMutexUnlock(&(pool->mutex));
196             break;
197         }
198         pool->queueCurNum--;
199         job = pool->head;
200         if (SoftBusMutexLock(&(job->mutex)) != 0) {
201             pool->queueCurNum++;
202             SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "%s:lock failed", __func__);
203             SoftBusMutexUnlock(&(pool->mutex));
204             continue;
205         }
206         JobCheck(pool, job);
207         if (pool->queueCurNum == 0) {
208             SoftBusCondSignal(&(pool->queueEmpty));
209         }
210         if (pool->queueCurNum == pool->queueMaxNum - 1) {
211             SoftBusCondBroadcast(&(pool->queueNotFull));
212         }
213         SoftBusMutexUnlock(&(pool->mutex));
214 
215         // copy job task relative variables to run it after leave job mutex
216         bool runnable = job->runnable;
217         JobTask task = job->callbackFunction;
218         void *arguement = job->arg;
219         if (job->jobMode == ONCE || job->runnable == false) {
220             SoftBusMutexUnlock(&(job->mutex));
221             SoftBusMutexDestroy(&(job->mutex));
222             SoftBusFree(job);
223             job = NULL;
224         } else {
225             SoftBusMutexUnlock(&(job->mutex));
226         }
227         if (runnable) {
228             (void)(task(arguement));
229         }
230     }
231     SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_INFO, "ThreadPoolWorker Exit");
232 }
233 
CheckThreadPoolAddReady(ThreadPool * pool,int32_t (* callbackFunction)(void * arg))234 static int32_t CheckThreadPoolAddReady(ThreadPool *pool, int32_t (*callbackFunction)(void *arg))
235 {
236     if (pool == NULL || callbackFunction == NULL) {
237         return SOFTBUS_INVALID_PARAM;
238     }
239     if (SoftBusMutexLock(&(pool->mutex)) != 0) {
240         SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "%s:lock failed", __func__);
241         return SOFTBUS_LOCK_ERR;
242     }
243     if (pool->queueCurNum == pool->queueMaxNum) {
244         SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "queueCurNum equals queueMaxNum, just quit");
245         SoftBusMutexUnlock(&(pool->mutex));
246         return SOFTBUS_ERR;
247     }
248     while ((pool->queueCurNum == pool->queueMaxNum) && !(pool->queueClose || pool->poolClose)) {
249         SoftBusCondWait(&(pool->queueNotFull), &(pool->mutex), NULL);
250     }
251     if (pool->queueClose || pool->poolClose) {
252         SoftBusMutexUnlock(&(pool->mutex));
253         return SOFTBUS_ERR;
254     }
255     // will call SoftBusMutexUnlock in ThreadPoolAddJob
256     return SOFTBUS_OK;
257 }
258 
ThreadPoolAddJob(ThreadPool * pool,int32_t (* callbackFunction)(void * arg),void * arg,JobMode jobMode,uintptr_t handle)259 int32_t ThreadPoolAddJob(ThreadPool *pool, int32_t (*callbackFunction)(void *arg), void *arg,
260     JobMode jobMode, uintptr_t handle)
261 {
262     int32_t ret = CheckThreadPoolAddReady(pool, callbackFunction);
263     if (ret != SOFTBUS_OK) {
264         return ret;
265     }
266     Job* job = pool->head;
267     while (job != NULL) {
268         if (job->handle == handle && job->runnable == true) {
269             SoftBusMutexUnlock(&(pool->mutex));
270             return SOFTBUS_ALREADY_EXISTED;
271         }
272         job = job->next;
273     }
274     job = (Job *)SoftBusCalloc(sizeof(Job));
275     if (job == NULL) {
276         SoftBusMutexUnlock(&(pool->mutex));
277         return SOFTBUS_MALLOC_ERR;
278     }
279     job->callbackFunction = callbackFunction;
280     job->arg = arg;
281     job->jobMode = jobMode;
282     job->handle = handle;
283     job->runnable = true;
284     job->next = NULL;
285     if (SoftBusMutexInit(&(job->mutex), NULL)) {
286         SoftBusFree(job);
287         SoftBusMutexUnlock(&(pool->mutex));
288         return SOFTBUS_ERR;
289     }
290     if (pool->head == NULL) {
291         pool->head = pool->tail = job;
292         SoftBusCondBroadcast(&(pool->queueNotEmpty));
293     } else {
294         pool->tail->next = job;
295         pool->tail = job;
296     }
297     pool->queueCurNum++;
298     SoftBusMutexUnlock(&(pool->mutex));
299     return SOFTBUS_OK;
300 }
301 
ThreadPoolRemoveJob(ThreadPool * pool,uintptr_t handle)302 int32_t ThreadPoolRemoveJob(ThreadPool *pool, uintptr_t handle)
303 {
304     if (pool == NULL) {
305         SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "ThreadPoolRemoveJob failed, pool == NULL");
306         return SOFTBUS_INVALID_PARAM;
307     }
308     if (SoftBusMutexLock(&(pool->mutex)) != 0) {
309         SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "%s:lock failed", __func__);
310         return SOFTBUS_LOCK_ERR;
311     }
312     Job* job = pool->head;
313     while (job != NULL) {
314         if (job->handle == handle && job->jobMode == PERSISTENT && job->runnable == true) {
315             break;
316         }
317         job = job->next;
318     }
319     if (job != NULL && job->runnable == true && job->jobMode == PERSISTENT) {
320         if (SoftBusMutexLock(&(job->mutex)) != 0) {
321             SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "%s:lock failed", __func__);
322             return SOFTBUS_LOCK_ERR;
323         }
324         job->runnable = false;
325         SoftBusMutexUnlock(&(job->mutex));
326     }
327     SoftBusMutexUnlock(&(pool->mutex));
328     return SOFTBUS_OK;
329 }
330 
ThreadPoolDestroy(ThreadPool * pool)331 int32_t ThreadPoolDestroy(ThreadPool *pool)
332 {
333     if (pool == NULL) {
334         return SOFTBUS_INVALID_PARAM;
335     }
336     if (SoftBusMutexLock(&(pool->mutex)) != 0) {
337         SoftBusLog(SOFTBUS_LOG_CONN, SOFTBUS_LOG_ERROR, "%s:lock failed", __func__);
338         return SOFTBUS_LOCK_ERR;
339     }
340     if (pool->queueClose || pool->poolClose) {
341         SoftBusMutexUnlock(&(pool->mutex));
342         return SOFTBUS_OK;
343     }
344     pool->queueClose = 1;
345     while (pool->queueCurNum != 0) {
346         SoftBusCondWait(&(pool->queueEmpty), &(pool->mutex), NULL);
347     }
348     pool->poolClose = 1;
349     SoftBusMutexUnlock(&(pool->mutex));
350     SoftBusCondBroadcast(&(pool->queueNotEmpty));
351     SoftBusCondBroadcast(&(pool->queueNotFull));
352     for (int32_t i = 0; i < pool->threadNum; ++i) {
353         if (pool->pthreads != NULL) {
354             SoftBusThreadJoin(pool->pthreads[i], NULL);
355         }
356     }
357     SoftBusMutexDestroy(&(pool->mutex));
358     SoftBusCondDestroy(&(pool->queueEmpty));
359     SoftBusCondDestroy(&(pool->queueNotEmpty));
360     SoftBusCondDestroy(&(pool->queueNotFull));
361     SoftBusFree(pool->pthreads);
362     Job* job = NULL;
363     while (pool->head != NULL) {
364         job = pool->head;
365         pool->head = job->next;
366         SoftBusFree(job);
367     }
368     SoftBusFree(pool);
369     return SOFTBUS_OK;
370 }
371