• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "message_handler.h"
17 
18 #include <sys/time.h>
19 #include <sys/types.h>
20 
21 #include "common_list.h"
22 #include "securec.h"
23 #include "softbus_adapter_mem.h"
24 #include "softbus_adapter_thread.h"
25 #include "softbus_def.h"
26 #include "softbus_log.h"
27 #include "softbus_type_def.h"
28 #include "unistd.h"
29 
30 #define LOOP_NAME_LEN 16
31 #define TIME_THOUSANDS_MULTIPLIER 1000LL
32 
33 static int8_t g_isNeedDestroy = 0;
34 static int8_t g_isThreadStarted = 0;
35 
36 typedef struct {
37     SoftBusMessage *msg;
38     ListNode node;
39 } SoftBusMessageNode;
40 
41 struct SoftBusLooperContext {
42     ListNode msgHead;
43     char name[LOOP_NAME_LEN];
44     volatile unsigned char stop; // destroys looper, stop =1, and running =0
45     volatile unsigned char running;
46     SoftBusMessage *currentMsg;
47     unsigned int msgSize;
48     SoftBusMutex lock;
49     SoftBusMutexAttr attr;
50     SoftBusCond cond;
51     SoftBusCond condRunning;
52 };
53 
UptimeMicros(void)54 static int64_t UptimeMicros(void)
55 {
56     SoftBusSysTime t;
57     t.sec = 0;
58     t.usec = 0;
59     SoftBusGetTime(&t);
60     int64_t when = t.sec * TIME_THOUSANDS_MULTIPLIER * TIME_THOUSANDS_MULTIPLIER + t.usec;
61     return when;
62 }
63 
FreeSoftBusMsg(SoftBusMessage * msg)64 static void FreeSoftBusMsg(SoftBusMessage *msg)
65 {
66     if (msg->FreeMessage == NULL) {
67         SoftBusFree(msg);
68     } else {
69         msg->FreeMessage(msg);
70     }
71 }
72 
MallocMessage(void)73 SoftBusMessage *MallocMessage(void)
74 {
75     SoftBusMessage *msg = (SoftBusMessage *)SoftBusMalloc(sizeof(SoftBusMessage));
76     if (msg == NULL) {
77         SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_ERROR, "malloc SoftBusMessage failed");
78         return NULL;
79     }
80     (void)memset_s(msg, sizeof(SoftBusMessage), 0, sizeof(SoftBusMessage));
81     return msg;
82 }
83 
FreeMessage(SoftBusMessage * msg)84 void FreeMessage(SoftBusMessage *msg)
85 {
86     if (msg != NULL) {
87         FreeSoftBusMsg(msg);
88     }
89 }
90 
LoopTask(void * arg)91 static void *LoopTask(void *arg)
92 {
93     SoftBusLooper *looper = arg;
94     SoftBusLooperContext *context = looper->context;
95     if (context == NULL) {
96         SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_ERROR, "loop context is NULL");
97         return NULL;
98     }
99 
100     SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_INFO, "LoopTask[%s] running", context->name);
101 
102     if (SoftBusMutexLock(&context->lock) != 0) {
103         SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_ERROR, "lock failed");
104         return NULL;
105     }
106     context->running = 1;
107     g_isThreadStarted = 1;
108     (void)SoftBusMutexUnlock(&context->lock);
109 
110     for (;;) {
111         if (SoftBusMutexLock(&context->lock) != 0) {
112             SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_ERROR, "lock failed");
113             return NULL;
114         }
115         // wait
116         if (context->stop == 1) {
117             SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_INFO, "LoopTask[%s], stop ==1", context->name);
118             (void)SoftBusMutexUnlock(&context->lock);
119             break;
120         }
121 
122         if (g_isNeedDestroy == 1) {
123             (void)SoftBusMutexUnlock(&context->lock);
124             break;
125         }
126 
127         if (IsListEmpty(&context->msgHead)) {
128             SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_INFO, "LoopTask[%s] wait msg list empty", context->name);
129             SoftBusCondWait(&context->cond, &context->lock, NULL);
130             (void)SoftBusMutexUnlock(&context->lock);
131             continue;
132         }
133 
134         int64_t now = UptimeMicros();
135         ListNode *item = context->msgHead.next;
136         SoftBusMessage *msg = NULL;
137         SoftBusMessageNode *itemNode = CONTAINER_OF(item, SoftBusMessageNode, node);
138         int64_t time = itemNode->msg->time;
139         if (now >= time) {
140             msg = itemNode->msg;
141             ListDelete(item);
142             SoftBusFree(itemNode);
143             context->msgSize--;
144             if (looper->dumpable) {
145                 SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_DBG, "LoopTask[%s], get message. handle=%s,what=%d,msgSize=%u",
146                     context->name, msg->handler->name, msg->what, context->msgSize);
147             }
148         } else {
149             SoftBusSysTime tv;
150             tv.sec = time / TIME_THOUSANDS_MULTIPLIER / TIME_THOUSANDS_MULTIPLIER;
151             tv.usec = time % (TIME_THOUSANDS_MULTIPLIER * TIME_THOUSANDS_MULTIPLIER) * TIME_THOUSANDS_MULTIPLIER;
152             SoftBusCondWait(&context->cond, &context->lock, &tv);
153         }
154 
155         if (msg == NULL) {
156             (void)SoftBusMutexUnlock(&context->lock);
157             continue;
158         }
159         context->currentMsg = msg;
160         (void)SoftBusMutexUnlock(&context->lock);
161         if (looper->dumpable) {
162             SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_DBG, "LoopTask[%s], HandleMessage message. handle=%s,what=%d",
163                 context->name, msg->handler->name, msg->what);
164         }
165 
166         if (msg->handler->HandleMessage != NULL) {
167             msg->handler->HandleMessage(msg);
168         }
169         if (looper->dumpable) {
170             // Don`t print msg->handler, msg->handler->HandleMessage() may remove handler,
171             // so msg->handler maybe invalid pointer
172             SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_INFO,
173                 "LoopTask[%s], after HandleMessage message. what=%d",
174                 context->name, msg->what);
175         }
176         (void)SoftBusMutexLock(&context->lock);
177         FreeSoftBusMsg(msg);
178         context->currentMsg = NULL;
179         (void)SoftBusMutexUnlock(&context->lock);
180     }
181     (void)SoftBusMutexLock(&context->lock);
182     context->running = 0;
183     SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_INFO, "LoopTask[%s], running =0", context->name);
184     SoftBusCondBroadcast(&context->cond);
185     SoftBusCondBroadcast(&context->condRunning);
186     (void)SoftBusMutexUnlock(&context->lock);
187     if (g_isNeedDestroy == 1) {
188         LooperDeinit();
189     }
190     return NULL;
191 }
192 
StartNewLooperThread(SoftBusLooper * looper)193 static int StartNewLooperThread(SoftBusLooper *looper)
194 {
195 #ifdef ASAN_BUILD
196 #define MAINLOOP_STACK_SIZE 10240
197 #else
198 #define MAINLOOP_STACK_SIZE 8192
199 #endif
200     int ret;
201     SoftBusThreadAttr threadAttr;
202     SoftBusThread tid;
203     SoftBusThreadAttrInit(&threadAttr);
204 
205     threadAttr.stackSize = MAINLOOP_STACK_SIZE;
206     ret = SoftBusThreadCreate(&tid, &threadAttr, LoopTask, looper);
207     if (ret != 0) {
208         SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_ERROR, "Init DeathProcTask ThreadAttr failed");
209         return -1;
210     }
211 
212     SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_INFO, "loop thread creating %s id %d", looper->context->name,
213         (int)(uintptr_t)tid);
214     return 0;
215 }
216 
DumpLooperLocked(const SoftBusLooperContext * context)217 static void DumpLooperLocked(const SoftBusLooperContext *context)
218 {
219     int i = 0;
220     ListNode *item = NULL;
221     LIST_FOR_EACH(item, &context->msgHead) {
222         SoftBusMessageNode *itemNode = LIST_ENTRY(item, SoftBusMessageNode, node);
223         SoftBusMessage *msg = itemNode->msg;
224         SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_DBG,
225             "DumpLooper. i=%d,handler=%s,what =%d,arg1=%llu arg2=%llu, time=%lld",
226             i, msg->handler->name, msg->what, msg->arg1, msg->arg2, msg->time);
227         i++;
228     }
229 }
230 
DumpLooper(const SoftBusLooper * looper)231 void DumpLooper(const SoftBusLooper *looper)
232 {
233     if (looper == NULL) {
234         return;
235     }
236     SoftBusLooperContext *context = looper->context;
237     if (SoftBusMutexLock(&context->lock) != 0) {
238         SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_ERROR, "lock failed");
239         return;
240     }
241     if (looper->dumpable) {
242         DumpLooperLocked(context);
243     }
244     (void)SoftBusMutexUnlock(&context->lock);
245 }
246 
PostMessageAtTime(const SoftBusLooper * looper,SoftBusMessage * msgPost)247 static void PostMessageAtTime(const SoftBusLooper *looper, SoftBusMessage *msgPost)
248 {
249     if (looper->dumpable) {
250         SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_DBG, "[%s]PostMessageAtTime what =%d time=%lld us",
251             looper->context->name, msgPost->what, msgPost->time);
252     }
253     if (msgPost->handler == NULL) {
254         FreeSoftBusMsg(msgPost);
255         SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_ERROR, "[%s]PostMessageAtTime. msg handler is null",
256             looper->context->name);
257         return;
258     }
259     SoftBusMessageNode *newNode = (SoftBusMessageNode *)SoftBusMalloc(sizeof(SoftBusMessageNode));
260     if (newNode == NULL) {
261         FreeSoftBusMsg(msgPost);
262         return;
263     }
264     ListInit(&newNode->node);
265     newNode->msg = msgPost;
266     SoftBusLooperContext *context = looper->context;
267     if (SoftBusMutexLock(&context->lock) != 0) {
268         SoftBusFree(newNode);
269         FreeSoftBusMsg(msgPost);
270         SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_ERROR, "lock failed");
271         return;
272     }
273     if (context->stop == 1) {
274         SoftBusFree(newNode);
275         FreeSoftBusMsg(msgPost);
276         (void)SoftBusMutexUnlock(&context->lock);
277         SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_ERROR, "[%s]PostMessageAtTime. running=%d,stop=%d",
278             context->name, context->running, context->stop);
279         return;
280     }
281     ListNode *item = NULL;
282     ListNode *nextItem = NULL;
283     bool insert = false;
284     LIST_FOR_EACH_SAFE(item, nextItem, &context->msgHead) {
285         SoftBusMessageNode *itemNode = LIST_ENTRY(item, SoftBusMessageNode, node);
286         SoftBusMessage *msg = itemNode->msg;
287         if (msg->time > msgPost->time) {
288             ListTailInsert(item, &(newNode->node));
289             insert = true;
290             break;
291         }
292     }
293     if (!insert) {
294         ListTailInsert(&(context->msgHead), &(newNode->node));
295     }
296     context->msgSize++;
297     if (looper->dumpable) {
298         SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_DBG, "[%s]PostMessageAtTime. insert", context->name);
299         DumpLooperLocked(context);
300     }
301     SoftBusCondBroadcast(&context->cond);
302     (void)SoftBusMutexUnlock(&context->lock);
303 }
304 
LooperPostMessage(const SoftBusLooper * looper,SoftBusMessage * msg)305 static void LooperPostMessage(const SoftBusLooper *looper, SoftBusMessage *msg)
306 {
307     msg->time = UptimeMicros();
308     PostMessageAtTime(looper, msg);
309 }
310 
LooperPostMessageDelay(const SoftBusLooper * looper,SoftBusMessage * msg,uint64_t delayMillis)311 static void LooperPostMessageDelay(const SoftBusLooper *looper, SoftBusMessage *msg, uint64_t delayMillis)
312 {
313     msg->time = UptimeMicros() + (int64_t)delayMillis * TIME_THOUSANDS_MULTIPLIER;
314     PostMessageAtTime(looper, msg);
315 }
316 
WhatRemoveFunc(const SoftBusMessage * msg,void * args)317 static int WhatRemoveFunc(const SoftBusMessage *msg, void *args)
318 {
319     int32_t what = (int32_t)(intptr_t)args;
320     if (msg->what == what) {
321         return 0;
322     }
323     return 1;
324 }
325 
LoopRemoveMessageCustom(const SoftBusLooper * looper,const SoftBusHandler * handler,int (* customFunc)(const SoftBusMessage *,void *),void * args)326 static void LoopRemoveMessageCustom(const SoftBusLooper *looper, const SoftBusHandler *handler,
327     int (*customFunc)(const SoftBusMessage*, void*), void *args)
328 {
329     SoftBusLooperContext *context = looper->context;
330     if (SoftBusMutexLock(&context->lock) != 0) {
331         SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_ERROR, "lock failed");
332         return;
333     }
334     if (context->running == 0 || context->stop == 1) {
335         (void)SoftBusMutexUnlock(&context->lock);
336         return;
337     }
338     ListNode *item = NULL;
339     ListNode *nextItem = NULL;
340     LIST_FOR_EACH_SAFE(item, nextItem, &context->msgHead) {
341         SoftBusMessageNode *itemNode = LIST_ENTRY(item, SoftBusMessageNode, node);
342         SoftBusMessage *msg = itemNode->msg;
343         if (msg->handler == handler && customFunc(msg, args) == 0) {
344             SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_INFO, "[%s]LooperRemoveMessage. handler=%s, what =%d",
345                 context->name, handler->name, msg->what);
346             FreeSoftBusMsg(msg);
347             ListDelete(&itemNode->node);
348             SoftBusFree(itemNode);
349             context->msgSize--;
350         }
351     }
352     (void)SoftBusMutexUnlock(&context->lock);
353 }
354 
LooperRemoveMessage(const SoftBusLooper * looper,const SoftBusHandler * handler,int what)355 static void LooperRemoveMessage(const SoftBusLooper *looper, const SoftBusHandler *handler, int what)
356 {
357     LoopRemoveMessageCustom(looper, handler, WhatRemoveFunc, (void*)(intptr_t)what);
358 }
359 
SetLooperDumpable(SoftBusLooper * loop,bool dumpable)360 void SetLooperDumpable(SoftBusLooper *loop, bool dumpable)
361 {
362     if (loop == NULL) {
363         return;
364     }
365     loop->dumpable = dumpable;
366 }
367 
CreateNewLooper(const char * name)368 SoftBusLooper *CreateNewLooper(const char *name)
369 {
370     SoftBusLooper *looper = (SoftBusLooper *)SoftBusCalloc(sizeof(SoftBusLooper));
371     if (looper == NULL) {
372         return NULL;
373     }
374 
375     SoftBusLooperContext *context = SoftBusCalloc(sizeof(SoftBusLooperContext));
376     if (context == NULL) {
377         SoftBusFree(looper);
378         return NULL;
379     }
380 
381     if (memcpy_s(context->name, sizeof(context->name), name, strlen(name)) != EOK) {
382         SoftBusFree(looper);
383         SoftBusFree(context);
384         return NULL;
385     }
386     ListInit(&context->msgHead);
387     // init context
388     SoftBusMutexInit(&context->lock, NULL);
389     SoftBusCondInit(&context->cond);
390     SoftBusCondInit(&context->condRunning);
391     // init looper
392     looper->context = context;
393     looper->dumpable = true;
394     looper->PostMessage = LooperPostMessage;
395     looper->PostMessageDelay = LooperPostMessageDelay;
396     looper->RemoveMessage = LooperRemoveMessage;
397     looper->RemoveMessageCustom = LoopRemoveMessageCustom;
398     int ret = StartNewLooperThread(looper);
399     if (ret != 0) {
400         SoftBusFree(looper);
401         SoftBusFree(context);
402         return NULL;
403     }
404 
405     SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_INFO, "[%s]wait looper start ok", context->name);
406     return looper;
407 }
408 
409 struct LoopConfigItem {
410     int type;
411     SoftBusLooper *looper;
412 };
413 
414 static struct LoopConfigItem g_loopConfig[] = {
415     {LOOP_TYPE_DEFAULT, NULL},
416     {LOOP_TYPE_BR_SEND, NULL},
417     {LOOP_TYPE_BR_RECV, NULL},
418     {LOOP_TYPE_P2P, NULL}
419 };
420 
GetLooper(int type)421 SoftBusLooper *GetLooper(int type)
422 {
423     uint32_t len = sizeof(g_loopConfig) / sizeof(struct LoopConfigItem);
424     for (uint32_t i = 0; i < len; i++) {
425         if (g_loopConfig[i].type == type) {
426             return g_loopConfig[i].looper;
427         }
428     }
429     return NULL;
430 }
431 
SetLooper(int type,SoftBusLooper * looper)432 static void SetLooper(int type, SoftBusLooper *looper)
433 {
434     uint32_t len = sizeof(g_loopConfig) / sizeof(struct LoopConfigItem);
435     for (uint32_t i = 0; i < len; i++) {
436         if (g_loopConfig[i].type == type) {
437             g_loopConfig[i].looper = looper;
438         }
439     }
440 }
441 
ReleaseLooper(const SoftBusLooper * looper)442 static void ReleaseLooper(const SoftBusLooper *looper)
443 {
444     uint32_t len = sizeof(g_loopConfig) / sizeof(struct LoopConfigItem);
445     for (uint32_t i = 0; i < len; i++) {
446         if (g_loopConfig[i].looper == looper) {
447             g_loopConfig[i].looper = NULL;
448             return;
449         }
450     }
451 }
452 
DestroyLooper(SoftBusLooper * looper)453 void DestroyLooper(SoftBusLooper *looper)
454 {
455     if (looper == NULL) {
456         return;
457     }
458 
459     SoftBusLooperContext *context = looper->context;
460     if (context != NULL) {
461         (void)SoftBusMutexLock(&context->lock);
462 
463         SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_INFO, "[%s]set stop = 1", context->name);
464         context->stop = 1;
465 
466         SoftBusCondBroadcast(&context->cond);
467         (void)SoftBusMutexUnlock(&context->lock);
468         while (1) {
469             (void)SoftBusMutexLock(&context->lock);
470             SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_INFO, "[%s] get running = %d", context->name, context->running);
471             if (context->running == 0) {
472                 (void)SoftBusMutexUnlock(&context->lock);
473                 break;
474             }
475             SoftBusCondWait(&context->condRunning, &context->lock, NULL);
476             (void)SoftBusMutexUnlock(&context->lock);
477         }
478         // release msg
479         ListNode *item = NULL;
480         ListNode *nextItem = NULL;
481         LIST_FOR_EACH_SAFE(item, nextItem, &context->msgHead) {
482             SoftBusMessageNode *itemNode = LIST_ENTRY(item, SoftBusMessageNode, node);
483             SoftBusMessage *msg = itemNode->msg;
484             FreeSoftBusMsg(msg);
485             ListDelete(&itemNode->node);
486             SoftBusFree(itemNode);
487         }
488         SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_INFO, "[%s] destroy", context->name);
489         // destroy looper
490         SoftBusCondDestroy(&context->cond);
491         SoftBusCondDestroy(&context->condRunning);
492         SoftBusMutexDestroy(&context->lock);
493         SoftBusFree(context);
494         looper->context = NULL;
495     }
496     ReleaseLooper(looper);
497     SoftBusFree(looper);
498 }
499 
LooperInit(void)500 int LooperInit(void)
501 {
502     SoftBusLooper *looper = CreateNewLooper("Loop-default");
503     if (!looper) {
504         SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_ERROR, "init looper fail.");
505         return -1;
506     }
507     SetLooper(LOOP_TYPE_DEFAULT, looper);
508     SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_INFO, "init looper success.");
509     return 0;
510 }
511 
LooperDeinit(void)512 void LooperDeinit(void)
513 {
514     uint32_t len = sizeof(g_loopConfig) / sizeof(struct LoopConfigItem);
515     for (uint32_t i = 0; i < len; i++) {
516         if (g_loopConfig[i].looper == NULL) {
517             continue;
518         }
519         (void)SoftBusMutexLock(&(g_loopConfig[i].looper->context->lock));
520         if (g_isThreadStarted == 0) {
521             g_isNeedDestroy = 1;
522             (void)SoftBusMutexUnlock(&(g_loopConfig[i].looper->context->lock));
523             return;
524         }
525         (void)SoftBusMutexUnlock(&(g_loopConfig[i].looper->context->lock));
526         DestroyLooper(g_loopConfig[i].looper);
527     }
528 }
529