/* * Copyright (c) 2021-2022 Huawei Device Co., Ltd. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef OHOS_APPEXECFWK_DELAY_QUEUE_H #define OHOS_APPEXECFWK_DELAY_QUEUE_H #include #include #include #include #include #include #include "hilog_wrapper.h" namespace OHOS { namespace AppExecFwk { struct DelayTaskWrapper { DelayTaskWrapper(long delayMs, std::function runnable) : runnable_(std::move(runnable)) { auto now = std::chrono::high_resolution_clock::now(); startTime_ = now + std::chrono::milliseconds(delayMs); } DelayTaskWrapper() = delete; ~DelayTaskWrapper(){}; std::chrono::high_resolution_clock::time_point startTime_; std::function runnable_; }; struct CompareTaskDelayMs { bool operator()( const std::shared_ptr &other1, const std::shared_ptr &other2) const { return other1->startTime_ > other2->startTime_; } }; class DelayQueue { public: DelayQueue() : stopFlag_(false){}; ~DelayQueue(){}; bool Offer(const std::shared_ptr &task) { std::unique_lock lock(mutex_); taskQueue_.push(task); emptyWait_.notify_all(); return true; } std::shared_ptr Take() { while (true) { std::unique_lock lock(mutex_); while (taskQueue_.empty() && !stopFlag_) { HILOG_INFO("DelayQueue::taskQueue_ is empty"); emptyWait_.wait(lock); } if (taskQueue_.empty() && stopFlag_) { HILOG_INFO("DelayQueue::taskQueue is empty and stopFlag is true"); return nullptr; } std::shared_ptr front = taskQueue_.top(); auto now = std::chrono::high_resolution_clock::now(); while (now < front->startTime_) { emptyWait_.wait_until(lock, front->startTime_); now = std::chrono::high_resolution_clock::now(); front = taskQueue_.top(); } std::shared_ptr check = taskQueue_.top(); if (check->startTime_ == front->startTime_) { taskQueue_.pop(); return front; } } } size_t Size() { std::unique_lock lock(mutex_); return taskQueue_.size(); } bool Empty() { std::unique_lock lock(mutex_); return taskQueue_.empty(); } void Stop() { std::unique_lock lock(mutex_); stopFlag_.store(true); emptyWait_.notify_all(); } DelayQueue(const DelayQueue &) = delete; DelayQueue &operator=(const DelayQueue &) = delete; private: std::mutex mutex_; std::condition_variable emptyWait_; std::priority_queue, std::vector>, CompareTaskDelayMs> taskQueue_; std::atomic stopFlag_; }; } // namespace AppExecFwk } // namespace OHOS #endif