• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "serial_queue.h"
17 #include <chrono>
18 #include "dfx/log/ffrt_log_api.h"
19 
20 namespace ffrt {
~SerialQueue()21 SerialQueue::~SerialQueue()
22 {
23     Quit();
24 }
25 
Quit()26 void SerialQueue::Quit()
27 {
28     std::unique_lock lock(mutex_);
29     FFRT_LOGD("quit [%s] enter", name_.c_str());
30     if (isExit_) {
31         return;
32     }
33     isExit_ = true;
34     cond_.notify_all();
35 
36     for (auto it = whenMap_.begin(); it != whenMap_.end(); it++) {
37         for (auto itList = it->second.begin(); itList != it->second.end(); itList++) {
38             if (*itList != nullptr) {
39                 (*itList)->Notify();
40                 (*itList)->DecDeleteRef();
41             }
42         }
43     }
44     whenMap_.clear();
45     FFRT_LOGD("quit [%s] leave", name_.c_str());
46 }
47 
PushTask(ITask * task,uint64_t upTime)48 int SerialQueue::PushTask(ITask* task, uint64_t upTime)
49 {
50     std::unique_lock lock(mutex_);
51     FFRT_COND_DO_ERR((task == nullptr), return -1, "failed to push task, task is nullptr");
52     whenMap_[upTime].emplace_back(task);
53     if (upTime == whenMap_.begin()->first) {
54         cond_.notify_all();
55     }
56     FFRT_LOGD("push serial task gid=%llu into [%s] succ", task->gid, name_.c_str());
57     return 0;
58 }
59 
RemoveTask(const ITask * task)60 int SerialQueue::RemoveTask(const ITask* task)
61 {
62     std::unique_lock lock(mutex_);
63     FFRT_COND_DO_ERR((task == nullptr), return -1, "failed to remove task, task is nullptr");
64     FFRT_LOGD("remove serial task gid=%llu of [%s] enter", task->gid, name_.c_str());
65     for (auto it = whenMap_.begin(); it != whenMap_.end();) {
66         for (auto itList = it->second.begin(); itList != it->second.end();) {
67             if ((*itList) != task) {
68                 itList++;
69                 continue;
70             }
71             it->second.erase(itList++);
72             // a task can be submitted only once through the C interface
73             return 0;
74         }
75 
76         if (it->second.empty()) {
77             whenMap_.erase(it++);
78         } else {
79             it++;
80         }
81     }
82     FFRT_LOGD("remove serial task gid=%llu of [%s] failed, task not waiting in queue", task->gid, name_.c_str());
83     return 1;
84 }
85 
Next()86 ITask* SerialQueue::Next()
87 {
88     std::unique_lock lock(mutex_);
89     while (whenMap_.empty() && !isExit_) {
90         FFRT_LOGD("[%s] is empty, begin to wait", name_.c_str());
91         cond_.wait(lock);
92         FFRT_LOGD("[%s] is notified, end to wait", name_.c_str());
93     }
94 
95     if (isExit_) {
96         FFRT_LOGD("[%s] is exit", name_.c_str());
97         return nullptr;
98     }
99 
100     auto nowUs = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::steady_clock::now());
101     uint64_t now = static_cast<uint64_t>(nowUs.time_since_epoch().count());
102     auto it = whenMap_.begin();
103     if (now >= it->first) {
104         if (it->second.empty()) {
105             (void)whenMap_.erase(it);
106             return nullptr;
107         }
108         auto nextTask = *it->second.begin();
109         it->second.pop_front();
110         if (it->second.empty()) {
111             (void)whenMap_.erase(it);
112         }
113         FFRT_LOGD("get next serial task gid=%llu, %s contains [%u] other timestamps", nextTask->gid, name_.c_str(),
114             whenMap_.size());
115         return nextTask;
116     } else {
117         uint64_t diff = it->first - now;
118         FFRT_LOGD("[%s] begin to wait for [%llu us] to get next task", name_.c_str(), diff);
119         (void)cond_.wait_for(lock, std::chrono::microseconds(diff));
120         FFRT_LOGD("[%s] end to wait for [%llu us]", name_.c_str(), diff);
121     }
122 
123     return nullptr;
124 }
125 } // namespace ffrt
126