• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "serial_looper.h"
16 #include <sstream>
17 #include "cpp/task.h"
18 #include "dfx/log/ffrt_log_api.h"
19 #include "ihandler.h"
20 #include "sync/sync.h"
21 #include "util/slab.h"
22 #include "queue_monitor.h"
23 
24 namespace {
25 constexpr uint32_t STRING_SIZE_MAX = 128;
26 }
27 
28 namespace ffrt {
29 static std::atomic_uint32_t q_gid(0);
SerialLooper(const char * name,int qos,uint64_t timeout,ffrt_function_header_t * timeoutCb)30 SerialLooper::SerialLooper(const char* name, int qos, uint64_t timeout, ffrt_function_header_t* timeoutCb)
31     : qid_(q_gid++), timeout_(timeout), timeoutCb_(timeoutCb)
32 {
33     if (name != nullptr && (std::string(name).size() <= STRING_SIZE_MAX)) {
34         name_ += name;
35     }
36 
37     if (timeout > 0 && timeoutCb_ != nullptr) {
38         GetSerialTaskByFuncStorageOffset(timeoutCb)->IncDeleteRef();
39     }
40 
41     queue_ = std::make_shared<SerialQueue>(qid_, name_);
42     FFRT_COND_DO_ERR((queue_ == nullptr), return, "failed to construct serial queue, qid=%u", qid_);
43     // using nested submission is to submit looper task on worker.
44     // when ffrt::wait() is used in the current thread, the looper task is not in the waiting list.
45     submit([this, qos] { handle = submit_h([this] { Run(); }, {}, {}, task_attr().name(name_.c_str()).qos(qos)); },
46         {}, { &handle });
47     QueueMonitor::GetInstance().RegisterQueueId(qid_);
48     ffrt::wait({&handle});
49     FFRT_COND_DO_ERR((handle == nullptr), return, "failed to construct serial looper, qid=%u", qid_);
50     FFRT_LOGI("create serial looper [%s] succ, qid=%u", name_.c_str(), qid_);
51 }
52 
~SerialLooper()53 SerialLooper::~SerialLooper()
54 {
55     Quit();
56 }
57 
Quit()58 void SerialLooper::Quit()
59 {
60     FFRT_LOGI("quit serial looper [%s] enter, qid=%u", name_.c_str(), qid_);
61     isExit_.store(true);
62     queue_->Quit();
63     QueueMonitor::GetInstance().ResetQueueInfo(qid_);
64     // wait for the task being executed to complete.
65     wait({handle});
66 
67     if (timeout_ > 0) {
68         // wait for all delayedWorker to complete.
69         while (delayedCbCnt_.load() > 0) {
70             this_task::sleep_for(std::chrono::microseconds(timeout_));
71         }
72 
73         if (timeoutCb_ != nullptr) {
74             GetSerialTaskByFuncStorageOffset(timeoutCb_)->DecDeleteRef();
75         }
76     }
77     FFRT_LOGI("quit serial looper [%s] leave, qid=%u", name_.c_str(), qid_);
78 }
79 
Run()80 void SerialLooper::Run()
81 {
82     FFRT_LOGI("run serial looper [%s] enter, qid=%u", name_.c_str(), qid_);
83     while (!isExit_.load()) {
84         ITask* task = queue_->Next();
85         if (task) {
86             FFRT_LOGI("pick task gid=%llu, qid=%u [%s] remains [%u]", task->gid, qid_, name_.c_str(),
87                 queue_->GetMapSize());
88             SetTimeoutMonitor(task);
89             FFRT_COND_DO_ERR((task->handler_ == nullptr), break, "failed to run task, handler is nullptr");
90             QueueMonitor::GetInstance().UpdateQueueInfo(qid_, task->gid);
91             task->handler_->DispatchTask(task);
92             QueueMonitor::GetInstance().ResetQueueInfo(qid_);
93         }
94     }
95     FFRT_LOGI("run serial looper [%s] leave, qid=%u", name_.c_str(), qid_);
96 }
97 
SetTimeoutMonitor(ITask * task)98 void SerialLooper::SetTimeoutMonitor(ITask* task)
99 {
100     if (timeout_ <= 0) {
101         return;
102     }
103 
104     task->IncDeleteRef();
105     WaitUntilEntry* we = new (SimpleAllocator<WaitUntilEntry>::allocMem()) WaitUntilEntry();
106     // set dealyedworker callback
107     we->cb = ([this, task](WaitEntry* we) {
108         if (!task->isFinished_) {
109             RunTimeOutCallback(task);
110         }
111         delayedCbCnt_.fetch_sub(1);
112         task->DecDeleteRef();
113         SimpleAllocator<WaitUntilEntry>::freeMem(static_cast<WaitUntilEntry*>(we));
114     });
115 
116     // set dealyedworker wakeup time
117     std::chrono::microseconds timeout(timeout_);
118     auto now = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::steady_clock::now());
119     we->tp = std::chrono::time_point_cast<std::chrono::steady_clock::duration>(now + timeout);
120 
121     if (!DelayedWakeup(we->tp, we, we->cb)) {
122         task->DecDeleteRef();
123         SimpleAllocator<WaitUntilEntry>::freeMem(we);
124         FFRT_LOGW("failed to set watchdog for task gid=%llu in %s qid=%u with timeout [%llu us] ", task->gid,
125             name_.c_str(), qid_, timeout_);
126         return;
127     }
128 
129     delayedCbCnt_.fetch_add(1);
130     FFRT_LOGD("set watchdog of task gid=%llu of %s qid=%u succ", task->gid, name_.c_str(), qid_);
131 }
132 
RunTimeOutCallback(ITask * task)133 void SerialLooper::RunTimeOutCallback(ITask* task)
134 {
135     std::stringstream ss;
136     ss << "serial queue [" << name_ << "] qid=" << qid_ << ", serial task gid=" << task->gid <<
137         " execution time exceeds " << timeout_ << " us";
138     std::string msg = ss.str();
139     std::string eventName = "SERIAL_TASK_TIMEOUT";
140 
141 #ifdef FFRT_SEND_EVENT
142     time_t cur_time = time(nullptr);
143     std::string sendMsg = std::string((ctime(&cur_time) == nullptr) ? "" : ctime(&cur_time)) + "\n" + msg + "\n";
144     HiSysEventWrite(OHOS::HiviewDFX::HiSysEvent::Domain::FFRT, eventName,
145         OHOS::HiviewDFX::HiSysEvent::EventType::FAULT, "PID", getpid(), "TGID", getgid(), "UID", getuid(),
146         "MODULE_NAME", "ffrt", "PROCESS_NAME", "ffrt", "MSG", sendMsg);
147 #endif
148 
149     FFRT_LOGE("[%s], %s", eventName.c_str(), msg.c_str());
150     if (timeoutCb_ != nullptr) {
151         timeoutCb_->exec(timeoutCb_);
152     }
153 }
154 } // namespace ffrt
155