• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef OHOS_DISTRIBUTED_DATA_FRAMEWORKS_COMMON_PRIORITY_QUEUE_H
17 #define OHOS_DISTRIBUTED_DATA_FRAMEWORKS_COMMON_PRIORITY_QUEUE_H
18 #include <map>
19 #include <memory>
20 #include <mutex>
21 #include <queue>
22 #include <set>
23 #include <shared_mutex>
24 #include <functional>
25 namespace OHOS {
26 template<typename _Tsk, typename _Tme, typename _Tid>
27 class PriorityQueue {
28 public:
29     struct PQMatrix {
30         _Tsk task_;
31         _Tid id_;
32         bool removed = false;
PQMatrixPQMatrix33         PQMatrix(_Tsk task, _Tid id) : task_(task), id_(id) {}
34     };
35     using TskIndex = typename std::map<_Tme, PQMatrix>::iterator;
36     using TskUpdater = typename std::function<std::pair<bool, _Tme>(_Tsk &element)>;
37 
38     PriorityQueue(const _Tsk &task, TskUpdater updater = nullptr)
INVALID_TSK(std::move (task))39         : INVALID_TSK(std::move(task)), updater_(std::move(updater))
40     {
41         if (!updater_) {
42             updater_ = [](_Tsk &) { return std::pair{false, _Tme()};};
43         }
44     }
Pop()45     _Tsk Pop()
46     {
47         std::unique_lock<decltype(pqMtx_)> lock(pqMtx_);
48         while (!tasks_.empty()) {
49             auto waitTme = tasks_.begin()->first;
50             if (waitTme > std::chrono::steady_clock::now()) {
51                 popCv_.wait_until(lock, waitTme);
52                 continue;
53             }
54             auto temp = tasks_.begin();
55             auto id = temp->second.id_;
56             running_.emplace(id, temp->second);
57             auto res = std::move(temp->second.task_);
58             tasks_.erase(temp);
59             indexes_.erase(id);
60             return res;
61         }
62         return INVALID_TSK;
63     }
64 
Push(_Tsk tsk,_Tid id,_Tme tme)65     bool Push(_Tsk tsk, _Tid id, _Tme tme)
66     {
67         std::unique_lock<std::mutex> lock(pqMtx_);
68         if (!tsk.Valid()) {
69             return false;
70         }
71         auto temp = tasks_.emplace(tme, PQMatrix(std::move(tsk), id));
72         indexes_.emplace(id, temp);
73         popCv_.notify_all();
74         return true;
75     }
76 
Size()77     size_t Size()
78     {
79         std::lock_guard<std::mutex> lock(pqMtx_);
80         return tasks_.size();
81     }
82 
Find(_Tid id)83     _Tsk Find(_Tid id)
84     {
85         std::unique_lock<decltype(pqMtx_)> lock(pqMtx_);
86         if (indexes_.find(id) != indexes_.end()) {
87             return indexes_[id]->second.task_;
88         }
89         return INVALID_TSK;
90     }
91 
Update(_Tid id,TskUpdater updater)92     bool Update(_Tid id, TskUpdater updater)
93     {
94         std::unique_lock<decltype(pqMtx_)> lock(pqMtx_);
95         auto index = indexes_.find(id);
96         if (index != indexes_.end()) {
97             auto [repeat, time] = updater(index->second->second.task_);
98             auto matrix = std::move(index->second->second);
99             tasks_.erase(index->second);
100             index->second = tasks_.emplace(time, std::move(matrix));
101             popCv_.notify_all();
102             return true;
103         }
104 
105         auto running = running_.find(id);
106         if (running != running_.end()) {
107             auto [repeat, time] = updater((*running).second.task_);
108             return repeat;
109         }
110 
111         return false;
112     }
113 
Remove(_Tid id,bool wait)114     bool Remove(_Tid id, bool wait)
115     {
116         std::unique_lock<decltype(pqMtx_)> lock(pqMtx_);
117         auto it = running_.find(id);
118         if (it != running_.end()) {
119             it->second.removed = true;
120         }
121         removeCv_.wait(lock, [this, id, wait] {
122             return !wait || running_.find(id) == running_.end();
123         });
124         auto index = indexes_.find(id);
125         if (index == indexes_.end()) {
126             return false;
127         }
128         tasks_.erase(index->second);
129         indexes_.erase(index);
130         popCv_.notify_all();
131         return true;
132     }
133 
Clean()134     void Clean()
135     {
136         std::unique_lock<decltype(pqMtx_)> lock(pqMtx_);
137         indexes_.clear();
138         tasks_.clear();
139         popCv_.notify_all();
140     }
141 
Finish(_Tid id)142     void Finish(_Tid id)
143     {
144         std::unique_lock<decltype(pqMtx_)> lock(pqMtx_);
145         auto it = running_.find(id);
146         if (it == running_.end()) {
147             return;
148         }
149         if (!it->second.removed) {
150             auto [repeat, time] = updater_(it->second.task_);
151             if (repeat) {
152                 indexes_.emplace(id, tasks_.emplace(time, std::move(it->second)));
153             }
154         }
155         running_.erase(it);
156         removeCv_.notify_all();
157     }
158 
159 private:
160     const _Tsk INVALID_TSK;
161     std::mutex pqMtx_;
162     std::condition_variable popCv_;
163     std::condition_variable removeCv_;
164     std::multimap<_Tme, PQMatrix> tasks_;
165     std::map<_Tid, PQMatrix> running_;
166     std::map<_Tid, TskIndex> indexes_;
167     TskUpdater updater_;
168 };
169 } // namespace OHOS
170 #endif //OHOS_DISTRIBUTED_DATA_FRAMEWORKS_COMMON_PRIORITY_QUEUE_H
171