1 /* 2 * Copyright (c) 2022-2024 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef COMMUNICATION_NETMANAGER_BASE_DELAYED_QUEUE_H 17 #define COMMUNICATION_NETMANAGER_BASE_DELAYED_QUEUE_H 18 19 #include <array> 20 #include <atomic> 21 #include <condition_variable> 22 #include <map> 23 #include <memory> 24 #include <mutex> 25 #include <set> 26 #include <thread> 27 28 #include "ffrt_inner.h" 29 30 #ifndef CROSS_PLATFORM 31 #include "netnative_log_wrapper.h" 32 #endif 33 34 namespace OHOS::NetManagerStandard { 35 template <typename T, size_t ARRAY_SIZE, size_t DELAYED_COUNT> class DelayedQueue { 36 public: DelayedQueue()37 DelayedQueue() : index_(0), needRun_(true) 38 { 39 pthread_ = ffrt::thread([this]() { 40 #ifndef CROSS_PLATFORM 41 size_t allCounter = 0; 42 #endif 43 while (needRun_) { 44 { 45 std::lock_guard<ffrt::mutex> guard(mutex_); 46 #ifndef CROSS_PLATFORM 47 size_t counter = 0; 48 for (auto &temp : elems_) { 49 counter += temp.size(); 50 } 51 if (allCounter != counter) { 52 NETNATIVE_LOGI("dns:%{public}zu", counter); 53 allCounter = counter; 54 } 55 #endif 56 for (const auto &elem : elems_[index_]) { 57 if (elem) { 58 elem->Execute(); 59 } 60 indexMap_.erase(elem); 61 } 62 elems_[index_].clear(); 63 } 64 if (!needRun_) { 65 break; 66 } 67 std::unique_lock<ffrt::mutex> needRunLock(needRunMutex_); 68 needRunCondition_.wait_for(needRunLock, std::chrono::seconds(1), [this] { return !needRun_; }); 69 std::lock_guard<ffrt::mutex> guard(mutex_); 70 index_ = (index_ + 1) % (ARRAY_SIZE + DELAYED_COUNT); 71 } 72 }); 73 } 74 ~DelayedQueue()75 ~DelayedQueue() 76 { 77 // set needRun_ = false, and notify the thread to wake 78 needRun_ = false; 79 needRunCondition_.notify_all(); 80 if (pthread_.joinable()) { 81 pthread_.join(); 82 } 83 } 84 Put(const std::shared_ptr<T> & elem)85 void Put(const std::shared_ptr<T> &elem) 86 { 87 std::lock_guard<ffrt::mutex> guard(mutex_); 88 if (indexMap_.find(elem) != indexMap_.end()) { 89 int oldIndex = indexMap_[elem]; 90 if (oldIndex >= 0 && oldIndex < static_cast<int>(elems_.size()) && 91 (elems_[oldIndex].find(elem) != elems_[oldIndex].end())) { 92 elems_[oldIndex].erase(elem); 93 } 94 } 95 int index = (index_ + DELAYED_COUNT) % (ARRAY_SIZE + DELAYED_COUNT); 96 elems_[index].insert(elem); 97 indexMap_[elem] = index; 98 } 99 100 private: 101 ffrt::thread pthread_; 102 int index_; 103 ffrt::mutex mutex_; 104 std::atomic_bool needRun_; 105 ffrt::condition_variable needRunCondition_; 106 ffrt::mutex needRunMutex_; 107 std::array<std::set<std::shared_ptr<T>, std::owner_less<std::shared_ptr<T>>>, ARRAY_SIZE + DELAYED_COUNT> elems_; 108 std::map<std::shared_ptr<T>, int, std::owner_less<std::shared_ptr<T>>> indexMap_; 109 }; 110 } // namespace OHOS::NetManagerStandard 111 112 #endif // COMMUNICATION_NETMANAGER_BASE_DELAYED_QUEUE_H 113