1 /* 2 * Copyright (c) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 #ifndef FFRT_CONCURRENT_QUEUE_H 16 #define FFRT_CONCURRENT_QUEUE_H 17 18 #include "queue/base_queue.h" 19 20 namespace ffrt { 21 class ConcurrentQueue : public BaseQueue { 22 public: 23 explicit ConcurrentQueue(const int maxConcurrency = 1) maxConcurrency_(maxConcurrency)24 : maxConcurrency_(maxConcurrency) 25 { 26 dequeFunc_ = QueueStrategy<QueueTask>::DequeSingleByPriority; 27 headTaskVec_.resize(maxConcurrency_); 28 } 29 ~ConcurrentQueue() override; 30 31 int Push(QueueTask* task) override; 32 QueueTask* Pull() override; 33 int Remove() override; 34 int Remove(const char* name) override; 35 int Remove(const QueueTask* task) override; 36 void Stop() override; 37 int WaitAll() override; 38 std::vector<QueueTask*> GetHeadTask() override; 39 GetActiveStatus()40 bool GetActiveStatus() override 41 { 42 return concurrency_.load(); 43 } 44 GetQueueType()45 int GetQueueType() const override 46 { 47 return ffrt_queue_concurrent; 48 } 49 GetMapSize()50 inline uint64_t GetMapSize() override 51 { 52 std::lock_guard lock(mutex_); 53 return whenMap_.size() + waitingMap_.size(); 54 } 55 56 bool SetLoop(Loop* loop); 57 bool HasTask(const char* name) override; 58 ClearLoop()59 inline bool ClearLoop() 60 { 61 if (loop_ == nullptr) { 62 return false; 63 } 64 65 loop_ = nullptr; 66 return true; 67 } 68 IsOnLoop()69 bool IsOnLoop() override 70 { 71 return isOnLoop_.load(); 72 } 73 74 private: 75 int PushDelayTaskToTimer(QueueTask* task); 76 int PushAndCalConcurrency(QueueTask* task, ffrt_queue_priority_t taskPriority, std::unique_lock<ffrt::mutex>& lock, 77 bool needUnlock); 78 void Stop(std::multimap<uint64_t, QueueTask*>& whenMap); 79 80 Loop* loop_ { nullptr }; 81 std::atomic_bool isOnLoop_ { false }; 82 83 int maxConcurrency_ {1}; 84 std::vector<QueueTask*> headTaskVec_; 85 std::atomic_int concurrency_ {0}; 86 87 bool waitingAll_ = false; 88 std::multimap<uint64_t, QueueTask*> waitingMap_; 89 std::multimap<uint64_t, QueueTask*> whenMapVec_[4]; 90 std::vector<std::pair<uint64_t, QueueTask*>> allWhenmapTask; 91 }; 92 93 std::unique_ptr<BaseQueue> CreateConcurrentQueue(const ffrt_queue_attr_t* attr); 94 } // namespace ffrt 95 #endif // FFRT_CONCURRENT_QUEUE_H 96