1 /*
2 * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "event_runner.h"
17
18 #include <condition_variable>
19 #include <mutex>
20 #include <sstream>
21 #include <thread>
22 #include <unordered_map>
23 #include <vector>
24
25 #include <sys/prctl.h>
26
27 #include "event_handler.h"
28 #include "event_handler_utils.h"
29 #include "event_inner_runner.h"
30 #include "singleton.h"
31 #include "thread_local_data.h"
32
33 DEFINE_HILOG_LABEL("EventRunner");
34
35 namespace OHOS {
36 namespace AppExecFwk {
37 namespace {
38 // Invoke system call to set name of current thread.
SystemCallSetThreadName(const std::string & name)39 inline void SystemCallSetThreadName(const std::string &name)
40 {
41 if (prctl(PR_SET_NAME, name.c_str()) < 0) {
42 char errmsg[MAX_ERRORMSG_LEN] = {0};
43 GetLastErr(errmsg, MAX_ERRORMSG_LEN);
44 HILOGE("SystemCallSetThreadName: Failed to set thread name, %{public}s", errmsg);
45 }
46 }
47
48 // Help to calculate hash code of object.
49 template<typename T>
CalculateHashCode(const T & obj)50 inline size_t CalculateHashCode(const T &obj)
51 {
52 std::hash<T> calculateHashCode;
53 return calculateHashCode(obj);
54 }
55
56 // Thread collector is used to reclaim thread that needs to finish running.
57 class ThreadCollector : public DelayedRefSingleton<ThreadCollector> {
58 DECLARE_DELAYED_REF_SINGLETON(ThreadCollector);
59
60 public:
61 DISALLOW_COPY_AND_MOVE(ThreadCollector);
62
63 using ExitFunction = std::function<void()>;
64
ReclaimCurrentThread()65 void ReclaimCurrentThread()
66 {
67 // Get id of current thread.
68 auto threadId = std::this_thread::get_id();
69 HILOGD("Reclaim: Thread id: %{public}zu", CalculateHashCode(threadId));
70
71 {
72 // Add thread id to list and notify to reclaim.
73 std::lock_guard<std::mutex> lock(collectorLock_);
74 if (destroying_) {
75 HILOGI("Reclaim: Thread collector is destroying");
76 return;
77 }
78
79 reclaims_.emplace_back(threadId);
80 if (isWaiting_) {
81 condition_.notify_one();
82 }
83 }
84
85 if (threadLock_.try_lock()) {
86 if ((!thread_) && (needCreateThread_)) {
87 // Start daemon thread to collect finished threads, if not exist.
88 thread_ = std::make_unique<std::thread>(&ThreadCollector::Run, this);
89 }
90 threadLock_.unlock();
91 }
92 }
93
Deposit(std::unique_ptr<std::thread> & thread,const ExitFunction & threadExit)94 bool Deposit(std::unique_ptr<std::thread> &thread, const ExitFunction &threadExit)
95 {
96 if ((!thread) || (!thread->joinable()) || (!threadExit)) {
97 auto threadState = thread ? (thread->joinable() ? "active" : "finished") : "null";
98 HILOGE("Deposit(%{public}s, %{public}s): Invalid parameter", threadState, threadExit ? "valid" : "null");
99 return false;
100 }
101
102 auto threadId = thread->get_id();
103 HILOGD("Deposit: New thread id: %{public}zu", CalculateHashCode(threadId));
104 // Save these information into map.
105 std::lock_guard<std::mutex> lock(collectorLock_);
106 if (destroying_) {
107 HILOGW("Deposit: Collector thread is destroying");
108 return false;
109 }
110 // Save thread object and its exit callback.
111 depositMap_.emplace(threadId,
112 ThreadExitInfo {
113 .thread = std::move(thread),
114 .threadExit = threadExit,
115 });
116 return true;
117 }
118
119 private:
120 DEFINE_HILOG_LABEL("ThreadCollector");
121
122 struct ThreadExitInfo {
123 std::unique_ptr<std::thread> thread;
124 ExitFunction threadExit;
125 };
126
ReclaimAll()127 inline void ReclaimAll()
128 {
129 std::unique_lock<std::mutex> lock(collectorLock_);
130 // All thread deposited need to stop one by one.
131 while (!depositMap_.empty()) {
132 DoReclaimLocked(lock, depositMap_.begin());
133 }
134 }
135
Stop()136 void Stop()
137 {
138 {
139 // Stop the collect thread, while destruction of collector.
140 std::lock_guard<std::mutex> lock(collectorLock_);
141 destroying_ = true;
142 if (isWaiting_) {
143 condition_.notify_all();
144 }
145 }
146
147 {
148 std::lock_guard<std::mutex> lock(threadLock_);
149 if ((thread_) && (thread_->joinable())) {
150 // Wait utils collect thread finished, if exists.
151 thread_->join();
152 }
153 needCreateThread_ = false;
154 }
155
156 ReclaimAll();
157 }
158
DoReclaimLocked(std::unique_lock<std::mutex> & lock,std::unordered_map<std::thread::id,ThreadExitInfo>::iterator it,bool needCallExit=true)159 void DoReclaimLocked(std::unique_lock<std::mutex> &lock,
160 std::unordered_map<std::thread::id, ThreadExitInfo>::iterator it, bool needCallExit = true)
161 {
162 if (it == depositMap_.end()) {
163 return;
164 }
165
166 // Remove thread information from map.
167 auto threadId = it->first;
168 auto exitInfo = std::move(it->second);
169 (void)depositMap_.erase(it);
170
171 // Unlock, because stopping thread maybe spend lot of time, it should be out of the lock.
172 lock.unlock();
173
174 size_t hashThreadId = CalculateHashCode(threadId);
175 HILOGD("DoReclaim: Thread id: %{public}zu", hashThreadId);
176 if (needCallExit) {
177 // Call exit callback to stop loop in thread.
178 exitInfo.threadExit();
179 }
180 // Wait until thread finished.
181 exitInfo.thread->join();
182 HILOGD("DoReclaim: Done, thread id: %{public}zu", hashThreadId);
183
184 // Lock again.
185 lock.lock();
186 }
187
Run()188 void Run()
189 {
190 HILOGD("Run: Collector thread is started");
191
192 std::unique_lock<std::mutex> lock(collectorLock_);
193 while (!destroying_) {
194 // Reclaim threads in list one by one.
195 while (!reclaims_.empty()) {
196 auto threadId = reclaims_.back();
197 reclaims_.pop_back();
198 DoReclaimLocked(lock, depositMap_.find(threadId), false);
199 }
200
201 // Maybe stop running while doing reclaim, so check before waiting.
202 if (destroying_) {
203 break;
204 }
205
206 isWaiting_ = true;
207 condition_.wait(lock);
208 isWaiting_ = false;
209 }
210
211 HILOGD("Run: Collector thread is stopped");
212 }
213
214 std::mutex collectorLock_;
215 std::condition_variable condition_;
216 bool isWaiting_ {false};
217 bool destroying_ {false};
218 std::vector<std::thread::id> reclaims_;
219 std::unordered_map<std::thread::id, ThreadExitInfo> depositMap_;
220
221 std::mutex threadLock_;
222 // Thread for collector
223 std::unique_ptr<std::thread> thread_;
224 bool needCreateThread_ {true};
225
226 // Avatar of thread collector, used to stop collector at the specified opportunity.
227 class Avatar {
228 public:
229 DISALLOW_COPY_AND_MOVE(Avatar);
230
231 Avatar() = default;
~Avatar()232 ~Avatar()
233 {
234 if (avatarEnabled_) {
235 GetInstance().avatarDestructed_ = true;
236 GetInstance().Stop();
237 }
238 }
239
Disable() const240 inline void Disable() const
241 {
242 avatarEnabled_ = false;
243 }
244 };
245
246 // Mark whether avatar is destructed.
247 volatile bool avatarDestructed_ {false};
248 // Mark whether avatar is enabled.
249 static volatile bool avatarEnabled_;
250 static Avatar avatar_;
251 };
252
ThreadCollector()253 ThreadCollector::ThreadCollector()
254 : collectorLock_(), condition_(), reclaims_(), depositMap_(), threadLock_(), thread_(nullptr)
255 {
256 // Thread collector is created, so enable avatar.
257 avatarEnabled_ = true;
258 }
259
~ThreadCollector()260 ThreadCollector::~ThreadCollector()
261 {
262 // If avatar is not destructed, stop collector by itself.
263 if (!avatarDestructed_) {
264 avatar_.Disable();
265 Stop();
266 }
267 }
268
269 class EventRunnerImpl final : public EventInnerRunner {
270 public:
EventRunnerImpl(const std::shared_ptr<EventRunner> & runner)271 explicit EventRunnerImpl(const std::shared_ptr<EventRunner> &runner) : EventInnerRunner(runner)
272 {
273 queue_ = std::make_shared<EventQueue>();
274 }
275
276 ~EventRunnerImpl() final = default;
277 DISALLOW_COPY_AND_MOVE(EventRunnerImpl);
278
ThreadMain(const std::weak_ptr<EventRunnerImpl> & wp)279 static void ThreadMain(const std::weak_ptr<EventRunnerImpl> &wp)
280 {
281 std::shared_ptr<EventRunnerImpl> inner = wp.lock();
282 if (inner) {
283 HILOGD("ThreadMain: Start running for thread '%{public}s'", inner->threadName_.c_str());
284
285 // Call system call to modify thread name.
286 SystemCallSetThreadName(inner->threadName_);
287
288 // Enter event loop.
289 inner->Run();
290
291 HILOGD("ThreadMain: Stopped running for thread '%{public}s'", inner->threadName_.c_str());
292 } else {
293 HILOGW("ThreadMain: EventRunner has been released just after its creation");
294 }
295
296 // Reclaim current thread.
297 ThreadCollector::GetInstance().ReclaimCurrentThread();
298 }
299
Run()300 void Run() final
301 {
302 // Prepare to start event loop.
303 queue_->Prepare();
304
305 // Make sure instance of 'EventRunner' exists.
306 if (owner_.expired()) {
307 return;
308 }
309
310 threadId_ = std::this_thread::get_id();
311
312 // Save old event runner.
313 std::weak_ptr<EventRunner> oldRunner = currentEventRunner;
314 // Set current event runner into thread local data.
315 currentEventRunner = owner_;
316
317 // Start event looper.
318 for (auto event = queue_->GetEvent(); event; event = queue_->GetEvent()) {
319 std::shared_ptr<EventHandler> handler = event->GetOwner();
320 // Make sure owner of the event exists.
321 if (handler) {
322 std::shared_ptr<Logger> logging = logger_;
323 std::stringstream address;
324 address << handler.get();
325 if (logging != nullptr) {
326 if (!event->HasTask()) {
327 logging->Log("Dispatching to handler event id = " + std::to_string(event->GetInnerEventId()));
328 } else {
329 logging->Log("Dispatching to handler event task name = " + event->GetTaskName());
330 }
331 }
332 handler->DistributeEvent(event);
333
334 if (logging != nullptr) {
335 logging->Log("Finished to handler(0x" + address.str() + ")");
336 }
337 }
338 // Release event manually, otherwise event will be released until next event coming.
339 event.reset();
340 }
341
342 // Restore current event runner.
343 currentEventRunner = oldRunner;
344 }
345
Stop()346 void Stop() final
347 {
348 queue_->Finish();
349 }
350
Attach(std::unique_ptr<std::thread> & thread)351 inline bool Attach(std::unique_ptr<std::thread> &thread)
352 {
353 auto exitThread = [queue = queue_]() { queue->Finish(); };
354
355 return ThreadCollector::GetInstance().Deposit(thread, exitThread);
356 }
357
SetThreadName(const std::string & threadName)358 inline void SetThreadName(const std::string &threadName)
359 {
360 static std::atomic<uint32_t> idGenerator(1);
361
362 if (threadName.empty()) {
363 // Generate a default name
364 threadName_ = "event_runner#";
365 threadName_ += std::to_string(idGenerator++);
366 } else {
367 threadName_ = threadName;
368 }
369 }
370
371 private:
372 DEFINE_HILOG_LABEL("EventRunnerImpl");
373 };
374 } // unnamed namespace
375
EventInnerRunner(const std::shared_ptr<EventRunner> & runner)376 EventInnerRunner::EventInnerRunner(const std::shared_ptr<EventRunner> &runner)
377 : queue_(nullptr), owner_(runner), logger_(nullptr), threadName_(""), threadId_()
378 {}
379
GetCurrentEventRunner()380 std::shared_ptr<EventRunner> EventInnerRunner::GetCurrentEventRunner()
381 {
382 const std::weak_ptr<EventRunner> &wp = currentEventRunner;
383 return wp.lock();
384 }
385
386 ThreadLocalData<std::weak_ptr<EventRunner>> EventInnerRunner::currentEventRunner;
387
388 namespace {
389 volatile bool ThreadCollector::avatarEnabled_ = false;
390
391 /*
392 * All event runners are relied on 'currentEventRunner', so make sure destruction of 'currentEventRunner'
393 * should after all event runners finished. All event runners finished in destruction of 'ThreadCollector::Avatar',
394 * so instance of 'ThreadCollector::Avatar' MUST defined after 'currentEventRunner'.
395 */
396 ThreadCollector::Avatar ThreadCollector::avatar_;
397 } // unnamed namespace
398
399 std::shared_ptr<EventRunner> EventRunner::mainRunner_;
400
Create(bool inNewThread)401 std::shared_ptr<EventRunner> EventRunner::Create(bool inNewThread)
402 {
403 if (inNewThread) {
404 return Create(std::string());
405 }
406
407 // Constructor of 'EventRunner' is private, could not use 'std::make_shared' to construct it.
408 std::shared_ptr<EventRunner> sp(new EventRunner(false));
409 auto innerRunner = std::make_shared<EventRunnerImpl>(sp);
410 sp->innerRunner_ = innerRunner;
411 sp->queue_ = innerRunner->GetEventQueue();
412
413 return sp;
414 }
415
Create(const std::string & threadName)416 std::shared_ptr<EventRunner> EventRunner::Create(const std::string &threadName)
417 {
418 // Constructor of 'EventRunner' is private, could not use 'std::make_shared' to construct it.
419 std::shared_ptr<EventRunner> sp(new EventRunner(true));
420 auto innerRunner = std::make_shared<EventRunnerImpl>(sp);
421 sp->innerRunner_ = innerRunner;
422 sp->queue_ = innerRunner->GetEventQueue();
423
424 // Start new thread
425 innerRunner->SetThreadName(threadName);
426 auto thread =
427 std::make_unique<std::thread>(EventRunnerImpl::ThreadMain, std::weak_ptr<EventRunnerImpl>(innerRunner));
428 if (!innerRunner->Attach(thread)) {
429 HILOGW("Create: Failed to attach thread, maybe process is exiting");
430 innerRunner->Stop();
431 thread->join();
432 }
433
434 return sp;
435 }
436
Current()437 std::shared_ptr<EventRunner> EventRunner::Current()
438 {
439 auto runner = EventInnerRunner::GetCurrentEventRunner();
440 if (runner) {
441 return runner;
442 }
443 return nullptr;
444 }
445
EventRunner(bool deposit)446 EventRunner::EventRunner(bool deposit) : deposit_(deposit)
447 {}
448
GetRunnerThreadName() const449 std::string EventRunner::GetRunnerThreadName() const
450 {
451 return innerRunner_->GetThreadName();
452 }
453
~EventRunner()454 EventRunner::~EventRunner()
455 {
456 if (deposit_) {
457 innerRunner_->Stop();
458 }
459 }
460
Run()461 ErrCode EventRunner::Run()
462 {
463 if (deposit_) {
464 HILOGE("Run: Do not call, if event runner is deposited");
465 return EVENT_HANDLER_ERR_RUNNER_NO_PERMIT;
466 }
467
468 // Avoid more than one thread to start this runner.
469 if (running_.exchange(true)) {
470 HILOGW("Run: Already running");
471 return EVENT_HANDLER_ERR_RUNNER_ALREADY;
472 }
473
474 // Entry event loop.
475 innerRunner_->Run();
476
477 running_.store(false);
478 return ERR_OK;
479 }
480
Stop()481 ErrCode EventRunner::Stop()
482 {
483 if (deposit_) {
484 HILOGE("Stop: Do not call, if event runner is deposited");
485 return EVENT_HANDLER_ERR_RUNNER_NO_PERMIT;
486 }
487
488 if (running_.load()) {
489 innerRunner_->Stop();
490 } else {
491 HILOGW("Stop: Already stopped");
492 }
493
494 return ERR_OK;
495 }
496
Dump(Dumper & dumper)497 void EventRunner::Dump(Dumper &dumper)
498 {
499 if (!IsRunning()) {
500 dumper.Dump(dumper.GetTag() + " Event runner is not running" + LINE_SEPARATOR);
501 return;
502 }
503
504 if (queue_ == nullptr) {
505 dumper.Dump(dumper.GetTag() + " Queue is nullLINE_SEPARATOR" + LINE_SEPARATOR);
506 return;
507 }
508
509 dumper.Dump(dumper.GetTag() + " Event runner (" + "Thread name = " + innerRunner_->GetThreadName() +
510 ", Thread ID = " + std::to_string(GetThreadId()) + ") is running" + LINE_SEPARATOR);
511
512 queue_->Dump(dumper);
513 }
514
DumpRunnerInfo(std::string & runnerInfo)515 void EventRunner::DumpRunnerInfo(std::string& runnerInfo)
516 {
517 if (!IsRunning()) {
518 runnerInfo = " Event runner is not running" + LINE_SEPARATOR;
519 }
520
521 if (queue_ == nullptr) {
522 runnerInfo = " Queue is null" + LINE_SEPARATOR;
523 return;
524 }
525
526 std::string queueInfo;
527 queue_->DumpQueueInfo(queueInfo);
528 runnerInfo += queueInfo;
529 }
530
SetLogger(const std::shared_ptr<Logger> & logger)531 void EventRunner::SetLogger(const std::shared_ptr<Logger> &logger)
532 {
533 innerRunner_->SetLogger(logger);
534 }
535
GetCurrentEventQueue()536 std::shared_ptr<EventQueue> EventRunner::GetCurrentEventQueue()
537 {
538 auto runner = EventRunner::Current();
539 if (!runner) {
540 return nullptr;
541 }
542 return runner->queue_;
543 }
544
GetThreadId()545 uint64_t EventRunner::GetThreadId()
546 {
547 std::thread::id tid = innerRunner_->GetThreadId();
548 std::stringstream buf;
549 buf << tid;
550 std::string stid = buf.str();
551 return std::stoull(stid);
552 }
553
IsCurrentRunnerThread()554 bool EventRunner::IsCurrentRunnerThread()
555 {
556 return std::this_thread::get_id() == innerRunner_->GetThreadId();
557 }
558
GetMainEventRunner()559 std::shared_ptr<EventRunner> EventRunner::GetMainEventRunner()
560 {
561 if (!mainRunner_) {
562 mainRunner_ = Create(false);
563 if (!mainRunner_) {
564 HILOGE("mainRunner_ create fail");
565 }
566 }
567
568 return mainRunner_;
569 }
570 } // namespace AppExecFwk
571 } // namespace OHOS
572