1 /*
2 * Copyright (c) 2021-2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "event_runner.h"
17
18 #include <condition_variable>
19 #include <mutex>
20 #include <sstream>
21 #include <thread>
22 #include <unordered_map>
23 #include <vector>
24
25 #include <unistd.h>
26 #include <sys/prctl.h>
27 #include <sys/syscall.h>
28
29 #include "event_handler.h"
30 #include "event_inner_runner.h"
31 #include "event_logger.h"
32 #include "securec.h"
33 #include "singleton.h"
34
35
36 namespace OHOS {
37 namespace AppExecFwk {
38 namespace {
39 const char *g_crashEmptyDumpInfo = "Current Event Caller is empty. Nothing to dump";
40 const int CRASH_BUF_MIN_LEN = 2;
41 thread_local static Caller g_currentEventCaller = {};
42 thread_local static std::string g_currentEventName = {};
43
44 DEFINE_EH_HILOG_LABEL("EventRunner");
45
46 // Invoke system call to set name of current thread.
SystemCallSetThreadName(const std::string & name)47 inline void SystemCallSetThreadName(const std::string &name)
48 {
49 if (prctl(PR_SET_NAME, name.c_str()) < 0) {
50 char errmsg[MAX_ERRORMSG_LEN] = {0};
51 GetLastErr(errmsg, MAX_ERRORMSG_LEN);
52 HILOGE("Failed to set thread name, %{public}s", errmsg);
53 }
54 HILOGD("thread name is %{public}s", name.c_str());
55 }
56
57 // Help to calculate hash code of object.
58 template<typename T>
CalculateHashCode(const T & obj)59 inline size_t CalculateHashCode(const T &obj)
60 {
61 std::hash<T> calculateHashCode;
62 return calculateHashCode(obj);
63 }
64
65 // Thread collector is used to reclaim thread that needs to finish running.
66 class ThreadCollector : public DelayedRefSingleton<ThreadCollector> {
67 DECLARE_DELAYED_REF_SINGLETON(ThreadCollector);
68
69 public:
70 DISALLOW_COPY_AND_MOVE(ThreadCollector);
71
72 using ExitFunction = std::function<void()>;
73
ReclaimCurrentThread()74 void ReclaimCurrentThread()
75 {
76 // Get id of current thread.
77 auto threadId = std::this_thread::get_id();
78 HILOGD("Thread id: %{public}zu", CalculateHashCode(threadId));
79
80 {
81 // Add thread id to list and notify to reclaim.
82 std::lock_guard<std::mutex> lock(collectorLock_);
83 if (destroying_) {
84 HILOGI("Thread collector is destroying");
85 return;
86 }
87
88 reclaims_.emplace_back(threadId);
89 if (isWaiting_) {
90 condition_.notify_one();
91 }
92 }
93
94 if (threadLock_.try_lock()) {
95 if ((!thread_) && (needCreateThread_)) {
96 // Start daemon thread to collect finished threads, if not exist.
97 thread_ = std::make_unique<std::thread>(&ThreadCollector::Run, this);
98 }
99 threadLock_.unlock();
100 }
101 }
102
Deposit(std::unique_ptr<std::thread> & thread,const ExitFunction & threadExit)103 bool Deposit(std::unique_ptr<std::thread> &thread, const ExitFunction &threadExit)
104 {
105 if ((!thread) || (!thread->joinable()) || (!threadExit)) {
106 auto threadState = thread ? (thread->joinable() ? "active" : "finished") : "null";
107 HILOGE("%{public}s, %{public}s: Invalid parameter", threadState, threadExit ? "valid" : "null");
108 return false;
109 }
110
111 auto threadId = thread->get_id();
112 HILOGD("New thread id: %{public}zu", CalculateHashCode(threadId));
113 // Save these information into map.
114 std::lock_guard<std::mutex> lock(collectorLock_);
115 if (destroying_) {
116 HILOGW("Collector thread is destroying");
117 return false;
118 }
119 // Save thread object and its exit callback.
120 depositMap_.emplace(threadId,
121 ThreadExitInfo {
122 .thread = std::move(thread),
123 .threadExit = threadExit,
124 });
125 return true;
126 }
127
128 private:
129 DEFINE_EH_HILOG_LABEL("ThreadCollector");
130
131 struct ThreadExitInfo {
132 std::unique_ptr<std::thread> thread;
133 ExitFunction threadExit;
134 };
135
ReclaimAll()136 inline void ReclaimAll()
137 {
138 HILOGD("enter");
139 std::unique_lock<std::mutex> lock(collectorLock_);
140 // All thread deposited need to stop one by one.
141 while (!depositMap_.empty()) {
142 DoReclaimLocked(lock, depositMap_.begin());
143 }
144 }
145
Stop()146 void Stop()
147 {
148 HILOGD("enter");
149 {
150 // Stop the collect thread, while destruction of collector.
151 std::lock_guard<std::mutex> lock(collectorLock_);
152 destroying_ = true;
153 if (isWaiting_) {
154 condition_.notify_all();
155 }
156 }
157
158 {
159 std::lock_guard<std::mutex> lock(threadLock_);
160 if ((thread_) && (thread_->joinable())) {
161 // Wait utils collect thread finished, if exists.
162 thread_->join();
163 }
164 needCreateThread_ = false;
165 }
166
167 ReclaimAll();
168 }
169
DoReclaimLocked(std::unique_lock<std::mutex> & lock,std::unordered_map<std::thread::id,ThreadExitInfo>::iterator it,bool needCallExit=true)170 void DoReclaimLocked(std::unique_lock<std::mutex> &lock,
171 std::unordered_map<std::thread::id, ThreadExitInfo>::iterator it, bool needCallExit = true)
172 {
173 if (it == depositMap_.end()) {
174 return;
175 }
176
177 // Remove thread information from map.
178 auto threadId = it->first;
179 auto exitInfo = std::move(it->second);
180 (void)depositMap_.erase(it);
181
182 // Unlock, because stopping thread maybe spend lot of time, it should be out of the lock.
183 lock.unlock();
184
185 size_t hashThreadId = CalculateHashCode(threadId);
186 HILOGD("Thread id: %{public}zu", hashThreadId);
187 if (needCallExit) {
188 // Call exit callback to stop loop in thread.
189 exitInfo.threadExit();
190 }
191 // Wait until thread finished.
192 exitInfo.thread->join();
193 HILOGD("Done, thread id: %{public}zu", hashThreadId);
194
195 // Lock again.
196 lock.lock();
197 }
198
Run()199 void Run()
200 {
201 HILOGD("Collector thread is started");
202
203 std::unique_lock<std::mutex> lock(collectorLock_);
204 while (!destroying_) {
205 // Reclaim threads in list one by one.
206 while (!reclaims_.empty()) {
207 auto threadId = reclaims_.back();
208 reclaims_.pop_back();
209 DoReclaimLocked(lock, depositMap_.find(threadId), false);
210 }
211
212 // Maybe stop running while doing reclaim, so check before waiting.
213 if (destroying_) {
214 break;
215 }
216
217 isWaiting_ = true;
218 condition_.wait(lock);
219 isWaiting_ = false;
220 }
221
222 HILOGD("Collector thread is stopped");
223 }
224
225 std::mutex collectorLock_;
226 std::condition_variable condition_;
227 bool isWaiting_ {false};
228 bool destroying_ {false};
229 std::vector<std::thread::id> reclaims_;
230 std::unordered_map<std::thread::id, ThreadExitInfo> depositMap_;
231
232 std::mutex threadLock_;
233 // Thread for collector
234 std::unique_ptr<std::thread> thread_;
235 bool needCreateThread_ {true};
236
237 // Avatar of thread collector, used to stop collector at the specified opportunity.
238 class Avatar {
239 public:
240 DISALLOW_COPY_AND_MOVE(Avatar);
241
242 Avatar() = default;
~Avatar()243 ~Avatar()
244 {
245 HILOGD("enter");
246 if (avatarEnabled_) {
247 GetInstance().avatarDestructed_ = true;
248 GetInstance().Stop();
249 }
250 }
251
Disable()252 static inline void Disable()
253 {
254 avatarEnabled_ = false;
255 }
256 };
257
258 // Mark whether avatar is destructed.
259 volatile bool avatarDestructed_ {false};
260 // Mark whether avatar is enabled.
261 static volatile bool avatarEnabled_;
262 static Avatar avatar_;
263 };
264
ThreadCollector()265 ThreadCollector::ThreadCollector()
266 : collectorLock_(), condition_(), reclaims_(), depositMap_(), threadLock_(), thread_(nullptr)
267 {
268 // Thread collector is created, so enable avatar.
269 HILOGD("enter");
270 avatarEnabled_ = true;
271 }
272
~ThreadCollector()273 ThreadCollector::~ThreadCollector()
274 {
275 // If avatar is not destructed, stop collector by itself.
276 HILOGD("enter");
277 if (!avatarDestructed_) {
278 avatar_.Disable();
279 Stop();
280 }
281 }
282
283 typedef void(*ThreadInfoCallback)(char *buf, size_t len, void *ucontext);
284 extern "C" void SetThreadInfoCallback(ThreadInfoCallback func) __attribute__((weak));
285
286 class EventRunnerImpl final : public EventInnerRunner {
287 public:
EventRunnerImpl(const std::shared_ptr<EventRunner> & runner)288 explicit EventRunnerImpl(const std::shared_ptr<EventRunner> &runner) : EventInnerRunner(runner)
289 {
290 HILOGD("enter");
291 queue_ = std::make_shared<EventQueue>();
292 }
293
~EventRunnerImpl()294 ~EventRunnerImpl() final
295 {
296 HILOGD("enter");
297 queue_->RemoveAll();
298 }
299 DISALLOW_COPY_AND_MOVE(EventRunnerImpl);
300
ThreadMain(const std::weak_ptr<EventRunnerImpl> & wp)301 static void ThreadMain(const std::weak_ptr<EventRunnerImpl> &wp)
302 {
303 HILOGD("enter");
304 if (SetThreadInfoCallback != nullptr) {
305 SetThreadInfoCallback(CrashCallback);
306 }
307 std::shared_ptr<EventRunnerImpl> inner = wp.lock();
308 if (inner) {
309 HILOGD("Start running for thread '%{public}s'", inner->threadName_.c_str());
310
311 // Call system call to modify thread name.
312 SystemCallSetThreadName(inner->threadName_);
313
314 // Enter event loop.
315 inner->Run();
316
317 HILOGD("Stopped running for thread '%{public}s'", inner->threadName_.c_str());
318 } else {
319 HILOGW("EventRunner has been released just after its creation");
320 }
321
322 // Reclaim current thread.
323 ThreadCollector::GetInstance().ReclaimCurrentThread();
324 }
325
Run()326 void Run() final
327 {
328 HILOGD("enter");
329 // Prepare to start event loop.
330 queue_->Prepare();
331
332 // Make sure instance of 'EventRunner' exists.
333 if (owner_.expired()) {
334 return;
335 }
336
337 threadId_ = std::this_thread::get_id();
338 kernelThreadId_ = getproctid();
339
340 // Save old event runner.
341 std::weak_ptr<EventRunner> oldRunner = currentEventRunner;
342 // Set current event runner into thread local data.
343 currentEventRunner = owner_;
344
345 // Start event looper.
346 for (auto event = queue_->GetEvent(); event; event = queue_->GetEvent()) {
347 std::shared_ptr<EventHandler> handler = event->GetOwner();
348 // Make sure owner of the event exists.
349 if (handler) {
350 std::shared_ptr<Logger> logging = logger_;
351 if (logging != nullptr) {
352 if (!event->HasTask()) {
353 InnerEvent::EventId eventId = event->GetInnerEventIdEx();
354 if (eventId.index() == TYPE_U32_INDEX) {
355 logging->Log(
356 "Dispatching to handler event id = " + std::to_string(std::get<uint32_t>(eventId)));
357 } else {
358 logging->Log("Dispatching to handler event id = " + std::get<std::string>(eventId));
359 }
360 } else {
361 logging->Log("Dispatching to handler event task name = " + event->GetTaskName());
362 }
363 }
364
365 SetCurrentEventInfo(event);
366 queue_->PushHistoryQueueBeforeDistribute(event);
367 handler->DistributeEvent(event);
368 queue_->PushHistoryQueueAfterDistribute();
369 ClearCurrentEventInfo();
370 }
371 // Release event manually, otherwise event will be released until next event coming.
372 event.reset();
373 }
374
375 // Restore current event runner.
376 currentEventRunner = oldRunner;
377 }
378
Stop()379 void Stop() final
380 {
381 HILOGD("enter");
382 queue_->Finish();
383 }
384
Attach(std::unique_ptr<std::thread> & thread)385 inline bool Attach(std::unique_ptr<std::thread> &thread)
386 {
387 auto exitThread = [queue = queue_]() { queue->Finish(); };
388
389 return ThreadCollector::GetInstance().Deposit(thread, exitThread);
390 }
391
SetThreadName(const std::string & threadName)392 inline void SetThreadName(const std::string &threadName)
393 {
394 static std::atomic<uint32_t> idGenerator(1);
395
396 if (threadName.empty()) {
397 // Generate a default name
398 threadName_ = "EventRunner#";
399 threadName_ += std::to_string(idGenerator++);
400 } else {
401 threadName_ = threadName;
402 }
403 }
404
405 private:
406 DEFINE_EH_HILOG_LABEL("EventRunnerImpl");
407
408 static void CrashCallback(char *buf, size_t len, void *ucontext);
409
SetCurrentEventInfo(const InnerEvent::Pointer & event)410 void SetCurrentEventInfo(const InnerEvent::Pointer &event)
411 {
412 g_currentEventCaller = event->GetCaller();
413 if (event->HasTask()) {
414 g_currentEventName = event->GetTaskName();
415 } else {
416 InnerEvent::EventId eventId = event->GetInnerEventIdEx();
417 if (eventId.index() == TYPE_U32_INDEX) {
418 g_currentEventName = std::to_string(std::get<uint32_t>(eventId));
419 } else {
420 g_currentEventName = std::get<std::string>(eventId);
421 }
422 }
423 }
424
ClearCurrentEventInfo()425 inline void ClearCurrentEventInfo()
426 {
427 g_currentEventCaller = {};
428 g_currentEventName.clear();
429 }
430 };
431 } // unnamed namespace
432
CrashCallback(char * buf,size_t len,void * ucontext)433 void EventRunnerImpl::CrashCallback(char *buf, size_t len, void *ucontext)
434 {
435 if (len < CRASH_BUF_MIN_LEN) {
436 return;
437 }
438 if (memset_s(buf, len, 0x00, len) != EOK) {
439 HILOGE("memset_s failed");
440 return;
441 }
442
443 if (!g_currentEventName.empty()) {
444 const char* file = g_currentEventCaller.file_.c_str();
445 const char* func = g_currentEventCaller.func_.c_str();
446 const char* eventName = g_currentEventName.c_str();
447 int line = g_currentEventCaller.line_;
448 if (snprintf_s(buf, len, len - 1, "Current Event Caller info: [%s(%s:%d)]. EventName is '%s'",
449 file, func, line, eventName) < 0) {
450 HILOGE("snprintf_s failed");
451 return;
452 }
453 return;
454 }
455
456 if (memcpy_s(buf, len - 1, g_crashEmptyDumpInfo, len - 1) != EOK) {
457 HILOGE("memcpy_s failed");
458 return;
459 }
460 }
461
EventInnerRunner(const std::shared_ptr<EventRunner> & runner)462 EventInnerRunner::EventInnerRunner(const std::shared_ptr<EventRunner> &runner)
463 : queue_(nullptr), owner_(runner), logger_(nullptr), threadName_(""), threadId_()
464 {
465 HILOGD("enter");
466 }
467
GetCurrentEventRunner()468 std::shared_ptr<EventRunner> EventInnerRunner::GetCurrentEventRunner()
469 {
470 const std::weak_ptr<EventRunner> &wp = currentEventRunner;
471 return wp.lock();
472 }
473
474 ThreadLocalData<std::weak_ptr<EventRunner>> EventInnerRunner::currentEventRunner;
475
476 namespace {
477 volatile bool ThreadCollector::avatarEnabled_ = false;
478
479 /*
480 * All event runners are relied on 'currentEventRunner', so make sure destruction of 'currentEventRunner'
481 * should after all event runners finished. All event runners finished in destruction of 'ThreadCollector::Avatar',
482 * so instance of 'ThreadCollector::Avatar' MUST defined after 'currentEventRunner'.
483 */
484 ThreadCollector::Avatar ThreadCollector::avatar_;
485 } // unnamed namespace
486
487 std::shared_ptr<EventRunner> EventRunner::mainRunner_;
488
Create(bool inNewThread)489 std::shared_ptr<EventRunner> EventRunner::Create(bool inNewThread)
490 {
491 HILOGD("inNewThread is %{public}d", inNewThread);
492 if (inNewThread) {
493 HILOGI("EventRunner created");
494 return Create(std::string());
495 }
496
497 // Constructor of 'EventRunner' is private, could not use 'std::make_shared' to construct it.
498 std::shared_ptr<EventRunner> sp(new (std::nothrow) EventRunner(false));
499 if (sp == nullptr) {
500 HILOGI("Failed to create EventRunner Instance");
501 return nullptr;
502 }
503 auto innerRunner = std::make_shared<EventRunnerImpl>(sp);
504 sp->innerRunner_ = innerRunner;
505 sp->queue_ = innerRunner->GetEventQueue();
506
507 return sp;
508 }
509
Create(const std::string & threadName)510 std::shared_ptr<EventRunner> EventRunner::Create(const std::string &threadName)
511 {
512 HILOGD("threadName is %{public}s", threadName.c_str());
513 // Constructor of 'EventRunner' is private, could not use 'std::make_shared' to construct it.
514 std::shared_ptr<EventRunner> sp(new EventRunner(true));
515 auto innerRunner = std::make_shared<EventRunnerImpl>(sp);
516 sp->innerRunner_ = innerRunner;
517 sp->queue_ = innerRunner->GetEventQueue();
518
519 // Start new thread
520 innerRunner->SetThreadName(threadName);
521 auto thread =
522 std::make_unique<std::thread>(EventRunnerImpl::ThreadMain, std::weak_ptr<EventRunnerImpl>(innerRunner));
523 if (!innerRunner->Attach(thread)) {
524 HILOGW("Failed to attach thread, maybe process is exiting");
525 innerRunner->Stop();
526 thread->join();
527 }
528
529 return sp;
530 }
531
Current()532 std::shared_ptr<EventRunner> EventRunner::Current()
533 {
534 auto runner = EventInnerRunner::GetCurrentEventRunner();
535 if (runner) {
536 return runner;
537 }
538 return nullptr;
539 }
540
EventRunner(bool deposit)541 EventRunner::EventRunner(bool deposit) : deposit_(deposit)
542 {
543 HILOGD("deposit_ is %{public}d", deposit_);
544 }
545
GetRunnerThreadName() const546 std::string EventRunner::GetRunnerThreadName() const
547 {
548 return innerRunner_->GetThreadName();
549 }
550
~EventRunner()551 EventRunner::~EventRunner()
552 {
553 HILOGD("deposit_ is %{public}d", deposit_);
554 if (deposit_ && innerRunner_ != nullptr) {
555 innerRunner_->Stop();
556 }
557 }
558
Run()559 ErrCode EventRunner::Run()
560 {
561 HILOGD("enter");
562 if (deposit_) {
563 HILOGE("Do not call, if event runner is deposited");
564 return EVENT_HANDLER_ERR_RUNNER_NO_PERMIT;
565 }
566
567 // Avoid more than one thread to start this runner.
568 if (running_.exchange(true)) {
569 HILOGW("Already running");
570 return EVENT_HANDLER_ERR_RUNNER_ALREADY;
571 }
572
573 // Entry event loop.
574 innerRunner_->Run();
575
576 running_.store(false);
577 return ERR_OK;
578 }
579
Stop()580 ErrCode EventRunner::Stop()
581 {
582 HILOGD("enter");
583 if (deposit_) {
584 HILOGE("Stop: Do not call, if event runner is deposited");
585 return EVENT_HANDLER_ERR_RUNNER_NO_PERMIT;
586 }
587
588 if (running_.load()) {
589 innerRunner_->Stop();
590 } else {
591 HILOGW("Stop: Already stopped");
592 }
593
594 return ERR_OK;
595 }
596
Dump(Dumper & dumper)597 void EventRunner::Dump(Dumper &dumper)
598 {
599 if (!IsRunning()) {
600 dumper.Dump(dumper.GetTag() + " Event runner is not running" + std::string(LINE_SEPARATOR));
601 return;
602 }
603
604 if (queue_ == nullptr) {
605 dumper.Dump(dumper.GetTag() + " Queue is nullLINE_SEPARATOR" + std::string(LINE_SEPARATOR));
606 return;
607 }
608
609 dumper.Dump(dumper.GetTag() + " Event runner (" + "Thread name = " + innerRunner_->GetThreadName() +
610 ", Thread ID = " + std::to_string(GetKernelThreadId()) + ") is running" + std::string(LINE_SEPARATOR));
611 queue_->Dump(dumper);
612 }
613
DumpRunnerInfo(std::string & runnerInfo)614 void EventRunner::DumpRunnerInfo(std::string& runnerInfo)
615 {
616 if (!IsRunning()) {
617 runnerInfo = " Event runner is not running" + std::string(LINE_SEPARATOR);
618 }
619
620 if (queue_ == nullptr) {
621 runnerInfo = " Queue is null" + std::string(LINE_SEPARATOR);
622 return;
623 }
624
625 std::string queueInfo;
626 queue_->DumpQueueInfo(queueInfo);
627 runnerInfo += queueInfo;
628 }
629
SetLogger(const std::shared_ptr<Logger> & logger)630 void EventRunner::SetLogger(const std::shared_ptr<Logger> &logger)
631 {
632 innerRunner_->SetLogger(logger);
633 }
634
GetCurrentEventQueue()635 std::shared_ptr<EventQueue> EventRunner::GetCurrentEventQueue()
636 {
637 auto runner = EventRunner::Current();
638 if (!runner) {
639 HILOGE("current runner is nullptr");
640 return nullptr;
641 }
642 return runner->queue_;
643 }
644
GetThreadId()645 uint64_t EventRunner::GetThreadId()
646 {
647 std::thread::id tid = innerRunner_->GetThreadId();
648 std::stringstream buf;
649 buf << tid;
650 std::string stid = buf.str();
651 return std::stoull(stid);
652 }
653
GetKernelThreadId()654 uint64_t EventRunner::GetKernelThreadId()
655 {
656 return innerRunner_->GetKernelThreadId();
657 }
658
IsCurrentRunnerThread()659 bool EventRunner::IsCurrentRunnerThread()
660 {
661 return std::this_thread::get_id() == innerRunner_->GetThreadId();
662 }
663
GetMainEventRunner()664 std::shared_ptr<EventRunner> EventRunner::GetMainEventRunner()
665 {
666 if (!mainRunner_) {
667 mainRunner_ = Create(false);
668 if (!mainRunner_) {
669 HILOGE("mainRunner_ create fail");
670 }
671 }
672
673 return mainRunner_;
674 }
675 } // namespace AppExecFwk
676 } // namespace OHOS
677