1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "message_handler.h"
17
18 #include <sys/time.h>
19 #include <sys/types.h>
20
21 #include "common_list.h"
22 #include "securec.h"
23 #include "softbus_adapter_mem.h"
24 #include "softbus_adapter_thread.h"
25 #include "softbus_def.h"
26 #include "softbus_log.h"
27 #include "softbus_type_def.h"
28 #include "unistd.h"
29
30 #define LOOP_NAME_LEN 16
31 #define TIME_THOUSANDS_MULTIPLIER 1000LL
32
33 static int8_t g_isNeedDestroy = 0;
34 static int8_t g_isThreadStarted = 0;
35
36 typedef struct {
37 SoftBusMessage *msg;
38 ListNode node;
39 } SoftBusMessageNode;
40
41 struct SoftBusLooperContext {
42 ListNode msgHead;
43 char name[LOOP_NAME_LEN];
44 volatile unsigned char stop; // destroys looper, stop =1, and running =0
45 volatile unsigned char running;
46 SoftBusMessage *currentMsg;
47 unsigned int msgSize;
48 SoftBusMutex lock;
49 SoftBusMutexAttr attr;
50 SoftBusCond cond;
51 SoftBusCond condRunning;
52 };
53
UptimeMicros(void)54 static int64_t UptimeMicros(void)
55 {
56 SoftBusSysTime t;
57 t.sec = 0;
58 t.usec = 0;
59 SoftBusGetTime(&t);
60 int64_t when = t.sec * TIME_THOUSANDS_MULTIPLIER * TIME_THOUSANDS_MULTIPLIER + t.usec;
61 return when;
62 }
63
FreeSoftBusMsg(SoftBusMessage * msg)64 static void FreeSoftBusMsg(SoftBusMessage *msg)
65 {
66 if (msg->FreeMessage == NULL) {
67 SoftBusFree(msg);
68 } else {
69 msg->FreeMessage(msg);
70 }
71 }
72
MallocMessage(void)73 SoftBusMessage *MallocMessage(void)
74 {
75 SoftBusMessage *msg = (SoftBusMessage *)SoftBusMalloc(sizeof(SoftBusMessage));
76 if (msg == NULL) {
77 SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_ERROR, "malloc SoftBusMessage failed");
78 return NULL;
79 }
80 (void)memset_s(msg, sizeof(SoftBusMessage), 0, sizeof(SoftBusMessage));
81 return msg;
82 }
83
FreeMessage(SoftBusMessage * msg)84 void FreeMessage(SoftBusMessage *msg)
85 {
86 if (msg != NULL) {
87 FreeSoftBusMsg(msg);
88 }
89 }
90
LoopTask(void * arg)91 static void *LoopTask(void *arg)
92 {
93 SoftBusLooper *looper = arg;
94 SoftBusLooperContext *context = looper->context;
95 if (context == NULL) {
96 SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_ERROR, "loop context is NULL");
97 return NULL;
98 }
99
100 SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_INFO, "LoopTask[%s] running", context->name);
101
102 if (SoftBusMutexLock(&context->lock) != 0) {
103 SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_ERROR, "lock failed");
104 return NULL;
105 }
106 context->running = 1;
107 g_isThreadStarted = 1;
108 (void)SoftBusMutexUnlock(&context->lock);
109
110 for (;;) {
111 if (SoftBusMutexLock(&context->lock) != 0) {
112 SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_ERROR, "lock failed");
113 return NULL;
114 }
115 // wait
116 if (context->stop == 1) {
117 SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_INFO, "LoopTask[%s], stop ==1", context->name);
118 (void)SoftBusMutexUnlock(&context->lock);
119 break;
120 }
121
122 if (g_isNeedDestroy == 1) {
123 (void)SoftBusMutexUnlock(&context->lock);
124 break;
125 }
126
127 if (IsListEmpty(&context->msgHead)) {
128 SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_INFO, "LoopTask[%s] wait msg list empty", context->name);
129 SoftBusCondWait(&context->cond, &context->lock, NULL);
130 (void)SoftBusMutexUnlock(&context->lock);
131 continue;
132 }
133
134 int64_t now = UptimeMicros();
135 ListNode *item = context->msgHead.next;
136 SoftBusMessage *msg = NULL;
137 SoftBusMessageNode *itemNode = CONTAINER_OF(item, SoftBusMessageNode, node);
138 int64_t time = itemNode->msg->time;
139 if (now >= time) {
140 msg = itemNode->msg;
141 ListDelete(item);
142 SoftBusFree(itemNode);
143 context->msgSize--;
144 if (looper->dumpable) {
145 SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_DBG, "LoopTask[%s], get message. handle=%s,what=%d,msgSize=%u",
146 context->name, msg->handler->name, msg->what, context->msgSize);
147 }
148 } else {
149 SoftBusSysTime tv;
150 tv.sec = time / TIME_THOUSANDS_MULTIPLIER / TIME_THOUSANDS_MULTIPLIER;
151 tv.usec = time % (TIME_THOUSANDS_MULTIPLIER * TIME_THOUSANDS_MULTIPLIER) * TIME_THOUSANDS_MULTIPLIER;
152 SoftBusCondWait(&context->cond, &context->lock, &tv);
153 }
154
155 if (msg == NULL) {
156 (void)SoftBusMutexUnlock(&context->lock);
157 continue;
158 }
159 context->currentMsg = msg;
160 (void)SoftBusMutexUnlock(&context->lock);
161 if (looper->dumpable) {
162 SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_DBG, "LoopTask[%s], HandleMessage message. handle=%s,what=%d",
163 context->name, msg->handler->name, msg->what);
164 }
165
166 if (msg->handler->HandleMessage != NULL) {
167 msg->handler->HandleMessage(msg);
168 }
169 if (looper->dumpable) {
170 // Don`t print msg->handler, msg->handler->HandleMessage() may remove handler,
171 // so msg->handler maybe invalid pointer
172 SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_INFO,
173 "LoopTask[%s], after HandleMessage message. what=%d",
174 context->name, msg->what);
175 }
176 (void)SoftBusMutexLock(&context->lock);
177 FreeSoftBusMsg(msg);
178 context->currentMsg = NULL;
179 (void)SoftBusMutexUnlock(&context->lock);
180 }
181 (void)SoftBusMutexLock(&context->lock);
182 context->running = 0;
183 SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_INFO, "LoopTask[%s], running =0", context->name);
184 SoftBusCondBroadcast(&context->cond);
185 SoftBusCondBroadcast(&context->condRunning);
186 (void)SoftBusMutexUnlock(&context->lock);
187 if (g_isNeedDestroy == 1) {
188 LooperDeinit();
189 }
190 return NULL;
191 }
192
StartNewLooperThread(SoftBusLooper * looper)193 static int StartNewLooperThread(SoftBusLooper *looper)
194 {
195 #ifdef ASAN_BUILD
196 #define MAINLOOP_STACK_SIZE 10240
197 #else
198 #define MAINLOOP_STACK_SIZE 8192
199 #endif
200 int ret;
201 SoftBusThreadAttr threadAttr;
202 SoftBusThread tid;
203 SoftBusThreadAttrInit(&threadAttr);
204
205 threadAttr.stackSize = MAINLOOP_STACK_SIZE;
206 ret = SoftBusThreadCreate(&tid, &threadAttr, LoopTask, looper);
207 if (ret != 0) {
208 SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_ERROR, "Init DeathProcTask ThreadAttr failed");
209 return -1;
210 }
211
212 SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_INFO, "loop thread creating %s id %d", looper->context->name,
213 (int)(uintptr_t)tid);
214 return 0;
215 }
216
DumpLooperLocked(const SoftBusLooperContext * context)217 static void DumpLooperLocked(const SoftBusLooperContext *context)
218 {
219 int i = 0;
220 ListNode *item = NULL;
221 LIST_FOR_EACH(item, &context->msgHead) {
222 SoftBusMessageNode *itemNode = LIST_ENTRY(item, SoftBusMessageNode, node);
223 SoftBusMessage *msg = itemNode->msg;
224 SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_DBG,
225 "DumpLooper. i=%d,handler=%s,what =%d,arg1=%llu arg2=%llu, time=%lld",
226 i, msg->handler->name, msg->what, msg->arg1, msg->arg2, msg->time);
227 i++;
228 }
229 }
230
DumpLooper(const SoftBusLooper * looper)231 void DumpLooper(const SoftBusLooper *looper)
232 {
233 if (looper == NULL) {
234 return;
235 }
236 SoftBusLooperContext *context = looper->context;
237 if (SoftBusMutexLock(&context->lock) != 0) {
238 SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_ERROR, "lock failed");
239 return;
240 }
241 if (looper->dumpable) {
242 DumpLooperLocked(context);
243 }
244 (void)SoftBusMutexUnlock(&context->lock);
245 }
246
PostMessageAtTime(const SoftBusLooper * looper,SoftBusMessage * msgPost)247 static void PostMessageAtTime(const SoftBusLooper *looper, SoftBusMessage *msgPost)
248 {
249 if (looper->dumpable) {
250 SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_DBG, "[%s]PostMessageAtTime what =%d time=%lld us",
251 looper->context->name, msgPost->what, msgPost->time);
252 }
253 if (msgPost->handler == NULL) {
254 FreeSoftBusMsg(msgPost);
255 SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_ERROR, "[%s]PostMessageAtTime. msg handler is null",
256 looper->context->name);
257 return;
258 }
259 SoftBusMessageNode *newNode = (SoftBusMessageNode *)SoftBusMalloc(sizeof(SoftBusMessageNode));
260 if (newNode == NULL) {
261 FreeSoftBusMsg(msgPost);
262 return;
263 }
264 ListInit(&newNode->node);
265 newNode->msg = msgPost;
266 SoftBusLooperContext *context = looper->context;
267 if (SoftBusMutexLock(&context->lock) != 0) {
268 SoftBusFree(newNode);
269 FreeSoftBusMsg(msgPost);
270 SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_ERROR, "lock failed");
271 return;
272 }
273 if (context->stop == 1) {
274 SoftBusFree(newNode);
275 FreeSoftBusMsg(msgPost);
276 (void)SoftBusMutexUnlock(&context->lock);
277 SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_ERROR, "[%s]PostMessageAtTime. running=%d,stop=%d",
278 context->name, context->running, context->stop);
279 return;
280 }
281 ListNode *item = NULL;
282 ListNode *nextItem = NULL;
283 bool insert = false;
284 LIST_FOR_EACH_SAFE(item, nextItem, &context->msgHead) {
285 SoftBusMessageNode *itemNode = LIST_ENTRY(item, SoftBusMessageNode, node);
286 SoftBusMessage *msg = itemNode->msg;
287 if (msg->time > msgPost->time) {
288 ListTailInsert(item, &(newNode->node));
289 insert = true;
290 break;
291 }
292 }
293 if (!insert) {
294 ListTailInsert(&(context->msgHead), &(newNode->node));
295 }
296 context->msgSize++;
297 if (looper->dumpable) {
298 SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_DBG, "[%s]PostMessageAtTime. insert", context->name);
299 DumpLooperLocked(context);
300 }
301 SoftBusCondBroadcast(&context->cond);
302 (void)SoftBusMutexUnlock(&context->lock);
303 }
304
LooperPostMessage(const SoftBusLooper * looper,SoftBusMessage * msg)305 static void LooperPostMessage(const SoftBusLooper *looper, SoftBusMessage *msg)
306 {
307 msg->time = UptimeMicros();
308 PostMessageAtTime(looper, msg);
309 }
310
LooperPostMessageDelay(const SoftBusLooper * looper,SoftBusMessage * msg,uint64_t delayMillis)311 static void LooperPostMessageDelay(const SoftBusLooper *looper, SoftBusMessage *msg, uint64_t delayMillis)
312 {
313 msg->time = UptimeMicros() + (int64_t)delayMillis * TIME_THOUSANDS_MULTIPLIER;
314 PostMessageAtTime(looper, msg);
315 }
316
WhatRemoveFunc(const SoftBusMessage * msg,void * args)317 static int WhatRemoveFunc(const SoftBusMessage *msg, void *args)
318 {
319 int32_t what = (int32_t)(intptr_t)args;
320 if (msg->what == what) {
321 return 0;
322 }
323 return 1;
324 }
325
LoopRemoveMessageCustom(const SoftBusLooper * looper,const SoftBusHandler * handler,int (* customFunc)(const SoftBusMessage *,void *),void * args)326 static void LoopRemoveMessageCustom(const SoftBusLooper *looper, const SoftBusHandler *handler,
327 int (*customFunc)(const SoftBusMessage*, void*), void *args)
328 {
329 SoftBusLooperContext *context = looper->context;
330 if (SoftBusMutexLock(&context->lock) != 0) {
331 SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_ERROR, "lock failed");
332 return;
333 }
334 if (context->running == 0 || context->stop == 1) {
335 (void)SoftBusMutexUnlock(&context->lock);
336 return;
337 }
338 ListNode *item = NULL;
339 ListNode *nextItem = NULL;
340 LIST_FOR_EACH_SAFE(item, nextItem, &context->msgHead) {
341 SoftBusMessageNode *itemNode = LIST_ENTRY(item, SoftBusMessageNode, node);
342 SoftBusMessage *msg = itemNode->msg;
343 if (msg->handler == handler && customFunc(msg, args) == 0) {
344 SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_INFO, "[%s]LooperRemoveMessage. handler=%s, what =%d",
345 context->name, handler->name, msg->what);
346 FreeSoftBusMsg(msg);
347 ListDelete(&itemNode->node);
348 SoftBusFree(itemNode);
349 context->msgSize--;
350 }
351 }
352 (void)SoftBusMutexUnlock(&context->lock);
353 }
354
LooperRemoveMessage(const SoftBusLooper * looper,const SoftBusHandler * handler,int what)355 static void LooperRemoveMessage(const SoftBusLooper *looper, const SoftBusHandler *handler, int what)
356 {
357 LoopRemoveMessageCustom(looper, handler, WhatRemoveFunc, (void*)(intptr_t)what);
358 }
359
SetLooperDumpable(SoftBusLooper * loop,bool dumpable)360 void SetLooperDumpable(SoftBusLooper *loop, bool dumpable)
361 {
362 if (loop == NULL) {
363 return;
364 }
365 loop->dumpable = dumpable;
366 }
367
CreateNewLooper(const char * name)368 SoftBusLooper *CreateNewLooper(const char *name)
369 {
370 SoftBusLooper *looper = (SoftBusLooper *)SoftBusCalloc(sizeof(SoftBusLooper));
371 if (looper == NULL) {
372 return NULL;
373 }
374
375 SoftBusLooperContext *context = SoftBusCalloc(sizeof(SoftBusLooperContext));
376 if (context == NULL) {
377 SoftBusFree(looper);
378 return NULL;
379 }
380
381 if (memcpy_s(context->name, sizeof(context->name), name, strlen(name)) != EOK) {
382 SoftBusFree(looper);
383 SoftBusFree(context);
384 return NULL;
385 }
386 ListInit(&context->msgHead);
387 // init context
388 SoftBusMutexInit(&context->lock, NULL);
389 SoftBusCondInit(&context->cond);
390 SoftBusCondInit(&context->condRunning);
391 // init looper
392 looper->context = context;
393 looper->dumpable = true;
394 looper->PostMessage = LooperPostMessage;
395 looper->PostMessageDelay = LooperPostMessageDelay;
396 looper->RemoveMessage = LooperRemoveMessage;
397 looper->RemoveMessageCustom = LoopRemoveMessageCustom;
398 int ret = StartNewLooperThread(looper);
399 if (ret != 0) {
400 SoftBusFree(looper);
401 SoftBusFree(context);
402 return NULL;
403 }
404
405 SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_INFO, "[%s]wait looper start ok", context->name);
406 return looper;
407 }
408
409 struct LoopConfigItem {
410 int type;
411 SoftBusLooper *looper;
412 };
413
414 static struct LoopConfigItem g_loopConfig[] = {
415 {LOOP_TYPE_DEFAULT, NULL},
416 {LOOP_TYPE_BR_SEND, NULL},
417 {LOOP_TYPE_BR_RECV, NULL},
418 {LOOP_TYPE_P2P, NULL}
419 };
420
GetLooper(int type)421 SoftBusLooper *GetLooper(int type)
422 {
423 uint32_t len = sizeof(g_loopConfig) / sizeof(struct LoopConfigItem);
424 for (uint32_t i = 0; i < len; i++) {
425 if (g_loopConfig[i].type == type) {
426 return g_loopConfig[i].looper;
427 }
428 }
429 return NULL;
430 }
431
SetLooper(int type,SoftBusLooper * looper)432 static void SetLooper(int type, SoftBusLooper *looper)
433 {
434 uint32_t len = sizeof(g_loopConfig) / sizeof(struct LoopConfigItem);
435 for (uint32_t i = 0; i < len; i++) {
436 if (g_loopConfig[i].type == type) {
437 g_loopConfig[i].looper = looper;
438 }
439 }
440 }
441
ReleaseLooper(const SoftBusLooper * looper)442 static void ReleaseLooper(const SoftBusLooper *looper)
443 {
444 uint32_t len = sizeof(g_loopConfig) / sizeof(struct LoopConfigItem);
445 for (uint32_t i = 0; i < len; i++) {
446 if (g_loopConfig[i].looper == looper) {
447 g_loopConfig[i].looper = NULL;
448 return;
449 }
450 }
451 }
452
DestroyLooper(SoftBusLooper * looper)453 void DestroyLooper(SoftBusLooper *looper)
454 {
455 if (looper == NULL) {
456 return;
457 }
458
459 SoftBusLooperContext *context = looper->context;
460 if (context != NULL) {
461 (void)SoftBusMutexLock(&context->lock);
462
463 SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_INFO, "[%s]set stop = 1", context->name);
464 context->stop = 1;
465
466 SoftBusCondBroadcast(&context->cond);
467 (void)SoftBusMutexUnlock(&context->lock);
468 while (1) {
469 (void)SoftBusMutexLock(&context->lock);
470 SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_INFO, "[%s] get running = %d", context->name, context->running);
471 if (context->running == 0) {
472 (void)SoftBusMutexUnlock(&context->lock);
473 break;
474 }
475 SoftBusCondWait(&context->condRunning, &context->lock, NULL);
476 (void)SoftBusMutexUnlock(&context->lock);
477 }
478 // release msg
479 ListNode *item = NULL;
480 ListNode *nextItem = NULL;
481 LIST_FOR_EACH_SAFE(item, nextItem, &context->msgHead) {
482 SoftBusMessageNode *itemNode = LIST_ENTRY(item, SoftBusMessageNode, node);
483 SoftBusMessage *msg = itemNode->msg;
484 FreeSoftBusMsg(msg);
485 ListDelete(&itemNode->node);
486 SoftBusFree(itemNode);
487 }
488 SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_INFO, "[%s] destroy", context->name);
489 // destroy looper
490 SoftBusCondDestroy(&context->cond);
491 SoftBusCondDestroy(&context->condRunning);
492 SoftBusMutexDestroy(&context->lock);
493 SoftBusFree(context);
494 looper->context = NULL;
495 }
496 ReleaseLooper(looper);
497 SoftBusFree(looper);
498 }
499
LooperInit(void)500 int LooperInit(void)
501 {
502 SoftBusLooper *looper = CreateNewLooper("Loop-default");
503 if (!looper) {
504 SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_ERROR, "init looper fail.");
505 return -1;
506 }
507 SetLooper(LOOP_TYPE_DEFAULT, looper);
508 SoftBusLog(SOFTBUS_LOG_COMM, SOFTBUS_LOG_INFO, "init looper success.");
509 return 0;
510 }
511
LooperDeinit(void)512 void LooperDeinit(void)
513 {
514 uint32_t len = sizeof(g_loopConfig) / sizeof(struct LoopConfigItem);
515 for (uint32_t i = 0; i < len; i++) {
516 if (g_loopConfig[i].looper == NULL) {
517 continue;
518 }
519 (void)SoftBusMutexLock(&(g_loopConfig[i].looper->context->lock));
520 if (g_isThreadStarted == 0) {
521 g_isNeedDestroy = 1;
522 (void)SoftBusMutexUnlock(&(g_loopConfig[i].looper->context->lock));
523 return;
524 }
525 (void)SoftBusMutexUnlock(&(g_loopConfig[i].looper->context->lock));
526 DestroyLooper(g_loopConfig[i].looper);
527 }
528 }
529