• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifndef OHOS_APPEXECFWK_DELAY_QUEUE_H
16 #define OHOS_APPEXECFWK_DELAY_QUEUE_H
17 
18 #include <atomic>
19 #include <condition_variable>
20 #include <deque>
21 #include <functional>
22 #include <mutex>
23 #include <queue>
24 
25 #include "app_log_wrapper.h"
26 
27 namespace OHOS {
28 namespace AppExecFwk {
29 struct DelayTaskWrapper {
DelayTaskWrapperDelayTaskWrapper30     DelayTaskWrapper(long delayMs, std::function<void()> runnable) : runnable_(std::move(runnable))
31     {
32         auto now = std::chrono::high_resolution_clock::now();
33         startTime_ = now + std::chrono::milliseconds(delayMs);
34     }
35     DelayTaskWrapper() = delete;
~DelayTaskWrapperDelayTaskWrapper36     ~DelayTaskWrapper(){};
37 
38     std::chrono::high_resolution_clock::time_point startTime_;
39     std::function<void()> runnable_;
40 };
41 
42 struct CompareTaskDelayMs {
operatorCompareTaskDelayMs43     bool operator()(
44         const std::shared_ptr<DelayTaskWrapper> &other1, const std::shared_ptr<DelayTaskWrapper> &other2) const
45     {
46         return other1->startTime_ > other2->startTime_;
47     }
48 };
49 
50 class DelayQueue {
51 public:
DelayQueue()52     DelayQueue() : stopFlag_(false){};
~DelayQueue()53     ~DelayQueue(){};
54 
Offer(const std::shared_ptr<DelayTaskWrapper> & task)55     bool Offer(const std::shared_ptr<DelayTaskWrapper> &task)
56     {
57         std::unique_lock<std::mutex> lock(mutex_);
58         taskQueue_.push(task);
59         emptyWait_.notify_all();
60         return true;
61     }
62 
Take()63     std::shared_ptr<DelayTaskWrapper> Take()
64     {
65         while (true) {
66             std::unique_lock<std::mutex> lock(mutex_);
67             while (taskQueue_.empty() && !stopFlag_) {
68                 APP_LOGI("DelayQueue::taskQueue_ is empty");
69                 emptyWait_.wait(lock);
70             }
71 
72             if (taskQueue_.empty() && stopFlag_) {
73                 APP_LOGI("DelayQueue::taskQueue is empty and stopFlag is true");
74                 return nullptr;
75             }
76 
77             std::shared_ptr<DelayTaskWrapper> front = taskQueue_.top();
78             auto now = std::chrono::high_resolution_clock::now();
79             while (now < front->startTime_) {
80                 emptyWait_.wait_until(lock, front->startTime_);
81 
82                 now = std::chrono::high_resolution_clock::now();
83                 front = taskQueue_.top();
84             }
85             std::shared_ptr<DelayTaskWrapper> check = taskQueue_.top();
86             if (check->startTime_ == front->startTime_) {
87                 taskQueue_.pop();
88                 return front;
89             }
90         }
91     }
92 
Size()93     size_t Size()
94     {
95         std::unique_lock<std::mutex> lock(mutex_);
96         return taskQueue_.size();
97     }
98 
Empty()99     bool Empty()
100     {
101         std::unique_lock<std::mutex> lock(mutex_);
102         return taskQueue_.empty();
103     }
104 
Stop()105     void Stop()
106     {
107         std::unique_lock<std::mutex> lock(mutex_);
108         stopFlag_.store(true);
109         emptyWait_.notify_all();
110     }
111 
112     DelayQueue(const DelayQueue &) = delete;
113     DelayQueue &operator=(const DelayQueue &) = delete;
114 
115 private:
116     std::mutex mutex_;
117     std::condition_variable emptyWait_;
118     std::priority_queue<std::shared_ptr<DelayTaskWrapper>, std::vector<std::shared_ptr<DelayTaskWrapper>>,
119         CompareTaskDelayMs>
120         taskQueue_;
121     std::atomic<bool> stopFlag_;
122 };
123 }  // namespace AppExecFwk
124 }  // namespace OHOS
125 
126 #endif
127