• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifndef FFRT_QUEUE_HANDLER_H
16 #define FFRT_QUEUE_HANDLER_H
17 
18 #include <atomic>
19 #include <memory>
20 #include <string>
21 #include <shared_mutex>
22 #include <unordered_map>
23 #include "c/queue.h"
24 #include "c/queue_ext.h"
25 #include "cpp/task.h"
26 #include "base_queue.h"
27 #include "sched/execute_ctx.h"
28 #include "traffic_record.h"
29 #include "tm/task_base.h"
30 
31 namespace ffrt {
32 class QueueTask;
33 class SerialQueue;
34 class Loop;
35 
36 class QueueHandler {
37 public:
38     QueueHandler(const char* name, const ffrt_queue_attr_t* attr, const int type = ffrt_queue_serial);
39     ~QueueHandler();
40 
41     void Cancel();
42     void CancelAndWait();
43     int Cancel(const char* name);
44     int Cancel(QueueTask* task);
45     void Dispatch(QueueTask* inTask);
46     void Submit(QueueTask* task);
47     void TransferTask(QueueTask* task);
48     void TransferInitTask();
49 
50     std::string GetDfxInfo(int index) const;
51     std::pair<std::vector<uint64_t>, uint64_t> EvaluateTaskTimeout(uint64_t timeoutThreshold, uint64_t timeoutUs,
52         std::stringstream& ss);
53 
54     bool SetLoop(Loop* loop);
55     bool ClearLoop();
56 
IsOnLoop()57     inline bool IsOnLoop() const
58     {
59         return queue_->IsOnLoop();
60     }
61 
62     QueueTask* PickUpTask();
63 
IsValidForLoop()64     inline bool IsValidForLoop()
65     {
66         return !isUsed_.load() && (queue_->GetQueueType() == ffrt_queue_concurrent
67                || queue_->GetQueueType() == ffrt_queue_eventhandler_interactive);
68     }
69 
GetName()70     inline std::string GetName()
71     {
72         return name_;
73     }
74 
GetQueueId()75     inline uint32_t GetQueueId()
76     {
77         FFRT_COND_DO_ERR((queue_ == nullptr), return 0, "queue construct failed");
78         return queue_->GetQueueId();
79     }
80 
GetExecTaskId()81     inline uint32_t GetExecTaskId() const
82     {
83         return execTaskId_.load();
84     }
85 
HasTask(const char * name)86     inline bool HasTask(const char* name)
87     {
88         FFRT_COND_DO_ERR((queue_ == nullptr), return false, "[queueId=%u] constructed failed", GetQueueId());
89         return queue_->HasTask(name);
90     }
91 
GetTaskCnt()92     inline uint64_t GetTaskCnt()
93     {
94         FFRT_COND_DO_ERR((queue_ == nullptr), return false, "[queueId=%u] constructed failed", GetQueueId());
95         return queue_->GetMapSize();
96     }
97 
WaitAll()98     inline int WaitAll()
99     {
100         FFRT_COND_DO_ERR((queue_ == nullptr), return -1, "[queueId=%u] constructed failed", GetQueueId());
101         return queue_->WaitAll();
102     }
103 
GetQueueDueCount()104     inline uint64_t GetQueueDueCount()
105     {
106         FFRT_COND_DO_ERR((queue_ == nullptr), return 0, "[queueId=%u] constructed failed", GetQueueId());
107         return queue_->GetDueTaskCount();
108     }
109 
CheckDelayStatus()110     inline bool CheckDelayStatus()
111     {
112         return queue_->DelayStatus();
113     }
114 
115     bool IsIdle();
116     void SetEventHandler(void* eventHandler);
117     void* GetEventHandler();
118 
119     int Dump(const char* tag, char* buf, uint32_t len, bool historyInfo = true);
120     int DumpSize(ffrt_inner_queue_priority_t priority);
121 
GetQueue()122     inline const std::unique_ptr<BaseQueue>& GetQueue()
123     {
124         return queue_;
125     }
126 
GetType()127     inline int GetType()
128     {
129         return queue_->GetQueueType();
130     }
131 
GetMode()132     inline bool GetMode()
133     {
134         return threadMode_;
135     }
136 
137 private:
138     void Deliver();
139     void SetTimeoutMonitor(QueueTask* task);
140     void RemoveTimeoutMonitor(QueueTask* task);
141     void RunTimeOutCallback(QueueTask* task);
142 
143     void ReportTimeout(const std::vector<std::pair<uint64_t, std::string>>& timeoutTaskInfo);
144     bool ControlTimeoutFreq(uint64_t timeoutCnt);
145     void CheckSchedDeadline();
146     bool CheckExecutingTask();
147     void SendSchedTimer(TimePoint delay);
148     void AddSchedDeadline(QueueTask* task);
149     void RemoveSchedDeadline(QueueTask* task);
150     void ReportTaskTimeout(uint64_t timeoutUs, std::stringstream& ss, int index);
151     uint64_t CheckTimeSchedule(uint64_t time, uint64_t timeoutUs);
152 
153     void SetCurTask(QueueTask* task);
154     void UpdateCurTask(QueueTask* task);
155 
156     // queue info
157     std::string name_;
158     int qos_ = qos_default;
159     std::unique_ptr<BaseQueue> queue_ = nullptr;
160     std::atomic_bool isUsed_ = false;
161     std::atomic_uint64_t execTaskId_ = 0;
162     int maxConcurrency_ = 1;
163     std::vector<QueueTask*> curTaskVec_;
164     uint64_t desWaitCnt_ = 0;
165 
166     // for timeout watchdog
167     uint64_t timeout_ = 0;
168     std::vector<TimeoutTask> timeoutTaskVec_;
169     std::atomic_int delayedCbCnt_ = {0};
170     ffrt_function_header_t* timeoutCb_ = nullptr;
171     TrafficRecord trafficRecord_;
172     uint64_t trafficRecordInterval_ = DEFAULT_TRAFFIC_INTERVAL;
173 
174     ffrt::mutex mutex_;
175     bool initSchedTimer_ = false;
176     WaitUntilEntry* we_ = nullptr;
177     std::unordered_map<QueueTask*, uint64_t> schedDeadline_;
178     std::atomic_int deliverCnt_ = {0};
179     bool threadMode_ = false;
180 };
181 } // namespace ffrt
182 
183 #endif // FFRT_QUEUE_HANDLER_H
184