1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "serial_looper.h"
16 #include <sstream>
17 #include "cpp/task.h"
18 #include "dfx/log/ffrt_log_api.h"
19 #include "ihandler.h"
20 #include "sync/sync.h"
21 #include "util/slab.h"
22 #include "queue_monitor.h"
23
24 namespace {
25 constexpr uint32_t STRING_SIZE_MAX = 128;
26 }
27
28 namespace ffrt {
29 static std::atomic_uint32_t q_gid(0);
SerialLooper(const char * name,int qos,uint64_t timeout,ffrt_function_header_t * timeoutCb)30 SerialLooper::SerialLooper(const char* name, int qos, uint64_t timeout, ffrt_function_header_t* timeoutCb)
31 : qid_(q_gid++), timeout_(timeout), timeoutCb_(timeoutCb)
32 {
33 if (name != nullptr && (std::string(name).size() <= STRING_SIZE_MAX)) {
34 name_ += name;
35 }
36
37 if (timeout > 0 && timeoutCb_ != nullptr) {
38 GetSerialTaskByFuncStorageOffset(timeoutCb)->IncDeleteRef();
39 }
40
41 queue_ = std::make_shared<SerialQueue>(qid_, name_);
42 FFRT_COND_DO_ERR((queue_ == nullptr), return, "failed to construct serial queue, qid=%u", qid_);
43 // using nested submission is to submit looper task on worker.
44 // when ffrt::wait() is used in the current thread, the looper task is not in the waiting list.
45 submit([this, qos] { handle = submit_h([this] { Run(); }, {}, {}, task_attr().name(name_.c_str()).qos(qos)); },
46 {}, { &handle });
47 QueueMonitor::GetInstance().RegisterQueueId(qid_);
48 ffrt::wait({&handle});
49 FFRT_COND_DO_ERR((handle == nullptr), return, "failed to construct serial looper, qid=%u", qid_);
50 FFRT_LOGI("create serial looper [%s] succ, qid=%u", name_.c_str(), qid_);
51 }
52
~SerialLooper()53 SerialLooper::~SerialLooper()
54 {
55 Quit();
56 }
57
Quit()58 void SerialLooper::Quit()
59 {
60 FFRT_LOGI("quit serial looper [%s] enter, qid=%u", name_.c_str(), qid_);
61 isExit_.store(true);
62 queue_->Quit();
63 QueueMonitor::GetInstance().ResetQueueInfo(qid_);
64 // wait for the task being executed to complete.
65 wait({handle});
66
67 if (timeout_ > 0) {
68 // wait for all delayedWorker to complete.
69 while (delayedCbCnt_.load() > 0) {
70 this_task::sleep_for(std::chrono::microseconds(timeout_));
71 }
72
73 if (timeoutCb_ != nullptr) {
74 GetSerialTaskByFuncStorageOffset(timeoutCb_)->DecDeleteRef();
75 }
76 }
77 FFRT_LOGI("quit serial looper [%s] leave, qid=%u", name_.c_str(), qid_);
78 }
79
Run()80 void SerialLooper::Run()
81 {
82 FFRT_LOGI("run serial looper [%s] enter, qid=%u", name_.c_str(), qid_);
83 while (!isExit_.load()) {
84 ITask* task = queue_->Next();
85 if (task) {
86 FFRT_LOGI("pick task gid=%llu, qid=%u [%s] remains [%u]", task->gid, qid_, name_.c_str(),
87 queue_->GetMapSize());
88 SetTimeoutMonitor(task);
89 FFRT_COND_DO_ERR((task->handler_ == nullptr), break, "failed to run task, handler is nullptr");
90 QueueMonitor::GetInstance().UpdateQueueInfo(qid_, task->gid);
91 task->handler_->DispatchTask(task);
92 QueueMonitor::GetInstance().ResetQueueInfo(qid_);
93 }
94 }
95 FFRT_LOGI("run serial looper [%s] leave, qid=%u", name_.c_str(), qid_);
96 }
97
SetTimeoutMonitor(ITask * task)98 void SerialLooper::SetTimeoutMonitor(ITask* task)
99 {
100 if (timeout_ <= 0) {
101 return;
102 }
103
104 task->IncDeleteRef();
105 WaitUntilEntry* we = new (SimpleAllocator<WaitUntilEntry>::allocMem()) WaitUntilEntry();
106 // set dealyedworker callback
107 we->cb = ([this, task](WaitEntry* we) {
108 if (!task->isFinished_) {
109 RunTimeOutCallback(task);
110 }
111 delayedCbCnt_.fetch_sub(1);
112 task->DecDeleteRef();
113 SimpleAllocator<WaitUntilEntry>::freeMem(static_cast<WaitUntilEntry*>(we));
114 });
115
116 // set dealyedworker wakeup time
117 std::chrono::microseconds timeout(timeout_);
118 auto now = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::steady_clock::now());
119 we->tp = std::chrono::time_point_cast<std::chrono::steady_clock::duration>(now + timeout);
120
121 if (!DelayedWakeup(we->tp, we, we->cb)) {
122 task->DecDeleteRef();
123 SimpleAllocator<WaitUntilEntry>::freeMem(we);
124 FFRT_LOGW("failed to set watchdog for task gid=%llu in %s qid=%u with timeout [%llu us] ", task->gid,
125 name_.c_str(), qid_, timeout_);
126 return;
127 }
128
129 delayedCbCnt_.fetch_add(1);
130 FFRT_LOGD("set watchdog of task gid=%llu of %s qid=%u succ", task->gid, name_.c_str(), qid_);
131 }
132
RunTimeOutCallback(ITask * task)133 void SerialLooper::RunTimeOutCallback(ITask* task)
134 {
135 std::stringstream ss;
136 ss << "serial queue [" << name_ << "] qid=" << qid_ << ", serial task gid=" << task->gid <<
137 " execution time exceeds " << timeout_ << " us";
138 std::string msg = ss.str();
139 std::string eventName = "SERIAL_TASK_TIMEOUT";
140
141 #ifdef FFRT_SEND_EVENT
142 time_t cur_time = time(nullptr);
143 std::string sendMsg = std::string((ctime(&cur_time) == nullptr) ? "" : ctime(&cur_time)) + "\n" + msg + "\n";
144 HiSysEventWrite(OHOS::HiviewDFX::HiSysEvent::Domain::FFRT, eventName,
145 OHOS::HiviewDFX::HiSysEvent::EventType::FAULT, "PID", getpid(), "TGID", getgid(), "UID", getuid(),
146 "MODULE_NAME", "ffrt", "PROCESS_NAME", "ffrt", "MSG", sendMsg);
147 #endif
148
149 FFRT_LOGE("[%s], %s", eventName.c_str(), msg.c_str());
150 if (timeoutCb_ != nullptr) {
151 timeoutCb_->exec(timeoutCb_);
152 }
153 }
154 } // namespace ffrt
155