1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "serial_queue.h"
17 #include <chrono>
18 #include "dfx/log/ffrt_log_api.h"
19
20 namespace ffrt {
~SerialQueue()21 SerialQueue::~SerialQueue()
22 {
23 Quit();
24 }
25
Quit()26 void SerialQueue::Quit()
27 {
28 std::unique_lock lock(mutex_);
29 FFRT_LOGD("quit [%s] enter", name_.c_str());
30 if (isExit_) {
31 return;
32 }
33 isExit_ = true;
34 cond_.notify_all();
35
36 for (auto it = whenMap_.begin(); it != whenMap_.end(); it++) {
37 for (auto itList = it->second.begin(); itList != it->second.end(); itList++) {
38 if (*itList != nullptr) {
39 (*itList)->Notify();
40 (*itList)->DecDeleteRef();
41 }
42 }
43 }
44 whenMap_.clear();
45 FFRT_LOGD("quit [%s] leave", name_.c_str());
46 }
47
PushTask(ITask * task,uint64_t upTime)48 int SerialQueue::PushTask(ITask* task, uint64_t upTime)
49 {
50 FFRT_COND_DO_ERR((task == nullptr), return -1, "failed to push task, task is nullptr");
51 FFRT_LOGI("push task gid=%llu to qid=%u [%s]", task->gid, qid_, name_.c_str());
52 {
53 std::unique_lock lock(mutex_);
54 whenMap_[upTime].emplace_back(task);
55 if (upTime == whenMap_.begin()->first) {
56 cond_.notify_all();
57 }
58 }
59 return 0;
60 }
61
RemoveTask(const ITask * task)62 int SerialQueue::RemoveTask(const ITask* task)
63 {
64 FFRT_COND_DO_ERR((task == nullptr), return -1, "failed to remove task, task is nullptr");
65 FFRT_LOGI("cancel task gid=%llu of qid=%u [%s]", task->gid, qid_, name_.c_str());
66 {
67 std::unique_lock lock(mutex_);
68 for (auto it = whenMap_.begin(); it != whenMap_.end();) {
69 for (auto itList = it->second.begin(); itList != it->second.end();) {
70 if ((*itList) != task) {
71 itList++;
72 continue;
73 }
74 it->second.erase(itList++);
75 // a task can be submitted only once through the C interface
76 return 0;
77 }
78
79 if (it->second.empty()) {
80 whenMap_.erase(it++);
81 } else {
82 it++;
83 }
84 }
85 }
86 FFRT_LOGD("remove serial task gid=%llu of [%s] failed, task not waiting in queue", task->gid, name_.c_str());
87 return 1;
88 }
89
Next()90 ITask* SerialQueue::Next()
91 {
92 std::unique_lock lock(mutex_);
93 while (whenMap_.empty() && !isExit_) {
94 FFRT_LOGD("[%s] is empty, begin to wait", name_.c_str());
95 cond_.wait(lock);
96 FFRT_LOGD("[%s] is notified, end to wait", name_.c_str());
97 }
98
99 if (isExit_) {
100 FFRT_LOGD("[%s] is exit", name_.c_str());
101 return nullptr;
102 }
103
104 auto nowUs = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::steady_clock::now());
105 uint64_t now = static_cast<uint64_t>(nowUs.time_since_epoch().count());
106 auto it = whenMap_.begin();
107 if (now >= it->first) {
108 if (it->second.empty()) {
109 (void)whenMap_.erase(it);
110 return nullptr;
111 }
112 auto nextTask = *it->second.begin();
113 it->second.pop_front();
114 if (it->second.empty()) {
115 (void)whenMap_.erase(it);
116 }
117 mapSize_.store(whenMap_.size());
118 return nextTask;
119 } else {
120 uint64_t diff = it->first - now;
121 FFRT_LOGD("[%s] begin to wait for [%llu us] to get next task", name_.c_str(), diff);
122 (void)cond_.wait_for(lock, std::chrono::microseconds(diff));
123 FFRT_LOGD("[%s] end to wait for [%llu us]", name_.c_str(), diff);
124 }
125
126 return nullptr;
127 }
128 } // namespace ffrt
129