• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifndef FFRT_CONCURRENT_QUEUE_H
16 #define FFRT_CONCURRENT_QUEUE_H
17 
18 #include "queue/base_queue.h"
19 
20 namespace ffrt {
21 class ConcurrentQueue : public BaseQueue {
22 public:
23     explicit ConcurrentQueue(const int maxConcurrency = 1)
maxConcurrency_(maxConcurrency)24         : maxConcurrency_(maxConcurrency)
25     {
26         dequeFunc_ = QueueStrategy<QueueTask>::DequeSingleByPriority;
27         headTaskVec_.resize(maxConcurrency_);
28     }
29     ~ConcurrentQueue() override;
30 
31     int Push(QueueTask* task) override;
32     QueueTask* Pull() override;
33     int Remove() override;
34     int Remove(const char* name) override;
35     int Remove(const QueueTask* task) override;
36     void Stop() override;
37     int WaitAll() override;
38     std::vector<QueueTask*> GetHeadTask() override;
39 
GetActiveStatus()40     bool GetActiveStatus() override
41     {
42         return concurrency_.load();
43     }
44 
GetQueueType()45     int GetQueueType() const override
46     {
47         return ffrt_queue_concurrent;
48     }
49 
GetMapSize()50     inline uint64_t GetMapSize() override
51     {
52         std::lock_guard lock(mutex_);
53         return whenMap_.size() + waitingMap_.size();
54     }
55 
56     bool SetLoop(Loop* loop);
57     bool HasTask(const char* name) override;
58 
ClearLoop()59     inline bool ClearLoop()
60     {
61         if (loop_ == nullptr) {
62             return false;
63         }
64 
65         loop_ = nullptr;
66         return true;
67     }
68 
IsOnLoop()69     bool IsOnLoop() override
70     {
71         return isOnLoop_.load();
72     }
73 
74 private:
75     int PushDelayTaskToTimer(QueueTask* task);
76     int PushAndCalConcurrency(QueueTask* task, ffrt_queue_priority_t taskPriority, std::unique_lock<ffrt::mutex>& lock,
77         bool needUnlock);
78     void Stop(std::multimap<uint64_t, QueueTask*>& whenMap);
79 
80     Loop* loop_ { nullptr };
81     std::atomic_bool isOnLoop_ { false };
82 
83     int maxConcurrency_ {1};
84     std::vector<QueueTask*> headTaskVec_;
85     std::atomic_int concurrency_ {0};
86 
87     bool waitingAll_ = false;
88     std::multimap<uint64_t, QueueTask*> waitingMap_;
89     std::multimap<uint64_t, QueueTask*> whenMapVec_[4];
90     std::vector<std::pair<uint64_t, QueueTask*>> allWhenmapTask;
91 };
92 
93 std::unique_ptr<BaseQueue> CreateConcurrentQueue(const ffrt_queue_attr_t* attr);
94 } // namespace ffrt
95 #endif // FFRT_CONCURRENT_QUEUE_H
96