• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "message_handler.h"
17 
18 #include <securec.h>
19 #include <atomic>
20 #include "ffrt.h"
21 #include "c/ffrt_ipc.h"
22 
23 #include "common_list.h"
24 #include "comm_log.h"
25 #include "softbus_adapter_mem.h"
26 #include "softbus_def.h"
27 #include "softbus_error_code.h"
28 
29 #define LOOP_NAME_LEN 16
30 #define TIME_THOUSANDS_MULTIPLIER 1000LL
31 #define MAX_LOOPER_CNT 30U
32 #define MAX_LOOPER_PRINT_CNT 64
33 #define TASK_HANDLE_TIMEOUT 5000LL
34 
35 static std::atomic<uint32_t> g_looperCnt(0);
36 
37 typedef struct {
38     int32_t what;
39     uint64_t arg1;
40     uint64_t arg2;
41     int64_t time;
42     char *handler;
43     char looper[LOOP_NAME_LEN];
44 } MsgEventInfo;
45 
46 struct FfrtMsgQueue {
47     ffrt::queue *msgQueue;
48 };
49 
50 typedef struct {
51     SoftBusMessage *msg;
52     ListNode node;
53     ffrt::task_handle *msgHandle;
54 } SoftBusMessageNode;
55 
56 struct SoftBusLooperContext {
57     ListNode msgHead;
58     char name[LOOP_NAME_LEN];
59     unsigned int msgSize;
60     ffrt::mutex *mtx;
61     volatile bool stop; // destroys looper, stop = true
62     MsgEventInfo handlingMsg;
63 };
64 
UptimeMicros(void)65 static int64_t UptimeMicros(void)
66 {
67     SoftBusSysTime t = {
68         .sec = 0,
69         .usec = 0,
70     };
71     SoftBusGetTime(&t);
72     int64_t when = t.sec * TIME_THOUSANDS_MULTIPLIER * TIME_THOUSANDS_MULTIPLIER + t.usec;
73     return when;
74 }
75 
FreeSoftBusMsg(SoftBusMessage * msg)76 NO_SANITIZE("cfi") static void FreeSoftBusMsg(SoftBusMessage *msg)
77 {
78     if (msg->FreeMessage == nullptr) {
79         SoftBusFree(msg);
80         msg = nullptr;
81     } else {
82         msg->FreeMessage(msg);
83     }
84 }
85 
MallocMessage(void)86 SoftBusMessage *MallocMessage(void)
87 {
88     SoftBusMessage *msg = static_cast<SoftBusMessage *>(SoftBusCalloc(sizeof(SoftBusMessage)));
89     if (msg == nullptr) {
90         COMM_LOGE(COMM_UTILS, "malloc SoftBusMessage failed");
91         return nullptr;
92     }
93     return msg;
94 }
95 
FreeMessage(SoftBusMessage * msg)96 void FreeMessage(SoftBusMessage *msg)
97 {
98     if (msg != nullptr) {
99         FreeSoftBusMsg(msg);
100         msg = nullptr;
101     }
102 }
103 
DumpLooperLocked(const SoftBusLooperContext * context)104 static void DumpLooperLocked(const SoftBusLooperContext *context)
105 {
106     int32_t i = 0;
107     ListNode *item = nullptr;
108     LIST_FOR_EACH(item, &context->msgHead) {
109         SoftBusMessageNode *itemNode = LIST_ENTRY(item, SoftBusMessageNode, node);
110         SoftBusMessage *msg = itemNode->msg;
111         if (i > MAX_LOOPER_PRINT_CNT) {
112             COMM_LOGW(COMM_UTILS, "many messages left unprocessed, msgSize=%{public}u",
113                 context->msgSize);
114             break;
115         }
116         COMM_LOGD(COMM_UTILS,
117             "DumpLooper. i=%{public}d, handler=%{public}s, what=%{public}" PRId32 ", arg1=%{public}" PRIu64 ", "
118             "arg2=%{public}" PRIu64 ", time=%{public}" PRId64 "",
119             i, msg->handler->name, msg->what, msg->arg1, msg->arg2, msg->time);
120         i++;
121     }
122 }
123 
DumpLooper(const SoftBusLooper * looper)124 void DumpLooper(const SoftBusLooper *looper)
125 {
126     if (looper == nullptr) {
127         return;
128     }
129     SoftBusLooperContext *context = looper->context;
130     context->mtx->lock();
131     if (looper->dumpable) {
132         DumpLooperLocked(context);
133     }
134     context->mtx->unlock();
135 }
136 
137 struct LoopConfigItem {
138     int type;
139     SoftBusLooper *looper;
140 };
141 
142 struct LoopConfigItem g_loopConfig[] = {
143     {LOOP_TYPE_DEFAULT, nullptr},
144     {LOOP_TYPE_CONN, nullptr},
145     {LOOP_TYPE_LNN, nullptr},
146     {LOOP_TYPE_DISC, nullptr},
147     {LOOP_TYPE_LANE, nullptr},
148 };
149 
ReleaseLooper(const SoftBusLooper * looper)150 static void ReleaseLooper(const SoftBusLooper *looper)
151 {
152     const uint32_t len = sizeof(g_loopConfig) / sizeof(struct LoopConfigItem);
153     for (uint32_t i = 0; i < len; i++) {
154         if (g_loopConfig[i].looper == looper) {
155             g_loopConfig[i].looper = nullptr;
156             return;
157         }
158     }
159 }
160 
DumpMsgInfo(const SoftBusMessage * msg)161 static void DumpMsgInfo(const SoftBusMessage *msg)
162 {
163     if (msg->handler == nullptr || msg->handler->looper == nullptr ||
164         msg->handler->looper->context == nullptr) {
165         COMM_LOGE(COMM_UTILS, "invalid para when dump msg info");
166         return;
167     }
168     COMM_LOGD(COMM_UTILS, "handling msg, %{public}s, %{public}s, %{public}" PRId32 ", %{public}" PRIu64 ", "
169         "%{public}" PRIu64 ", %{public}" PRId64 "",
170         msg->handler->looper->context->name, msg->handler->name, msg->what, msg->arg1, msg->arg2, msg->time);
171 }
172 
UpdateHandlingMsg(const SoftBusMessage * currentMsg,const SoftBusLooper * looper)173 static void UpdateHandlingMsg(const SoftBusMessage *currentMsg, const SoftBusLooper *looper)
174 {
175     if (looper->queue == nullptr || currentMsg->handler == nullptr || currentMsg->handler->name == nullptr) {
176         COMM_LOGE(COMM_UTILS, "invalid para when update handling msg");
177         return;
178     }
179     MsgEventInfo *tmpMsg = &(looper->context->handlingMsg);
180     if (tmpMsg->handler != nullptr) {
181         SoftBusFree(tmpMsg->handler);
182         tmpMsg->handler = nullptr;
183     }
184     tmpMsg->handler = static_cast<char *>(SoftBusCalloc(strlen(currentMsg->handler->name) + 1));
185     if (tmpMsg->handler == nullptr) {
186         COMM_LOGE(COMM_UTILS, "calloc fail when update handling msg");
187         return;
188     }
189     if (memcpy_s(tmpMsg->looper, LOOP_NAME_LEN, looper->context->name, LOOP_NAME_LEN) != EOK) {
190         COMM_LOGE(COMM_UTILS, "copy looper fail when update handling msg");
191     }
192     if (strcpy_s(tmpMsg->handler, strlen(currentMsg->handler->name) + 1, currentMsg->handler->name) != EOK) {
193         COMM_LOGE(COMM_UTILS, "copy handler fail when update handling msg");
194         SoftBusFree(tmpMsg->handler);
195         tmpMsg->handler = nullptr;
196     }
197     tmpMsg->what = currentMsg->what;
198     tmpMsg->arg1 = currentMsg->arg1;
199     tmpMsg->arg2 = currentMsg->arg2;
200     tmpMsg->time = currentMsg->time;
201 }
202 
GetMsgNodeFromContext(SoftBusMessageNode ** msgNode,const SoftBusMessage * tmpMsg,const SoftBusLooper * looper)203 static int32_t GetMsgNodeFromContext(SoftBusMessageNode **msgNode,
204     const SoftBusMessage *tmpMsg, const SoftBusLooper *looper)
205 {
206     looper->context->mtx->lock();
207     if (looper->context->stop) {
208         COMM_LOGE(COMM_UTILS, "cancel handle with looper is stop, name=%{public}s", looper->context->name);
209         looper->context->mtx->unlock();
210         return SOFTBUS_LOOPER_ERR;
211     }
212     ListNode *item = nullptr;
213     ListNode *nextItem = nullptr;
214     LIST_FOR_EACH_SAFE(item, nextItem, &(looper->context->msgHead)) {
215         SoftBusMessageNode *itemNode = LIST_ENTRY(item, SoftBusMessageNode, node);
216         SoftBusMessage *msg = itemNode->msg;
217         if (tmpMsg->what == msg->what && tmpMsg->arg1 == msg->arg1 && tmpMsg->arg2 == msg->arg2 &&
218             tmpMsg->time == msg->time && tmpMsg->handler == msg->handler) {
219             ListDelete(&itemNode->node);
220             *msgNode = itemNode;
221             looper->context->msgSize--;
222             UpdateHandlingMsg(msg, looper);
223             looper->context->mtx->unlock();
224             return SOFTBUS_OK;
225         }
226     }
227     COMM_LOGE(COMM_UTILS, "no get correct msg from context, time=%{public}" PRId64"", tmpMsg->time);
228     looper->context->mtx->unlock();
229     return SOFTBUS_LOOPER_ERR;
230 }
231 
SubmitMsgToFfrt(SoftBusMessageNode * msgNode,const SoftBusLooper * looper,uint64_t delayMicros)232 static int32_t SubmitMsgToFfrt(SoftBusMessageNode *msgNode, const SoftBusLooper *looper, uint64_t delayMicros)
233 {
234     msgNode->msgHandle = new (std::nothrow)ffrt::task_handle();
235     if (msgNode->msgHandle == nullptr) {
236         COMM_LOGE(COMM_UTILS, "ffrt msgHandle SoftBusCalloc fail");
237         return SOFTBUS_MALLOC_ERR;
238     }
239     SoftBusMessage tmpMsg = {
240         .what = msgNode->msg->what,
241         .arg1 = msgNode->msg->arg1,
242         .arg2 = msgNode->msg->arg2,
243         .time = msgNode->msg->time,
244         .handler = msgNode->msg->handler,
245     };
246     *(msgNode->msgHandle) = looper->queue->msgQueue->submit_h([tmpMsg, looper] {
247         ffrt_this_task_set_legacy_mode(true);
248         if (looper == nullptr || looper->context == nullptr) {
249             COMM_LOGE(COMM_UTILS, "invalid looper para when handle");
250             ffrt_this_task_set_legacy_mode(false);
251             return;
252         }
253         SoftBusMessageNode *currentMsgNode = nullptr;
254         if (GetMsgNodeFromContext(&currentMsgNode, &tmpMsg, looper) != SOFTBUS_OK) {
255             COMM_LOGE(COMM_UTILS, "get currentMsgNode from context fail");
256             ffrt_this_task_set_legacy_mode(false);
257             return;
258         }
259         SoftBusMessage *currentMsg = currentMsgNode->msg;
260         if (currentMsg->handler != nullptr && currentMsg->handler->HandleMessage != nullptr) {
261             DumpMsgInfo(currentMsg);
262             currentMsg->handler->HandleMessage(currentMsg);
263         } else {
264             COMM_LOGE(COMM_UTILS, "handler is null when handle msg, name=%{public}s", looper->context->name);
265         }
266         FreeSoftBusMsg(currentMsg);
267         delete (currentMsgNode->msgHandle);
268         SoftBusFree(currentMsgNode);
269         ffrt_this_task_set_legacy_mode(false);
270     }, ffrt::task_attr().delay(delayMicros));
271     return SOFTBUS_OK;
272 }
273 
InsertMsgWithTime(SoftBusLooperContext * context,SoftBusMessageNode * msgNode)274 static void InsertMsgWithTime(SoftBusLooperContext *context, SoftBusMessageNode *msgNode)
275 {
276     ListNode *item = nullptr;
277     ListNode *nextItem = nullptr;
278     bool insert = false;
279     LIST_FOR_EACH_SAFE(item, nextItem, &context->msgHead) {
280         SoftBusMessageNode *itemNode = LIST_ENTRY(item, SoftBusMessageNode, node);
281         if (itemNode->msg->time > msgNode->msg->time) {
282             ListTailInsert(item, &(msgNode->node));
283             insert = true;
284             break;
285         }
286     }
287     if (!insert) {
288         ListTailInsert(&(context->msgHead), &(msgNode->node));
289     }
290 }
291 
PostMessageWithFfrt(const SoftBusLooper * looper,SoftBusMessage * msg,uint64_t delayMicros)292 static void PostMessageWithFfrt(const SoftBusLooper *looper, SoftBusMessage *msg, uint64_t delayMicros)
293 {
294     SoftBusMessageNode *msgNode = static_cast<SoftBusMessageNode *>(SoftBusCalloc(sizeof(SoftBusMessageNode)));
295     if (msgNode == nullptr) {
296         COMM_LOGE(COMM_UTILS, "message node malloc failed");
297         FreeSoftBusMsg(msg);
298         return;
299     }
300     ListInit(&msgNode->node);
301     msgNode->msg = msg;
302     SoftBusLooperContext *context = looper->context;
303     context->mtx->lock();
304     if (context->stop) {
305         FreeSoftBusMsg(msg);
306         SoftBusFree(msgNode);
307         COMM_LOGE(COMM_UTILS, "cancel post with looper is stop, name=%{public}s", context->name);
308         context->mtx->unlock();
309         return;
310     }
311     if (SubmitMsgToFfrt(msgNode, looper, delayMicros) != SOFTBUS_OK) {
312         FreeSoftBusMsg(msg);
313         SoftBusFree(msgNode);
314         COMM_LOGE(COMM_UTILS, "submit msg to ffrt fail, name=%{public}s", context->name);
315         context->mtx->unlock();
316         return;
317     }
318     InsertMsgWithTime(context, msgNode);
319     context->msgSize++;
320     if (looper->dumpable) {
321         DumpLooperLocked(context);
322     }
323     context->mtx->unlock();
324 }
325 
RemoveMessageWithFfrt(const SoftBusLooper * looper,const SoftBusHandler * handler,int (* customFunc)(const SoftBusMessage *,void *),void * args)326 static void RemoveMessageWithFfrt(const SoftBusLooper *looper, const SoftBusHandler *handler,
327     int (*customFunc)(const SoftBusMessage*, void*), void *args)
328 {
329     SoftBusLooperContext *context = looper->context;
330     context->mtx->lock();
331     if (context->stop) {
332         COMM_LOGE(COMM_UTILS, "cancel remove with looper is stop, name=%{public}s", context->name);
333         context->mtx->unlock();
334         return;
335     }
336     ListNode *item = nullptr;
337     ListNode *nextItem = nullptr;
338     LIST_FOR_EACH_SAFE(item, nextItem, &context->msgHead) {
339         SoftBusMessageNode *itemNode = LIST_ENTRY(item, SoftBusMessageNode, node);
340         SoftBusMessage *msg = itemNode->msg;
341         if (msg->handler == handler && customFunc(msg, args) == 0) {
342             looper->queue->msgQueue->cancel(*(itemNode->msgHandle)); // cancel fail when task is handling
343             COMM_LOGD(COMM_UTILS, "remove msg with ffrt succ, time=%{public}" PRId64"", msg->time);
344             FreeSoftBusMsg(msg);
345             ListDelete(&itemNode->node);
346             delete (itemNode->msgHandle);
347             SoftBusFree(itemNode);
348             context->msgSize--;
349         }
350     }
351     context->mtx->unlock();
352 }
353 
DestroyLooperWithFfrt(SoftBusLooper * looper)354 static void DestroyLooperWithFfrt(SoftBusLooper *looper)
355 {
356     SoftBusLooperContext *context = looper->context;
357     if (context != nullptr) {
358         context->mtx->lock();
359         context->stop = true;
360         COMM_LOGI(COMM_UTILS, "looper is stop, name=%{public}s", looper->context->name);
361         context->mtx->unlock();
362         delete (looper->queue->msgQueue); //if task is handling when delete, it will return after handle;
363         looper->queue->msgQueue = nullptr;
364         ListNode *item = nullptr;
365         ListNode *nextItem = nullptr;
366         LIST_FOR_EACH_SAFE(item, nextItem, &context->msgHead) {
367             SoftBusMessageNode *itemNode = LIST_ENTRY(item, SoftBusMessageNode, node);
368             SoftBusMessage *msg = itemNode->msg;
369             FreeSoftBusMsg(msg);
370             ListDelete(&itemNode->node);
371             delete (itemNode->msgHandle);
372             SoftBusFree(itemNode);
373             context->msgSize--;
374         }
375         if (context->handlingMsg.handler != nullptr) {
376             SoftBusFree(context->handlingMsg.handler);
377             context->handlingMsg.handler = nullptr;
378         }
379         delete (context->mtx);
380         context->mtx = nullptr;
381         SoftBusFree(context);
382         context = nullptr;
383     } else {
384         delete (looper->queue->msgQueue);
385         looper->queue->msgQueue = nullptr;
386     }
387     SoftBusFree(looper->queue);
388     looper->queue = nullptr;
389     ReleaseLooper(looper);
390     SoftBusFree(looper);
391     looper = nullptr;
392     if (g_looperCnt.load(std::memory_order_acquire) != 0) {
393         g_looperCnt--;
394     }
395 }
396 
LooperPostMessage(const SoftBusLooper * looper,SoftBusMessage * msg)397 static void LooperPostMessage(const SoftBusLooper *looper, SoftBusMessage *msg)
398 {
399     if (msg == nullptr || msg->handler == nullptr) {
400         COMM_LOGE(COMM_UTILS, "LooperPostMessage with nullmsg");
401         return;
402     }
403     if (looper == nullptr || looper->context == nullptr) {
404         COMM_LOGE(COMM_UTILS, "LooperPostMessage with nulllooper");
405         return;
406     }
407     if (looper->queue == nullptr || looper->queue->msgQueue == nullptr) {
408         COMM_LOGE(COMM_UTILS, "LooperPostMessage with nullqueue");
409         return;
410     }
411     msg->time = UptimeMicros();
412     PostMessageWithFfrt(looper, msg, 0);
413 }
414 
LooperPostMessageDelay(const SoftBusLooper * looper,SoftBusMessage * msg,uint64_t delayMillis)415 static void LooperPostMessageDelay(const SoftBusLooper *looper, SoftBusMessage *msg, uint64_t delayMillis)
416 {
417     if (msg == nullptr || msg->handler == nullptr) {
418         COMM_LOGE(COMM_UTILS, "LooperPostMessageDelay with nullmsg");
419         return;
420     }
421     if (looper == nullptr || looper->context == nullptr) {
422         COMM_LOGE(COMM_UTILS, "LooperPostMessageDelay with nulllooper");
423         return;
424     }
425     if (looper->queue == nullptr || looper->queue->msgQueue == nullptr) {
426         COMM_LOGE(COMM_UTILS, "LooperPostMessageDelay with nullqueue");
427         return;
428     }
429     msg->time = UptimeMicros() + (int64_t)delayMillis * TIME_THOUSANDS_MULTIPLIER;
430     PostMessageWithFfrt(looper, msg, delayMillis * TIME_THOUSANDS_MULTIPLIER);
431 }
432 
WhatRemoveFunc(const SoftBusMessage * msg,void * args)433 static int WhatRemoveFunc(const SoftBusMessage *msg, void *args)
434 {
435     int32_t what = static_cast<int32_t>(reinterpret_cast<intptr_t>(args));
436     if (msg->what == what) {
437         return 0;
438     }
439     return 1;
440 }
441 
LoopRemoveMessageCustom(const SoftBusLooper * looper,const SoftBusHandler * handler,int (* customFunc)(const SoftBusMessage *,void *),void * args)442 static void LoopRemoveMessageCustom(const SoftBusLooper *looper, const SoftBusHandler *handler,
443     int (*customFunc)(const SoftBusMessage*, void*), void *args)
444 {
445     if (looper == nullptr || looper->context == nullptr) {
446         COMM_LOGE(COMM_UTILS, "LooperRemoveMessage with nulllopper");
447         return;
448     }
449     if (looper->queue == nullptr || looper->queue->msgQueue == nullptr) {
450         COMM_LOGE(COMM_UTILS, "LooperRemoveMessage with nullqueue");
451         return;
452     }
453     if (handler == nullptr) {
454         COMM_LOGE(COMM_UTILS, "LooperRemoveMessage with nullhandler");
455         return;
456     }
457     if (customFunc == nullptr) {
458         COMM_LOGE(COMM_UTILS, "LooperRemoveMessage with nullcustomFunc");
459         return;
460     }
461     RemoveMessageWithFfrt(looper, handler, customFunc, args);
462 }
463 
LooperRemoveMessage(const SoftBusLooper * looper,const SoftBusHandler * handler,int32_t what)464 static void LooperRemoveMessage(const SoftBusLooper *looper, const SoftBusHandler *handler, int32_t what)
465 {
466     if (looper == nullptr || looper->context == nullptr) {
467         COMM_LOGE(COMM_UTILS, "LooperRemoveMessage with nulllopper");
468         return;
469     }
470     if (looper->queue == nullptr || looper->queue->msgQueue == nullptr) {
471         COMM_LOGE(COMM_UTILS, "LooperRemoveMessage with nullqueue");
472         return;
473     }
474     if (handler == nullptr) {
475         COMM_LOGE(COMM_UTILS, "LooperRemoveMessage with nullhandler");
476         return;
477     }
478     void *arg = reinterpret_cast<void *>(static_cast<intptr_t>(what));
479     LoopRemoveMessageCustom(looper, handler, WhatRemoveFunc, arg);
480 }
481 
SetLooperDumpable(SoftBusLooper * looper,bool dumpable)482 void SetLooperDumpable(SoftBusLooper *looper, bool dumpable)
483 {
484     if (looper == nullptr || looper->context == nullptr) {
485         COMM_LOGE(COMM_UTILS, "looper param is invalid");
486         return;
487     }
488     looper->context->mtx->lock();
489     looper->dumpable = dumpable;
490     looper->context->mtx->unlock();
491 }
492 /* create new ffrt queue depend on create new context success */
CreateNewFfrtQueue(FfrtMsgQueue ** ffrtQueue,const char * name,const SoftBusLooperContext * context)493 static int32_t CreateNewFfrtQueue(FfrtMsgQueue **ffrtQueue, const char *name, const SoftBusLooperContext *context)
494 {
495     FfrtMsgQueue *tmpQueue = static_cast<FfrtMsgQueue *>(SoftBusCalloc(sizeof(FfrtMsgQueue)));
496     if (tmpQueue == nullptr) {
497         COMM_LOGE(COMM_UTILS, "softbus msgQueue SoftBusCalloc fail");
498         return SOFTBUS_MALLOC_ERR;
499     }
500     std::function<void()> callbackFunc = [context]() {
501         if (context == nullptr || context->mtx == nullptr) {
502             COMM_LOGE(COMM_UTILS, "abnormal handle long time task with invalid task context");
503             return;
504         }
505         context->mtx->lock();
506         if (context->handlingMsg.handler != nullptr) {
507             COMM_LOGE(COMM_UTILS, "abnormal handle long time task, what=%{public}" PRId32 ", "
508                 "arg1=%{public}" PRIu64 ", arg2=%{public}" PRIu64 ", time=%{public}" PRId64 ", "
509                 "handler=%{public}s, looper=%{public}s", context->handlingMsg.what, context->handlingMsg.arg1,
510                 context->handlingMsg.arg2, context->handlingMsg.time, context->handlingMsg.handler,
511                 context->handlingMsg.looper);
512         } else {
513             COMM_LOGE(COMM_UTILS, "abnormal handle long time task with invalid msg info, looper=%{public}s",
514                 context->handlingMsg.looper);
515         }
516         context->mtx->unlock();
517     };
518     tmpQueue->msgQueue = new (std::nothrow)ffrt::queue(name,
519         ffrt::queue_attr().timeout(TASK_HANDLE_TIMEOUT * TIME_THOUSANDS_MULTIPLIER).callback(callbackFunc));
520     if (tmpQueue->msgQueue == nullptr) {
521         COMM_LOGE(COMM_UTILS, "ffrt msgQueue SoftBusCalloc fail");
522         SoftBusFree(tmpQueue);
523         return SOFTBUS_MALLOC_ERR;
524     }
525     *ffrtQueue = tmpQueue;
526     return SOFTBUS_OK;
527 }
528 
CreateNewContext(SoftBusLooperContext ** context,const char * name)529 static int32_t CreateNewContext(SoftBusLooperContext **context, const char *name)
530 {
531     SoftBusLooperContext *tmpContext = static_cast<SoftBusLooperContext *>(SoftBusCalloc(sizeof(SoftBusLooperContext)));
532     if (tmpContext == nullptr) {
533         COMM_LOGE(COMM_UTILS, "context SoftBusCalloc fail");
534         return SOFTBUS_MALLOC_ERR;
535     }
536     if (strncpy_s(tmpContext->name, LOOP_NAME_LEN, name, strlen(name)) != EOK) {
537         COMM_LOGE(COMM_UTILS, "memcpy context name fail");
538         SoftBusFree(tmpContext);
539         return SOFTBUS_STRCPY_ERR;
540     }
541     ListInit(&tmpContext->msgHead);
542     // init mtx
543     tmpContext->mtx = new (std::nothrow)ffrt::mutex();
544     if (tmpContext->mtx == nullptr) {
545         COMM_LOGE(COMM_UTILS, "context ffrt lock init fail");
546         SoftBusFree(tmpContext);
547         return SOFTBUS_MALLOC_ERR;
548     }
549     tmpContext->stop = false;
550     tmpContext->handlingMsg.what = 0;
551     tmpContext->handlingMsg.arg1 = 0;
552     tmpContext->handlingMsg.arg2 = 0;
553     tmpContext->handlingMsg.time = 0;
554     tmpContext->handlingMsg.handler = nullptr;
555     (void)memset_s(tmpContext->handlingMsg.looper, LOOP_NAME_LEN, 0, LOOP_NAME_LEN);
556     *context = tmpContext;
557     return SOFTBUS_OK;
558 }
559 
CreateNewLooper(const char * name)560 SoftBusLooper *CreateNewLooper(const char *name)
561 {
562     if (name == nullptr || strlen(name) >= LOOP_NAME_LEN) {
563         COMM_LOGE(COMM_UTILS, "invalid looper name=%{public}s", name);
564         return nullptr;
565     }
566     if (g_looperCnt.load(std::memory_order_acquire) >= MAX_LOOPER_CNT) {
567         COMM_LOGE(COMM_UTILS, "Looper exceeds the maximum, count=%{public}u,",
568             g_looperCnt.load(std::memory_order_acquire));
569         return nullptr;
570     }
571     SoftBusLooper *looper = static_cast<SoftBusLooper *>(SoftBusCalloc(sizeof(SoftBusLooper)));
572     if (looper == nullptr) {
573         COMM_LOGE(COMM_UTILS, "Looper SoftBusCalloc fail");
574         return nullptr;
575     }
576     SoftBusLooperContext *context = nullptr;
577     if (CreateNewContext(&context, name) != SOFTBUS_OK) {
578         COMM_LOGE(COMM_UTILS, "create new context fail");
579         SoftBusFree(looper);
580         return nullptr;
581     }
582     FfrtMsgQueue *ffrtQueue = nullptr;
583     if (CreateNewFfrtQueue(&ffrtQueue, name, context) != SOFTBUS_OK) {
584         COMM_LOGE(COMM_UTILS, "create new ffrtQueue fail");
585         delete (context->mtx);
586         SoftBusFree(context);
587         SoftBusFree(looper);
588         return nullptr;
589     }
590     // init looper
591     looper->context = context;
592     looper->dumpable = true;
593     looper->PostMessage = LooperPostMessage;
594     looper->PostMessageDelay = LooperPostMessageDelay;
595     looper->RemoveMessage = LooperRemoveMessage;
596     looper->RemoveMessageCustom = LoopRemoveMessageCustom;
597     looper->queue = ffrtQueue;
598     COMM_LOGI(COMM_UTILS, "start looper with ffrt ok, name=%{public}s", context->name);
599     g_looperCnt++;
600     return looper;
601 }
602 
GetLooper(int type)603 SoftBusLooper *GetLooper(int type)
604 {
605     uint32_t len = sizeof(g_loopConfig) / sizeof(struct LoopConfigItem);
606     for (uint32_t i = 0; i < len; i++) {
607         if (g_loopConfig[i].type == type) {
608             return g_loopConfig[i].looper;
609         }
610     }
611     return nullptr;
612 }
613 
SetLooper(int type,SoftBusLooper * looper)614 void SetLooper(int type, SoftBusLooper *looper)
615 {
616     uint32_t len = sizeof(g_loopConfig) / sizeof(struct LoopConfigItem);
617     for (uint32_t i = 0; i < len; i++) {
618         if (g_loopConfig[i].type == type) {
619             g_loopConfig[i].looper = looper;
620         }
621     }
622 }
623 
DestroyLooper(SoftBusLooper * looper)624 void DestroyLooper(SoftBusLooper *looper)
625 {
626     if (looper == nullptr) {
627         COMM_LOGE(COMM_UTILS, "DestroyLooper with nulllooper");
628         return;
629     }
630     if (looper->queue == nullptr || looper->queue->msgQueue == nullptr) {
631         COMM_LOGE(COMM_UTILS, "DestroyLooper with nullqueue");
632         return;
633     }
634     DestroyLooperWithFfrt(looper);
635 }
636 
LooperInit(void)637 int LooperInit(void)
638 {
639     SoftBusLooper *looper = CreateNewLooper("BusCenter_Lp");
640     if (!looper) {
641         COMM_LOGE(COMM_UTILS, "init BusCenter looper fail.");
642         return SOFTBUS_LOOPER_ERR;
643     }
644     SetLooper(LOOP_TYPE_DEFAULT, looper);
645 
646     SoftBusLooper *connLooper = CreateNewLooper("ReactorLink_Lp");
647     if (!connLooper) {
648         COMM_LOGE(COMM_UTILS, "init connection looper fail.");
649         return SOFTBUS_LOOPER_ERR;
650     }
651     SetLooper(LOOP_TYPE_CONN, connLooper);
652 
653     COMM_LOGD(COMM_UTILS, "init looper success.");
654     return SOFTBUS_OK;
655 }
656 
LooperDeinit(void)657 void LooperDeinit(void)
658 {
659     uint32_t len = sizeof(g_loopConfig) / sizeof(struct LoopConfigItem);
660     for (uint32_t i = 0; i < len; i++) {
661         if (g_loopConfig[i].looper == nullptr) {
662             continue;
663         }
664         DestroyLooper(g_loopConfig[i].looper);
665     }
666 }