• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "event_runner.h"
17 
18 #include <condition_variable>
19 #include <mutex>
20 #include <sstream>
21 #include <thread>
22 #include <unordered_map>
23 #include <vector>
24 
25 #if !defined(__APPLE__) && !defined(_WIN32)
26 #include <sys/prctl.h>
27 #endif
28 
29 #include "event_handler.h"
30 #include "event_handler_utils.h"
31 #include "event_inner_runner.h"
32 #include "singleton.h"
33 #include "thread_local_data.h"
34 
35 DEFINE_HILOG_LABEL("EventRunner");
36 
37 namespace OHOS {
38 namespace AppExecFwk {
39 namespace {
40 // Invoke system call to set name of current thread.
SystemCallSetThreadName(const std::string & name)41 inline void SystemCallSetThreadName(const std::string &name)
42 {
43 #if !defined(__APPLE__) && !defined(_WIN32)
44     if (prctl(PR_SET_NAME, name.c_str()) < 0) {
45         char errmsg[MAX_ERRORMSG_LEN] = {0};
46         GetLastErr(errmsg, MAX_ERRORMSG_LEN);
47         HILOGE("SystemCallSetThreadName: Failed to set thread name, %{public}s", errmsg);
48     }
49 #endif
50 }
51 
52 // Help to calculate hash code of object.
53 template<typename T>
CalculateHashCode(const T & obj)54 inline size_t CalculateHashCode(const T &obj)
55 {
56     std::hash<T> calculateHashCode;
57     return calculateHashCode(obj);
58 }
59 
60 // Thread collector is used to reclaim thread that needs to finish running.
61 class ThreadCollector : public DelayedRefSingleton<ThreadCollector> {
62     DECLARE_DELAYED_REF_SINGLETON(ThreadCollector);
63 
64 public:
65     DISALLOW_COPY_AND_MOVE(ThreadCollector);
66 
67     using ExitFunction = std::function<void()>;
68 
ReclaimCurrentThread()69     void ReclaimCurrentThread()
70     {
71         // Get id of current thread.
72         auto threadId = std::this_thread::get_id();
73         HILOGD("Reclaim: Thread id: %{public}zu", CalculateHashCode(threadId));
74 
75         {
76             // Add thread id to list and notify to reclaim.
77             std::lock_guard<std::mutex> lock(collectorLock_);
78             if (destroying_) {
79                 HILOGI("Reclaim: Thread collector is destroying");
80                 return;
81             }
82 
83             reclaims_.emplace_back(threadId);
84             if (isWaiting_) {
85                 condition_.notify_one();
86             }
87         }
88 
89         if (threadLock_.try_lock()) {
90             if ((!thread_) && (needCreateThread_)) {
91                 // Start daemon thread to collect finished threads, if not exist.
92                 thread_ = std::make_unique<std::thread>(&ThreadCollector::Run, this);
93             }
94             threadLock_.unlock();
95         }
96     }
97 
Deposit(std::unique_ptr<std::thread> & thread,const ExitFunction & threadExit)98     bool Deposit(std::unique_ptr<std::thread> &thread, const ExitFunction &threadExit)
99     {
100         if ((!thread) || (!thread->joinable()) || (!threadExit)) {
101             auto threadState = thread ? (thread->joinable() ? "active" : "finished") : "null";
102             HILOGE("Deposit(%{public}s, %{public}s): Invalid parameter", threadState, threadExit ? "valid" : "null");
103             return false;
104         }
105 
106         auto threadId = thread->get_id();
107         HILOGD("Deposit: New thread id: %{public}zu", CalculateHashCode(threadId));
108         // Save these information into map.
109         std::lock_guard<std::mutex> lock(collectorLock_);
110         if (destroying_) {
111             HILOGW("Deposit: Collector thread is destroying");
112             return false;
113         }
114         // Save thread object and its exit callback.
115         depositMap_.emplace(threadId,
116             ThreadExitInfo {
117                 .thread = std::move(thread),
118                 .threadExit = threadExit,
119             });
120         return true;
121     }
122 
123 private:
124     DEFINE_HILOG_LABEL("ThreadCollector");
125 
126     struct ThreadExitInfo {
127         std::unique_ptr<std::thread> thread;
128         ExitFunction threadExit;
129     };
130 
ReclaimAll()131     inline void ReclaimAll()
132     {
133         std::unique_lock<std::mutex> lock(collectorLock_);
134         // All thread deposited need to stop one by one.
135         while (!depositMap_.empty()) {
136             DoReclaimLocked(lock, depositMap_.begin());
137         }
138     }
139 
Stop()140     void Stop()
141     {
142         {
143             // Stop the collect thread, while destruction of collector.
144             std::lock_guard<std::mutex> lock(collectorLock_);
145             destroying_ = true;
146             if (isWaiting_) {
147                 condition_.notify_all();
148             }
149         }
150 
151         {
152             std::lock_guard<std::mutex> lock(threadLock_);
153             if ((thread_) && (thread_->joinable())) {
154                 // Wait utils collect thread finished, if exists.
155                 thread_->join();
156             }
157             needCreateThread_ = false;
158         }
159 
160         ReclaimAll();
161     }
162 
DoReclaimLocked(std::unique_lock<std::mutex> & lock,std::unordered_map<std::thread::id,ThreadExitInfo>::iterator it,bool needCallExit=true)163     void DoReclaimLocked(std::unique_lock<std::mutex> &lock,
164         std::unordered_map<std::thread::id, ThreadExitInfo>::iterator it, bool needCallExit = true)
165     {
166         if (it == depositMap_.end()) {
167             return;
168         }
169 
170         // Remove thread information from map.
171         auto threadId = it->first;
172         auto exitInfo = std::move(it->second);
173         (void)depositMap_.erase(it);
174 
175         // Unlock, because stopping thread maybe spend lot of time, it should be out of the lock.
176         lock.unlock();
177 
178         size_t hashThreadId = CalculateHashCode(threadId);
179         HILOGD("DoReclaim: Thread id: %{public}zu", hashThreadId);
180         if (needCallExit) {
181             // Call exit callback to stop loop in thread.
182             exitInfo.threadExit();
183         }
184         // Wait until thread finished.
185         exitInfo.thread->join();
186         HILOGD("DoReclaim: Done, thread id: %{public}zu", hashThreadId);
187 
188         // Lock again.
189         lock.lock();
190     }
191 
Run()192     void Run()
193     {
194         HILOGD("Run: Collector thread is started");
195 
196         std::unique_lock<std::mutex> lock(collectorLock_);
197         while (!destroying_) {
198             // Reclaim threads in list one by one.
199             while (!reclaims_.empty()) {
200                 auto threadId = reclaims_.back();
201                 reclaims_.pop_back();
202                 DoReclaimLocked(lock, depositMap_.find(threadId), false);
203             }
204 
205             // Maybe stop running while doing reclaim, so check before waiting.
206             if (destroying_) {
207                 break;
208             }
209 
210             isWaiting_ = true;
211             condition_.wait(lock);
212             isWaiting_ = false;
213         }
214 
215         HILOGD("Run: Collector thread is stopped");
216     }
217 
218     std::mutex collectorLock_;
219     std::condition_variable condition_;
220     bool isWaiting_ {false};
221     bool destroying_ {false};
222     std::vector<std::thread::id> reclaims_;
223     std::unordered_map<std::thread::id, ThreadExitInfo> depositMap_;
224 
225     std::mutex threadLock_;
226     // Thread for collector
227     std::unique_ptr<std::thread> thread_;
228     bool needCreateThread_ {true};
229 
230     // Avatar of thread collector, used to stop collector at the specified opportunity.
231     class Avatar {
232     public:
233         DISALLOW_COPY_AND_MOVE(Avatar);
234 
235         Avatar() = default;
~Avatar()236         ~Avatar()
237         {
238             if (avatarEnabled_) {
239                 GetInstance().avatarDestructed_ = true;
240                 GetInstance().Stop();
241             }
242         }
243 
Disable()244         static inline void Disable()
245         {
246             avatarEnabled_ = false;
247         }
248     };
249 
250     // Mark whether avatar is destructed.
251     volatile bool avatarDestructed_ {false};
252     // Mark whether avatar is enabled.
253     static volatile bool avatarEnabled_;
254     static Avatar avatar_;
255 };
256 
ThreadCollector()257 ThreadCollector::ThreadCollector()
258     : collectorLock_(), condition_(), reclaims_(), depositMap_(), threadLock_(), thread_(nullptr)
259 {
260     // Thread collector is created, so enable avatar.
261     avatarEnabled_ = true;
262 }
263 
~ThreadCollector()264 ThreadCollector::~ThreadCollector()
265 {
266     // If avatar is not destructed, stop collector by itself.
267     if (!avatarDestructed_) {
268         avatar_.Disable();
269         Stop();
270     }
271 }
272 
273 class EventRunnerImpl final : public EventInnerRunner {
274 public:
EventRunnerImpl(const std::shared_ptr<EventRunner> & runner)275     explicit EventRunnerImpl(const std::shared_ptr<EventRunner> &runner) : EventInnerRunner(runner)
276     {
277         queue_ = std::make_shared<EventQueue>();
278     }
279 
280     ~EventRunnerImpl() final = default;
281     DISALLOW_COPY_AND_MOVE(EventRunnerImpl);
282 
ThreadMain(const std::weak_ptr<EventRunnerImpl> & wp)283     static void ThreadMain(const std::weak_ptr<EventRunnerImpl> &wp)
284     {
285         std::shared_ptr<EventRunnerImpl> inner = wp.lock();
286         if (inner) {
287             HILOGD("ThreadMain: Start running for thread '%{public}s'", inner->threadName_.c_str());
288 
289             // Call system call to modify thread name.
290             SystemCallSetThreadName(inner->threadName_);
291 
292             // Enter event loop.
293             inner->Run();
294 
295             HILOGD("ThreadMain: Stopped running for thread '%{public}s'", inner->threadName_.c_str());
296         } else {
297             HILOGW("ThreadMain: EventRunner has been released just after its creation");
298         }
299 
300         // Reclaim current thread.
301         ThreadCollector::GetInstance().ReclaimCurrentThread();
302     }
303 
Run()304     void Run() final
305     {
306         // Prepare to start event loop.
307         queue_->Prepare();
308 
309         // Make sure instance of 'EventRunner' exists.
310         if (owner_.expired()) {
311             return;
312         }
313 
314         threadId_ = std::this_thread::get_id();
315 
316         // Save old event runner.
317         std::weak_ptr<EventRunner> oldRunner = currentEventRunner;
318         // Set current event runner into thread local data.
319         currentEventRunner = owner_;
320 
321         // Start event looper.
322         for (auto event = queue_->GetEvent(); event; event = queue_->GetEvent()) {
323             std::shared_ptr<EventHandler> handler = event->GetOwner();
324             // Make sure owner of the event exists.
325             if (handler) {
326                 std::shared_ptr<Logger> logging = logger_;
327                 if (logging != nullptr) {
328                     if (!event->HasTask()) {
329                         logging->Log("Dispatching to handler event id = " + std::to_string(event->GetInnerEventId()));
330                     } else {
331                         logging->Log("Dispatching to handler event task name = " + event->GetTaskName());
332                     }
333                 }
334                 handler->DistributeEvent(event);
335             }
336             // Release event manually, otherwise event will be released until next event coming.
337             event.reset();
338         }
339 
340         // Restore current event runner.
341         currentEventRunner = oldRunner;
342     }
343 
Stop()344     void Stop() final
345     {
346         queue_->Finish();
347     }
348 
Attach(std::unique_ptr<std::thread> & thread)349     inline bool Attach(std::unique_ptr<std::thread> &thread)
350     {
351         auto exitThread = [queue = queue_]() { queue->Finish(); };
352 
353         return ThreadCollector::GetInstance().Deposit(thread, exitThread);
354     }
355 
SetThreadName(const std::string & threadName)356     inline void SetThreadName(const std::string &threadName)
357     {
358         static std::atomic<uint32_t> idGenerator(1);
359 
360         if (threadName.empty()) {
361             // Generate a default name
362             threadName_ = "EventRunner#";
363             threadName_ += std::to_string(idGenerator++);
364         } else {
365             threadName_ = threadName;
366         }
367     }
368 
369 private:
370     DEFINE_HILOG_LABEL("EventRunnerImpl");
371 };
372 }  // unnamed namespace
373 
EventInnerRunner(const std::shared_ptr<EventRunner> & runner)374 EventInnerRunner::EventInnerRunner(const std::shared_ptr<EventRunner> &runner)
375     : queue_(nullptr), owner_(runner), logger_(nullptr), threadName_(""), threadId_()
376 {}
377 
GetCurrentEventRunner()378 std::shared_ptr<EventRunner> EventInnerRunner::GetCurrentEventRunner()
379 {
380     const std::weak_ptr<EventRunner> &wp = currentEventRunner;
381     return wp.lock();
382 }
383 
384 ThreadLocalData<std::weak_ptr<EventRunner>> EventInnerRunner::currentEventRunner;
385 
386 namespace {
387 volatile bool ThreadCollector::avatarEnabled_ = false;
388 
389 /*
390  * All event runners are relied on 'currentEventRunner', so make sure destruction of 'currentEventRunner'
391  * should after all event runners finished. All event runners finished in destruction of 'ThreadCollector::Avatar',
392  * so instance of 'ThreadCollector::Avatar' MUST defined after 'currentEventRunner'.
393  */
394 ThreadCollector::Avatar ThreadCollector::avatar_;
395 }  // unnamed namespace
396 
397 std::shared_ptr<EventRunner> EventRunner::mainRunner_;
398 
Create(bool inNewThread)399 std::shared_ptr<EventRunner> EventRunner::Create(bool inNewThread)
400 {
401     if (inNewThread) {
402         return Create(std::string());
403     }
404 
405     // Constructor of 'EventRunner' is private, could not use 'std::make_shared' to construct it.
406     std::shared_ptr<EventRunner> sp(new EventRunner(false));
407     auto innerRunner = std::make_shared<EventRunnerImpl>(sp);
408     sp->innerRunner_ = innerRunner;
409     sp->queue_ = innerRunner->GetEventQueue();
410 
411     return sp;
412 }
413 
Create(const std::string & threadName)414 std::shared_ptr<EventRunner> EventRunner::Create(const std::string &threadName)
415 {
416     // Constructor of 'EventRunner' is private, could not use 'std::make_shared' to construct it.
417     std::shared_ptr<EventRunner> sp(new EventRunner(true));
418     auto innerRunner = std::make_shared<EventRunnerImpl>(sp);
419     sp->innerRunner_ = innerRunner;
420     sp->queue_ = innerRunner->GetEventQueue();
421 
422     // Start new thread
423     innerRunner->SetThreadName(threadName);
424     auto thread =
425         std::make_unique<std::thread>(EventRunnerImpl::ThreadMain, std::weak_ptr<EventRunnerImpl>(innerRunner));
426     if (!innerRunner->Attach(thread)) {
427         HILOGW("Create: Failed to attach thread, maybe process is exiting");
428         innerRunner->Stop();
429         thread->join();
430     }
431 
432     return sp;
433 }
434 
Current()435 std::shared_ptr<EventRunner> EventRunner::Current()
436 {
437     auto runner = EventInnerRunner::GetCurrentEventRunner();
438     if (runner) {
439         return runner;
440     }
441     return nullptr;
442 }
443 
EventRunner(bool deposit)444 EventRunner::EventRunner(bool deposit) : deposit_(deposit)
445 {}
446 
GetRunnerThreadName() const447 std::string EventRunner::GetRunnerThreadName() const
448 {
449     return innerRunner_->GetThreadName();
450 }
451 
~EventRunner()452 EventRunner::~EventRunner()
453 {
454     if (deposit_) {
455         innerRunner_->Stop();
456     }
457 }
458 
Run()459 ErrCode EventRunner::Run()
460 {
461     if (deposit_) {
462         HILOGE("Run: Do not call, if event runner is deposited");
463         return EVENT_HANDLER_ERR_RUNNER_NO_PERMIT;
464     }
465 
466     // Avoid more than one thread to start this runner.
467     if (running_.exchange(true)) {
468         HILOGW("Run: Already running");
469         return EVENT_HANDLER_ERR_RUNNER_ALREADY;
470     }
471 
472     // Entry event loop.
473     innerRunner_->Run();
474 
475     running_.store(false);
476     return ERR_OK;
477 }
478 
Stop()479 ErrCode EventRunner::Stop()
480 {
481     if (deposit_) {
482         HILOGE("Stop: Do not call, if event runner is deposited");
483         return EVENT_HANDLER_ERR_RUNNER_NO_PERMIT;
484     }
485 
486     if (running_.load()) {
487         innerRunner_->Stop();
488     } else {
489         HILOGW("Stop: Already stopped");
490     }
491 
492     return ERR_OK;
493 }
494 
Dump(Dumper & dumper)495 void EventRunner::Dump(Dumper &dumper)
496 {
497     if (!IsRunning()) {
498         dumper.Dump(dumper.GetTag() + " Event runner is not running" + LINE_SEPARATOR);
499         return;
500     }
501 
502     if (queue_ == nullptr) {
503         dumper.Dump(dumper.GetTag() + " Queue is nullLINE_SEPARATOR" + LINE_SEPARATOR);
504         return;
505     }
506 
507     dumper.Dump(dumper.GetTag() + " Event runner (" + "Thread name = " + innerRunner_->GetThreadName() +
508                 ", Thread ID = " + std::to_string(GetThreadId()) + ") is running" + LINE_SEPARATOR);
509 
510     queue_->Dump(dumper);
511 }
512 
DumpRunnerInfo(std::string & runnerInfo)513 void EventRunner::DumpRunnerInfo(std::string& runnerInfo)
514 {
515     if (!IsRunning()) {
516         runnerInfo = "        Event runner is not running" + LINE_SEPARATOR;
517     }
518 
519     if (queue_ == nullptr) {
520         runnerInfo = "        Queue is null" + LINE_SEPARATOR;
521         return;
522     }
523 
524     std::string queueInfo;
525     queue_->DumpQueueInfo(queueInfo);
526     runnerInfo += queueInfo;
527 }
528 
SetLogger(const std::shared_ptr<Logger> & logger)529 void EventRunner::SetLogger(const std::shared_ptr<Logger> &logger)
530 {
531     innerRunner_->SetLogger(logger);
532 }
533 
GetCurrentEventQueue()534 std::shared_ptr<EventQueue> EventRunner::GetCurrentEventQueue()
535 {
536     auto runner = EventRunner::Current();
537     if (!runner) {
538         return nullptr;
539     }
540     return runner->queue_;
541 }
542 
GetThreadId()543 uint64_t EventRunner::GetThreadId()
544 {
545     std::thread::id tid = innerRunner_->GetThreadId();
546     std::stringstream buf;
547     buf << tid;
548     std::string stid = buf.str();
549     return std::stoull(stid);
550 }
551 
IsCurrentRunnerThread()552 bool EventRunner::IsCurrentRunnerThread()
553 {
554     return std::this_thread::get_id() == innerRunner_->GetThreadId();
555 }
556 
GetMainEventRunner()557 std::shared_ptr<EventRunner> EventRunner::GetMainEventRunner()
558 {
559     if (!mainRunner_) {
560         mainRunner_ = Create(false);
561         if (!mainRunner_) {
562             HILOGE("mainRunner_ create fail");
563         }
564     }
565 
566     return mainRunner_;
567 }
568 }  // namespace AppExecFwk
569 }  // namespace OHOS
570