1 /* 2 * Copyright (c) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef OHOS_DISTRIBUTED_DATA_FRAMEWORKS_COMMON_PRIORITY_QUEUE_H 17 #define OHOS_DISTRIBUTED_DATA_FRAMEWORKS_COMMON_PRIORITY_QUEUE_H 18 #include <map> 19 #include <memory> 20 #include <mutex> 21 #include <queue> 22 #include <set> 23 #include <shared_mutex> 24 #include <functional> 25 namespace OHOS { 26 template<typename _Tsk, typename _Tme, typename _Tid> 27 class PriorityQueue { 28 public: 29 struct PQMatrix { 30 _Tsk task_; 31 _Tid id_; 32 bool removed = false; PQMatrixPQMatrix33 PQMatrix(_Tsk task, _Tid id) : task_(task), id_(id) {} 34 }; 35 using TskIndex = typename std::map<_Tme, PQMatrix>::iterator; 36 using TskUpdater = typename std::function<std::pair<bool, _Tme>(_Tsk &element)>; 37 38 PriorityQueue(const _Tsk &task, TskUpdater updater = nullptr) INVALID_TSK(std::move (task))39 : INVALID_TSK(std::move(task)), updater_(std::move(updater)) 40 { 41 if (!updater_) { 42 updater_ = [](_Tsk &) { return std::pair{false, _Tme()};}; 43 } 44 } Pop()45 _Tsk Pop() 46 { 47 std::unique_lock<decltype(pqMtx_)> lock(pqMtx_); 48 while (!tasks_.empty()) { 49 auto waitTme = tasks_.begin()->first; 50 if (waitTme > std::chrono::steady_clock::now()) { 51 popCv_.wait_until(lock, waitTme); 52 continue; 53 } 54 auto temp = tasks_.begin(); 55 auto id = temp->second.id_; 56 running_.emplace(id, temp->second); 57 auto res = std::move(temp->second.task_); 58 tasks_.erase(temp); 59 indexes_.erase(id); 60 return res; 61 } 62 return INVALID_TSK; 63 } 64 Push(_Tsk tsk,_Tid id,_Tme tme)65 bool Push(_Tsk tsk, _Tid id, _Tme tme) 66 { 67 std::unique_lock<std::mutex> lock(pqMtx_); 68 if (!tsk.Valid()) { 69 return false; 70 } 71 auto temp = tasks_.emplace(tme, PQMatrix(std::move(tsk), id)); 72 indexes_.emplace(id, temp); 73 popCv_.notify_all(); 74 return true; 75 } 76 Size()77 size_t Size() 78 { 79 std::lock_guard<std::mutex> lock(pqMtx_); 80 return tasks_.size(); 81 } 82 Find(_Tid id)83 _Tsk Find(_Tid id) 84 { 85 std::unique_lock<decltype(pqMtx_)> lock(pqMtx_); 86 if (indexes_.find(id) != indexes_.end()) { 87 return indexes_[id]->second.task_; 88 } 89 return INVALID_TSK; 90 } 91 Update(_Tid id,TskUpdater updater)92 bool Update(_Tid id, TskUpdater updater) 93 { 94 std::unique_lock<decltype(pqMtx_)> lock(pqMtx_); 95 auto index = indexes_.find(id); 96 if (index != indexes_.end()) { 97 auto [repeat, time] = updater(index->second->second.task_); 98 auto matrix = std::move(index->second->second); 99 tasks_.erase(index->second); 100 index->second = tasks_.emplace(time, std::move(matrix)); 101 popCv_.notify_all(); 102 return true; 103 } 104 105 auto running = running_.find(id); 106 if (running != running_.end()) { 107 auto [repeat, time] = updater((*running).second.task_); 108 return repeat; 109 } 110 111 return false; 112 } 113 Remove(_Tid id,bool wait)114 bool Remove(_Tid id, bool wait) 115 { 116 std::unique_lock<decltype(pqMtx_)> lock(pqMtx_); 117 auto it = running_.find(id); 118 if (it != running_.end()) { 119 it->second.removed = true; 120 } 121 removeCv_.wait(lock, [this, id, wait] { 122 return !wait || running_.find(id) == running_.end(); 123 }); 124 auto index = indexes_.find(id); 125 if (index == indexes_.end()) { 126 return false; 127 } 128 tasks_.erase(index->second); 129 indexes_.erase(index); 130 popCv_.notify_all(); 131 return true; 132 } 133 Clean()134 void Clean() 135 { 136 std::unique_lock<decltype(pqMtx_)> lock(pqMtx_); 137 indexes_.clear(); 138 tasks_.clear(); 139 popCv_.notify_all(); 140 } 141 Finish(_Tid id)142 void Finish(_Tid id) 143 { 144 std::unique_lock<decltype(pqMtx_)> lock(pqMtx_); 145 auto it = running_.find(id); 146 if (it == running_.end()) { 147 return; 148 } 149 if (!it->second.removed) { 150 auto [repeat, time] = updater_(it->second.task_); 151 if (repeat) { 152 indexes_.emplace(id, tasks_.emplace(time, std::move(it->second))); 153 } 154 } 155 running_.erase(it); 156 removeCv_.notify_all(); 157 } 158 159 private: 160 const _Tsk INVALID_TSK; 161 std::mutex pqMtx_; 162 std::condition_variable popCv_; 163 std::condition_variable removeCv_; 164 std::multimap<_Tme, PQMatrix> tasks_; 165 std::map<_Tid, PQMatrix> running_; 166 std::map<_Tid, TskIndex> indexes_; 167 TskUpdater updater_; 168 }; 169 } // namespace OHOS 170 #endif //OHOS_DISTRIBUTED_DATA_FRAMEWORKS_COMMON_PRIORITY_QUEUE_H 171