1 /*
2 * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "event_runner.h"
17
18 #include <condition_variable>
19 #include <mutex>
20 #include <sstream>
21 #include <thread>
22 #include <unordered_map>
23 #include <vector>
24
25 #if !defined(__APPLE__) && !defined(_WIN32)
26 #include <sys/prctl.h>
27 #endif
28
29 #include "event_handler.h"
30 #include "event_handler_utils.h"
31 #include "event_inner_runner.h"
32 #include "singleton.h"
33 #include "thread_local_data.h"
34
35 DEFINE_HILOG_LABEL("EventRunner");
36
37 namespace OHOS {
38 namespace AppExecFwk {
39 namespace {
40 // Invoke system call to set name of current thread.
SystemCallSetThreadName(const std::string & name)41 inline void SystemCallSetThreadName(const std::string &name)
42 {
43 #if !defined(__APPLE__) && !defined(_WIN32)
44 if (prctl(PR_SET_NAME, name.c_str()) < 0) {
45 char errmsg[MAX_ERRORMSG_LEN] = {0};
46 GetLastErr(errmsg, MAX_ERRORMSG_LEN);
47 HILOGE("SystemCallSetThreadName: Failed to set thread name, %{public}s", errmsg);
48 }
49 #endif
50 }
51
52 // Help to calculate hash code of object.
53 template<typename T>
CalculateHashCode(const T & obj)54 inline size_t CalculateHashCode(const T &obj)
55 {
56 std::hash<T> calculateHashCode;
57 return calculateHashCode(obj);
58 }
59
60 // Thread collector is used to reclaim thread that needs to finish running.
61 class ThreadCollector : public DelayedRefSingleton<ThreadCollector> {
62 DECLARE_DELAYED_REF_SINGLETON(ThreadCollector);
63
64 public:
65 DISALLOW_COPY_AND_MOVE(ThreadCollector);
66
67 using ExitFunction = std::function<void()>;
68
ReclaimCurrentThread()69 void ReclaimCurrentThread()
70 {
71 // Get id of current thread.
72 auto threadId = std::this_thread::get_id();
73 HILOGD("Reclaim: Thread id: %{public}zu", CalculateHashCode(threadId));
74
75 {
76 // Add thread id to list and notify to reclaim.
77 std::lock_guard<std::mutex> lock(collectorLock_);
78 if (destroying_) {
79 HILOGI("Reclaim: Thread collector is destroying");
80 return;
81 }
82
83 reclaims_.emplace_back(threadId);
84 if (isWaiting_) {
85 condition_.notify_one();
86 }
87 }
88
89 if (threadLock_.try_lock()) {
90 if ((!thread_) && (needCreateThread_)) {
91 // Start daemon thread to collect finished threads, if not exist.
92 thread_ = std::make_unique<std::thread>(&ThreadCollector::Run, this);
93 }
94 threadLock_.unlock();
95 }
96 }
97
Deposit(std::unique_ptr<std::thread> & thread,const ExitFunction & threadExit)98 bool Deposit(std::unique_ptr<std::thread> &thread, const ExitFunction &threadExit)
99 {
100 if ((!thread) || (!thread->joinable()) || (!threadExit)) {
101 auto threadState = thread ? (thread->joinable() ? "active" : "finished") : "null";
102 HILOGE("Deposit(%{public}s, %{public}s): Invalid parameter", threadState, threadExit ? "valid" : "null");
103 return false;
104 }
105
106 auto threadId = thread->get_id();
107 HILOGD("Deposit: New thread id: %{public}zu", CalculateHashCode(threadId));
108 // Save these information into map.
109 std::lock_guard<std::mutex> lock(collectorLock_);
110 if (destroying_) {
111 HILOGW("Deposit: Collector thread is destroying");
112 return false;
113 }
114 // Save thread object and its exit callback.
115 depositMap_.emplace(threadId,
116 ThreadExitInfo {
117 .thread = std::move(thread),
118 .threadExit = threadExit,
119 });
120 return true;
121 }
122
123 private:
124 DEFINE_HILOG_LABEL("ThreadCollector");
125
126 struct ThreadExitInfo {
127 std::unique_ptr<std::thread> thread;
128 ExitFunction threadExit;
129 };
130
ReclaimAll()131 inline void ReclaimAll()
132 {
133 std::unique_lock<std::mutex> lock(collectorLock_);
134 // All thread deposited need to stop one by one.
135 while (!depositMap_.empty()) {
136 DoReclaimLocked(lock, depositMap_.begin());
137 }
138 }
139
Stop()140 void Stop()
141 {
142 {
143 // Stop the collect thread, while destruction of collector.
144 std::lock_guard<std::mutex> lock(collectorLock_);
145 destroying_ = true;
146 if (isWaiting_) {
147 condition_.notify_all();
148 }
149 }
150
151 {
152 std::lock_guard<std::mutex> lock(threadLock_);
153 if ((thread_) && (thread_->joinable())) {
154 // Wait utils collect thread finished, if exists.
155 thread_->join();
156 }
157 needCreateThread_ = false;
158 }
159
160 ReclaimAll();
161 }
162
DoReclaimLocked(std::unique_lock<std::mutex> & lock,std::unordered_map<std::thread::id,ThreadExitInfo>::iterator it,bool needCallExit=true)163 void DoReclaimLocked(std::unique_lock<std::mutex> &lock,
164 std::unordered_map<std::thread::id, ThreadExitInfo>::iterator it, bool needCallExit = true)
165 {
166 if (it == depositMap_.end()) {
167 return;
168 }
169
170 // Remove thread information from map.
171 auto threadId = it->first;
172 auto exitInfo = std::move(it->second);
173 (void)depositMap_.erase(it);
174
175 // Unlock, because stopping thread maybe spend lot of time, it should be out of the lock.
176 lock.unlock();
177
178 size_t hashThreadId = CalculateHashCode(threadId);
179 HILOGD("DoReclaim: Thread id: %{public}zu", hashThreadId);
180 if (needCallExit) {
181 // Call exit callback to stop loop in thread.
182 exitInfo.threadExit();
183 }
184 // Wait until thread finished.
185 exitInfo.thread->join();
186 HILOGD("DoReclaim: Done, thread id: %{public}zu", hashThreadId);
187
188 // Lock again.
189 lock.lock();
190 }
191
Run()192 void Run()
193 {
194 HILOGD("Run: Collector thread is started");
195
196 std::unique_lock<std::mutex> lock(collectorLock_);
197 while (!destroying_) {
198 // Reclaim threads in list one by one.
199 while (!reclaims_.empty()) {
200 auto threadId = reclaims_.back();
201 reclaims_.pop_back();
202 DoReclaimLocked(lock, depositMap_.find(threadId), false);
203 }
204
205 // Maybe stop running while doing reclaim, so check before waiting.
206 if (destroying_) {
207 break;
208 }
209
210 isWaiting_ = true;
211 condition_.wait(lock);
212 isWaiting_ = false;
213 }
214
215 HILOGD("Run: Collector thread is stopped");
216 }
217
218 std::mutex collectorLock_;
219 std::condition_variable condition_;
220 bool isWaiting_ {false};
221 bool destroying_ {false};
222 std::vector<std::thread::id> reclaims_;
223 std::unordered_map<std::thread::id, ThreadExitInfo> depositMap_;
224
225 std::mutex threadLock_;
226 // Thread for collector
227 std::unique_ptr<std::thread> thread_;
228 bool needCreateThread_ {true};
229
230 // Avatar of thread collector, used to stop collector at the specified opportunity.
231 class Avatar {
232 public:
233 DISALLOW_COPY_AND_MOVE(Avatar);
234
235 Avatar() = default;
~Avatar()236 ~Avatar()
237 {
238 if (avatarEnabled_) {
239 GetInstance().avatarDestructed_ = true;
240 GetInstance().Stop();
241 }
242 }
243
Disable()244 static inline void Disable()
245 {
246 avatarEnabled_ = false;
247 }
248 };
249
250 // Mark whether avatar is destructed.
251 volatile bool avatarDestructed_ {false};
252 // Mark whether avatar is enabled.
253 static volatile bool avatarEnabled_;
254 static Avatar avatar_;
255 };
256
ThreadCollector()257 ThreadCollector::ThreadCollector()
258 : collectorLock_(), condition_(), reclaims_(), depositMap_(), threadLock_(), thread_(nullptr)
259 {
260 // Thread collector is created, so enable avatar.
261 avatarEnabled_ = true;
262 }
263
~ThreadCollector()264 ThreadCollector::~ThreadCollector()
265 {
266 // If avatar is not destructed, stop collector by itself.
267 if (!avatarDestructed_) {
268 avatar_.Disable();
269 Stop();
270 }
271 }
272
273 class EventRunnerImpl final : public EventInnerRunner {
274 public:
EventRunnerImpl(const std::shared_ptr<EventRunner> & runner)275 explicit EventRunnerImpl(const std::shared_ptr<EventRunner> &runner) : EventInnerRunner(runner)
276 {
277 queue_ = std::make_shared<EventQueue>();
278 }
279
280 ~EventRunnerImpl() final = default;
281 DISALLOW_COPY_AND_MOVE(EventRunnerImpl);
282
ThreadMain(const std::weak_ptr<EventRunnerImpl> & wp)283 static void ThreadMain(const std::weak_ptr<EventRunnerImpl> &wp)
284 {
285 std::shared_ptr<EventRunnerImpl> inner = wp.lock();
286 if (inner) {
287 HILOGD("ThreadMain: Start running for thread '%{public}s'", inner->threadName_.c_str());
288
289 // Call system call to modify thread name.
290 SystemCallSetThreadName(inner->threadName_);
291
292 // Enter event loop.
293 inner->Run();
294
295 HILOGD("ThreadMain: Stopped running for thread '%{public}s'", inner->threadName_.c_str());
296 } else {
297 HILOGW("ThreadMain: EventRunner has been released just after its creation");
298 }
299
300 // Reclaim current thread.
301 ThreadCollector::GetInstance().ReclaimCurrentThread();
302 }
303
Run()304 void Run() final
305 {
306 // Prepare to start event loop.
307 queue_->Prepare();
308
309 // Make sure instance of 'EventRunner' exists.
310 if (owner_.expired()) {
311 return;
312 }
313
314 threadId_ = std::this_thread::get_id();
315
316 // Save old event runner.
317 std::weak_ptr<EventRunner> oldRunner = currentEventRunner;
318 // Set current event runner into thread local data.
319 currentEventRunner = owner_;
320
321 // Start event looper.
322 for (auto event = queue_->GetEvent(); event; event = queue_->GetEvent()) {
323 std::shared_ptr<EventHandler> handler = event->GetOwner();
324 // Make sure owner of the event exists.
325 if (handler) {
326 std::shared_ptr<Logger> logging = logger_;
327 if (logging != nullptr) {
328 if (!event->HasTask()) {
329 logging->Log("Dispatching to handler event id = " + std::to_string(event->GetInnerEventId()));
330 } else {
331 logging->Log("Dispatching to handler event task name = " + event->GetTaskName());
332 }
333 }
334 handler->DistributeEvent(event);
335 }
336 // Release event manually, otherwise event will be released until next event coming.
337 event.reset();
338 }
339
340 // Restore current event runner.
341 currentEventRunner = oldRunner;
342 }
343
Stop()344 void Stop() final
345 {
346 queue_->Finish();
347 }
348
Attach(std::unique_ptr<std::thread> & thread)349 inline bool Attach(std::unique_ptr<std::thread> &thread)
350 {
351 auto exitThread = [queue = queue_]() { queue->Finish(); };
352
353 return ThreadCollector::GetInstance().Deposit(thread, exitThread);
354 }
355
SetThreadName(const std::string & threadName)356 inline void SetThreadName(const std::string &threadName)
357 {
358 static std::atomic<uint32_t> idGenerator(1);
359
360 if (threadName.empty()) {
361 // Generate a default name
362 threadName_ = "EventRunner#";
363 threadName_ += std::to_string(idGenerator++);
364 } else {
365 threadName_ = threadName;
366 }
367 }
368
369 private:
370 DEFINE_HILOG_LABEL("EventRunnerImpl");
371 };
372 } // unnamed namespace
373
EventInnerRunner(const std::shared_ptr<EventRunner> & runner)374 EventInnerRunner::EventInnerRunner(const std::shared_ptr<EventRunner> &runner)
375 : queue_(nullptr), owner_(runner), logger_(nullptr), threadName_(""), threadId_()
376 {}
377
GetCurrentEventRunner()378 std::shared_ptr<EventRunner> EventInnerRunner::GetCurrentEventRunner()
379 {
380 const std::weak_ptr<EventRunner> &wp = currentEventRunner;
381 return wp.lock();
382 }
383
384 ThreadLocalData<std::weak_ptr<EventRunner>> EventInnerRunner::currentEventRunner;
385
386 namespace {
387 volatile bool ThreadCollector::avatarEnabled_ = false;
388
389 /*
390 * All event runners are relied on 'currentEventRunner', so make sure destruction of 'currentEventRunner'
391 * should after all event runners finished. All event runners finished in destruction of 'ThreadCollector::Avatar',
392 * so instance of 'ThreadCollector::Avatar' MUST defined after 'currentEventRunner'.
393 */
394 ThreadCollector::Avatar ThreadCollector::avatar_;
395 } // unnamed namespace
396
397 std::shared_ptr<EventRunner> EventRunner::mainRunner_;
398
Create(bool inNewThread)399 std::shared_ptr<EventRunner> EventRunner::Create(bool inNewThread)
400 {
401 if (inNewThread) {
402 return Create(std::string());
403 }
404
405 // Constructor of 'EventRunner' is private, could not use 'std::make_shared' to construct it.
406 std::shared_ptr<EventRunner> sp(new EventRunner(false));
407 auto innerRunner = std::make_shared<EventRunnerImpl>(sp);
408 sp->innerRunner_ = innerRunner;
409 sp->queue_ = innerRunner->GetEventQueue();
410
411 return sp;
412 }
413
Create(const std::string & threadName)414 std::shared_ptr<EventRunner> EventRunner::Create(const std::string &threadName)
415 {
416 // Constructor of 'EventRunner' is private, could not use 'std::make_shared' to construct it.
417 std::shared_ptr<EventRunner> sp(new EventRunner(true));
418 auto innerRunner = std::make_shared<EventRunnerImpl>(sp);
419 sp->innerRunner_ = innerRunner;
420 sp->queue_ = innerRunner->GetEventQueue();
421
422 // Start new thread
423 innerRunner->SetThreadName(threadName);
424 auto thread =
425 std::make_unique<std::thread>(EventRunnerImpl::ThreadMain, std::weak_ptr<EventRunnerImpl>(innerRunner));
426 if (!innerRunner->Attach(thread)) {
427 HILOGW("Create: Failed to attach thread, maybe process is exiting");
428 innerRunner->Stop();
429 thread->join();
430 }
431
432 return sp;
433 }
434
Current()435 std::shared_ptr<EventRunner> EventRunner::Current()
436 {
437 auto runner = EventInnerRunner::GetCurrentEventRunner();
438 if (runner) {
439 return runner;
440 }
441 return nullptr;
442 }
443
EventRunner(bool deposit)444 EventRunner::EventRunner(bool deposit) : deposit_(deposit)
445 {}
446
GetRunnerThreadName() const447 std::string EventRunner::GetRunnerThreadName() const
448 {
449 return innerRunner_->GetThreadName();
450 }
451
~EventRunner()452 EventRunner::~EventRunner()
453 {
454 if (deposit_) {
455 innerRunner_->Stop();
456 }
457 }
458
Run()459 ErrCode EventRunner::Run()
460 {
461 if (deposit_) {
462 HILOGE("Run: Do not call, if event runner is deposited");
463 return EVENT_HANDLER_ERR_RUNNER_NO_PERMIT;
464 }
465
466 // Avoid more than one thread to start this runner.
467 if (running_.exchange(true)) {
468 HILOGW("Run: Already running");
469 return EVENT_HANDLER_ERR_RUNNER_ALREADY;
470 }
471
472 // Entry event loop.
473 innerRunner_->Run();
474
475 running_.store(false);
476 return ERR_OK;
477 }
478
Stop()479 ErrCode EventRunner::Stop()
480 {
481 if (deposit_) {
482 HILOGE("Stop: Do not call, if event runner is deposited");
483 return EVENT_HANDLER_ERR_RUNNER_NO_PERMIT;
484 }
485
486 if (running_.load()) {
487 innerRunner_->Stop();
488 } else {
489 HILOGW("Stop: Already stopped");
490 }
491
492 return ERR_OK;
493 }
494
Dump(Dumper & dumper)495 void EventRunner::Dump(Dumper &dumper)
496 {
497 if (!IsRunning()) {
498 dumper.Dump(dumper.GetTag() + " Event runner is not running" + LINE_SEPARATOR);
499 return;
500 }
501
502 if (queue_ == nullptr) {
503 dumper.Dump(dumper.GetTag() + " Queue is nullLINE_SEPARATOR" + LINE_SEPARATOR);
504 return;
505 }
506
507 dumper.Dump(dumper.GetTag() + " Event runner (" + "Thread name = " + innerRunner_->GetThreadName() +
508 ", Thread ID = " + std::to_string(GetThreadId()) + ") is running" + LINE_SEPARATOR);
509
510 queue_->Dump(dumper);
511 }
512
DumpRunnerInfo(std::string & runnerInfo)513 void EventRunner::DumpRunnerInfo(std::string& runnerInfo)
514 {
515 if (!IsRunning()) {
516 runnerInfo = " Event runner is not running" + LINE_SEPARATOR;
517 }
518
519 if (queue_ == nullptr) {
520 runnerInfo = " Queue is null" + LINE_SEPARATOR;
521 return;
522 }
523
524 std::string queueInfo;
525 queue_->DumpQueueInfo(queueInfo);
526 runnerInfo += queueInfo;
527 }
528
SetLogger(const std::shared_ptr<Logger> & logger)529 void EventRunner::SetLogger(const std::shared_ptr<Logger> &logger)
530 {
531 innerRunner_->SetLogger(logger);
532 }
533
GetCurrentEventQueue()534 std::shared_ptr<EventQueue> EventRunner::GetCurrentEventQueue()
535 {
536 auto runner = EventRunner::Current();
537 if (!runner) {
538 return nullptr;
539 }
540 return runner->queue_;
541 }
542
GetThreadId()543 uint64_t EventRunner::GetThreadId()
544 {
545 std::thread::id tid = innerRunner_->GetThreadId();
546 std::stringstream buf;
547 buf << tid;
548 std::string stid = buf.str();
549 return std::stoull(stid);
550 }
551
IsCurrentRunnerThread()552 bool EventRunner::IsCurrentRunnerThread()
553 {
554 return std::this_thread::get_id() == innerRunner_->GetThreadId();
555 }
556
GetMainEventRunner()557 std::shared_ptr<EventRunner> EventRunner::GetMainEventRunner()
558 {
559 if (!mainRunner_) {
560 mainRunner_ = Create(false);
561 if (!mainRunner_) {
562 HILOGE("mainRunner_ create fail");
563 }
564 }
565
566 return mainRunner_;
567 }
568 } // namespace AppExecFwk
569 } // namespace OHOS
570