• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021-2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "event_runner.h"
17 
18 #include <condition_variable>
19 #include <mutex>
20 #include <sstream>
21 #include <thread>
22 #include <unordered_map>
23 #include <vector>
24 
25 #include <unistd.h>
26 #include <sys/prctl.h>
27 #include <sys/syscall.h>
28 
29 #include "event_handler.h"
30 #include "event_inner_runner.h"
31 #include "event_logger.h"
32 #include "securec.h"
33 #include "singleton.h"
34 
35 
36 namespace OHOS {
37 namespace AppExecFwk {
38 namespace {
39 const char *g_crashEmptyDumpInfo = "Current Event Caller is empty. Nothing to dump";
40 const int CRASH_BUF_MIN_LEN = 2;
41 thread_local static Caller g_currentEventCaller = {};
42 thread_local static std::string g_currentEventName = {};
43 
44 DEFINE_EH_HILOG_LABEL("EventRunner");
45 
46 // Invoke system call to set name of current thread.
SystemCallSetThreadName(const std::string & name)47 inline void SystemCallSetThreadName(const std::string &name)
48 {
49     if (prctl(PR_SET_NAME, name.c_str()) < 0) {
50         char errmsg[MAX_ERRORMSG_LEN] = {0};
51         GetLastErr(errmsg, MAX_ERRORMSG_LEN);
52         HILOGE("Failed to set thread name, %{public}s", errmsg);
53     }
54     HILOGD("thread name is %{public}s", name.c_str());
55 }
56 
57 // Help to calculate hash code of object.
58 template<typename T>
CalculateHashCode(const T & obj)59 inline size_t CalculateHashCode(const T &obj)
60 {
61     std::hash<T> calculateHashCode;
62     return calculateHashCode(obj);
63 }
64 
65 // Thread collector is used to reclaim thread that needs to finish running.
66 class ThreadCollector : public DelayedRefSingleton<ThreadCollector> {
67     DECLARE_DELAYED_REF_SINGLETON(ThreadCollector);
68 
69 public:
70     DISALLOW_COPY_AND_MOVE(ThreadCollector);
71 
72     using ExitFunction = std::function<void()>;
73 
ReclaimCurrentThread()74     void ReclaimCurrentThread()
75     {
76         // Get id of current thread.
77         auto threadId = std::this_thread::get_id();
78         HILOGD("Thread id: %{public}zu", CalculateHashCode(threadId));
79 
80         {
81             // Add thread id to list and notify to reclaim.
82             std::lock_guard<std::mutex> lock(collectorLock_);
83             if (destroying_) {
84                 HILOGI("Thread collector is destroying");
85                 return;
86             }
87 
88             reclaims_.emplace_back(threadId);
89             if (isWaiting_) {
90                 condition_.notify_one();
91             }
92         }
93 
94         if (threadLock_.try_lock()) {
95             if ((!thread_) && (needCreateThread_)) {
96                 // Start daemon thread to collect finished threads, if not exist.
97                 thread_ = std::make_unique<std::thread>(&ThreadCollector::Run, this);
98             }
99             threadLock_.unlock();
100         }
101     }
102 
Deposit(std::unique_ptr<std::thread> & thread,const ExitFunction & threadExit)103     bool Deposit(std::unique_ptr<std::thread> &thread, const ExitFunction &threadExit)
104     {
105         if ((!thread) || (!thread->joinable()) || (!threadExit)) {
106             auto threadState = thread ? (thread->joinable() ? "active" : "finished") : "null";
107             HILOGE("%{public}s, %{public}s: Invalid parameter", threadState, threadExit ? "valid" : "null");
108             return false;
109         }
110 
111         auto threadId = thread->get_id();
112         HILOGD("New thread id: %{public}zu", CalculateHashCode(threadId));
113         // Save these information into map.
114         std::lock_guard<std::mutex> lock(collectorLock_);
115         if (destroying_) {
116             HILOGW("Collector thread is destroying");
117             return false;
118         }
119         // Save thread object and its exit callback.
120         depositMap_.emplace(threadId,
121             ThreadExitInfo {
122                 .thread = std::move(thread),
123                 .threadExit = threadExit,
124             });
125         return true;
126     }
127 
128 private:
129     DEFINE_EH_HILOG_LABEL("ThreadCollector");
130 
131     struct ThreadExitInfo {
132         std::unique_ptr<std::thread> thread;
133         ExitFunction threadExit;
134     };
135 
ReclaimAll()136     inline void ReclaimAll()
137     {
138         HILOGD("enter");
139         std::unique_lock<std::mutex> lock(collectorLock_);
140         // All thread deposited need to stop one by one.
141         while (!depositMap_.empty()) {
142             DoReclaimLocked(lock, depositMap_.begin());
143         }
144     }
145 
Stop()146     void Stop()
147     {
148         HILOGD("enter");
149         {
150             // Stop the collect thread, while destruction of collector.
151             std::lock_guard<std::mutex> lock(collectorLock_);
152             destroying_ = true;
153             if (isWaiting_) {
154                 condition_.notify_all();
155             }
156         }
157 
158         {
159             std::lock_guard<std::mutex> lock(threadLock_);
160             if ((thread_) && (thread_->joinable())) {
161                 // Wait utils collect thread finished, if exists.
162                 thread_->join();
163             }
164             needCreateThread_ = false;
165         }
166 
167         ReclaimAll();
168     }
169 
DoReclaimLocked(std::unique_lock<std::mutex> & lock,std::unordered_map<std::thread::id,ThreadExitInfo>::iterator it,bool needCallExit=true)170     void DoReclaimLocked(std::unique_lock<std::mutex> &lock,
171         std::unordered_map<std::thread::id, ThreadExitInfo>::iterator it, bool needCallExit = true)
172     {
173         if (it == depositMap_.end()) {
174             return;
175         }
176 
177         // Remove thread information from map.
178         auto threadId = it->first;
179         auto exitInfo = std::move(it->second);
180         (void)depositMap_.erase(it);
181 
182         // Unlock, because stopping thread maybe spend lot of time, it should be out of the lock.
183         lock.unlock();
184 
185         size_t hashThreadId = CalculateHashCode(threadId);
186         HILOGD("Thread id: %{public}zu", hashThreadId);
187         if (needCallExit) {
188             // Call exit callback to stop loop in thread.
189             exitInfo.threadExit();
190         }
191         // Wait until thread finished.
192         exitInfo.thread->join();
193         HILOGD("Done, thread id: %{public}zu", hashThreadId);
194 
195         // Lock again.
196         lock.lock();
197     }
198 
Run()199     void Run()
200     {
201         HILOGD("Collector thread is started");
202 
203         std::unique_lock<std::mutex> lock(collectorLock_);
204         while (!destroying_) {
205             // Reclaim threads in list one by one.
206             while (!reclaims_.empty()) {
207                 auto threadId = reclaims_.back();
208                 reclaims_.pop_back();
209                 DoReclaimLocked(lock, depositMap_.find(threadId), false);
210             }
211 
212             // Maybe stop running while doing reclaim, so check before waiting.
213             if (destroying_) {
214                 break;
215             }
216 
217             isWaiting_ = true;
218             condition_.wait(lock);
219             isWaiting_ = false;
220         }
221 
222         HILOGD("Collector thread is stopped");
223     }
224 
225     std::mutex collectorLock_;
226     std::condition_variable condition_;
227     bool isWaiting_ {false};
228     bool destroying_ {false};
229     std::vector<std::thread::id> reclaims_;
230     std::unordered_map<std::thread::id, ThreadExitInfo> depositMap_;
231 
232     std::mutex threadLock_;
233     // Thread for collector
234     std::unique_ptr<std::thread> thread_;
235     bool needCreateThread_ {true};
236 
237     // Avatar of thread collector, used to stop collector at the specified opportunity.
238     class Avatar {
239     public:
240         DISALLOW_COPY_AND_MOVE(Avatar);
241 
242         Avatar() = default;
~Avatar()243         ~Avatar()
244         {
245             HILOGD("enter");
246             if (avatarEnabled_) {
247                 GetInstance().avatarDestructed_ = true;
248                 GetInstance().Stop();
249             }
250         }
251 
Disable()252         static inline void Disable()
253         {
254             avatarEnabled_ = false;
255         }
256     };
257 
258     // Mark whether avatar is destructed.
259     volatile bool avatarDestructed_ {false};
260     // Mark whether avatar is enabled.
261     static volatile bool avatarEnabled_;
262     static Avatar avatar_;
263 };
264 
ThreadCollector()265 ThreadCollector::ThreadCollector()
266     : collectorLock_(), condition_(), reclaims_(), depositMap_(), threadLock_(), thread_(nullptr)
267 {
268     // Thread collector is created, so enable avatar.
269     HILOGD("enter");
270     avatarEnabled_ = true;
271 }
272 
~ThreadCollector()273 ThreadCollector::~ThreadCollector()
274 {
275     // If avatar is not destructed, stop collector by itself.
276     HILOGD("enter");
277     if (!avatarDestructed_) {
278         avatar_.Disable();
279         Stop();
280     }
281 }
282 
283 typedef void(*ThreadInfoCallback)(char *buf, size_t len, void *ucontext);
284 extern "C" void SetThreadInfoCallback(ThreadInfoCallback func) __attribute__((weak));
285 
286 class EventRunnerImpl final : public EventInnerRunner {
287 public:
EventRunnerImpl(const std::shared_ptr<EventRunner> & runner)288     explicit EventRunnerImpl(const std::shared_ptr<EventRunner> &runner) : EventInnerRunner(runner)
289     {
290         HILOGD("enter");
291         queue_ = std::make_shared<EventQueue>();
292     }
293 
~EventRunnerImpl()294     ~EventRunnerImpl() final
295     {
296         HILOGD("enter");
297         queue_->RemoveAll();
298     }
299     DISALLOW_COPY_AND_MOVE(EventRunnerImpl);
300 
ThreadMain(const std::weak_ptr<EventRunnerImpl> & wp)301     static void ThreadMain(const std::weak_ptr<EventRunnerImpl> &wp)
302     {
303         HILOGD("enter");
304         if (SetThreadInfoCallback != nullptr) {
305             SetThreadInfoCallback(CrashCallback);
306         }
307         std::shared_ptr<EventRunnerImpl> inner = wp.lock();
308         if (inner) {
309             HILOGD("Start running for thread '%{public}s'", inner->threadName_.c_str());
310 
311             // Call system call to modify thread name.
312             SystemCallSetThreadName(inner->threadName_);
313 
314             // Enter event loop.
315             inner->Run();
316 
317             HILOGD("Stopped running for thread '%{public}s'", inner->threadName_.c_str());
318         } else {
319             HILOGW("EventRunner has been released just after its creation");
320         }
321 
322         // Reclaim current thread.
323         ThreadCollector::GetInstance().ReclaimCurrentThread();
324     }
325 
Run()326     void Run() final
327     {
328         HILOGD("enter");
329         // Prepare to start event loop.
330         queue_->Prepare();
331 
332         // Make sure instance of 'EventRunner' exists.
333         if (owner_.expired()) {
334             return;
335         }
336 
337         threadId_ = std::this_thread::get_id();
338         kernelThreadId_ = getproctid();
339 
340         // Save old event runner.
341         std::weak_ptr<EventRunner> oldRunner = currentEventRunner;
342         // Set current event runner into thread local data.
343         currentEventRunner = owner_;
344 
345         // Start event looper.
346         for (auto event = queue_->GetEvent(); event; event = queue_->GetEvent()) {
347             std::shared_ptr<EventHandler> handler = event->GetOwner();
348             // Make sure owner of the event exists.
349             if (handler) {
350                 std::shared_ptr<Logger> logging = logger_;
351                 if (logging != nullptr) {
352                     if (!event->HasTask()) {
353                         InnerEvent::EventId eventId = event->GetInnerEventIdEx();
354                         if (eventId.index() == TYPE_U32_INDEX) {
355                             logging->Log(
356                                 "Dispatching to handler event id = " + std::to_string(std::get<uint32_t>(eventId)));
357                         } else {
358                             logging->Log("Dispatching to handler event id = " + std::get<std::string>(eventId));
359                         }
360                     } else {
361                         logging->Log("Dispatching to handler event task name = " + event->GetTaskName());
362                     }
363                 }
364 
365                 SetCurrentEventInfo(event);
366                 queue_->PushHistoryQueueBeforeDistribute(event);
367                 handler->DistributeEvent(event);
368                 queue_->PushHistoryQueueAfterDistribute();
369                 ClearCurrentEventInfo();
370             }
371             // Release event manually, otherwise event will be released until next event coming.
372             event.reset();
373         }
374 
375         // Restore current event runner.
376         currentEventRunner = oldRunner;
377     }
378 
Stop()379     void Stop() final
380     {
381         HILOGD("enter");
382         queue_->Finish();
383     }
384 
Attach(std::unique_ptr<std::thread> & thread)385     inline bool Attach(std::unique_ptr<std::thread> &thread)
386     {
387         auto exitThread = [queue = queue_]() { queue->Finish(); };
388 
389         return ThreadCollector::GetInstance().Deposit(thread, exitThread);
390     }
391 
SetThreadName(const std::string & threadName)392     inline void SetThreadName(const std::string &threadName)
393     {
394         static std::atomic<uint32_t> idGenerator(1);
395 
396         if (threadName.empty()) {
397             // Generate a default name
398             threadName_ = "EventRunner#";
399             threadName_ += std::to_string(idGenerator++);
400         } else {
401             threadName_ = threadName;
402         }
403     }
404 
405 private:
406     DEFINE_EH_HILOG_LABEL("EventRunnerImpl");
407 
408     static void CrashCallback(char *buf, size_t len, void *ucontext);
409 
SetCurrentEventInfo(const InnerEvent::Pointer & event)410     void SetCurrentEventInfo(const InnerEvent::Pointer &event)
411     {
412         g_currentEventCaller = event->GetCaller();
413         if (event->HasTask()) {
414             g_currentEventName = event->GetTaskName();
415         } else {
416             InnerEvent::EventId eventId = event->GetInnerEventIdEx();
417             if (eventId.index() == TYPE_U32_INDEX) {
418                 g_currentEventName = std::to_string(std::get<uint32_t>(eventId));
419             } else {
420                 g_currentEventName = std::get<std::string>(eventId);
421             }
422         }
423     }
424 
ClearCurrentEventInfo()425     inline void ClearCurrentEventInfo()
426     {
427         g_currentEventCaller = {};
428         g_currentEventName.clear();
429     }
430 };
431 }  // unnamed namespace
432 
CrashCallback(char * buf,size_t len,void * ucontext)433 void EventRunnerImpl::CrashCallback(char *buf, size_t len, void *ucontext)
434 {
435     if (len < CRASH_BUF_MIN_LEN) {
436         return;
437     }
438     if (memset_s(buf, len, 0x00, len) != EOK) {
439         HILOGE("memset_s failed");
440         return;
441     }
442 
443     if (!g_currentEventName.empty()) {
444         const char* file = g_currentEventCaller.file_.c_str();
445         const char* func = g_currentEventCaller.func_.c_str();
446         const char* eventName = g_currentEventName.c_str();
447         int line = g_currentEventCaller.line_;
448         if (snprintf_s(buf, len, len - 1, "Current Event Caller info: [%s(%s:%d)]. EventName is '%s'",
449             file, func, line, eventName) < 0) {
450             HILOGE("snprintf_s failed");
451             return;
452         }
453         return;
454     }
455 
456     if (memcpy_s(buf, len - 1, g_crashEmptyDumpInfo, len - 1) != EOK) {
457         HILOGE("memcpy_s failed");
458         return;
459     }
460 }
461 
EventInnerRunner(const std::shared_ptr<EventRunner> & runner)462 EventInnerRunner::EventInnerRunner(const std::shared_ptr<EventRunner> &runner)
463     : queue_(nullptr), owner_(runner), logger_(nullptr), threadName_(""), threadId_()
464 {
465     HILOGD("enter");
466 }
467 
GetCurrentEventRunner()468 std::shared_ptr<EventRunner> EventInnerRunner::GetCurrentEventRunner()
469 {
470     const std::weak_ptr<EventRunner> &wp = currentEventRunner;
471     return wp.lock();
472 }
473 
474 ThreadLocalData<std::weak_ptr<EventRunner>> EventInnerRunner::currentEventRunner;
475 
476 namespace {
477 volatile bool ThreadCollector::avatarEnabled_ = false;
478 
479 /*
480  * All event runners are relied on 'currentEventRunner', so make sure destruction of 'currentEventRunner'
481  * should after all event runners finished. All event runners finished in destruction of 'ThreadCollector::Avatar',
482  * so instance of 'ThreadCollector::Avatar' MUST defined after 'currentEventRunner'.
483  */
484 ThreadCollector::Avatar ThreadCollector::avatar_;
485 }  // unnamed namespace
486 
487 std::shared_ptr<EventRunner> EventRunner::mainRunner_;
488 
Create(bool inNewThread)489 std::shared_ptr<EventRunner> EventRunner::Create(bool inNewThread)
490 {
491     HILOGD("inNewThread is %{public}d", inNewThread);
492     if (inNewThread) {
493         HILOGI("EventRunner created");
494         return Create(std::string());
495     }
496 
497     // Constructor of 'EventRunner' is private, could not use 'std::make_shared' to construct it.
498     std::shared_ptr<EventRunner> sp(new (std::nothrow) EventRunner(false));
499     if (sp == nullptr) {
500         HILOGI("Failed to create EventRunner Instance");
501         return nullptr;
502     }
503     auto innerRunner = std::make_shared<EventRunnerImpl>(sp);
504     sp->innerRunner_ = innerRunner;
505     sp->queue_ = innerRunner->GetEventQueue();
506 
507     return sp;
508 }
509 
Create(const std::string & threadName)510 std::shared_ptr<EventRunner> EventRunner::Create(const std::string &threadName)
511 {
512     HILOGD("threadName is %{public}s", threadName.c_str());
513     // Constructor of 'EventRunner' is private, could not use 'std::make_shared' to construct it.
514     std::shared_ptr<EventRunner> sp(new EventRunner(true));
515     auto innerRunner = std::make_shared<EventRunnerImpl>(sp);
516     sp->innerRunner_ = innerRunner;
517     sp->queue_ = innerRunner->GetEventQueue();
518 
519     // Start new thread
520     innerRunner->SetThreadName(threadName);
521     auto thread =
522         std::make_unique<std::thread>(EventRunnerImpl::ThreadMain, std::weak_ptr<EventRunnerImpl>(innerRunner));
523     if (!innerRunner->Attach(thread)) {
524         HILOGW("Failed to attach thread, maybe process is exiting");
525         innerRunner->Stop();
526         thread->join();
527     }
528 
529     return sp;
530 }
531 
Current()532 std::shared_ptr<EventRunner> EventRunner::Current()
533 {
534     auto runner = EventInnerRunner::GetCurrentEventRunner();
535     if (runner) {
536         return runner;
537     }
538     return nullptr;
539 }
540 
EventRunner(bool deposit)541 EventRunner::EventRunner(bool deposit) : deposit_(deposit)
542 {
543     HILOGD("deposit_ is %{public}d", deposit_);
544 }
545 
GetRunnerThreadName() const546 std::string EventRunner::GetRunnerThreadName() const
547 {
548     return innerRunner_->GetThreadName();
549 }
550 
~EventRunner()551 EventRunner::~EventRunner()
552 {
553     HILOGD("deposit_ is %{public}d", deposit_);
554     if (deposit_ && innerRunner_ != nullptr) {
555         innerRunner_->Stop();
556     }
557 }
558 
Run()559 ErrCode EventRunner::Run()
560 {
561     HILOGD("enter");
562     if (deposit_) {
563         HILOGE("Do not call, if event runner is deposited");
564         return EVENT_HANDLER_ERR_RUNNER_NO_PERMIT;
565     }
566 
567     // Avoid more than one thread to start this runner.
568     if (running_.exchange(true)) {
569         HILOGW("Already running");
570         return EVENT_HANDLER_ERR_RUNNER_ALREADY;
571     }
572 
573     // Entry event loop.
574     innerRunner_->Run();
575 
576     running_.store(false);
577     return ERR_OK;
578 }
579 
Stop()580 ErrCode EventRunner::Stop()
581 {
582     HILOGD("enter");
583     if (deposit_) {
584         HILOGE("Stop: Do not call, if event runner is deposited");
585         return EVENT_HANDLER_ERR_RUNNER_NO_PERMIT;
586     }
587 
588     if (running_.load()) {
589         innerRunner_->Stop();
590     } else {
591         HILOGW("Stop: Already stopped");
592     }
593 
594     return ERR_OK;
595 }
596 
Dump(Dumper & dumper)597 void EventRunner::Dump(Dumper &dumper)
598 {
599     if (!IsRunning()) {
600         dumper.Dump(dumper.GetTag() + " Event runner is not running" + std::string(LINE_SEPARATOR));
601         return;
602     }
603 
604     if (queue_ == nullptr) {
605         dumper.Dump(dumper.GetTag() + " Queue is nullLINE_SEPARATOR" + std::string(LINE_SEPARATOR));
606         return;
607     }
608 
609     dumper.Dump(dumper.GetTag() + " Event runner (" + "Thread name = " + innerRunner_->GetThreadName() +
610                 ", Thread ID = " + std::to_string(GetKernelThreadId()) + ") is running" + std::string(LINE_SEPARATOR));
611     queue_->Dump(dumper);
612 }
613 
DumpRunnerInfo(std::string & runnerInfo)614 void EventRunner::DumpRunnerInfo(std::string& runnerInfo)
615 {
616     if (!IsRunning()) {
617         runnerInfo = "        Event runner is not running" + std::string(LINE_SEPARATOR);
618     }
619 
620     if (queue_ == nullptr) {
621         runnerInfo = "        Queue is null" + std::string(LINE_SEPARATOR);
622         return;
623     }
624 
625     std::string queueInfo;
626     queue_->DumpQueueInfo(queueInfo);
627     runnerInfo += queueInfo;
628 }
629 
SetLogger(const std::shared_ptr<Logger> & logger)630 void EventRunner::SetLogger(const std::shared_ptr<Logger> &logger)
631 {
632     innerRunner_->SetLogger(logger);
633 }
634 
GetCurrentEventQueue()635 std::shared_ptr<EventQueue> EventRunner::GetCurrentEventQueue()
636 {
637     auto runner = EventRunner::Current();
638     if (!runner) {
639         HILOGE("current runner is nullptr");
640         return nullptr;
641     }
642     return runner->queue_;
643 }
644 
GetThreadId()645 uint64_t EventRunner::GetThreadId()
646 {
647     std::thread::id tid = innerRunner_->GetThreadId();
648     std::stringstream buf;
649     buf << tid;
650     std::string stid = buf.str();
651     return std::stoull(stid);
652 }
653 
GetKernelThreadId()654 uint64_t EventRunner::GetKernelThreadId()
655 {
656     return innerRunner_->GetKernelThreadId();
657 }
658 
IsCurrentRunnerThread()659 bool EventRunner::IsCurrentRunnerThread()
660 {
661     return std::this_thread::get_id() == innerRunner_->GetThreadId();
662 }
663 
GetMainEventRunner()664 std::shared_ptr<EventRunner> EventRunner::GetMainEventRunner()
665 {
666     if (!mainRunner_) {
667         mainRunner_ = Create(false);
668         if (!mainRunner_) {
669             HILOGE("mainRunner_ create fail");
670         }
671     }
672 
673     return mainRunner_;
674 }
675 }  // namespace AppExecFwk
676 }  // namespace OHOS
677