• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "message_handler.h"
17 
18 #include <sys/time.h>
19 #include <sys/types.h>
20 
21 #include "common_list.h"
22 #include "securec.h"
23 #include "softbus_adapter_mem.h"
24 #include "softbus_adapter_thread.h"
25 #include "softbus_def.h"
26 #include "softbus_log.h"
27 #include "softbus_type_def.h"
28 #include "unistd.h"
29 
30 #define LOOP_NAME_LEN 16
31 #define TIME_THOUSANDS_MULTIPLIER 1000LL
32 #define MAX_LOOPER_CNT 30U
33 
34 static int8_t g_isNeedDestroy = 0;
35 static int8_t g_isThreadStarted = 0;
36 static uint32_t g_looperCnt = 0;
37 
38 typedef struct {
39     SoftBusMessage *msg;
40     ListNode node;
41 } SoftBusMessageNode;
42 
43 struct SoftBusLooperContext {
44     ListNode msgHead;
45     char name[LOOP_NAME_LEN];
46     volatile unsigned char stop; // destroys looper, stop =1, and running =0
47     volatile unsigned char running;
48     SoftBusMessage *currentMsg;
49     unsigned int msgSize;
50     SoftBusMutex lock;
51     SoftBusMutexAttr attr;
52     SoftBusCond cond;
53     SoftBusCond condRunning;
54 };
55 
UptimeMicros(void)56 static int64_t UptimeMicros(void)
57 {
58     SoftBusSysTime t;
59     t.sec = 0;
60     t.usec = 0;
61     SoftBusGetTime(&t);
62     int64_t when = t.sec * TIME_THOUSANDS_MULTIPLIER * TIME_THOUSANDS_MULTIPLIER + t.usec;
63     return when;
64 }
65 
FreeSoftBusMsg(SoftBusMessage * msg)66 static void FreeSoftBusMsg(SoftBusMessage *msg)
67 {
68     if (msg->FreeMessage == NULL) {
69         SoftBusFree(msg);
70     } else {
71         msg->FreeMessage(msg);
72     }
73 }
74 
MallocMessage(void)75 SoftBusMessage *MallocMessage(void)
76 {
77     SoftBusMessage *msg = (SoftBusMessage *)SoftBusMalloc(sizeof(SoftBusMessage));
78     if (msg == NULL) {
79         SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_ERROR, "malloc SoftBusMessage failed");
80         return NULL;
81     }
82     (void)memset_s(msg, sizeof(SoftBusMessage), 0, sizeof(SoftBusMessage));
83     return msg;
84 }
85 
FreeMessage(SoftBusMessage * msg)86 void FreeMessage(SoftBusMessage *msg)
87 {
88     if (msg != NULL) {
89         FreeSoftBusMsg(msg);
90     }
91 }
92 
LoopTask(void * arg)93 static void *LoopTask(void *arg)
94 {
95     SoftBusLooper *looper = arg;
96     SoftBusLooperContext *context = looper->context;
97     if (context == NULL) {
98         SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_ERROR, "loop context is NULL");
99         return NULL;
100     }
101 
102     SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_INFO, "LoopTask[%s] running", context->name);
103 
104     if (SoftBusMutexLock(&context->lock) != 0) {
105         SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_ERROR, "lock failed");
106         return NULL;
107     }
108     context->running = 1;
109     g_isThreadStarted = 1;
110     (void)SoftBusMutexUnlock(&context->lock);
111 
112     for (;;) {
113         if (SoftBusMutexLock(&context->lock) != 0) {
114             SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_ERROR, "lock failed");
115             return NULL;
116         }
117         // wait
118         if (context->stop == 1) {
119             SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_INFO, "LoopTask[%s], stop ==1", context->name);
120             (void)SoftBusMutexUnlock(&context->lock);
121             break;
122         }
123 
124         if (g_isNeedDestroy == 1) {
125             (void)SoftBusMutexUnlock(&context->lock);
126             break;
127         }
128 
129         if (IsListEmpty(&context->msgHead)) {
130             SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_INFO, "LoopTask[%s] wait msg list empty", context->name);
131             SoftBusCondWait(&context->cond, &context->lock, NULL);
132             (void)SoftBusMutexUnlock(&context->lock);
133             continue;
134         }
135 
136         int64_t now = UptimeMicros();
137         ListNode *item = context->msgHead.next;
138         SoftBusMessage *msg = NULL;
139         SoftBusMessageNode *itemNode = CONTAINER_OF(item, SoftBusMessageNode, node);
140         int64_t time = itemNode->msg->time;
141         if (now >= time) {
142             msg = itemNode->msg;
143             ListDelete(item);
144             SoftBusFree(itemNode);
145             context->msgSize--;
146             if (looper->dumpable) {
147                 SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_DBG,
148                     "LoopTask[%s], get message. handle=%s,what=%" PRId32 ",arg1=%" PRIu64 ",msgSize=%u", context->name,
149                     msg->handler->name, msg->what, msg->arg1, context->msgSize);
150             }
151         } else {
152             SoftBusSysTime tv;
153             tv.sec = time / TIME_THOUSANDS_MULTIPLIER / TIME_THOUSANDS_MULTIPLIER;
154             tv.usec = time % (TIME_THOUSANDS_MULTIPLIER * TIME_THOUSANDS_MULTIPLIER);
155             SoftBusCondWait(&context->cond, &context->lock, &tv);
156         }
157 
158         if (msg == NULL) {
159             (void)SoftBusMutexUnlock(&context->lock);
160             continue;
161         }
162         context->currentMsg = msg;
163         (void)SoftBusMutexUnlock(&context->lock);
164         if (looper->dumpable) {
165             SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_DBG,
166                 "LoopTask[%s], HandleMessage message. handle=%s,what=%" PRId32, context->name, msg->handler->name,
167                 msg->what);
168         }
169 
170         if (msg->handler->HandleMessage != NULL) {
171             msg->handler->HandleMessage(msg);
172         }
173         if (looper->dumpable) {
174             // Don`t print msg->handler, msg->handler->HandleMessage() may remove handler,
175             // so msg->handler maybe invalid pointer
176             SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_INFO,
177                 "LoopTask[%s], after HandleMessage message. what=%" PRId32 ",arg1=%" PRIu64,
178                 context->name, msg->what, msg->arg1);
179         }
180         (void)SoftBusMutexLock(&context->lock);
181         FreeSoftBusMsg(msg);
182         context->currentMsg = NULL;
183         (void)SoftBusMutexUnlock(&context->lock);
184     }
185     (void)SoftBusMutexLock(&context->lock);
186     context->running = 0;
187     SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_INFO, "LoopTask[%s], running =0", context->name);
188     SoftBusCondBroadcast(&context->cond);
189     SoftBusCondBroadcast(&context->condRunning);
190     (void)SoftBusMutexUnlock(&context->lock);
191     if (g_isNeedDestroy == 1) {
192         LooperDeinit();
193     }
194     return NULL;
195 }
196 
StartNewLooperThread(SoftBusLooper * looper)197 static int StartNewLooperThread(SoftBusLooper *looper)
198 {
199 #ifdef __aarch64__
200 #define MAINLOOP_STACK_SIZE (2 * 1024 * 1024)
201 #else
202 #ifdef ASAN_BUILD
203 #define MAINLOOP_STACK_SIZE 10240
204 #else
205 #define MAINLOOP_STACK_SIZE 8192
206 #endif
207 #endif
208     int ret;
209     SoftBusThreadAttr threadAttr;
210     SoftBusThread tid;
211     SoftBusThreadAttrInit(&threadAttr);
212 
213     threadAttr.stackSize = MAINLOOP_STACK_SIZE;
214     ret = SoftBusThreadCreate(&tid, &threadAttr, LoopTask, looper);
215     if (ret != 0) {
216         SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_ERROR, "Init DeathProcTask ThreadAttr failed");
217         return -1;
218     }
219 
220     SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_INFO, "loop thread creating %s id %d", looper->context->name,
221         (int)(uintptr_t)tid);
222     return 0;
223 }
224 
DumpLooperLocked(const SoftBusLooperContext * context)225 static void DumpLooperLocked(const SoftBusLooperContext *context)
226 {
227     int i = 0;
228     ListNode *item = NULL;
229     LIST_FOR_EACH(item, &context->msgHead) {
230         SoftBusMessageNode *itemNode = LIST_ENTRY(item, SoftBusMessageNode, node);
231         SoftBusMessage *msg = itemNode->msg;
232         SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_DBG,
233             "DumpLooper. i=%d,handler=%s,what =%" PRId32 ",arg1=%" PRIu64 " arg2=%" PRIu64 ", time=%" PRId64,
234             i, msg->handler->name, msg->what, msg->arg1, msg->arg2, msg->time);
235         i++;
236     }
237 }
238 
DumpLooper(const SoftBusLooper * looper)239 void DumpLooper(const SoftBusLooper *looper)
240 {
241     if (looper == NULL) {
242         return;
243     }
244     SoftBusLooperContext *context = looper->context;
245     if (SoftBusMutexLock(&context->lock) != 0) {
246         SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_ERROR, "lock failed");
247         return;
248     }
249     if (looper->dumpable) {
250         DumpLooperLocked(context);
251     }
252     (void)SoftBusMutexUnlock(&context->lock);
253 }
254 
PostMessageAtTime(const SoftBusLooper * looper,SoftBusMessage * msgPost)255 static void PostMessageAtTime(const SoftBusLooper *looper, SoftBusMessage *msgPost)
256 {
257     if (looper->dumpable) {
258         SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_DBG, "[%s]PostMessageAtTime what =%d time=% " PRId64 " us",
259             looper->context->name, msgPost->what, msgPost->time);
260     }
261     if (msgPost->handler == NULL) {
262         FreeSoftBusMsg(msgPost);
263         SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_ERROR, "[%s]PostMessageAtTime. msg handler is null",
264             looper->context->name);
265         return;
266     }
267     SoftBusMessageNode *newNode = (SoftBusMessageNode *)SoftBusMalloc(sizeof(SoftBusMessageNode));
268     if (newNode == NULL) {
269         SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_ERROR, "%s:oom", __func__);
270         FreeSoftBusMsg(msgPost);
271         return;
272     }
273     ListInit(&newNode->node);
274     newNode->msg = msgPost;
275     SoftBusLooperContext *context = looper->context;
276     if (SoftBusMutexLock(&context->lock) != 0) {
277         SoftBusFree(newNode);
278         FreeSoftBusMsg(msgPost);
279         SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_ERROR, "lock failed");
280         return;
281     }
282     if (context->stop == 1) {
283         SoftBusFree(newNode);
284         FreeSoftBusMsg(msgPost);
285         (void)SoftBusMutexUnlock(&context->lock);
286         SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_ERROR, "[%s]PostMessageAtTime. running=%d,stop=1.",
287             context->name, context->running);
288         return;
289     }
290     ListNode *item = NULL;
291     ListNode *nextItem = NULL;
292     bool insert = false;
293     LIST_FOR_EACH_SAFE(item, nextItem, &context->msgHead) {
294         SoftBusMessageNode *itemNode = LIST_ENTRY(item, SoftBusMessageNode, node);
295         SoftBusMessage *msg = itemNode->msg;
296         if (msg->time > msgPost->time) {
297             ListTailInsert(item, &(newNode->node));
298             insert = true;
299             break;
300         }
301     }
302     if (!insert) {
303         ListTailInsert(&(context->msgHead), &(newNode->node));
304     }
305     context->msgSize++;
306     if (looper->dumpable) {
307         SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_DBG, "[%s]PostMessageAtTime. insert", context->name);
308         DumpLooperLocked(context);
309     }
310     SoftBusCondBroadcast(&context->cond);
311     (void)SoftBusMutexUnlock(&context->lock);
312 }
313 
LooperPostMessage(const SoftBusLooper * looper,SoftBusMessage * msg)314 static void LooperPostMessage(const SoftBusLooper *looper, SoftBusMessage *msg)
315 {
316     msg->time = UptimeMicros();
317     PostMessageAtTime(looper, msg);
318 }
319 
LooperPostMessageDelay(const SoftBusLooper * looper,SoftBusMessage * msg,uint64_t delayMillis)320 static void LooperPostMessageDelay(const SoftBusLooper *looper, SoftBusMessage *msg, uint64_t delayMillis)
321 {
322     msg->time = UptimeMicros() + (int64_t)delayMillis * TIME_THOUSANDS_MULTIPLIER;
323     PostMessageAtTime(looper, msg);
324 }
325 
WhatRemoveFunc(const SoftBusMessage * msg,void * args)326 static int WhatRemoveFunc(const SoftBusMessage *msg, void *args)
327 {
328     int32_t what = (int32_t)(intptr_t)args;
329     if (msg->what == what) {
330         return 0;
331     }
332     return 1;
333 }
334 
LoopRemoveMessageCustom(const SoftBusLooper * looper,const SoftBusHandler * handler,int (* customFunc)(const SoftBusMessage *,void *),void * args)335 static void LoopRemoveMessageCustom(const SoftBusLooper *looper, const SoftBusHandler *handler,
336     int (*customFunc)(const SoftBusMessage*, void*), void *args)
337 {
338     SoftBusLooperContext *context = looper->context;
339     if (SoftBusMutexLock(&context->lock) != 0) {
340         SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_ERROR, "lock failed");
341         return;
342     }
343     if (context->running == 0 || context->stop == 1) {
344         (void)SoftBusMutexUnlock(&context->lock);
345         return;
346     }
347     ListNode *item = NULL;
348     ListNode *nextItem = NULL;
349     LIST_FOR_EACH_SAFE(item, nextItem, &context->msgHead) {
350         SoftBusMessageNode *itemNode = LIST_ENTRY(item, SoftBusMessageNode, node);
351         SoftBusMessage *msg = itemNode->msg;
352         if (msg->handler == handler && customFunc(msg, args) == 0) {
353             SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_INFO, "[%s]LooperRemoveMessage. handler=%s, what=%d,arg1=%" PRIu64
354                 ",time=%" PRId64, context->name, handler->name, msg->what, msg->arg1, msg->time);
355             FreeSoftBusMsg(msg);
356             ListDelete(&itemNode->node);
357             SoftBusFree(itemNode);
358             context->msgSize--;
359         }
360     }
361     (void)SoftBusMutexUnlock(&context->lock);
362 }
363 
LooperRemoveMessage(const SoftBusLooper * looper,const SoftBusHandler * handler,int what)364 static void LooperRemoveMessage(const SoftBusLooper *looper, const SoftBusHandler *handler, int what)
365 {
366     LoopRemoveMessageCustom(looper, handler, WhatRemoveFunc, (void*)(intptr_t)what);
367 }
368 
SetLooperDumpable(SoftBusLooper * loop,bool dumpable)369 void SetLooperDumpable(SoftBusLooper *loop, bool dumpable)
370 {
371     if (loop == NULL) {
372         return;
373     }
374     loop->dumpable = dumpable;
375 }
376 
CreateNewLooper(const char * name)377 SoftBusLooper *CreateNewLooper(const char *name)
378 {
379     if (g_looperCnt >= MAX_LOOPER_CNT) {
380         SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_ERROR, "Looper count:%u, exceeds the maximum", g_looperCnt);
381         return NULL;
382     }
383     SoftBusLooper *looper = (SoftBusLooper *)SoftBusCalloc(sizeof(SoftBusLooper));
384     if (looper == NULL) {
385         return NULL;
386     }
387 
388     SoftBusLooperContext *context = SoftBusCalloc(sizeof(SoftBusLooperContext));
389     if (context == NULL) {
390         SoftBusFree(looper);
391         return NULL;
392     }
393 
394     if (memcpy_s(context->name, sizeof(context->name), name, strlen(name)) != EOK) {
395         SoftBusFree(looper);
396         SoftBusFree(context);
397         return NULL;
398     }
399     ListInit(&context->msgHead);
400     // init context
401     SoftBusMutexInit(&context->lock, NULL);
402     SoftBusCondInit(&context->cond);
403     SoftBusCondInit(&context->condRunning);
404     // init looper
405     looper->context = context;
406     looper->dumpable = true;
407     looper->PostMessage = LooperPostMessage;
408     looper->PostMessageDelay = LooperPostMessageDelay;
409     looper->RemoveMessage = LooperRemoveMessage;
410     looper->RemoveMessageCustom = LoopRemoveMessageCustom;
411     int ret = StartNewLooperThread(looper);
412     if (ret != 0) {
413         SoftBusFree(looper);
414         SoftBusFree(context);
415         return NULL;
416     }
417     g_looperCnt++;
418     SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_INFO, "[%s]wait looper start ok", context->name);
419     return looper;
420 }
421 
422 struct LoopConfigItem {
423     int type;
424     SoftBusLooper *looper;
425 };
426 
427 static struct LoopConfigItem g_loopConfig[] = {
428     {LOOP_TYPE_DEFAULT, NULL},
429     {LOOP_TYPE_BR_SEND, NULL},
430     {LOOP_TYPE_BR_RECV, NULL},
431     {LOOP_TYPE_P2P, NULL},
432     {LOOP_TYPE_LANE, NULL}
433 };
434 
GetLooper(int type)435 SoftBusLooper *GetLooper(int type)
436 {
437     uint32_t len = sizeof(g_loopConfig) / sizeof(struct LoopConfigItem);
438     for (uint32_t i = 0; i < len; i++) {
439         if (g_loopConfig[i].type == type) {
440             return g_loopConfig[i].looper;
441         }
442     }
443     return NULL;
444 }
445 
SetLooper(int type,SoftBusLooper * looper)446 void SetLooper(int type, SoftBusLooper *looper)
447 {
448     uint32_t len = sizeof(g_loopConfig) / sizeof(struct LoopConfigItem);
449     for (uint32_t i = 0; i < len; i++) {
450         if (g_loopConfig[i].type == type) {
451             g_loopConfig[i].looper = looper;
452         }
453     }
454 }
455 
ReleaseLooper(const SoftBusLooper * looper)456 static void ReleaseLooper(const SoftBusLooper *looper)
457 {
458     uint32_t len = sizeof(g_loopConfig) / sizeof(struct LoopConfigItem);
459     for (uint32_t i = 0; i < len; i++) {
460         if (g_loopConfig[i].looper == looper) {
461             g_loopConfig[i].looper = NULL;
462             return;
463         }
464     }
465 }
466 
DestroyLooper(SoftBusLooper * looper)467 void DestroyLooper(SoftBusLooper *looper)
468 {
469     if (looper == NULL) {
470         return;
471     }
472 
473     SoftBusLooperContext *context = looper->context;
474     if (context != NULL) {
475         (void)SoftBusMutexLock(&context->lock);
476 
477         SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_INFO, "[%s]set stop = 1", context->name);
478         context->stop = 1;
479 
480         SoftBusCondBroadcast(&context->cond);
481         (void)SoftBusMutexUnlock(&context->lock);
482         while (1) {
483             (void)SoftBusMutexLock(&context->lock);
484             SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_INFO, "[%s] get running = %d", context->name, context->running);
485             if (context->running == 0) {
486                 (void)SoftBusMutexUnlock(&context->lock);
487                 break;
488             }
489             SoftBusCondWait(&context->condRunning, &context->lock, NULL);
490             (void)SoftBusMutexUnlock(&context->lock);
491         }
492         // release msg
493         ListNode *item = NULL;
494         ListNode *nextItem = NULL;
495         LIST_FOR_EACH_SAFE(item, nextItem, &context->msgHead) {
496             SoftBusMessageNode *itemNode = LIST_ENTRY(item, SoftBusMessageNode, node);
497             SoftBusMessage *msg = itemNode->msg;
498             FreeSoftBusMsg(msg);
499             ListDelete(&itemNode->node);
500             SoftBusFree(itemNode);
501         }
502         SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_INFO, "[%s] destroy", context->name);
503         // destroy looper
504         SoftBusCondDestroy(&context->cond);
505         SoftBusCondDestroy(&context->condRunning);
506         SoftBusMutexDestroy(&context->lock);
507         SoftBusFree(context);
508         looper->context = NULL;
509     }
510     ReleaseLooper(looper);
511     SoftBusFree(looper);
512     if (g_looperCnt != 0) {
513         g_looperCnt--;
514     }
515 }
516 
LooperInit(void)517 int LooperInit(void)
518 {
519     SoftBusLooper *looper = CreateNewLooper("BusCenter");
520     if (!looper) {
521         SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_ERROR, "init looper fail.");
522         return -1;
523     }
524     SetLooper(LOOP_TYPE_DEFAULT, looper);
525     SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_INFO, "init looper success.");
526     return 0;
527 }
528 
LooperDeinit(void)529 void LooperDeinit(void)
530 {
531     uint32_t len = sizeof(g_loopConfig) / sizeof(struct LoopConfigItem);
532     for (uint32_t i = 0; i < len; i++) {
533         if (g_loopConfig[i].looper == NULL) {
534             continue;
535         }
536         (void)SoftBusMutexLock(&(g_loopConfig[i].looper->context->lock));
537         if (g_isThreadStarted == 0) {
538             g_isNeedDestroy = 1;
539             (void)SoftBusMutexUnlock(&(g_loopConfig[i].looper->context->lock));
540             return;
541         }
542         (void)SoftBusMutexUnlock(&(g_loopConfig[i].looper->context->lock));
543         DestroyLooper(g_loopConfig[i].looper);
544     }
545 }
546