1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 #ifndef OHOS_APPEXECFWK_DELAY_QUEUE_H 16 #define OHOS_APPEXECFWK_DELAY_QUEUE_H 17 18 #include <atomic> 19 #include <condition_variable> 20 #include <deque> 21 #include <functional> 22 #include <mutex> 23 #include <queue> 24 25 #include "app_log_wrapper.h" 26 27 namespace OHOS { 28 namespace AppExecFwk { 29 struct DelayTaskWrapper { DelayTaskWrapperDelayTaskWrapper30 DelayTaskWrapper(long delayMs, std::function<void()> runnable) : runnable_(std::move(runnable)) 31 { 32 auto now = std::chrono::high_resolution_clock::now(); 33 startTime_ = now + std::chrono::milliseconds(delayMs); 34 } 35 DelayTaskWrapper() = delete; ~DelayTaskWrapperDelayTaskWrapper36 ~DelayTaskWrapper(){}; 37 38 std::chrono::high_resolution_clock::time_point startTime_; 39 std::function<void()> runnable_; 40 }; 41 42 struct CompareTaskDelayMs { operatorCompareTaskDelayMs43 bool operator()( 44 const std::shared_ptr<DelayTaskWrapper> &other1, const std::shared_ptr<DelayTaskWrapper> &other2) const 45 { 46 return other1->startTime_ > other2->startTime_; 47 } 48 }; 49 50 class DelayQueue { 51 public: DelayQueue()52 DelayQueue() : stopFlag_(false){}; ~DelayQueue()53 ~DelayQueue(){}; 54 Offer(const std::shared_ptr<DelayTaskWrapper> & task)55 bool Offer(const std::shared_ptr<DelayTaskWrapper> &task) 56 { 57 std::unique_lock<std::mutex> lock(mutex_); 58 taskQueue_.push(task); 59 emptyWait_.notify_all(); 60 return true; 61 } 62 Take()63 std::shared_ptr<DelayTaskWrapper> Take() 64 { 65 while (true) { 66 std::unique_lock<std::mutex> lock(mutex_); 67 while (taskQueue_.empty() && !stopFlag_) { 68 APP_LOGI("DelayQueue::taskQueue_ is empty"); 69 emptyWait_.wait(lock); 70 } 71 72 if (taskQueue_.empty() && stopFlag_) { 73 APP_LOGI("DelayQueue::taskQueue is empty and stopFlag is true"); 74 return nullptr; 75 } 76 77 std::shared_ptr<DelayTaskWrapper> front = taskQueue_.top(); 78 auto now = std::chrono::high_resolution_clock::now(); 79 while (now < front->startTime_) { 80 emptyWait_.wait_until(lock, front->startTime_); 81 82 now = std::chrono::high_resolution_clock::now(); 83 front = taskQueue_.top(); 84 } 85 std::shared_ptr<DelayTaskWrapper> check = taskQueue_.top(); 86 if (check->startTime_ == front->startTime_) { 87 taskQueue_.pop(); 88 return front; 89 } 90 } 91 } 92 Size()93 size_t Size() 94 { 95 std::unique_lock<std::mutex> lock(mutex_); 96 return taskQueue_.size(); 97 } 98 Empty()99 bool Empty() 100 { 101 std::unique_lock<std::mutex> lock(mutex_); 102 return taskQueue_.empty(); 103 } 104 Stop()105 void Stop() 106 { 107 std::unique_lock<std::mutex> lock(mutex_); 108 stopFlag_.store(true); 109 emptyWait_.notify_all(); 110 } 111 112 DelayQueue(const DelayQueue &) = delete; 113 DelayQueue &operator=(const DelayQueue &) = delete; 114 115 private: 116 std::mutex mutex_; 117 std::condition_variable emptyWait_; 118 std::priority_queue<std::shared_ptr<DelayTaskWrapper>, std::vector<std::shared_ptr<DelayTaskWrapper>>, 119 CompareTaskDelayMs> 120 taskQueue_; 121 std::atomic<bool> stopFlag_; 122 }; 123 } // namespace AppExecFwk 124 } // namespace OHOS 125 126 #endif 127