1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "serial_queue.h"
17 #include <chrono>
18 #include "dfx/log/ffrt_log_api.h"
19
20 namespace ffrt {
~SerialQueue()21 SerialQueue::~SerialQueue()
22 {
23 Quit();
24 }
25
Quit()26 void SerialQueue::Quit()
27 {
28 std::unique_lock lock(mutex_);
29 FFRT_LOGD("quit [%s] enter", name_.c_str());
30 if (isExit_) {
31 return;
32 }
33 isExit_ = true;
34 cond_.notify_all();
35
36 for (auto it = whenMap_.begin(); it != whenMap_.end(); it++) {
37 for (auto itList = it->second.begin(); itList != it->second.end(); itList++) {
38 if (*itList != nullptr) {
39 (*itList)->Notify();
40 (*itList)->DecDeleteRef();
41 }
42 }
43 }
44 whenMap_.clear();
45 FFRT_LOGD("quit [%s] leave", name_.c_str());
46 }
47
PushTask(ITask * task,uint64_t upTime)48 int SerialQueue::PushTask(ITask* task, uint64_t upTime)
49 {
50 std::unique_lock lock(mutex_);
51 FFRT_COND_DO_ERR((task == nullptr), return -1, "failed to push task, task is nullptr");
52 whenMap_[upTime].emplace_back(task);
53 if (upTime == whenMap_.begin()->first) {
54 cond_.notify_all();
55 }
56 FFRT_LOGD("push serial task gid=%llu into [%s] succ", task->gid, name_.c_str());
57 return 0;
58 }
59
RemoveTask(const ITask * task)60 int SerialQueue::RemoveTask(const ITask* task)
61 {
62 std::unique_lock lock(mutex_);
63 FFRT_COND_DO_ERR((task == nullptr), return -1, "failed to remove task, task is nullptr");
64 FFRT_LOGD("remove serial task gid=%llu of [%s] enter", task->gid, name_.c_str());
65 for (auto it = whenMap_.begin(); it != whenMap_.end();) {
66 for (auto itList = it->second.begin(); itList != it->second.end();) {
67 if ((*itList) != task) {
68 itList++;
69 continue;
70 }
71 it->second.erase(itList++);
72 // a task can be submitted only once through the C interface
73 return 0;
74 }
75
76 if (it->second.empty()) {
77 whenMap_.erase(it++);
78 } else {
79 it++;
80 }
81 }
82 FFRT_LOGD("remove serial task gid=%llu of [%s] failed, task not waiting in queue", task->gid, name_.c_str());
83 return 1;
84 }
85
Next()86 ITask* SerialQueue::Next()
87 {
88 std::unique_lock lock(mutex_);
89 while (whenMap_.empty() && !isExit_) {
90 FFRT_LOGD("[%s] is empty, begin to wait", name_.c_str());
91 cond_.wait(lock);
92 FFRT_LOGD("[%s] is notified, end to wait", name_.c_str());
93 }
94
95 if (isExit_) {
96 FFRT_LOGD("[%s] is exit", name_.c_str());
97 return nullptr;
98 }
99
100 auto nowUs = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::steady_clock::now());
101 uint64_t now = static_cast<uint64_t>(nowUs.time_since_epoch().count());
102 auto it = whenMap_.begin();
103 if (now >= it->first) {
104 if (it->second.empty()) {
105 (void)whenMap_.erase(it);
106 return nullptr;
107 }
108 auto nextTask = *it->second.begin();
109 it->second.pop_front();
110 if (it->second.empty()) {
111 (void)whenMap_.erase(it);
112 }
113 FFRT_LOGD("get next serial task gid=%llu, %s contains [%u] other timestamps", nextTask->gid, name_.c_str(),
114 whenMap_.size());
115 return nextTask;
116 } else {
117 uint64_t diff = it->first - now;
118 FFRT_LOGD("[%s] begin to wait for [%llu us] to get next task", name_.c_str(), diff);
119 (void)cond_.wait_for(lock, std::chrono::microseconds(diff));
120 FFRT_LOGD("[%s] end to wait for [%llu us]", name_.c_str(), diff);
121 }
122
123 return nullptr;
124 }
125 } // namespace ffrt
126