1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "serial_looper.h"
16 #include <sstream>
17 #include "cpp/task.h"
18 #include "dfx/log/ffrt_log_api.h"
19 #include "ihandler.h"
20 #include "sync/sync.h"
21 #include "util/slab.h"
22 #include "queue_monitor.h"
23
24 namespace {
25 constexpr uint32_t STRING_SIZE_MAX = 128;
26 }
27
28 namespace ffrt {
29 static std::atomic_uint32_t q_gid(0);
SerialLooper(const char * name,int qos,uint64_t timeout,ffrt_function_header_t * timeoutCb)30 SerialLooper::SerialLooper(const char* name, int qos, uint64_t timeout, ffrt_function_header_t* timeoutCb)
31 : qid_(q_gid++), timeout_(timeout), timeoutCb_(timeoutCb)
32 {
33 if (name != nullptr && (std::string(name).size() <= STRING_SIZE_MAX)) {
34 name_ += name;
35 }
36
37 if (timeout > 0 && timeoutCb_ != nullptr) {
38 GetSerialTaskByFuncStorageOffset(timeoutCb)->IncDeleteRef();
39 }
40
41 queue_ = std::make_shared<SerialQueue>(name_);
42 FFRT_COND_DO_ERR((queue_ == nullptr), return, "failed to construct serial queue, qid=%u", qid_);
43 // using nested submission is to submit looper task on worker.
44 // when ffrt::wait() is used in the current thread, the looper task is not in the waiting list.
45 submit([this, qos] { handle = submit_h([this] { Run(); }, {}, {}, task_attr().name(name_.c_str()).qos(qos)); },
46 {}, { &handle });
47 QueueMonitor::GetInstance().RegisterQueueId(qid_);
48 ffrt::wait({&handle});
49 FFRT_COND_DO_ERR((handle == nullptr), return, "failed to construct serial looper, qid=%u", qid_);
50 FFRT_LOGI("create serial looper [%s] succ, qid=%u", name_.c_str(), qid_);
51 }
52
~SerialLooper()53 SerialLooper::~SerialLooper()
54 {
55 Quit();
56 }
57
Quit()58 void SerialLooper::Quit()
59 {
60 FFRT_LOGI("quit serial looper [%s] enter", name_.c_str());
61 isExit_.store(true);
62 queue_->Quit();
63 QueueMonitor::GetInstance().ResetQueueInfo(qid_);
64 // wait for the task being executed to complete.
65 wait({handle});
66
67 if (timeout_ > 0) {
68 // wait for all delayedWorker to complete.
69 while (delayedCbCnt_.load() > 0) {
70 this_task::sleep_for(std::chrono::microseconds(timeout_));
71 }
72
73 if (timeoutCb_ != nullptr) {
74 GetSerialTaskByFuncStorageOffset(timeoutCb_)->DecDeleteRef();
75 }
76 }
77 FFRT_LOGI("quit serial looper [%s] leave, qid=%u", name_.c_str(), qid_);
78 }
79
Run()80 void SerialLooper::Run()
81 {
82 FFRT_LOGI("run serial looper [%s] enter, qid=%u", name_.c_str(), qid_);
83 while (!isExit_.load()) {
84 ITask* task = queue_->Next();
85 if (task) {
86 SetTimeoutMonitor(task);
87 FFRT_COND_DO_ERR((task->handler_ == nullptr), break, "failed to run task, handler is nullptr");
88 QueueMonitor::GetInstance().UpdateQueueInfo(qid_, task->gid);
89 task->handler_->DispatchTask(task);
90 QueueMonitor::GetInstance().ResetQueueInfo(qid_);
91 }
92 }
93 FFRT_LOGI("run serial looper [%s] enter, qid=%u", name_.c_str(), qid_);
94 }
95
SetTimeoutMonitor(ITask * task)96 void SerialLooper::SetTimeoutMonitor(ITask* task)
97 {
98 if (timeout_ <= 0) {
99 return;
100 }
101
102 task->IncDeleteRef();
103 WaitUntilEntry* we = new (SimpleAllocator<WaitUntilEntry>::allocMem()) WaitUntilEntry();
104 // set dealyedworker callback
105 we->cb = ([this, task](WaitEntry* we) {
106 if (!task->isFinished_) {
107 RunTimeOutCallback(task);
108 }
109 delayedCbCnt_.fetch_sub(1);
110 task->DecDeleteRef();
111 SimpleAllocator<WaitUntilEntry>::freeMem(static_cast<WaitUntilEntry*>(we));
112 });
113
114 // set dealyedworker wakeup time
115 std::chrono::microseconds timeout(timeout_);
116 auto now = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::steady_clock::now());
117 we->tp = std::chrono::time_point_cast<std::chrono::steady_clock::duration>(now + timeout);
118
119 if (!DelayedWakeup(we->tp, we, we->cb)) {
120 task->DecDeleteRef();
121 SimpleAllocator<WaitUntilEntry>::freeMem(we);
122 FFRT_LOGW("failed to set watchdog for task gid=%llu in %s qid=%u with timeout [%llu us] ",
123 task->gid, name_.c_str(), qid_, timeout_);
124 return;
125 }
126
127 delayedCbCnt_.fetch_add(1);
128 FFRT_LOGD("set watchdog of task [%p] of %s succ", task, name_.c_str());
129 }
130
RunTimeOutCallback(ITask * task)131 void SerialLooper::RunTimeOutCallback(ITask* task)
132 {
133 std::stringstream ss;
134 ss << "serial queue [" << name_ << "] qid=" << qid_ <<
135 ", serial task gid=" << task->gid << " execution time exceeds "
136 << timeout_ << " us";
137 std::string msg = ss.str();
138 std::string eventName = "SERIAL_TASK_TIMEOUT";
139
140 #ifdef FFRT_SEND_EVENT
141 time_t cur_time = time(nullptr);
142 std::string sendMsg = std::string((ctime(&cur_time) == nullptr) ? "" : ctime(&cur_time)) + "\n" + msg + "\n";
143 HiSysEventWrite(OHOS::HiviewDFX::HiSysEvent::Domain::FFRT, eventName,
144 OHOS::HiviewDFX::HiSysEvent::EventType::FAULT, "PID", getpid(), "TGID", getgid(), "UID", getuid(),
145 "MODULE_NAME", "ffrt", "PROCESS_NAME", "ffrt", "MSG", sendMsg);
146 #endif
147
148 FFRT_LOGE("[%s], %s", eventName.c_str(), msg.c_str());
149 if (timeoutCb_ != nullptr) {
150 timeoutCb_->exec(timeoutCb_);
151 }
152 }
153 } // namespace ffrt
154