• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "serial_queue.h"
17 #include <chrono>
18 #include "dfx/log/ffrt_log_api.h"
19 
20 namespace ffrt {
~SerialQueue()21 SerialQueue::~SerialQueue()
22 {
23     Quit();
24 }
25 
Quit()26 void SerialQueue::Quit()
27 {
28     std::unique_lock lock(mutex_);
29     FFRT_LOGD("quit [%s] enter", name_.c_str());
30     if (isExit_) {
31         return;
32     }
33     isExit_ = true;
34     cond_.notify_all();
35 
36     for (auto it = whenMap_.begin(); it != whenMap_.end(); it++) {
37         for (auto itList = it->second.begin(); itList != it->second.end(); itList++) {
38             if (*itList != nullptr) {
39                 (*itList)->Notify();
40                 (*itList)->DecDeleteRef();
41             }
42         }
43     }
44     whenMap_.clear();
45     FFRT_LOGD("quit [%s] leave", name_.c_str());
46 }
47 
PushTask(ITask * task,uint64_t upTime)48 int SerialQueue::PushTask(ITask* task, uint64_t upTime)
49 {
50     FFRT_COND_DO_ERR((task == nullptr), return -1, "failed to push task, task is nullptr");
51     FFRT_LOGI("push task gid=%llu to qid=%u [%s]", task->gid, qid_, name_.c_str());
52     {
53         std::unique_lock lock(mutex_);
54         whenMap_[upTime].emplace_back(task);
55         if (upTime == whenMap_.begin()->first) {
56             cond_.notify_all();
57         }
58     }
59     return 0;
60 }
61 
RemoveTask(const ITask * task)62 int SerialQueue::RemoveTask(const ITask* task)
63 {
64     FFRT_COND_DO_ERR((task == nullptr), return -1, "failed to remove task, task is nullptr");
65     FFRT_LOGI("cancel task gid=%llu of qid=%u [%s]", task->gid, qid_, name_.c_str());
66     {
67         std::unique_lock lock(mutex_);
68         for (auto it = whenMap_.begin(); it != whenMap_.end();) {
69             for (auto itList = it->second.begin(); itList != it->second.end();) {
70                 if ((*itList) != task) {
71                     itList++;
72                     continue;
73                 }
74                 it->second.erase(itList++);
75                 // a task can be submitted only once through the C interface
76                 return 0;
77             }
78 
79             if (it->second.empty()) {
80                 whenMap_.erase(it++);
81             } else {
82                 it++;
83             }
84         }
85     }
86     FFRT_LOGD("remove serial task gid=%llu of [%s] failed, task not waiting in queue", task->gid, name_.c_str());
87     return 1;
88 }
89 
Next()90 ITask* SerialQueue::Next()
91 {
92     std::unique_lock lock(mutex_);
93     while (whenMap_.empty() && !isExit_) {
94         FFRT_LOGD("[%s] is empty, begin to wait", name_.c_str());
95         cond_.wait(lock);
96         FFRT_LOGD("[%s] is notified, end to wait", name_.c_str());
97     }
98 
99     if (isExit_) {
100         FFRT_LOGD("[%s] is exit", name_.c_str());
101         return nullptr;
102     }
103 
104     auto nowUs = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::steady_clock::now());
105     uint64_t now = static_cast<uint64_t>(nowUs.time_since_epoch().count());
106     auto it = whenMap_.begin();
107     if (now >= it->first) {
108         if (it->second.empty()) {
109             (void)whenMap_.erase(it);
110             return nullptr;
111         }
112         auto nextTask = *it->second.begin();
113         it->second.pop_front();
114         if (it->second.empty()) {
115             (void)whenMap_.erase(it);
116         }
117         mapSize_.store(whenMap_.size());
118         return nextTask;
119     } else {
120         uint64_t diff = it->first - now;
121         FFRT_LOGD("[%s] begin to wait for [%llu us] to get next task", name_.c_str(), diff);
122         (void)cond_.wait_for(lock, std::chrono::microseconds(diff));
123         FFRT_LOGD("[%s] end to wait for [%llu us]", name_.c_str(), diff);
124     }
125 
126     return nullptr;
127 }
128 } // namespace ffrt
129