1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef DFSU_THREAD_SAFE_QUEUE_H 17 #define DFSU_THREAD_SAFE_QUEUE_H 18 19 #include <algorithm> 20 #include <deque> 21 22 #include "dfsu_exception.h" 23 24 namespace OHOS { 25 namespace Storage { 26 namespace DistributedFile { 27 /** 28 * @brief A Thread-safe Queue. 29 * 30 * Design choices: 31 * 1) unlimited capacity 32 * 2) throw exception to indicate failues 33 * 3) blocking pop interface 34 * 3) hatlable 35 * 36 * @tparam T Any type. Aggregate data type is prefered 37 * 38 */ 39 template<typename T> 40 class DfsuThreadSafeQueue { 41 public: Push(std::unique_ptr<T> pt)42 void Push(std::unique_ptr<T> pt) 43 { 44 if (!pt) { 45 ThrowException(ERR_UTILS_ACTOR_INVALID_CMD, "Push an empty cmd"); 46 } 47 std::unique_lock<std::mutex> lock(mutex_); 48 queue_.emplace_back(std::move(pt)); 49 cv_.notify_one(); 50 } 51 PushFront(std::unique_ptr<T> pt)52 void PushFront(std::unique_ptr<T> pt) 53 { 54 if (!pt) { 55 ThrowException(ERR_UTILS_ACTOR_INVALID_CMD, "Push an empty cmd"); 56 } 57 std::unique_lock<std::mutex> lock(mutex_); 58 queue_.emplace_front(std::move(pt)); 59 cv_.notify_one(); 60 } 61 WaitAndPop()62 std::unique_ptr<T> WaitAndPop() 63 { 64 std::unique_lock<std::mutex> lock(mutex_); 65 cv_.wait(lock, [&] { return !queue_.empty() || halted; }); 66 if (halted && queue_.empty()) { 67 ThrowException(ERR_UTILS_ACTOR_QUEUE_STOP, "Queue was halted"); 68 } 69 70 auto res = std::move(queue_.front()); 71 queue_.pop_front(); 72 return std::move(res); 73 } 74 ForEach(std::function<void (const std::unique_ptr<T> &)> executor)75 void ForEach(std::function<void(const std::unique_ptr<T> &)> executor) 76 { 77 std::unique_lock<std::mutex> lock(mutex_); 78 std::for_each(queue_.begin(), queue_.end(), executor); 79 } 80 Halt()81 void Halt() 82 { 83 halted = true; 84 cv_.notify_all(); 85 } 86 87 private: 88 std::deque<std::unique_ptr<T>> queue_; 89 std::mutex mutex_; 90 std::condition_variable cv_; 91 92 bool halted {false}; 93 }; 94 } // namespace DistributedFile 95 } // namespace Storage 96 } // namespace OHOS 97 #endif // DFSU_THREAD_SAFE_QUEUE_H