• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "event_runner.h"
17 
18 #include <condition_variable>
19 #include <mutex>
20 #include <sstream>
21 #include <thread>
22 #include <unordered_map>
23 #include <vector>
24 
25 #include <sys/prctl.h>
26 
27 #include "event_handler.h"
28 #include "event_handler_utils.h"
29 #include "event_inner_runner.h"
30 #include "singleton.h"
31 #include "thread_local_data.h"
32 
33 DEFINE_HILOG_LABEL("EventRunner");
34 
35 namespace OHOS {
36 namespace AppExecFwk {
37 namespace {
38 // Invoke system call to set name of current thread.
SystemCallSetThreadName(const std::string & name)39 inline void SystemCallSetThreadName(const std::string &name)
40 {
41     if (prctl(PR_SET_NAME, name.c_str()) < 0) {
42         char errmsg[MAX_ERRORMSG_LEN] = {0};
43         GetLastErr(errmsg, MAX_ERRORMSG_LEN);
44         HILOGE("SystemCallSetThreadName: Failed to set thread name, %{public}s", errmsg);
45     }
46 }
47 
48 // Help to calculate hash code of object.
49 template<typename T>
CalculateHashCode(const T & obj)50 inline size_t CalculateHashCode(const T &obj)
51 {
52     std::hash<T> calculateHashCode;
53     return calculateHashCode(obj);
54 }
55 
56 // Thread collector is used to reclaim thread that needs to finish running.
57 class ThreadCollector : public DelayedRefSingleton<ThreadCollector> {
58     DECLARE_DELAYED_REF_SINGLETON(ThreadCollector);
59 
60 public:
61     DISALLOW_COPY_AND_MOVE(ThreadCollector);
62 
63     using ExitFunction = std::function<void()>;
64 
ReclaimCurrentThread()65     void ReclaimCurrentThread()
66     {
67         // Get id of current thread.
68         auto threadId = std::this_thread::get_id();
69         HILOGD("Reclaim: Thread id: %{public}zu", CalculateHashCode(threadId));
70 
71         {
72             // Add thread id to list and notify to reclaim.
73             std::lock_guard<std::mutex> lock(collectorLock_);
74             if (destroying_) {
75                 HILOGI("Reclaim: Thread collector is destroying");
76                 return;
77             }
78 
79             reclaims_.emplace_back(threadId);
80             if (isWaiting_) {
81                 condition_.notify_one();
82             }
83         }
84 
85         if (threadLock_.try_lock()) {
86             if ((!thread_) && (needCreateThread_)) {
87                 // Start daemon thread to collect finished threads, if not exist.
88                 thread_ = std::make_unique<std::thread>(&ThreadCollector::Run, this);
89             }
90             threadLock_.unlock();
91         }
92     }
93 
Deposit(std::unique_ptr<std::thread> & thread,const ExitFunction & threadExit)94     bool Deposit(std::unique_ptr<std::thread> &thread, const ExitFunction &threadExit)
95     {
96         if ((!thread) || (!thread->joinable()) || (!threadExit)) {
97             auto threadState = thread ? (thread->joinable() ? "active" : "finished") : "null";
98             HILOGE("Deposit(%{public}s, %{public}s): Invalid parameter", threadState, threadExit ? "valid" : "null");
99             return false;
100         }
101 
102         auto threadId = thread->get_id();
103         HILOGD("Deposit: New thread id: %{public}zu", CalculateHashCode(threadId));
104         // Save these information into map.
105         std::lock_guard<std::mutex> lock(collectorLock_);
106         if (destroying_) {
107             HILOGW("Deposit: Collector thread is destroying");
108             return false;
109         }
110         // Save thread object and its exit callback.
111         depositMap_.emplace(threadId,
112             ThreadExitInfo {
113                 .thread = std::move(thread),
114                 .threadExit = threadExit,
115             });
116         return true;
117     }
118 
119 private:
120     DEFINE_HILOG_LABEL("ThreadCollector");
121 
122     struct ThreadExitInfo {
123         std::unique_ptr<std::thread> thread;
124         ExitFunction threadExit;
125     };
126 
ReclaimAll()127     inline void ReclaimAll()
128     {
129         std::unique_lock<std::mutex> lock(collectorLock_);
130         // All thread deposited need to stop one by one.
131         while (!depositMap_.empty()) {
132             DoReclaimLocked(lock, depositMap_.begin());
133         }
134     }
135 
Stop()136     void Stop()
137     {
138         {
139             // Stop the collect thread, while destruction of collector.
140             std::lock_guard<std::mutex> lock(collectorLock_);
141             destroying_ = true;
142             if (isWaiting_) {
143                 condition_.notify_all();
144             }
145         }
146 
147         {
148             std::lock_guard<std::mutex> lock(threadLock_);
149             if ((thread_) && (thread_->joinable())) {
150                 // Wait utils collect thread finished, if exists.
151                 thread_->join();
152             }
153             needCreateThread_ = false;
154         }
155 
156         ReclaimAll();
157     }
158 
DoReclaimLocked(std::unique_lock<std::mutex> & lock,std::unordered_map<std::thread::id,ThreadExitInfo>::iterator it,bool needCallExit=true)159     void DoReclaimLocked(std::unique_lock<std::mutex> &lock,
160         std::unordered_map<std::thread::id, ThreadExitInfo>::iterator it, bool needCallExit = true)
161     {
162         if (it == depositMap_.end()) {
163             return;
164         }
165 
166         // Remove thread information from map.
167         auto threadId = it->first;
168         auto exitInfo = std::move(it->second);
169         (void)depositMap_.erase(it);
170 
171         // Unlock, because stopping thread maybe spend lot of time, it should be out of the lock.
172         lock.unlock();
173 
174         size_t hashThreadId = CalculateHashCode(threadId);
175         HILOGD("DoReclaim: Thread id: %{public}zu", hashThreadId);
176         if (needCallExit) {
177             // Call exit callback to stop loop in thread.
178             exitInfo.threadExit();
179         }
180         // Wait until thread finished.
181         exitInfo.thread->join();
182         HILOGD("DoReclaim: Done, thread id: %{public}zu", hashThreadId);
183 
184         // Lock again.
185         lock.lock();
186     }
187 
Run()188     void Run()
189     {
190         HILOGD("Run: Collector thread is started");
191 
192         std::unique_lock<std::mutex> lock(collectorLock_);
193         while (!destroying_) {
194             // Reclaim threads in list one by one.
195             while (!reclaims_.empty()) {
196                 auto threadId = reclaims_.back();
197                 reclaims_.pop_back();
198                 DoReclaimLocked(lock, depositMap_.find(threadId), false);
199             }
200 
201             // Maybe stop running while doing reclaim, so check before waiting.
202             if (destroying_) {
203                 break;
204             }
205 
206             isWaiting_ = true;
207             condition_.wait(lock);
208             isWaiting_ = false;
209         }
210 
211         HILOGD("Run: Collector thread is stopped");
212     }
213 
214     std::mutex collectorLock_;
215     std::condition_variable condition_;
216     bool isWaiting_ {false};
217     bool destroying_ {false};
218     std::vector<std::thread::id> reclaims_;
219     std::unordered_map<std::thread::id, ThreadExitInfo> depositMap_;
220 
221     std::mutex threadLock_;
222     // Thread for collector
223     std::unique_ptr<std::thread> thread_;
224     bool needCreateThread_ {true};
225 
226     // Avatar of thread collector, used to stop collector at the specified opportunity.
227     class Avatar {
228     public:
229         DISALLOW_COPY_AND_MOVE(Avatar);
230 
231         Avatar() = default;
~Avatar()232         ~Avatar()
233         {
234             if (avatarEnabled_) {
235                 GetInstance().avatarDestructed_ = true;
236                 GetInstance().Stop();
237             }
238         }
239 
Disable()240         static inline void Disable()
241         {
242             avatarEnabled_ = false;
243         }
244     };
245 
246     // Mark whether avatar is destructed.
247     volatile bool avatarDestructed_ {false};
248     // Mark whether avatar is enabled.
249     static volatile bool avatarEnabled_;
250     static Avatar avatar_;
251 };
252 
ThreadCollector()253 ThreadCollector::ThreadCollector()
254     : collectorLock_(), condition_(), reclaims_(), depositMap_(), threadLock_(), thread_(nullptr)
255 {
256     // Thread collector is created, so enable avatar.
257     avatarEnabled_ = true;
258 }
259 
~ThreadCollector()260 ThreadCollector::~ThreadCollector()
261 {
262     // If avatar is not destructed, stop collector by itself.
263     if (!avatarDestructed_) {
264         avatar_.Disable();
265         Stop();
266     }
267 }
268 
269 class EventRunnerImpl final : public EventInnerRunner {
270 public:
EventRunnerImpl(const std::shared_ptr<EventRunner> & runner)271     explicit EventRunnerImpl(const std::shared_ptr<EventRunner> &runner) : EventInnerRunner(runner)
272     {
273         queue_ = std::make_shared<EventQueue>();
274     }
275 
~EventRunnerImpl()276     ~EventRunnerImpl() final
277     {
278         queue_->RemoveAll();
279     }
280     DISALLOW_COPY_AND_MOVE(EventRunnerImpl);
281 
ThreadMain(const std::weak_ptr<EventRunnerImpl> & wp)282     static void ThreadMain(const std::weak_ptr<EventRunnerImpl> &wp)
283     {
284         std::shared_ptr<EventRunnerImpl> inner = wp.lock();
285         if (inner) {
286             HILOGD("ThreadMain: Start running for thread '%{public}s'", inner->threadName_.c_str());
287 
288             // Call system call to modify thread name.
289             SystemCallSetThreadName(inner->threadName_);
290 
291             // Enter event loop.
292             inner->Run();
293 
294             HILOGD("ThreadMain: Stopped running for thread '%{public}s'", inner->threadName_.c_str());
295         } else {
296             HILOGW("ThreadMain: EventRunner has been released just after its creation");
297         }
298 
299         // Reclaim current thread.
300         ThreadCollector::GetInstance().ReclaimCurrentThread();
301     }
302 
Run()303     void Run() final
304     {
305         // Prepare to start event loop.
306         queue_->Prepare();
307 
308         // Make sure instance of 'EventRunner' exists.
309         if (owner_.expired()) {
310             return;
311         }
312 
313         threadId_ = std::this_thread::get_id();
314 
315         // Save old event runner.
316         std::weak_ptr<EventRunner> oldRunner = currentEventRunner;
317         // Set current event runner into thread local data.
318         currentEventRunner = owner_;
319 
320         // Start event looper.
321         for (auto event = queue_->GetEvent(); event; event = queue_->GetEvent()) {
322             std::shared_ptr<EventHandler> handler = event->GetOwner();
323             // Make sure owner of the event exists.
324             if (handler) {
325                 std::shared_ptr<Logger> logging = logger_;
326                 if (logging != nullptr) {
327                     if (!event->HasTask()) {
328                         logging->Log("Dispatching to handler event id = " + std::to_string(event->GetInnerEventId()));
329                     } else {
330                         logging->Log("Dispatching to handler event task name = " + event->GetTaskName());
331                     }
332                 }
333                 handler->DistributeEvent(event);
334             }
335             // Release event manually, otherwise event will be released until next event coming.
336             event.reset();
337         }
338 
339         // Restore current event runner.
340         currentEventRunner = oldRunner;
341     }
342 
Stop()343     void Stop() final
344     {
345         queue_->Finish();
346     }
347 
Attach(std::unique_ptr<std::thread> & thread)348     inline bool Attach(std::unique_ptr<std::thread> &thread)
349     {
350         auto exitThread = [queue = queue_]() { queue->Finish(); };
351 
352         return ThreadCollector::GetInstance().Deposit(thread, exitThread);
353     }
354 
SetThreadName(const std::string & threadName)355     inline void SetThreadName(const std::string &threadName)
356     {
357         static std::atomic<uint32_t> idGenerator(1);
358 
359         if (threadName.empty()) {
360             // Generate a default name
361             threadName_ = "EventRunner#";
362             threadName_ += std::to_string(idGenerator++);
363         } else {
364             threadName_ = threadName;
365         }
366     }
367 
368 private:
369     DEFINE_HILOG_LABEL("EventRunnerImpl");
370 };
371 }  // unnamed namespace
372 
EventInnerRunner(const std::shared_ptr<EventRunner> & runner)373 EventInnerRunner::EventInnerRunner(const std::shared_ptr<EventRunner> &runner)
374     : queue_(nullptr), owner_(runner), logger_(nullptr), threadName_(""), threadId_()
375 {}
376 
GetCurrentEventRunner()377 std::shared_ptr<EventRunner> EventInnerRunner::GetCurrentEventRunner()
378 {
379     const std::weak_ptr<EventRunner> &wp = currentEventRunner;
380     return wp.lock();
381 }
382 
383 ThreadLocalData<std::weak_ptr<EventRunner>> EventInnerRunner::currentEventRunner;
384 
385 namespace {
386 volatile bool ThreadCollector::avatarEnabled_ = false;
387 
388 /*
389  * All event runners are relied on 'currentEventRunner', so make sure destruction of 'currentEventRunner'
390  * should after all event runners finished. All event runners finished in destruction of 'ThreadCollector::Avatar',
391  * so instance of 'ThreadCollector::Avatar' MUST defined after 'currentEventRunner'.
392  */
393 ThreadCollector::Avatar ThreadCollector::avatar_;
394 }  // unnamed namespace
395 
396 std::shared_ptr<EventRunner> EventRunner::mainRunner_;
397 
Create(bool inNewThread)398 std::shared_ptr<EventRunner> EventRunner::Create(bool inNewThread)
399 {
400     if (inNewThread) {
401         return Create(std::string());
402     }
403 
404     // Constructor of 'EventRunner' is private, could not use 'std::make_shared' to construct it.
405     std::shared_ptr<EventRunner> sp(new EventRunner(false));
406     auto innerRunner = std::make_shared<EventRunnerImpl>(sp);
407     sp->innerRunner_ = innerRunner;
408     sp->queue_ = innerRunner->GetEventQueue();
409 
410     return sp;
411 }
412 
Create(const std::string & threadName)413 std::shared_ptr<EventRunner> EventRunner::Create(const std::string &threadName)
414 {
415     // Constructor of 'EventRunner' is private, could not use 'std::make_shared' to construct it.
416     std::shared_ptr<EventRunner> sp(new EventRunner(true));
417     auto innerRunner = std::make_shared<EventRunnerImpl>(sp);
418     sp->innerRunner_ = innerRunner;
419     sp->queue_ = innerRunner->GetEventQueue();
420 
421     // Start new thread
422     innerRunner->SetThreadName(threadName);
423     auto thread =
424         std::make_unique<std::thread>(EventRunnerImpl::ThreadMain, std::weak_ptr<EventRunnerImpl>(innerRunner));
425     if (!innerRunner->Attach(thread)) {
426         HILOGW("Create: Failed to attach thread, maybe process is exiting");
427         innerRunner->Stop();
428         thread->join();
429     }
430 
431     return sp;
432 }
433 
Current()434 std::shared_ptr<EventRunner> EventRunner::Current()
435 {
436     auto runner = EventInnerRunner::GetCurrentEventRunner();
437     if (runner) {
438         return runner;
439     }
440     return nullptr;
441 }
442 
EventRunner(bool deposit)443 EventRunner::EventRunner(bool deposit) : deposit_(deposit)
444 {}
445 
GetRunnerThreadName() const446 std::string EventRunner::GetRunnerThreadName() const
447 {
448     return innerRunner_->GetThreadName();
449 }
450 
~EventRunner()451 EventRunner::~EventRunner()
452 {
453     if (deposit_) {
454         innerRunner_->Stop();
455     }
456 }
457 
Run()458 ErrCode EventRunner::Run()
459 {
460     if (deposit_) {
461         HILOGE("Run: Do not call, if event runner is deposited");
462         return EVENT_HANDLER_ERR_RUNNER_NO_PERMIT;
463     }
464 
465     // Avoid more than one thread to start this runner.
466     if (running_.exchange(true)) {
467         HILOGW("Run: Already running");
468         return EVENT_HANDLER_ERR_RUNNER_ALREADY;
469     }
470 
471     // Entry event loop.
472     innerRunner_->Run();
473 
474     running_.store(false);
475     return ERR_OK;
476 }
477 
Stop()478 ErrCode EventRunner::Stop()
479 {
480     if (deposit_) {
481         HILOGE("Stop: Do not call, if event runner is deposited");
482         return EVENT_HANDLER_ERR_RUNNER_NO_PERMIT;
483     }
484 
485     if (running_.load()) {
486         innerRunner_->Stop();
487     } else {
488         HILOGW("Stop: Already stopped");
489     }
490 
491     return ERR_OK;
492 }
493 
Dump(Dumper & dumper)494 void EventRunner::Dump(Dumper &dumper)
495 {
496     if (!IsRunning()) {
497         dumper.Dump(dumper.GetTag() + " Event runner is not running" + LINE_SEPARATOR);
498         return;
499     }
500 
501     if (queue_ == nullptr) {
502         dumper.Dump(dumper.GetTag() + " Queue is nullLINE_SEPARATOR" + LINE_SEPARATOR);
503         return;
504     }
505 
506     dumper.Dump(dumper.GetTag() + " Event runner (" + "Thread name = " + innerRunner_->GetThreadName() +
507                 ", Thread ID = " + std::to_string(GetThreadId()) + ") is running" + LINE_SEPARATOR);
508 
509     queue_->Dump(dumper);
510 }
511 
DumpRunnerInfo(std::string & runnerInfo)512 void EventRunner::DumpRunnerInfo(std::string& runnerInfo)
513 {
514     if (!IsRunning()) {
515         runnerInfo = "        Event runner is not running" + LINE_SEPARATOR;
516     }
517 
518     if (queue_ == nullptr) {
519         runnerInfo = "        Queue is null" + LINE_SEPARATOR;
520         return;
521     }
522 
523     std::string queueInfo;
524     queue_->DumpQueueInfo(queueInfo);
525     runnerInfo += queueInfo;
526 }
527 
SetLogger(const std::shared_ptr<Logger> & logger)528 void EventRunner::SetLogger(const std::shared_ptr<Logger> &logger)
529 {
530     innerRunner_->SetLogger(logger);
531 }
532 
GetCurrentEventQueue()533 std::shared_ptr<EventQueue> EventRunner::GetCurrentEventQueue()
534 {
535     auto runner = EventRunner::Current();
536     if (!runner) {
537         return nullptr;
538     }
539     return runner->queue_;
540 }
541 
GetThreadId()542 uint64_t EventRunner::GetThreadId()
543 {
544     std::thread::id tid = innerRunner_->GetThreadId();
545     std::stringstream buf;
546     buf << tid;
547     std::string stid = buf.str();
548     return std::stoull(stid);
549 }
550 
IsCurrentRunnerThread()551 bool EventRunner::IsCurrentRunnerThread()
552 {
553     return std::this_thread::get_id() == innerRunner_->GetThreadId();
554 }
555 
GetMainEventRunner()556 std::shared_ptr<EventRunner> EventRunner::GetMainEventRunner()
557 {
558     if (!mainRunner_) {
559         mainRunner_ = Create(false);
560         if (!mainRunner_) {
561             HILOGE("mainRunner_ create fail");
562         }
563     }
564 
565     return mainRunner_;
566 }
567 }  // namespace AppExecFwk
568 }  // namespace OHOS
569