• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "event_runner.h"
17 
18 #include <condition_variable>
19 #include <mutex>
20 #include <sstream>
21 #include <thread>
22 #include <unordered_map>
23 #include <vector>
24 
25 #include <unistd.h>
26 #include <sys/prctl.h>
27 #include <sys/syscall.h>
28 
29 #include "event_handler.h"
30 #include "event_handler_utils.h"
31 #include "event_inner_runner.h"
32 #include "singleton.h"
33 #include "thread_local_data.h"
34 
35 DEFINE_HILOG_LABEL("EventRunner");
36 
37 namespace OHOS {
38 namespace AppExecFwk {
39 namespace {
40 
41 // Invoke system call to set name of current thread.
SystemCallSetThreadName(const std::string & name)42 inline void SystemCallSetThreadName(const std::string &name)
43 {
44     if (prctl(PR_SET_NAME, name.c_str()) < 0) {
45         char errmsg[MAX_ERRORMSG_LEN] = {0};
46         GetLastErr(errmsg, MAX_ERRORMSG_LEN);
47         HILOGE("SystemCallSetThreadName: Failed to set thread name, %{public}s", errmsg);
48     }
49 }
50 
51 // Help to calculate hash code of object.
52 template<typename T>
CalculateHashCode(const T & obj)53 inline size_t CalculateHashCode(const T &obj)
54 {
55     std::hash<T> calculateHashCode;
56     return calculateHashCode(obj);
57 }
58 
59 // Thread collector is used to reclaim thread that needs to finish running.
60 class ThreadCollector : public DelayedRefSingleton<ThreadCollector> {
61     DECLARE_DELAYED_REF_SINGLETON(ThreadCollector);
62 
63 public:
64     DISALLOW_COPY_AND_MOVE(ThreadCollector);
65 
66     using ExitFunction = std::function<void()>;
67 
ReclaimCurrentThread()68     void ReclaimCurrentThread()
69     {
70         // Get id of current thread.
71         auto threadId = std::this_thread::get_id();
72         HILOGD("Reclaim: Thread id: %{public}zu", CalculateHashCode(threadId));
73 
74         {
75             // Add thread id to list and notify to reclaim.
76             std::lock_guard<std::mutex> lock(collectorLock_);
77             if (destroying_) {
78                 HILOGI("Reclaim: Thread collector is destroying");
79                 return;
80             }
81 
82             reclaims_.emplace_back(threadId);
83             if (isWaiting_) {
84                 condition_.notify_one();
85             }
86         }
87 
88         if (threadLock_.try_lock()) {
89             if ((!thread_) && (needCreateThread_)) {
90                 // Start daemon thread to collect finished threads, if not exist.
91                 thread_ = std::make_unique<std::thread>(&ThreadCollector::Run, this);
92             }
93             threadLock_.unlock();
94         }
95     }
96 
Deposit(std::unique_ptr<std::thread> & thread,const ExitFunction & threadExit)97     bool Deposit(std::unique_ptr<std::thread> &thread, const ExitFunction &threadExit)
98     {
99         if ((!thread) || (!thread->joinable()) || (!threadExit)) {
100             auto threadState = thread ? (thread->joinable() ? "active" : "finished") : "null";
101             HILOGE("Deposit(%{public}s, %{public}s): Invalid parameter", threadState, threadExit ? "valid" : "null");
102             return false;
103         }
104 
105         auto threadId = thread->get_id();
106         HILOGD("Deposit: New thread id: %{public}zu", CalculateHashCode(threadId));
107         // Save these information into map.
108         std::lock_guard<std::mutex> lock(collectorLock_);
109         if (destroying_) {
110             HILOGW("Deposit: Collector thread is destroying");
111             return false;
112         }
113         // Save thread object and its exit callback.
114         depositMap_.emplace(threadId,
115             ThreadExitInfo {
116                 .thread = std::move(thread),
117                 .threadExit = threadExit,
118             });
119         return true;
120     }
121 
122 private:
123     DEFINE_HILOG_LABEL("ThreadCollector");
124 
125     struct ThreadExitInfo {
126         std::unique_ptr<std::thread> thread;
127         ExitFunction threadExit;
128     };
129 
ReclaimAll()130     inline void ReclaimAll()
131     {
132         std::unique_lock<std::mutex> lock(collectorLock_);
133         // All thread deposited need to stop one by one.
134         while (!depositMap_.empty()) {
135             DoReclaimLocked(lock, depositMap_.begin());
136         }
137     }
138 
Stop()139     void Stop()
140     {
141         {
142             // Stop the collect thread, while destruction of collector.
143             std::lock_guard<std::mutex> lock(collectorLock_);
144             destroying_ = true;
145             if (isWaiting_) {
146                 condition_.notify_all();
147             }
148         }
149 
150         {
151             std::lock_guard<std::mutex> lock(threadLock_);
152             if ((thread_) && (thread_->joinable())) {
153                 // Wait utils collect thread finished, if exists.
154                 thread_->join();
155             }
156             needCreateThread_ = false;
157         }
158 
159         ReclaimAll();
160     }
161 
DoReclaimLocked(std::unique_lock<std::mutex> & lock,std::unordered_map<std::thread::id,ThreadExitInfo>::iterator it,bool needCallExit=true)162     void DoReclaimLocked(std::unique_lock<std::mutex> &lock,
163         std::unordered_map<std::thread::id, ThreadExitInfo>::iterator it, bool needCallExit = true)
164     {
165         if (it == depositMap_.end()) {
166             return;
167         }
168 
169         // Remove thread information from map.
170         auto threadId = it->first;
171         auto exitInfo = std::move(it->second);
172         (void)depositMap_.erase(it);
173 
174         // Unlock, because stopping thread maybe spend lot of time, it should be out of the lock.
175         lock.unlock();
176 
177         size_t hashThreadId = CalculateHashCode(threadId);
178         HILOGD("DoReclaim: Thread id: %{public}zu", hashThreadId);
179         if (needCallExit) {
180             // Call exit callback to stop loop in thread.
181             exitInfo.threadExit();
182         }
183         // Wait until thread finished.
184         exitInfo.thread->join();
185         HILOGD("DoReclaim: Done, thread id: %{public}zu", hashThreadId);
186 
187         // Lock again.
188         lock.lock();
189     }
190 
Run()191     void Run()
192     {
193         HILOGD("Run: Collector thread is started");
194 
195         std::unique_lock<std::mutex> lock(collectorLock_);
196         while (!destroying_) {
197             // Reclaim threads in list one by one.
198             while (!reclaims_.empty()) {
199                 auto threadId = reclaims_.back();
200                 reclaims_.pop_back();
201                 DoReclaimLocked(lock, depositMap_.find(threadId), false);
202             }
203 
204             // Maybe stop running while doing reclaim, so check before waiting.
205             if (destroying_) {
206                 break;
207             }
208 
209             isWaiting_ = true;
210             condition_.wait(lock);
211             isWaiting_ = false;
212         }
213 
214         HILOGD("Run: Collector thread is stopped");
215     }
216 
217     std::mutex collectorLock_;
218     std::condition_variable condition_;
219     bool isWaiting_ {false};
220     bool destroying_ {false};
221     std::vector<std::thread::id> reclaims_;
222     std::unordered_map<std::thread::id, ThreadExitInfo> depositMap_;
223 
224     std::mutex threadLock_;
225     // Thread for collector
226     std::unique_ptr<std::thread> thread_;
227     bool needCreateThread_ {true};
228 
229     // Avatar of thread collector, used to stop collector at the specified opportunity.
230     class Avatar {
231     public:
232         DISALLOW_COPY_AND_MOVE(Avatar);
233 
234         Avatar() = default;
~Avatar()235         ~Avatar()
236         {
237             if (avatarEnabled_) {
238                 GetInstance().avatarDestructed_ = true;
239                 GetInstance().Stop();
240             }
241         }
242 
Disable()243         static inline void Disable()
244         {
245             avatarEnabled_ = false;
246         }
247     };
248 
249     // Mark whether avatar is destructed.
250     volatile bool avatarDestructed_ {false};
251     // Mark whether avatar is enabled.
252     static volatile bool avatarEnabled_;
253     static Avatar avatar_;
254 };
255 
ThreadCollector()256 ThreadCollector::ThreadCollector()
257     : collectorLock_(), condition_(), reclaims_(), depositMap_(), threadLock_(), thread_(nullptr)
258 {
259     // Thread collector is created, so enable avatar.
260     avatarEnabled_ = true;
261 }
262 
~ThreadCollector()263 ThreadCollector::~ThreadCollector()
264 {
265     // If avatar is not destructed, stop collector by itself.
266     if (!avatarDestructed_) {
267         avatar_.Disable();
268         Stop();
269     }
270 }
271 
272 class EventRunnerImpl final : public EventInnerRunner {
273 public:
EventRunnerImpl(const std::shared_ptr<EventRunner> & runner)274     explicit EventRunnerImpl(const std::shared_ptr<EventRunner> &runner) : EventInnerRunner(runner)
275     {
276         queue_ = std::make_shared<EventQueue>();
277     }
278 
~EventRunnerImpl()279     ~EventRunnerImpl() final
280     {
281         queue_->RemoveAll();
282     }
283     DISALLOW_COPY_AND_MOVE(EventRunnerImpl);
284 
ThreadMain(const std::weak_ptr<EventRunnerImpl> & wp)285     static void ThreadMain(const std::weak_ptr<EventRunnerImpl> &wp)
286     {
287         std::shared_ptr<EventRunnerImpl> inner = wp.lock();
288         if (inner) {
289             HILOGD("ThreadMain: Start running for thread '%{public}s'", inner->threadName_.c_str());
290 
291             // Call system call to modify thread name.
292             SystemCallSetThreadName(inner->threadName_);
293 
294             // Enter event loop.
295             inner->Run();
296 
297             HILOGD("ThreadMain: Stopped running for thread '%{public}s'", inner->threadName_.c_str());
298         } else {
299             HILOGW("ThreadMain: EventRunner has been released just after its creation");
300         }
301 
302         // Reclaim current thread.
303         ThreadCollector::GetInstance().ReclaimCurrentThread();
304     }
305 
Run()306     void Run() final
307     {
308         // Prepare to start event loop.
309         queue_->Prepare();
310 
311         // Make sure instance of 'EventRunner' exists.
312         if (owner_.expired()) {
313             return;
314         }
315 
316         threadId_ = std::this_thread::get_id();
317         kernelThreadId_ = syscall(__NR_gettid);
318 
319         // Save old event runner.
320         std::weak_ptr<EventRunner> oldRunner = currentEventRunner;
321         // Set current event runner into thread local data.
322         currentEventRunner = owner_;
323 
324         // Start event looper.
325         for (auto event = queue_->GetEvent(); event; event = queue_->GetEvent()) {
326             std::shared_ptr<EventHandler> handler = event->GetOwner();
327             // Make sure owner of the event exists.
328             if (handler) {
329                 std::shared_ptr<Logger> logging = logger_;
330                 if (logging != nullptr) {
331                     if (!event->HasTask()) {
332                         logging->Log("Dispatching to handler event id = " + std::to_string(event->GetInnerEventId()));
333                     } else {
334                         logging->Log("Dispatching to handler event task name = " + event->GetTaskName());
335                     }
336                 }
337                 handler->DistributeEvent(event);
338             }
339             // Release event manually, otherwise event will be released until next event coming.
340             event.reset();
341         }
342 
343         // Restore current event runner.
344         currentEventRunner = oldRunner;
345     }
346 
Stop()347     void Stop() final
348     {
349         queue_->Finish();
350     }
351 
Attach(std::unique_ptr<std::thread> & thread)352     inline bool Attach(std::unique_ptr<std::thread> &thread)
353     {
354         auto exitThread = [queue = queue_]() { queue->Finish(); };
355 
356         return ThreadCollector::GetInstance().Deposit(thread, exitThread);
357     }
358 
SetThreadName(const std::string & threadName)359     inline void SetThreadName(const std::string &threadName)
360     {
361         static std::atomic<uint32_t> idGenerator(1);
362 
363         if (threadName.empty()) {
364             // Generate a default name
365             threadName_ = "EventRunner#";
366             threadName_ += std::to_string(idGenerator++);
367         } else {
368             threadName_ = threadName;
369         }
370     }
371 
372 private:
373     DEFINE_HILOG_LABEL("EventRunnerImpl");
374 };
375 }  // unnamed namespace
376 
EventInnerRunner(const std::shared_ptr<EventRunner> & runner)377 EventInnerRunner::EventInnerRunner(const std::shared_ptr<EventRunner> &runner)
378     : queue_(nullptr), owner_(runner), logger_(nullptr), threadName_(""), threadId_()
379 {}
380 
GetCurrentEventRunner()381 std::shared_ptr<EventRunner> EventInnerRunner::GetCurrentEventRunner()
382 {
383     const std::weak_ptr<EventRunner> &wp = currentEventRunner;
384     return wp.lock();
385 }
386 
387 ThreadLocalData<std::weak_ptr<EventRunner>> EventInnerRunner::currentEventRunner;
388 
389 namespace {
390 volatile bool ThreadCollector::avatarEnabled_ = false;
391 
392 /*
393  * All event runners are relied on 'currentEventRunner', so make sure destruction of 'currentEventRunner'
394  * should after all event runners finished. All event runners finished in destruction of 'ThreadCollector::Avatar',
395  * so instance of 'ThreadCollector::Avatar' MUST defined after 'currentEventRunner'.
396  */
397 ThreadCollector::Avatar ThreadCollector::avatar_;
398 }  // unnamed namespace
399 
400 std::shared_ptr<EventRunner> EventRunner::mainRunner_;
401 
Create(bool inNewThread)402 std::shared_ptr<EventRunner> EventRunner::Create(bool inNewThread)
403 {
404     if (inNewThread) {
405         return Create(std::string());
406     }
407 
408     // Constructor of 'EventRunner' is private, could not use 'std::make_shared' to construct it.
409     std::shared_ptr<EventRunner> sp(new EventRunner(false));
410     auto innerRunner = std::make_shared<EventRunnerImpl>(sp);
411     sp->innerRunner_ = innerRunner;
412     sp->queue_ = innerRunner->GetEventQueue();
413 
414     return sp;
415 }
416 
Create(const std::string & threadName)417 std::shared_ptr<EventRunner> EventRunner::Create(const std::string &threadName)
418 {
419     // Constructor of 'EventRunner' is private, could not use 'std::make_shared' to construct it.
420     std::shared_ptr<EventRunner> sp(new EventRunner(true));
421     auto innerRunner = std::make_shared<EventRunnerImpl>(sp);
422     sp->innerRunner_ = innerRunner;
423     sp->queue_ = innerRunner->GetEventQueue();
424 
425     // Start new thread
426     innerRunner->SetThreadName(threadName);
427     auto thread =
428         std::make_unique<std::thread>(EventRunnerImpl::ThreadMain, std::weak_ptr<EventRunnerImpl>(innerRunner));
429     if (!innerRunner->Attach(thread)) {
430         HILOGW("Create: Failed to attach thread, maybe process is exiting");
431         innerRunner->Stop();
432         thread->join();
433     }
434 
435     return sp;
436 }
437 
Current()438 std::shared_ptr<EventRunner> EventRunner::Current()
439 {
440     auto runner = EventInnerRunner::GetCurrentEventRunner();
441     if (runner) {
442         return runner;
443     }
444     return nullptr;
445 }
446 
EventRunner(bool deposit)447 EventRunner::EventRunner(bool deposit) : deposit_(deposit)
448 {}
449 
GetRunnerThreadName() const450 std::string EventRunner::GetRunnerThreadName() const
451 {
452     return innerRunner_->GetThreadName();
453 }
454 
~EventRunner()455 EventRunner::~EventRunner()
456 {
457     if (deposit_) {
458         innerRunner_->Stop();
459     }
460 }
461 
Run()462 ErrCode EventRunner::Run()
463 {
464     if (deposit_) {
465         HILOGE("Run: Do not call, if event runner is deposited");
466         return EVENT_HANDLER_ERR_RUNNER_NO_PERMIT;
467     }
468 
469     // Avoid more than one thread to start this runner.
470     if (running_.exchange(true)) {
471         HILOGW("Run: Already running");
472         return EVENT_HANDLER_ERR_RUNNER_ALREADY;
473     }
474 
475     // Entry event loop.
476     innerRunner_->Run();
477 
478     running_.store(false);
479     return ERR_OK;
480 }
481 
Stop()482 ErrCode EventRunner::Stop()
483 {
484     if (deposit_) {
485         HILOGE("Stop: Do not call, if event runner is deposited");
486         return EVENT_HANDLER_ERR_RUNNER_NO_PERMIT;
487     }
488 
489     if (running_.load()) {
490         innerRunner_->Stop();
491     } else {
492         HILOGW("Stop: Already stopped");
493     }
494 
495     return ERR_OK;
496 }
497 
Dump(Dumper & dumper)498 void EventRunner::Dump(Dumper &dumper)
499 {
500     if (!IsRunning()) {
501         dumper.Dump(dumper.GetTag() + " Event runner is not running" + LINE_SEPARATOR);
502         return;
503     }
504 
505     if (queue_ == nullptr) {
506         dumper.Dump(dumper.GetTag() + " Queue is nullLINE_SEPARATOR" + LINE_SEPARATOR);
507         return;
508     }
509 
510     dumper.Dump(dumper.GetTag() + " Event runner (" + "Thread name = " + innerRunner_->GetThreadName() +
511                 ", Thread ID = " + std::to_string(GetKernelThreadId()) + ") is running" + LINE_SEPARATOR);
512     queue_->Dump(dumper);
513 }
514 
DumpRunnerInfo(std::string & runnerInfo)515 void EventRunner::DumpRunnerInfo(std::string& runnerInfo)
516 {
517     if (!IsRunning()) {
518         runnerInfo = "        Event runner is not running" + LINE_SEPARATOR;
519     }
520 
521     if (queue_ == nullptr) {
522         runnerInfo = "        Queue is null" + LINE_SEPARATOR;
523         return;
524     }
525 
526     std::string queueInfo;
527     queue_->DumpQueueInfo(queueInfo);
528     runnerInfo += queueInfo;
529 }
530 
SetLogger(const std::shared_ptr<Logger> & logger)531 void EventRunner::SetLogger(const std::shared_ptr<Logger> &logger)
532 {
533     innerRunner_->SetLogger(logger);
534 }
535 
GetCurrentEventQueue()536 std::shared_ptr<EventQueue> EventRunner::GetCurrentEventQueue()
537 {
538     auto runner = EventRunner::Current();
539     if (!runner) {
540         return nullptr;
541     }
542     return runner->queue_;
543 }
544 
GetThreadId()545 uint64_t EventRunner::GetThreadId()
546 {
547     std::thread::id tid = innerRunner_->GetThreadId();
548     std::stringstream buf;
549     buf << tid;
550     std::string stid = buf.str();
551     return std::stoull(stid);
552 }
553 
GetKernelThreadId()554 uint64_t EventRunner::GetKernelThreadId()
555 {
556     return innerRunner_->GetKernelThreadId();
557 }
558 
IsCurrentRunnerThread()559 bool EventRunner::IsCurrentRunnerThread()
560 {
561     return std::this_thread::get_id() == innerRunner_->GetThreadId();
562 }
563 
GetMainEventRunner()564 std::shared_ptr<EventRunner> EventRunner::GetMainEventRunner()
565 {
566     if (!mainRunner_) {
567         mainRunner_ = Create(false);
568         if (!mainRunner_) {
569             HILOGE("mainRunner_ create fail");
570         }
571     }
572 
573     return mainRunner_;
574 }
575 }  // namespace AppExecFwk
576 }  // namespace OHOS
577