1 /*
2 * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "event_runner.h"
17
18 #include <condition_variable>
19 #include <mutex>
20 #include <sstream>
21 #include <thread>
22 #include <unordered_map>
23 #include <vector>
24
25 #include <unistd.h>
26 #include <sys/prctl.h>
27 #include <sys/syscall.h>
28
29 #include "event_handler.h"
30 #include "event_handler_utils.h"
31 #include "event_inner_runner.h"
32 #include "singleton.h"
33 #include "thread_local_data.h"
34
35 DEFINE_HILOG_LABEL("EventRunner");
36
37 namespace OHOS {
38 namespace AppExecFwk {
39 namespace {
40
41 // Invoke system call to set name of current thread.
SystemCallSetThreadName(const std::string & name)42 inline void SystemCallSetThreadName(const std::string &name)
43 {
44 if (prctl(PR_SET_NAME, name.c_str()) < 0) {
45 char errmsg[MAX_ERRORMSG_LEN] = {0};
46 GetLastErr(errmsg, MAX_ERRORMSG_LEN);
47 HILOGE("SystemCallSetThreadName: Failed to set thread name, %{public}s", errmsg);
48 }
49 }
50
51 // Help to calculate hash code of object.
52 template<typename T>
CalculateHashCode(const T & obj)53 inline size_t CalculateHashCode(const T &obj)
54 {
55 std::hash<T> calculateHashCode;
56 return calculateHashCode(obj);
57 }
58
59 // Thread collector is used to reclaim thread that needs to finish running.
60 class ThreadCollector : public DelayedRefSingleton<ThreadCollector> {
61 DECLARE_DELAYED_REF_SINGLETON(ThreadCollector);
62
63 public:
64 DISALLOW_COPY_AND_MOVE(ThreadCollector);
65
66 using ExitFunction = std::function<void()>;
67
ReclaimCurrentThread()68 void ReclaimCurrentThread()
69 {
70 // Get id of current thread.
71 auto threadId = std::this_thread::get_id();
72 HILOGD("Reclaim: Thread id: %{public}zu", CalculateHashCode(threadId));
73
74 {
75 // Add thread id to list and notify to reclaim.
76 std::lock_guard<std::mutex> lock(collectorLock_);
77 if (destroying_) {
78 HILOGI("Reclaim: Thread collector is destroying");
79 return;
80 }
81
82 reclaims_.emplace_back(threadId);
83 if (isWaiting_) {
84 condition_.notify_one();
85 }
86 }
87
88 if (threadLock_.try_lock()) {
89 if ((!thread_) && (needCreateThread_)) {
90 // Start daemon thread to collect finished threads, if not exist.
91 thread_ = std::make_unique<std::thread>(&ThreadCollector::Run, this);
92 }
93 threadLock_.unlock();
94 }
95 }
96
Deposit(std::unique_ptr<std::thread> & thread,const ExitFunction & threadExit)97 bool Deposit(std::unique_ptr<std::thread> &thread, const ExitFunction &threadExit)
98 {
99 if ((!thread) || (!thread->joinable()) || (!threadExit)) {
100 auto threadState = thread ? (thread->joinable() ? "active" : "finished") : "null";
101 HILOGE("Deposit(%{public}s, %{public}s): Invalid parameter", threadState, threadExit ? "valid" : "null");
102 return false;
103 }
104
105 auto threadId = thread->get_id();
106 HILOGD("Deposit: New thread id: %{public}zu", CalculateHashCode(threadId));
107 // Save these information into map.
108 std::lock_guard<std::mutex> lock(collectorLock_);
109 if (destroying_) {
110 HILOGW("Deposit: Collector thread is destroying");
111 return false;
112 }
113 // Save thread object and its exit callback.
114 depositMap_.emplace(threadId,
115 ThreadExitInfo {
116 .thread = std::move(thread),
117 .threadExit = threadExit,
118 });
119 return true;
120 }
121
122 private:
123 DEFINE_HILOG_LABEL("ThreadCollector");
124
125 struct ThreadExitInfo {
126 std::unique_ptr<std::thread> thread;
127 ExitFunction threadExit;
128 };
129
ReclaimAll()130 inline void ReclaimAll()
131 {
132 std::unique_lock<std::mutex> lock(collectorLock_);
133 // All thread deposited need to stop one by one.
134 while (!depositMap_.empty()) {
135 DoReclaimLocked(lock, depositMap_.begin());
136 }
137 }
138
Stop()139 void Stop()
140 {
141 {
142 // Stop the collect thread, while destruction of collector.
143 std::lock_guard<std::mutex> lock(collectorLock_);
144 destroying_ = true;
145 if (isWaiting_) {
146 condition_.notify_all();
147 }
148 }
149
150 {
151 std::lock_guard<std::mutex> lock(threadLock_);
152 if ((thread_) && (thread_->joinable())) {
153 // Wait utils collect thread finished, if exists.
154 thread_->join();
155 }
156 needCreateThread_ = false;
157 }
158
159 ReclaimAll();
160 }
161
DoReclaimLocked(std::unique_lock<std::mutex> & lock,std::unordered_map<std::thread::id,ThreadExitInfo>::iterator it,bool needCallExit=true)162 void DoReclaimLocked(std::unique_lock<std::mutex> &lock,
163 std::unordered_map<std::thread::id, ThreadExitInfo>::iterator it, bool needCallExit = true)
164 {
165 if (it == depositMap_.end()) {
166 return;
167 }
168
169 // Remove thread information from map.
170 auto threadId = it->first;
171 auto exitInfo = std::move(it->second);
172 (void)depositMap_.erase(it);
173
174 // Unlock, because stopping thread maybe spend lot of time, it should be out of the lock.
175 lock.unlock();
176
177 size_t hashThreadId = CalculateHashCode(threadId);
178 HILOGD("DoReclaim: Thread id: %{public}zu", hashThreadId);
179 if (needCallExit) {
180 // Call exit callback to stop loop in thread.
181 exitInfo.threadExit();
182 }
183 // Wait until thread finished.
184 exitInfo.thread->join();
185 HILOGD("DoReclaim: Done, thread id: %{public}zu", hashThreadId);
186
187 // Lock again.
188 lock.lock();
189 }
190
Run()191 void Run()
192 {
193 HILOGD("Run: Collector thread is started");
194
195 std::unique_lock<std::mutex> lock(collectorLock_);
196 while (!destroying_) {
197 // Reclaim threads in list one by one.
198 while (!reclaims_.empty()) {
199 auto threadId = reclaims_.back();
200 reclaims_.pop_back();
201 DoReclaimLocked(lock, depositMap_.find(threadId), false);
202 }
203
204 // Maybe stop running while doing reclaim, so check before waiting.
205 if (destroying_) {
206 break;
207 }
208
209 isWaiting_ = true;
210 condition_.wait(lock);
211 isWaiting_ = false;
212 }
213
214 HILOGD("Run: Collector thread is stopped");
215 }
216
217 std::mutex collectorLock_;
218 std::condition_variable condition_;
219 bool isWaiting_ {false};
220 bool destroying_ {false};
221 std::vector<std::thread::id> reclaims_;
222 std::unordered_map<std::thread::id, ThreadExitInfo> depositMap_;
223
224 std::mutex threadLock_;
225 // Thread for collector
226 std::unique_ptr<std::thread> thread_;
227 bool needCreateThread_ {true};
228
229 // Avatar of thread collector, used to stop collector at the specified opportunity.
230 class Avatar {
231 public:
232 DISALLOW_COPY_AND_MOVE(Avatar);
233
234 Avatar() = default;
~Avatar()235 ~Avatar()
236 {
237 if (avatarEnabled_) {
238 GetInstance().avatarDestructed_ = true;
239 GetInstance().Stop();
240 }
241 }
242
Disable()243 static inline void Disable()
244 {
245 avatarEnabled_ = false;
246 }
247 };
248
249 // Mark whether avatar is destructed.
250 volatile bool avatarDestructed_ {false};
251 // Mark whether avatar is enabled.
252 static volatile bool avatarEnabled_;
253 static Avatar avatar_;
254 };
255
ThreadCollector()256 ThreadCollector::ThreadCollector()
257 : collectorLock_(), condition_(), reclaims_(), depositMap_(), threadLock_(), thread_(nullptr)
258 {
259 // Thread collector is created, so enable avatar.
260 avatarEnabled_ = true;
261 }
262
~ThreadCollector()263 ThreadCollector::~ThreadCollector()
264 {
265 // If avatar is not destructed, stop collector by itself.
266 if (!avatarDestructed_) {
267 avatar_.Disable();
268 Stop();
269 }
270 }
271
272 class EventRunnerImpl final : public EventInnerRunner {
273 public:
EventRunnerImpl(const std::shared_ptr<EventRunner> & runner)274 explicit EventRunnerImpl(const std::shared_ptr<EventRunner> &runner) : EventInnerRunner(runner)
275 {
276 queue_ = std::make_shared<EventQueue>();
277 }
278
~EventRunnerImpl()279 ~EventRunnerImpl() final
280 {
281 queue_->RemoveAll();
282 }
283 DISALLOW_COPY_AND_MOVE(EventRunnerImpl);
284
ThreadMain(const std::weak_ptr<EventRunnerImpl> & wp)285 static void ThreadMain(const std::weak_ptr<EventRunnerImpl> &wp)
286 {
287 std::shared_ptr<EventRunnerImpl> inner = wp.lock();
288 if (inner) {
289 HILOGD("ThreadMain: Start running for thread '%{public}s'", inner->threadName_.c_str());
290
291 // Call system call to modify thread name.
292 SystemCallSetThreadName(inner->threadName_);
293
294 // Enter event loop.
295 inner->Run();
296
297 HILOGD("ThreadMain: Stopped running for thread '%{public}s'", inner->threadName_.c_str());
298 } else {
299 HILOGW("ThreadMain: EventRunner has been released just after its creation");
300 }
301
302 // Reclaim current thread.
303 ThreadCollector::GetInstance().ReclaimCurrentThread();
304 }
305
Run()306 void Run() final
307 {
308 // Prepare to start event loop.
309 queue_->Prepare();
310
311 // Make sure instance of 'EventRunner' exists.
312 if (owner_.expired()) {
313 return;
314 }
315
316 threadId_ = std::this_thread::get_id();
317 kernelThreadId_ = syscall(__NR_gettid);
318
319 // Save old event runner.
320 std::weak_ptr<EventRunner> oldRunner = currentEventRunner;
321 // Set current event runner into thread local data.
322 currentEventRunner = owner_;
323
324 // Start event looper.
325 for (auto event = queue_->GetEvent(); event; event = queue_->GetEvent()) {
326 std::shared_ptr<EventHandler> handler = event->GetOwner();
327 // Make sure owner of the event exists.
328 if (handler) {
329 std::shared_ptr<Logger> logging = logger_;
330 if (logging != nullptr) {
331 if (!event->HasTask()) {
332 logging->Log("Dispatching to handler event id = " + std::to_string(event->GetInnerEventId()));
333 } else {
334 logging->Log("Dispatching to handler event task name = " + event->GetTaskName());
335 }
336 }
337 handler->DistributeEvent(event);
338 }
339 // Release event manually, otherwise event will be released until next event coming.
340 event.reset();
341 }
342
343 // Restore current event runner.
344 currentEventRunner = oldRunner;
345 }
346
Stop()347 void Stop() final
348 {
349 queue_->Finish();
350 }
351
Attach(std::unique_ptr<std::thread> & thread)352 inline bool Attach(std::unique_ptr<std::thread> &thread)
353 {
354 auto exitThread = [queue = queue_]() { queue->Finish(); };
355
356 return ThreadCollector::GetInstance().Deposit(thread, exitThread);
357 }
358
SetThreadName(const std::string & threadName)359 inline void SetThreadName(const std::string &threadName)
360 {
361 static std::atomic<uint32_t> idGenerator(1);
362
363 if (threadName.empty()) {
364 // Generate a default name
365 threadName_ = "EventRunner#";
366 threadName_ += std::to_string(idGenerator++);
367 } else {
368 threadName_ = threadName;
369 }
370 }
371
372 private:
373 DEFINE_HILOG_LABEL("EventRunnerImpl");
374 };
375 } // unnamed namespace
376
EventInnerRunner(const std::shared_ptr<EventRunner> & runner)377 EventInnerRunner::EventInnerRunner(const std::shared_ptr<EventRunner> &runner)
378 : queue_(nullptr), owner_(runner), logger_(nullptr), threadName_(""), threadId_()
379 {}
380
GetCurrentEventRunner()381 std::shared_ptr<EventRunner> EventInnerRunner::GetCurrentEventRunner()
382 {
383 const std::weak_ptr<EventRunner> &wp = currentEventRunner;
384 return wp.lock();
385 }
386
387 ThreadLocalData<std::weak_ptr<EventRunner>> EventInnerRunner::currentEventRunner;
388
389 namespace {
390 volatile bool ThreadCollector::avatarEnabled_ = false;
391
392 /*
393 * All event runners are relied on 'currentEventRunner', so make sure destruction of 'currentEventRunner'
394 * should after all event runners finished. All event runners finished in destruction of 'ThreadCollector::Avatar',
395 * so instance of 'ThreadCollector::Avatar' MUST defined after 'currentEventRunner'.
396 */
397 ThreadCollector::Avatar ThreadCollector::avatar_;
398 } // unnamed namespace
399
400 std::shared_ptr<EventRunner> EventRunner::mainRunner_;
401
Create(bool inNewThread)402 std::shared_ptr<EventRunner> EventRunner::Create(bool inNewThread)
403 {
404 if (inNewThread) {
405 return Create(std::string());
406 }
407
408 // Constructor of 'EventRunner' is private, could not use 'std::make_shared' to construct it.
409 std::shared_ptr<EventRunner> sp(new EventRunner(false));
410 auto innerRunner = std::make_shared<EventRunnerImpl>(sp);
411 sp->innerRunner_ = innerRunner;
412 sp->queue_ = innerRunner->GetEventQueue();
413
414 return sp;
415 }
416
Create(const std::string & threadName)417 std::shared_ptr<EventRunner> EventRunner::Create(const std::string &threadName)
418 {
419 // Constructor of 'EventRunner' is private, could not use 'std::make_shared' to construct it.
420 std::shared_ptr<EventRunner> sp(new EventRunner(true));
421 auto innerRunner = std::make_shared<EventRunnerImpl>(sp);
422 sp->innerRunner_ = innerRunner;
423 sp->queue_ = innerRunner->GetEventQueue();
424
425 // Start new thread
426 innerRunner->SetThreadName(threadName);
427 auto thread =
428 std::make_unique<std::thread>(EventRunnerImpl::ThreadMain, std::weak_ptr<EventRunnerImpl>(innerRunner));
429 if (!innerRunner->Attach(thread)) {
430 HILOGW("Create: Failed to attach thread, maybe process is exiting");
431 innerRunner->Stop();
432 thread->join();
433 }
434
435 return sp;
436 }
437
Current()438 std::shared_ptr<EventRunner> EventRunner::Current()
439 {
440 auto runner = EventInnerRunner::GetCurrentEventRunner();
441 if (runner) {
442 return runner;
443 }
444 return nullptr;
445 }
446
EventRunner(bool deposit)447 EventRunner::EventRunner(bool deposit) : deposit_(deposit)
448 {}
449
GetRunnerThreadName() const450 std::string EventRunner::GetRunnerThreadName() const
451 {
452 return innerRunner_->GetThreadName();
453 }
454
~EventRunner()455 EventRunner::~EventRunner()
456 {
457 if (deposit_) {
458 innerRunner_->Stop();
459 }
460 }
461
Run()462 ErrCode EventRunner::Run()
463 {
464 if (deposit_) {
465 HILOGE("Run: Do not call, if event runner is deposited");
466 return EVENT_HANDLER_ERR_RUNNER_NO_PERMIT;
467 }
468
469 // Avoid more than one thread to start this runner.
470 if (running_.exchange(true)) {
471 HILOGW("Run: Already running");
472 return EVENT_HANDLER_ERR_RUNNER_ALREADY;
473 }
474
475 // Entry event loop.
476 innerRunner_->Run();
477
478 running_.store(false);
479 return ERR_OK;
480 }
481
Stop()482 ErrCode EventRunner::Stop()
483 {
484 if (deposit_) {
485 HILOGE("Stop: Do not call, if event runner is deposited");
486 return EVENT_HANDLER_ERR_RUNNER_NO_PERMIT;
487 }
488
489 if (running_.load()) {
490 innerRunner_->Stop();
491 } else {
492 HILOGW("Stop: Already stopped");
493 }
494
495 return ERR_OK;
496 }
497
Dump(Dumper & dumper)498 void EventRunner::Dump(Dumper &dumper)
499 {
500 if (!IsRunning()) {
501 dumper.Dump(dumper.GetTag() + " Event runner is not running" + LINE_SEPARATOR);
502 return;
503 }
504
505 if (queue_ == nullptr) {
506 dumper.Dump(dumper.GetTag() + " Queue is nullLINE_SEPARATOR" + LINE_SEPARATOR);
507 return;
508 }
509
510 dumper.Dump(dumper.GetTag() + " Event runner (" + "Thread name = " + innerRunner_->GetThreadName() +
511 ", Thread ID = " + std::to_string(GetKernelThreadId()) + ") is running" + LINE_SEPARATOR);
512 queue_->Dump(dumper);
513 }
514
DumpRunnerInfo(std::string & runnerInfo)515 void EventRunner::DumpRunnerInfo(std::string& runnerInfo)
516 {
517 if (!IsRunning()) {
518 runnerInfo = " Event runner is not running" + LINE_SEPARATOR;
519 }
520
521 if (queue_ == nullptr) {
522 runnerInfo = " Queue is null" + LINE_SEPARATOR;
523 return;
524 }
525
526 std::string queueInfo;
527 queue_->DumpQueueInfo(queueInfo);
528 runnerInfo += queueInfo;
529 }
530
SetLogger(const std::shared_ptr<Logger> & logger)531 void EventRunner::SetLogger(const std::shared_ptr<Logger> &logger)
532 {
533 innerRunner_->SetLogger(logger);
534 }
535
GetCurrentEventQueue()536 std::shared_ptr<EventQueue> EventRunner::GetCurrentEventQueue()
537 {
538 auto runner = EventRunner::Current();
539 if (!runner) {
540 return nullptr;
541 }
542 return runner->queue_;
543 }
544
GetThreadId()545 uint64_t EventRunner::GetThreadId()
546 {
547 std::thread::id tid = innerRunner_->GetThreadId();
548 std::stringstream buf;
549 buf << tid;
550 std::string stid = buf.str();
551 return std::stoull(stid);
552 }
553
GetKernelThreadId()554 uint64_t EventRunner::GetKernelThreadId()
555 {
556 return innerRunner_->GetKernelThreadId();
557 }
558
IsCurrentRunnerThread()559 bool EventRunner::IsCurrentRunnerThread()
560 {
561 return std::this_thread::get_id() == innerRunner_->GetThreadId();
562 }
563
GetMainEventRunner()564 std::shared_ptr<EventRunner> EventRunner::GetMainEventRunner()
565 {
566 if (!mainRunner_) {
567 mainRunner_ = Create(false);
568 if (!mainRunner_) {
569 HILOGE("mainRunner_ create fail");
570 }
571 }
572
573 return mainRunner_;
574 }
575 } // namespace AppExecFwk
576 } // namespace OHOS
577