• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "event_runner.h"
17 
18 #include <condition_variable>
19 #include <mutex>
20 #include <sstream>
21 #include <thread>
22 #include <unordered_map>
23 #include <vector>
24 
25 #include <sys/prctl.h>
26 
27 #include "event_handler.h"
28 #include "event_handler_utils.h"
29 #include "event_inner_runner.h"
30 #include "singleton.h"
31 #include "thread_local_data.h"
32 
33 DEFINE_HILOG_LABEL("EventRunner");
34 
35 namespace OHOS {
36 namespace AppExecFwk {
37 namespace {
38 // Invoke system call to set name of current thread.
SystemCallSetThreadName(const std::string & name)39 inline void SystemCallSetThreadName(const std::string &name)
40 {
41     if (prctl(PR_SET_NAME, name.c_str()) < 0) {
42         char errmsg[MAX_ERRORMSG_LEN] = {0};
43         GetLastErr(errmsg, MAX_ERRORMSG_LEN);
44         HILOGE("SystemCallSetThreadName: Failed to set thread name, %{public}s", errmsg);
45     }
46 }
47 
48 // Help to calculate hash code of object.
49 template<typename T>
CalculateHashCode(const T & obj)50 inline size_t CalculateHashCode(const T &obj)
51 {
52     std::hash<T> calculateHashCode;
53     return calculateHashCode(obj);
54 }
55 
56 // Thread collector is used to reclaim thread that needs to finish running.
57 class ThreadCollector : public DelayedRefSingleton<ThreadCollector> {
58     DECLARE_DELAYED_REF_SINGLETON(ThreadCollector);
59 
60 public:
61     DISALLOW_COPY_AND_MOVE(ThreadCollector);
62 
63     using ExitFunction = std::function<void()>;
64 
ReclaimCurrentThread()65     void ReclaimCurrentThread()
66     {
67         // Get id of current thread.
68         auto threadId = std::this_thread::get_id();
69         HILOGD("Reclaim: Thread id: %{public}zu", CalculateHashCode(threadId));
70 
71         {
72             // Add thread id to list and notify to reclaim.
73             std::lock_guard<std::mutex> lock(collectorLock_);
74             if (destroying_) {
75                 HILOGI("Reclaim: Thread collector is destroying");
76                 return;
77             }
78 
79             reclaims_.emplace_back(threadId);
80             if (isWaiting_) {
81                 condition_.notify_one();
82             }
83         }
84 
85         if (threadLock_.try_lock()) {
86             if ((!thread_) && (needCreateThread_)) {
87                 // Start daemon thread to collect finished threads, if not exist.
88                 thread_ = std::make_unique<std::thread>(&ThreadCollector::Run, this);
89             }
90             threadLock_.unlock();
91         }
92     }
93 
Deposit(std::unique_ptr<std::thread> & thread,const ExitFunction & threadExit)94     bool Deposit(std::unique_ptr<std::thread> &thread, const ExitFunction &threadExit)
95     {
96         if ((!thread) || (!thread->joinable()) || (!threadExit)) {
97             auto threadState = thread ? (thread->joinable() ? "active" : "finished") : "null";
98             HILOGE("Deposit(%{public}s, %{public}s): Invalid parameter", threadState, threadExit ? "valid" : "null");
99             return false;
100         }
101 
102         auto threadId = thread->get_id();
103         HILOGD("Deposit: New thread id: %{public}zu", CalculateHashCode(threadId));
104         // Save these information into map.
105         std::lock_guard<std::mutex> lock(collectorLock_);
106         if (destroying_) {
107             HILOGW("Deposit: Collector thread is destroying");
108             return false;
109         }
110         // Save thread object and its exit callback.
111         depositMap_.emplace(threadId,
112             ThreadExitInfo {
113                 .thread = std::move(thread),
114                 .threadExit = threadExit,
115             });
116         return true;
117     }
118 
119 private:
120     DEFINE_HILOG_LABEL("ThreadCollector");
121 
122     struct ThreadExitInfo {
123         std::unique_ptr<std::thread> thread;
124         ExitFunction threadExit;
125     };
126 
ReclaimAll()127     inline void ReclaimAll()
128     {
129         std::unique_lock<std::mutex> lock(collectorLock_);
130         // All thread deposited need to stop one by one.
131         while (!depositMap_.empty()) {
132             DoReclaimLocked(lock, depositMap_.begin());
133         }
134     }
135 
Stop()136     void Stop()
137     {
138         {
139             // Stop the collect thread, while destruction of collector.
140             std::lock_guard<std::mutex> lock(collectorLock_);
141             destroying_ = true;
142             if (isWaiting_) {
143                 condition_.notify_all();
144             }
145         }
146 
147         {
148             std::lock_guard<std::mutex> lock(threadLock_);
149             if ((thread_) && (thread_->joinable())) {
150                 // Wait utils collect thread finished, if exists.
151                 thread_->join();
152             }
153             needCreateThread_ = false;
154         }
155 
156         ReclaimAll();
157     }
158 
DoReclaimLocked(std::unique_lock<std::mutex> & lock,std::unordered_map<std::thread::id,ThreadExitInfo>::iterator it,bool needCallExit=true)159     void DoReclaimLocked(std::unique_lock<std::mutex> &lock,
160         std::unordered_map<std::thread::id, ThreadExitInfo>::iterator it, bool needCallExit = true)
161     {
162         if (it == depositMap_.end()) {
163             return;
164         }
165 
166         // Remove thread information from map.
167         auto threadId = it->first;
168         auto exitInfo = std::move(it->second);
169         (void)depositMap_.erase(it);
170 
171         // Unlock, because stopping thread maybe spend lot of time, it should be out of the lock.
172         lock.unlock();
173 
174         size_t hashThreadId = CalculateHashCode(threadId);
175         HILOGD("DoReclaim: Thread id: %{public}zu", hashThreadId);
176         if (needCallExit) {
177             // Call exit callback to stop loop in thread.
178             exitInfo.threadExit();
179         }
180         // Wait until thread finished.
181         exitInfo.thread->join();
182         HILOGD("DoReclaim: Done, thread id: %{public}zu", hashThreadId);
183 
184         // Lock again.
185         lock.lock();
186     }
187 
Run()188     void Run()
189     {
190         HILOGD("Run: Collector thread is started");
191 
192         std::unique_lock<std::mutex> lock(collectorLock_);
193         while (!destroying_) {
194             // Reclaim threads in list one by one.
195             while (!reclaims_.empty()) {
196                 auto threadId = reclaims_.back();
197                 reclaims_.pop_back();
198                 DoReclaimLocked(lock, depositMap_.find(threadId), false);
199             }
200 
201             // Maybe stop running while doing reclaim, so check before waiting.
202             if (destroying_) {
203                 break;
204             }
205 
206             isWaiting_ = true;
207             condition_.wait(lock);
208             isWaiting_ = false;
209         }
210 
211         HILOGD("Run: Collector thread is stopped");
212     }
213 
214     std::mutex collectorLock_;
215     std::condition_variable condition_;
216     bool isWaiting_ {false};
217     bool destroying_ {false};
218     std::vector<std::thread::id> reclaims_;
219     std::unordered_map<std::thread::id, ThreadExitInfo> depositMap_;
220 
221     std::mutex threadLock_;
222     // Thread for collector
223     std::unique_ptr<std::thread> thread_;
224     bool needCreateThread_ {true};
225 
226     // Avatar of thread collector, used to stop collector at the specified opportunity.
227     class Avatar {
228     public:
229         DISALLOW_COPY_AND_MOVE(Avatar);
230 
231         Avatar() = default;
~Avatar()232         ~Avatar()
233         {
234             if (avatarEnabled_) {
235                 GetInstance().avatarDestructed_ = true;
236                 GetInstance().Stop();
237             }
238         }
239 
Disable() const240         inline void Disable() const
241         {
242             avatarEnabled_ = false;
243         }
244     };
245 
246     // Mark whether avatar is destructed.
247     volatile bool avatarDestructed_ {false};
248     // Mark whether avatar is enabled.
249     static volatile bool avatarEnabled_;
250     static Avatar avatar_;
251 };
252 
ThreadCollector()253 ThreadCollector::ThreadCollector()
254     : collectorLock_(), condition_(), reclaims_(), depositMap_(), threadLock_(), thread_(nullptr)
255 {
256     // Thread collector is created, so enable avatar.
257     avatarEnabled_ = true;
258 }
259 
~ThreadCollector()260 ThreadCollector::~ThreadCollector()
261 {
262     // If avatar is not destructed, stop collector by itself.
263     if (!avatarDestructed_) {
264         avatar_.Disable();
265         Stop();
266     }
267 }
268 
269 class EventRunnerImpl final : public EventInnerRunner {
270 public:
EventRunnerImpl(const std::shared_ptr<EventRunner> & runner)271     explicit EventRunnerImpl(const std::shared_ptr<EventRunner> &runner) : EventInnerRunner(runner)
272     {
273         queue_ = std::make_shared<EventQueue>();
274     }
275 
276     ~EventRunnerImpl() final = default;
277     DISALLOW_COPY_AND_MOVE(EventRunnerImpl);
278 
ThreadMain(const std::weak_ptr<EventRunnerImpl> & wp)279     static void ThreadMain(const std::weak_ptr<EventRunnerImpl> &wp)
280     {
281         std::shared_ptr<EventRunnerImpl> inner = wp.lock();
282         if (inner) {
283             HILOGD("ThreadMain: Start running for thread '%{public}s'", inner->threadName_.c_str());
284 
285             // Call system call to modify thread name.
286             SystemCallSetThreadName(inner->threadName_);
287 
288             // Enter event loop.
289             inner->Run();
290 
291             HILOGD("ThreadMain: Stopped running for thread '%{public}s'", inner->threadName_.c_str());
292         } else {
293             HILOGW("ThreadMain: EventRunner has been released just after its creation");
294         }
295 
296         // Reclaim current thread.
297         ThreadCollector::GetInstance().ReclaimCurrentThread();
298     }
299 
Run()300     void Run() final
301     {
302         // Prepare to start event loop.
303         queue_->Prepare();
304 
305         // Make sure instance of 'EventRunner' exists.
306         if (owner_.expired()) {
307             return;
308         }
309 
310         threadId_ = std::this_thread::get_id();
311 
312         // Save old event runner.
313         std::weak_ptr<EventRunner> oldRunner = currentEventRunner;
314         // Set current event runner into thread local data.
315         currentEventRunner = owner_;
316 
317         // Start event looper.
318         for (auto event = queue_->GetEvent(); event; event = queue_->GetEvent()) {
319             std::shared_ptr<EventHandler> handler = event->GetOwner();
320             // Make sure owner of the event exists.
321             if (handler) {
322                 std::shared_ptr<Logger> logging = logger_;
323                 std::stringstream address;
324                 address << handler.get();
325                 if (logging != nullptr) {
326                     if (!event->HasTask()) {
327                         logging->Log("Dispatching to handler event id = " + std::to_string(event->GetInnerEventId()));
328                     } else {
329                         logging->Log("Dispatching to handler event task name = " + event->GetTaskName());
330                     }
331                 }
332                 handler->DistributeEvent(event);
333 
334                 if (logging != nullptr) {
335                     logging->Log("Finished to handler(0x" + address.str() + ")");
336                 }
337             }
338             // Release event manually, otherwise event will be released until next event coming.
339             event.reset();
340         }
341 
342         // Restore current event runner.
343         currentEventRunner = oldRunner;
344     }
345 
Stop()346     void Stop() final
347     {
348         queue_->Finish();
349     }
350 
Attach(std::unique_ptr<std::thread> & thread)351     inline bool Attach(std::unique_ptr<std::thread> &thread)
352     {
353         auto exitThread = [queue = queue_]() { queue->Finish(); };
354 
355         return ThreadCollector::GetInstance().Deposit(thread, exitThread);
356     }
357 
SetThreadName(const std::string & threadName)358     inline void SetThreadName(const std::string &threadName)
359     {
360         static std::atomic<uint32_t> idGenerator(1);
361 
362         if (threadName.empty()) {
363             // Generate a default name
364             threadName_ = "event_runner#";
365             threadName_ += std::to_string(idGenerator++);
366         } else {
367             threadName_ = threadName;
368         }
369     }
370 
371 private:
372     DEFINE_HILOG_LABEL("EventRunnerImpl");
373 };
374 }  // unnamed namespace
375 
EventInnerRunner(const std::shared_ptr<EventRunner> & runner)376 EventInnerRunner::EventInnerRunner(const std::shared_ptr<EventRunner> &runner)
377     : queue_(nullptr), owner_(runner), logger_(nullptr), threadName_(""), threadId_()
378 {}
379 
GetCurrentEventRunner()380 std::shared_ptr<EventRunner> EventInnerRunner::GetCurrentEventRunner()
381 {
382     const std::weak_ptr<EventRunner> &wp = currentEventRunner;
383     return wp.lock();
384 }
385 
386 ThreadLocalData<std::weak_ptr<EventRunner>> EventInnerRunner::currentEventRunner;
387 
388 namespace {
389 volatile bool ThreadCollector::avatarEnabled_ = false;
390 
391 /*
392  * All event runners are relied on 'currentEventRunner', so make sure destruction of 'currentEventRunner'
393  * should after all event runners finished. All event runners finished in destruction of 'ThreadCollector::Avatar',
394  * so instance of 'ThreadCollector::Avatar' MUST defined after 'currentEventRunner'.
395  */
396 ThreadCollector::Avatar ThreadCollector::avatar_;
397 }  // unnamed namespace
398 
399 std::shared_ptr<EventRunner> EventRunner::mainRunner_;
400 
Create(bool inNewThread)401 std::shared_ptr<EventRunner> EventRunner::Create(bool inNewThread)
402 {
403     if (inNewThread) {
404         return Create(std::string());
405     }
406 
407     // Constructor of 'EventRunner' is private, could not use 'std::make_shared' to construct it.
408     std::shared_ptr<EventRunner> sp(new EventRunner(false));
409     auto innerRunner = std::make_shared<EventRunnerImpl>(sp);
410     sp->innerRunner_ = innerRunner;
411     sp->queue_ = innerRunner->GetEventQueue();
412 
413     return sp;
414 }
415 
Create(const std::string & threadName)416 std::shared_ptr<EventRunner> EventRunner::Create(const std::string &threadName)
417 {
418     // Constructor of 'EventRunner' is private, could not use 'std::make_shared' to construct it.
419     std::shared_ptr<EventRunner> sp(new EventRunner(true));
420     auto innerRunner = std::make_shared<EventRunnerImpl>(sp);
421     sp->innerRunner_ = innerRunner;
422     sp->queue_ = innerRunner->GetEventQueue();
423 
424     // Start new thread
425     innerRunner->SetThreadName(threadName);
426     auto thread =
427         std::make_unique<std::thread>(EventRunnerImpl::ThreadMain, std::weak_ptr<EventRunnerImpl>(innerRunner));
428     if (!innerRunner->Attach(thread)) {
429         HILOGW("Create: Failed to attach thread, maybe process is exiting");
430         innerRunner->Stop();
431         thread->join();
432     }
433 
434     return sp;
435 }
436 
Current()437 std::shared_ptr<EventRunner> EventRunner::Current()
438 {
439     auto runner = EventInnerRunner::GetCurrentEventRunner();
440     if (runner) {
441         return runner;
442     }
443     return nullptr;
444 }
445 
EventRunner(bool deposit)446 EventRunner::EventRunner(bool deposit) : deposit_(deposit)
447 {}
448 
GetRunnerThreadName() const449 std::string EventRunner::GetRunnerThreadName() const
450 {
451     return innerRunner_->GetThreadName();
452 }
453 
~EventRunner()454 EventRunner::~EventRunner()
455 {
456     if (deposit_) {
457         innerRunner_->Stop();
458     }
459 }
460 
Run()461 ErrCode EventRunner::Run()
462 {
463     if (deposit_) {
464         HILOGE("Run: Do not call, if event runner is deposited");
465         return EVENT_HANDLER_ERR_RUNNER_NO_PERMIT;
466     }
467 
468     // Avoid more than one thread to start this runner.
469     if (running_.exchange(true)) {
470         HILOGW("Run: Already running");
471         return EVENT_HANDLER_ERR_RUNNER_ALREADY;
472     }
473 
474     // Entry event loop.
475     innerRunner_->Run();
476 
477     running_.store(false);
478     return ERR_OK;
479 }
480 
Stop()481 ErrCode EventRunner::Stop()
482 {
483     if (deposit_) {
484         HILOGE("Stop: Do not call, if event runner is deposited");
485         return EVENT_HANDLER_ERR_RUNNER_NO_PERMIT;
486     }
487 
488     if (running_.load()) {
489         innerRunner_->Stop();
490     } else {
491         HILOGW("Stop: Already stopped");
492     }
493 
494     return ERR_OK;
495 }
496 
Dump(Dumper & dumper)497 void EventRunner::Dump(Dumper &dumper)
498 {
499     if (!IsRunning()) {
500         dumper.Dump(dumper.GetTag() + " Event runner is not running" + LINE_SEPARATOR);
501         return;
502     }
503 
504     if (queue_ == nullptr) {
505         dumper.Dump(dumper.GetTag() + " Queue is nullLINE_SEPARATOR" + LINE_SEPARATOR);
506         return;
507     }
508 
509     dumper.Dump(dumper.GetTag() + " Event runner (" + "Thread name = " + innerRunner_->GetThreadName() +
510                 ", Thread ID = " + std::to_string(GetThreadId()) + ") is running" + LINE_SEPARATOR);
511 
512     queue_->Dump(dumper);
513 }
514 
DumpRunnerInfo(std::string & runnerInfo)515 void EventRunner::DumpRunnerInfo(std::string& runnerInfo)
516 {
517     if (!IsRunning()) {
518         runnerInfo = "        Event runner is not running" + LINE_SEPARATOR;
519     }
520 
521     if (queue_ == nullptr) {
522         runnerInfo = "        Queue is null" + LINE_SEPARATOR;
523         return;
524     }
525 
526     std::string queueInfo;
527     queue_->DumpQueueInfo(queueInfo);
528     runnerInfo += queueInfo;
529 }
530 
SetLogger(const std::shared_ptr<Logger> & logger)531 void EventRunner::SetLogger(const std::shared_ptr<Logger> &logger)
532 {
533     innerRunner_->SetLogger(logger);
534 }
535 
GetCurrentEventQueue()536 std::shared_ptr<EventQueue> EventRunner::GetCurrentEventQueue()
537 {
538     auto runner = EventRunner::Current();
539     if (!runner) {
540         return nullptr;
541     }
542     return runner->queue_;
543 }
544 
GetThreadId()545 uint64_t EventRunner::GetThreadId()
546 {
547     std::thread::id tid = innerRunner_->GetThreadId();
548     std::stringstream buf;
549     buf << tid;
550     std::string stid = buf.str();
551     return std::stoull(stid);
552 }
553 
IsCurrentRunnerThread()554 bool EventRunner::IsCurrentRunnerThread()
555 {
556     return std::this_thread::get_id() == innerRunner_->GetThreadId();
557 }
558 
GetMainEventRunner()559 std::shared_ptr<EventRunner> EventRunner::GetMainEventRunner()
560 {
561     if (!mainRunner_) {
562         mainRunner_ = Create(false);
563         if (!mainRunner_) {
564             HILOGE("mainRunner_ create fail");
565         }
566     }
567 
568     return mainRunner_;
569 }
570 }  // namespace AppExecFwk
571 }  // namespace OHOS
572