1 /*
2 * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "event_runner.h"
17
18 #include <condition_variable>
19 #include <mutex>
20 #include <sstream>
21 #include <thread>
22 #include <unordered_map>
23 #include <vector>
24
25 #include <sys/prctl.h>
26
27 #include "event_handler.h"
28 #include "event_handler_utils.h"
29 #include "event_inner_runner.h"
30 #include "singleton.h"
31 #include "thread_local_data.h"
32
33 DEFINE_HILOG_LABEL("EventRunner");
34
35 namespace OHOS {
36 namespace AppExecFwk {
37 namespace {
38 // Invoke system call to set name of current thread.
SystemCallSetThreadName(const std::string & name)39 inline void SystemCallSetThreadName(const std::string &name)
40 {
41 if (prctl(PR_SET_NAME, name.c_str()) < 0) {
42 char errmsg[MAX_ERRORMSG_LEN] = {0};
43 GetLastErr(errmsg, MAX_ERRORMSG_LEN);
44 HILOGE("SystemCallSetThreadName: Failed to set thread name, %{public}s", errmsg);
45 }
46 }
47
48 // Help to calculate hash code of object.
49 template<typename T>
CalculateHashCode(const T & obj)50 inline size_t CalculateHashCode(const T &obj)
51 {
52 std::hash<T> calculateHashCode;
53 return calculateHashCode(obj);
54 }
55
56 // Thread collector is used to reclaim thread that needs to finish running.
57 class ThreadCollector : public DelayedRefSingleton<ThreadCollector> {
58 DECLARE_DELAYED_REF_SINGLETON(ThreadCollector);
59
60 public:
61 DISALLOW_COPY_AND_MOVE(ThreadCollector);
62
63 using ExitFunction = std::function<void()>;
64
ReclaimCurrentThread()65 void ReclaimCurrentThread()
66 {
67 // Get id of current thread.
68 auto threadId = std::this_thread::get_id();
69 HILOGD("Reclaim: Thread id: %{public}zu", CalculateHashCode(threadId));
70
71 {
72 // Add thread id to list and notify to reclaim.
73 std::lock_guard<std::mutex> lock(collectorLock_);
74 if (destroying_) {
75 HILOGI("Reclaim: Thread collector is destroying");
76 return;
77 }
78
79 reclaims_.emplace_back(threadId);
80 if (isWaiting_) {
81 condition_.notify_one();
82 }
83 }
84
85 if (threadLock_.try_lock()) {
86 if ((!thread_) && (needCreateThread_)) {
87 // Start daemon thread to collect finished threads, if not exist.
88 thread_ = std::make_unique<std::thread>(&ThreadCollector::Run, this);
89 }
90 threadLock_.unlock();
91 }
92 }
93
Deposit(std::unique_ptr<std::thread> & thread,const ExitFunction & threadExit)94 bool Deposit(std::unique_ptr<std::thread> &thread, const ExitFunction &threadExit)
95 {
96 if ((!thread) || (!thread->joinable()) || (!threadExit)) {
97 auto threadState = thread ? (thread->joinable() ? "active" : "finished") : "null";
98 HILOGE("Deposit(%{public}s, %{public}s): Invalid parameter", threadState, threadExit ? "valid" : "null");
99 return false;
100 }
101
102 auto threadId = thread->get_id();
103 HILOGD("Deposit: New thread id: %{public}zu", CalculateHashCode(threadId));
104 // Save these information into map.
105 std::lock_guard<std::mutex> lock(collectorLock_);
106 if (destroying_) {
107 HILOGW("Deposit: Collector thread is destroying");
108 return false;
109 }
110 // Save thread object and its exit callback.
111 depositMap_.emplace(threadId,
112 ThreadExitInfo {
113 .thread = std::move(thread),
114 .threadExit = threadExit,
115 });
116 return true;
117 }
118
119 private:
120 DEFINE_HILOG_LABEL("ThreadCollector");
121
122 struct ThreadExitInfo {
123 std::unique_ptr<std::thread> thread;
124 ExitFunction threadExit;
125 };
126
ReclaimAll()127 inline void ReclaimAll()
128 {
129 std::unique_lock<std::mutex> lock(collectorLock_);
130 // All thread deposited need to stop one by one.
131 while (!depositMap_.empty()) {
132 DoReclaimLocked(lock, depositMap_.begin());
133 }
134 }
135
Stop()136 void Stop()
137 {
138 {
139 // Stop the collect thread, while destruction of collector.
140 std::lock_guard<std::mutex> lock(collectorLock_);
141 destroying_ = true;
142 if (isWaiting_) {
143 condition_.notify_all();
144 }
145 }
146
147 {
148 std::lock_guard<std::mutex> lock(threadLock_);
149 if ((thread_) && (thread_->joinable())) {
150 // Wait utils collect thread finished, if exists.
151 thread_->join();
152 }
153 needCreateThread_ = false;
154 }
155
156 ReclaimAll();
157 }
158
DoReclaimLocked(std::unique_lock<std::mutex> & lock,std::unordered_map<std::thread::id,ThreadExitInfo>::iterator it,bool needCallExit=true)159 void DoReclaimLocked(std::unique_lock<std::mutex> &lock,
160 std::unordered_map<std::thread::id, ThreadExitInfo>::iterator it, bool needCallExit = true)
161 {
162 if (it == depositMap_.end()) {
163 return;
164 }
165
166 // Remove thread information from map.
167 auto threadId = it->first;
168 auto exitInfo = std::move(it->second);
169 (void)depositMap_.erase(it);
170
171 // Unlock, because stopping thread maybe spend lot of time, it should be out of the lock.
172 lock.unlock();
173
174 size_t hashThreadId = CalculateHashCode(threadId);
175 HILOGD("DoReclaim: Thread id: %{public}zu", hashThreadId);
176 if (needCallExit) {
177 // Call exit callback to stop loop in thread.
178 exitInfo.threadExit();
179 }
180 // Wait until thread finished.
181 exitInfo.thread->join();
182 HILOGD("DoReclaim: Done, thread id: %{public}zu", hashThreadId);
183
184 // Lock again.
185 lock.lock();
186 }
187
Run()188 void Run()
189 {
190 HILOGD("Run: Collector thread is started");
191
192 std::unique_lock<std::mutex> lock(collectorLock_);
193 while (!destroying_) {
194 // Reclaim threads in list one by one.
195 while (!reclaims_.empty()) {
196 auto threadId = reclaims_.back();
197 reclaims_.pop_back();
198 DoReclaimLocked(lock, depositMap_.find(threadId), false);
199 }
200
201 // Maybe stop running while doing reclaim, so check before waiting.
202 if (destroying_) {
203 break;
204 }
205
206 isWaiting_ = true;
207 condition_.wait(lock);
208 isWaiting_ = false;
209 }
210
211 HILOGD("Run: Collector thread is stopped");
212 }
213
214 std::mutex collectorLock_;
215 std::condition_variable condition_;
216 bool isWaiting_ {false};
217 bool destroying_ {false};
218 std::vector<std::thread::id> reclaims_;
219 std::unordered_map<std::thread::id, ThreadExitInfo> depositMap_;
220
221 std::mutex threadLock_;
222 // Thread for collector
223 std::unique_ptr<std::thread> thread_;
224 bool needCreateThread_ {true};
225
226 // Avatar of thread collector, used to stop collector at the specified opportunity.
227 class Avatar {
228 public:
229 DISALLOW_COPY_AND_MOVE(Avatar);
230
231 Avatar() = default;
~Avatar()232 ~Avatar()
233 {
234 if (avatarEnabled_) {
235 GetInstance().avatarDestructed_ = true;
236 GetInstance().Stop();
237 }
238 }
239
Disable()240 static inline void Disable()
241 {
242 avatarEnabled_ = false;
243 }
244 };
245
246 // Mark whether avatar is destructed.
247 volatile bool avatarDestructed_ {false};
248 // Mark whether avatar is enabled.
249 static volatile bool avatarEnabled_;
250 static Avatar avatar_;
251 };
252
ThreadCollector()253 ThreadCollector::ThreadCollector()
254 : collectorLock_(), condition_(), reclaims_(), depositMap_(), threadLock_(), thread_(nullptr)
255 {
256 // Thread collector is created, so enable avatar.
257 avatarEnabled_ = true;
258 }
259
~ThreadCollector()260 ThreadCollector::~ThreadCollector()
261 {
262 // If avatar is not destructed, stop collector by itself.
263 if (!avatarDestructed_) {
264 avatar_.Disable();
265 Stop();
266 }
267 }
268
269 class EventRunnerImpl final : public EventInnerRunner {
270 public:
EventRunnerImpl(const std::shared_ptr<EventRunner> & runner)271 explicit EventRunnerImpl(const std::shared_ptr<EventRunner> &runner) : EventInnerRunner(runner)
272 {
273 queue_ = std::make_shared<EventQueue>();
274 }
275
~EventRunnerImpl()276 ~EventRunnerImpl() final
277 {
278 queue_->RemoveAll();
279 }
280 DISALLOW_COPY_AND_MOVE(EventRunnerImpl);
281
ThreadMain(const std::weak_ptr<EventRunnerImpl> & wp)282 static void ThreadMain(const std::weak_ptr<EventRunnerImpl> &wp)
283 {
284 std::shared_ptr<EventRunnerImpl> inner = wp.lock();
285 if (inner) {
286 HILOGD("ThreadMain: Start running for thread '%{public}s'", inner->threadName_.c_str());
287
288 // Call system call to modify thread name.
289 SystemCallSetThreadName(inner->threadName_);
290
291 // Enter event loop.
292 inner->Run();
293
294 HILOGD("ThreadMain: Stopped running for thread '%{public}s'", inner->threadName_.c_str());
295 } else {
296 HILOGW("ThreadMain: EventRunner has been released just after its creation");
297 }
298
299 // Reclaim current thread.
300 ThreadCollector::GetInstance().ReclaimCurrentThread();
301 }
302
Run()303 void Run() final
304 {
305 // Prepare to start event loop.
306 queue_->Prepare();
307
308 // Make sure instance of 'EventRunner' exists.
309 if (owner_.expired()) {
310 return;
311 }
312
313 threadId_ = std::this_thread::get_id();
314
315 // Save old event runner.
316 std::weak_ptr<EventRunner> oldRunner = currentEventRunner;
317 // Set current event runner into thread local data.
318 currentEventRunner = owner_;
319
320 // Start event looper.
321 for (auto event = queue_->GetEvent(); event; event = queue_->GetEvent()) {
322 std::shared_ptr<EventHandler> handler = event->GetOwner();
323 // Make sure owner of the event exists.
324 if (handler) {
325 std::shared_ptr<Logger> logging = logger_;
326 if (logging != nullptr) {
327 if (!event->HasTask()) {
328 logging->Log("Dispatching to handler event id = " + std::to_string(event->GetInnerEventId()));
329 } else {
330 logging->Log("Dispatching to handler event task name = " + event->GetTaskName());
331 }
332 }
333 handler->DistributeEvent(event);
334 }
335 // Release event manually, otherwise event will be released until next event coming.
336 event.reset();
337 }
338
339 // Restore current event runner.
340 currentEventRunner = oldRunner;
341 }
342
Stop()343 void Stop() final
344 {
345 queue_->Finish();
346 }
347
Attach(std::unique_ptr<std::thread> & thread)348 inline bool Attach(std::unique_ptr<std::thread> &thread)
349 {
350 auto exitThread = [queue = queue_]() { queue->Finish(); };
351
352 return ThreadCollector::GetInstance().Deposit(thread, exitThread);
353 }
354
SetThreadName(const std::string & threadName)355 inline void SetThreadName(const std::string &threadName)
356 {
357 static std::atomic<uint32_t> idGenerator(1);
358
359 if (threadName.empty()) {
360 // Generate a default name
361 threadName_ = "EventRunner#";
362 threadName_ += std::to_string(idGenerator++);
363 } else {
364 threadName_ = threadName;
365 }
366 }
367
368 private:
369 DEFINE_HILOG_LABEL("EventRunnerImpl");
370 };
371 } // unnamed namespace
372
EventInnerRunner(const std::shared_ptr<EventRunner> & runner)373 EventInnerRunner::EventInnerRunner(const std::shared_ptr<EventRunner> &runner)
374 : queue_(nullptr), owner_(runner), logger_(nullptr), threadName_(""), threadId_()
375 {}
376
GetCurrentEventRunner()377 std::shared_ptr<EventRunner> EventInnerRunner::GetCurrentEventRunner()
378 {
379 const std::weak_ptr<EventRunner> &wp = currentEventRunner;
380 return wp.lock();
381 }
382
383 ThreadLocalData<std::weak_ptr<EventRunner>> EventInnerRunner::currentEventRunner;
384
385 namespace {
386 volatile bool ThreadCollector::avatarEnabled_ = false;
387
388 /*
389 * All event runners are relied on 'currentEventRunner', so make sure destruction of 'currentEventRunner'
390 * should after all event runners finished. All event runners finished in destruction of 'ThreadCollector::Avatar',
391 * so instance of 'ThreadCollector::Avatar' MUST defined after 'currentEventRunner'.
392 */
393 ThreadCollector::Avatar ThreadCollector::avatar_;
394 } // unnamed namespace
395
396 std::shared_ptr<EventRunner> EventRunner::mainRunner_;
397
Create(bool inNewThread)398 std::shared_ptr<EventRunner> EventRunner::Create(bool inNewThread)
399 {
400 if (inNewThread) {
401 return Create(std::string());
402 }
403
404 // Constructor of 'EventRunner' is private, could not use 'std::make_shared' to construct it.
405 std::shared_ptr<EventRunner> sp(new EventRunner(false));
406 auto innerRunner = std::make_shared<EventRunnerImpl>(sp);
407 sp->innerRunner_ = innerRunner;
408 sp->queue_ = innerRunner->GetEventQueue();
409
410 return sp;
411 }
412
Create(const std::string & threadName)413 std::shared_ptr<EventRunner> EventRunner::Create(const std::string &threadName)
414 {
415 // Constructor of 'EventRunner' is private, could not use 'std::make_shared' to construct it.
416 std::shared_ptr<EventRunner> sp(new EventRunner(true));
417 auto innerRunner = std::make_shared<EventRunnerImpl>(sp);
418 sp->innerRunner_ = innerRunner;
419 sp->queue_ = innerRunner->GetEventQueue();
420
421 // Start new thread
422 innerRunner->SetThreadName(threadName);
423 auto thread =
424 std::make_unique<std::thread>(EventRunnerImpl::ThreadMain, std::weak_ptr<EventRunnerImpl>(innerRunner));
425 if (!innerRunner->Attach(thread)) {
426 HILOGW("Create: Failed to attach thread, maybe process is exiting");
427 innerRunner->Stop();
428 thread->join();
429 }
430
431 return sp;
432 }
433
Current()434 std::shared_ptr<EventRunner> EventRunner::Current()
435 {
436 auto runner = EventInnerRunner::GetCurrentEventRunner();
437 if (runner) {
438 return runner;
439 }
440 return nullptr;
441 }
442
EventRunner(bool deposit)443 EventRunner::EventRunner(bool deposit) : deposit_(deposit)
444 {}
445
GetRunnerThreadName() const446 std::string EventRunner::GetRunnerThreadName() const
447 {
448 return innerRunner_->GetThreadName();
449 }
450
~EventRunner()451 EventRunner::~EventRunner()
452 {
453 if (deposit_) {
454 innerRunner_->Stop();
455 }
456 }
457
Run()458 ErrCode EventRunner::Run()
459 {
460 if (deposit_) {
461 HILOGE("Run: Do not call, if event runner is deposited");
462 return EVENT_HANDLER_ERR_RUNNER_NO_PERMIT;
463 }
464
465 // Avoid more than one thread to start this runner.
466 if (running_.exchange(true)) {
467 HILOGW("Run: Already running");
468 return EVENT_HANDLER_ERR_RUNNER_ALREADY;
469 }
470
471 // Entry event loop.
472 innerRunner_->Run();
473
474 running_.store(false);
475 return ERR_OK;
476 }
477
Stop()478 ErrCode EventRunner::Stop()
479 {
480 if (deposit_) {
481 HILOGE("Stop: Do not call, if event runner is deposited");
482 return EVENT_HANDLER_ERR_RUNNER_NO_PERMIT;
483 }
484
485 if (running_.load()) {
486 innerRunner_->Stop();
487 } else {
488 HILOGW("Stop: Already stopped");
489 }
490
491 return ERR_OK;
492 }
493
Dump(Dumper & dumper)494 void EventRunner::Dump(Dumper &dumper)
495 {
496 if (!IsRunning()) {
497 dumper.Dump(dumper.GetTag() + " Event runner is not running" + LINE_SEPARATOR);
498 return;
499 }
500
501 if (queue_ == nullptr) {
502 dumper.Dump(dumper.GetTag() + " Queue is nullLINE_SEPARATOR" + LINE_SEPARATOR);
503 return;
504 }
505
506 dumper.Dump(dumper.GetTag() + " Event runner (" + "Thread name = " + innerRunner_->GetThreadName() +
507 ", Thread ID = " + std::to_string(GetThreadId()) + ") is running" + LINE_SEPARATOR);
508
509 queue_->Dump(dumper);
510 }
511
DumpRunnerInfo(std::string & runnerInfo)512 void EventRunner::DumpRunnerInfo(std::string& runnerInfo)
513 {
514 if (!IsRunning()) {
515 runnerInfo = " Event runner is not running" + LINE_SEPARATOR;
516 }
517
518 if (queue_ == nullptr) {
519 runnerInfo = " Queue is null" + LINE_SEPARATOR;
520 return;
521 }
522
523 std::string queueInfo;
524 queue_->DumpQueueInfo(queueInfo);
525 runnerInfo += queueInfo;
526 }
527
SetLogger(const std::shared_ptr<Logger> & logger)528 void EventRunner::SetLogger(const std::shared_ptr<Logger> &logger)
529 {
530 innerRunner_->SetLogger(logger);
531 }
532
GetCurrentEventQueue()533 std::shared_ptr<EventQueue> EventRunner::GetCurrentEventQueue()
534 {
535 auto runner = EventRunner::Current();
536 if (!runner) {
537 return nullptr;
538 }
539 return runner->queue_;
540 }
541
GetThreadId()542 uint64_t EventRunner::GetThreadId()
543 {
544 std::thread::id tid = innerRunner_->GetThreadId();
545 std::stringstream buf;
546 buf << tid;
547 std::string stid = buf.str();
548 return std::stoull(stid);
549 }
550
IsCurrentRunnerThread()551 bool EventRunner::IsCurrentRunnerThread()
552 {
553 return std::this_thread::get_id() == innerRunner_->GetThreadId();
554 }
555
GetMainEventRunner()556 std::shared_ptr<EventRunner> EventRunner::GetMainEventRunner()
557 {
558 if (!mainRunner_) {
559 mainRunner_ = Create(false);
560 if (!mainRunner_) {
561 HILOGE("mainRunner_ create fail");
562 }
563 }
564
565 return mainRunner_;
566 }
567 } // namespace AppExecFwk
568 } // namespace OHOS
569