1 /* 2 * Copyright (c) 2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef COMMUNICATION_NETMANAGER_BASE_DELAYED_QUEUE_H 17 #define COMMUNICATION_NETMANAGER_BASE_DELAYED_QUEUE_H 18 19 #include <array> 20 #include <atomic> 21 #include <condition_variable> 22 #include <mutex> 23 #include <thread> 24 #include <set> 25 #include <map> 26 27 namespace OHOS::NetManagerStandard { 28 template <typename T, size_t ARRAY_SIZE, size_t DELAYED_COUNT> class DelayedQueue { 29 public: DelayedQueue()30 DelayedQueue() : index_(0), isRun_(false), needRun_(true) 31 { 32 std::thread([this] { 33 isRun_ = true; 34 35 while (needRun_) { 36 { 37 // deal with elems in elems[index_] 38 std::lock_guard<std::mutex> guard(mutex_); 39 for (const auto &elem : elems_[index_]) { 40 elem.Execute(); 41 } 42 elems_[index_].clear(); 43 } 44 45 // wait for one second or until needRun_ is false 46 std::unique_lock<std::mutex> needRunLock(needRunMutex_); 47 needRunCondition_.wait_for(needRunLock, std::chrono::seconds(1), [this] { return !needRun_; }); 48 49 std::lock_guard<std::mutex> guard(mutex_); 50 index_ = (index_ + 1) % (ARRAY_SIZE + DELAYED_COUNT); 51 } 52 53 isRun_ = false; 54 isRunCondition_.notify_all(); 55 }).detach(); 56 } 57 ~DelayedQueue()58 ~DelayedQueue() 59 { 60 // set needRun_ = false, and notify the thread to wake 61 needRun_ = false; 62 needRunCondition_.notify_all(); 63 64 // wait until isRun is false(isRun is false, means that the thread is end) 65 std::unique_lock<std::mutex> isRunLock(isRunMutex_); 66 isRunCondition_.wait(isRunLock, [this] { return !isRun_; }); 67 } 68 Put(const T & elem)69 void Put(const T &elem) 70 { 71 std::lock_guard<std::mutex> guard(mutex_); 72 if (indexMap_.find(elem) != indexMap_.end()) { 73 int oldIndex = indexMap_[elem]; 74 if (oldIndex >= 0 && oldIndex < static_cast<int>(elems_.size()) && 75 (elems_[oldIndex].find(elem) != elems_[oldIndex].end())) { 76 elems_[oldIndex].erase(elem); 77 } 78 } 79 int index = (index_ + DELAYED_COUNT) % (ARRAY_SIZE + DELAYED_COUNT); 80 elems_[index].insert(elem); 81 indexMap_[elem] = index; 82 } 83 84 private: 85 int index_; 86 std::mutex mutex_; 87 std::atomic_bool isRun_; 88 std::atomic_bool needRun_; 89 std::condition_variable isRunCondition_; 90 std::condition_variable needRunCondition_; 91 std::mutex isRunMutex_; 92 std::mutex needRunMutex_; 93 std::array<std::set<T>, ARRAY_SIZE + DELAYED_COUNT> elems_; 94 std::map<T, int> indexMap_; 95 }; 96 } // namespace OHOS::NetManagerStandard 97 98 #endif // COMMUNICATION_NETMANAGER_BASE_DELAYED_QUEUE_H 99