• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "c/queue_ext.h"
16 #include "cpp/queue.h"
17 #include "core/task_wrapper.h"
18 #include "util/event_handler_adapter.h"
19 #include "dm/dependence_manager.h"
20 #include "tm/queue_task.h"
21 #include "queue/queue_handler.h"
22 #include "util/common_const.h"
23 
24 constexpr uint64_t MAX_TIMEOUT_US_COUNT = 1000000ULL * 100 * 60 * 60 * 24 * 365; // 100 year
25 
26 using namespace std;
27 using namespace ffrt;
28 
29 namespace {
ResetTimeoutCb(ffrt::queue_attr_private * p)30 inline void ResetTimeoutCb(ffrt::queue_attr_private* p)
31 {
32     if (p->timeoutCb_ == nullptr) {
33         return;
34     }
35     QueueTask* cbTask = GetQueueTaskByFuncStorageOffset(p->timeoutCb_);
36     cbTask->DecDeleteRef();
37     p->timeoutCb_ = nullptr;
38 }
39 
ffrt_queue_submit_base(ffrt_queue_t queue,ffrt_function_header_t * f,bool withHandle,bool insertHead,const ffrt_task_attr_t * attr)40 inline QueueTask* ffrt_queue_submit_base(ffrt_queue_t queue, ffrt_function_header_t* f, bool withHandle,
41     bool insertHead, const ffrt_task_attr_t* attr)
42 {
43     FFRT_COND_DO_ERR(unlikely(queue == nullptr), return nullptr, "input invalid, queue == nullptr");
44     FFRT_COND_DO_ERR(unlikely(f == nullptr), return nullptr, "input invalid, function header == nullptr");
45     QueueHandler* handler = static_cast<QueueHandler*>(queue);
46     ffrt::task_attr_private *p = reinterpret_cast<ffrt::task_attr_private *>(const_cast<ffrt_task_attr_t *>(attr));
47     QueueTask* task = GetQueueTaskByFuncStorageOffset(f);
48     new (task)ffrt::QueueTask(handler, p, insertHead);
49     if (withHandle) {
50         task->IncDeleteRef();
51     }
52 
53     handler->Submit(task);
54     return task;
55 }
56 
57 constexpr uint64_t MIN_TRAFFIC_INTERVAL_US = 1000000;
58 constexpr uint64_t MAX_TRAFFIC_INTERVAL_US = 600000000;
59 constexpr uint64_t DEFAULT_TRAFFIC_INTERVAL_US = 6000000;
60 } // namespace
61 
62 API_ATTRIBUTE((visibility("default")))
ffrt_queue_attr_init(ffrt_queue_attr_t * attr)63 int ffrt_queue_attr_init(ffrt_queue_attr_t* attr)
64 {
65     FFRT_COND_DO_ERR((attr == nullptr), return -1, "input invalid, attr == nullptr");
66     static_assert(sizeof(ffrt::queue_attr_private) <= ffrt_queue_attr_storage_size,
67         "size must be less than ffrt_queue_attr_storage_size");
68 
69     new (attr) ffrt::queue_attr_private();
70     return 0;
71 }
72 
73 API_ATTRIBUTE((visibility("default")))
ffrt_queue_attr_destroy(ffrt_queue_attr_t * attr)74 void ffrt_queue_attr_destroy(ffrt_queue_attr_t* attr)
75 {
76     FFRT_COND_DO_ERR((attr == nullptr), return, "input invalid, attr == nullptr");
77     auto p = reinterpret_cast<ffrt::queue_attr_private*>(attr);
78     ResetTimeoutCb(p);
79     p->~queue_attr_private();
80 }
81 
82 API_ATTRIBUTE((visibility("default")))
ffrt_queue_attr_set_qos(ffrt_queue_attr_t * attr,ffrt_qos_t qos)83 void ffrt_queue_attr_set_qos(ffrt_queue_attr_t* attr, ffrt_qos_t qos)
84 {
85     FFRT_COND_DO_ERR((attr == nullptr), return, "input invalid, attr == nullptr");
86     FFRT_COND_DO_ERR((ffrt::GetFuncQosMap() == nullptr), return, "input invalid, FuncQosMap has not regist");
87 
88     (reinterpret_cast<ffrt::queue_attr_private*>(attr))->qos_ = ffrt::GetFuncQosMap()(qos);
89 }
90 
91 API_ATTRIBUTE((visibility("default")))
ffrt_queue_attr_get_qos(const ffrt_queue_attr_t * attr)92 ffrt_qos_t ffrt_queue_attr_get_qos(const ffrt_queue_attr_t* attr)
93 {
94     FFRT_COND_DO_ERR((attr == nullptr), return ffrt_qos_default, "input invalid, attr == nullptr");
95     ffrt_queue_attr_t* p = const_cast<ffrt_queue_attr_t*>(attr);
96     return (reinterpret_cast<ffrt::queue_attr_private*>(p))->qos_;
97 }
98 
99 API_ATTRIBUTE((visibility("default")))
ffrt_queue_attr_set_timeout(ffrt_queue_attr_t * attr,uint64_t timeout_us)100 void ffrt_queue_attr_set_timeout(ffrt_queue_attr_t* attr, uint64_t timeout_us)
101 {
102     FFRT_COND_DO_ERR((attr == nullptr), return, "input invalid, attr == nullptr");
103     if (timeout_us < ONE_THOUSAND) {
104         (reinterpret_cast<ffrt::queue_attr_private*>(attr))->timeout_ = ONE_THOUSAND;
105         return;
106     }
107 
108     if (timeout_us > MAX_TIMEOUT_US_COUNT) {
109         FFRT_LOGW("timeout_us exceeds maximum allowed value %llu us. Clamping to %llu us.", timeout_us,
110             MAX_TIMEOUT_US_COUNT);
111         timeout_us = MAX_TIMEOUT_US_COUNT;
112     }
113 
114     (reinterpret_cast<ffrt::queue_attr_private*>(attr))->timeout_ = timeout_us;
115 }
116 
117 API_ATTRIBUTE((visibility("default")))
ffrt_queue_attr_get_timeout(const ffrt_queue_attr_t * attr)118 uint64_t ffrt_queue_attr_get_timeout(const ffrt_queue_attr_t* attr)
119 {
120     FFRT_COND_DO_ERR((attr == nullptr), return 0, "input invalid, attr == nullptr");
121     ffrt_queue_attr_t* p = const_cast<ffrt_queue_attr_t*>(attr);
122     return (reinterpret_cast<ffrt::queue_attr_private*>(p))->timeout_;
123 }
124 
125 API_ATTRIBUTE((visibility("default")))
ffrt_queue_attr_set_callback(ffrt_queue_attr_t * attr,ffrt_function_header_t * f)126 void ffrt_queue_attr_set_callback(ffrt_queue_attr_t* attr, ffrt_function_header_t* f)
127 {
128     FFRT_COND_DO_ERR((attr == nullptr), return, "input invalid, attr == nullptr");
129     FFRT_COND_DO_ERR((f == nullptr), return, "input invalid, f == nullptr");
130     ffrt::queue_attr_private* p = reinterpret_cast<ffrt::queue_attr_private*>(attr);
131     ResetTimeoutCb(p);
132     p->timeoutCb_ = f;
133     // the memory of timeoutCb are managed in the form of QueueTask
134     QueueTask* task = GetQueueTaskByFuncStorageOffset(f);
135     new (task)ffrt::QueueTask(nullptr);
136 }
137 
138 API_ATTRIBUTE((visibility("default")))
ffrt_queue_attr_get_callback(const ffrt_queue_attr_t * attr)139 ffrt_function_header_t* ffrt_queue_attr_get_callback(const ffrt_queue_attr_t* attr)
140 {
141     FFRT_COND_DO_ERR((attr == nullptr), return nullptr, "input invalid, attr == nullptr");
142     ffrt_queue_attr_t* p = const_cast<ffrt_queue_attr_t*>(attr);
143     return (reinterpret_cast<ffrt::queue_attr_private*>(p))->timeoutCb_;
144 }
145 
146 API_ATTRIBUTE((visibility("default")))
ffrt_queue_attr_set_max_concurrency(ffrt_queue_attr_t * attr,const int max_concurrency)147 void ffrt_queue_attr_set_max_concurrency(ffrt_queue_attr_t* attr, const int max_concurrency)
148 {
149     FFRT_COND_DO_ERR((attr == nullptr), return, "input invalid, attr == nullptr");
150 
151     FFRT_COND_DO_ERR((max_concurrency <= 0), return,
152         "max concurrency should be a valid value");
153 
154     (reinterpret_cast<ffrt::queue_attr_private*>(attr))->maxConcurrency_ = max_concurrency;
155 }
156 
157 API_ATTRIBUTE((visibility("default")))
ffrt_queue_attr_get_max_concurrency(const ffrt_queue_attr_t * attr)158 int ffrt_queue_attr_get_max_concurrency(const ffrt_queue_attr_t* attr)
159 {
160     FFRT_COND_DO_ERR((attr == nullptr), return 0, "input invalid, attr == nullptr");
161     ffrt_queue_attr_t* p = const_cast<ffrt_queue_attr_t*>(attr);
162     return (reinterpret_cast<ffrt::queue_attr_private*>(p))->maxConcurrency_;
163 }
164 
165 API_ATTRIBUTE((visibility("default")))
ffrt_queue_attr_set_thread_mode(ffrt_queue_attr_t * attr,bool mode)166 void ffrt_queue_attr_set_thread_mode(ffrt_queue_attr_t* attr, bool mode)
167 {
168     FFRT_COND_DO_ERR((attr == nullptr), return, "input invalid, attr == nullptr");
169 
170     (reinterpret_cast<ffrt::queue_attr_private*>(attr))->threadMode_ = mode;
171 }
172 
173 API_ATTRIBUTE((visibility("default")))
ffrt_queue_attr_get_thread_mode(const ffrt_queue_attr_t * attr)174 bool ffrt_queue_attr_get_thread_mode(const ffrt_queue_attr_t* attr)
175 {
176     FFRT_COND_DO_ERR((attr == nullptr), return 0, "input invalid, attr == nullptr");
177     ffrt_queue_attr_t* p = const_cast<ffrt_queue_attr_t*>(attr);
178     return (reinterpret_cast<ffrt::queue_attr_private*>(p))->threadMode_;
179 }
180 
181 API_ATTRIBUTE((visibility("default")))
ffrt_queue_create(ffrt_queue_type_t type,const char * name,const ffrt_queue_attr_t * attr)182 ffrt_queue_t ffrt_queue_create(ffrt_queue_type_t type, const char* name, const ffrt_queue_attr_t* attr)
183 {
184     bool invalidType = (type == ffrt_queue_max) || (type < ffrt_queue_serial) ||
185         (type >= static_cast<ffrt_queue_type_t>(ffrt_queue_inner_max));
186     FFRT_COND_DO_ERR(invalidType, return nullptr, "input invalid, type unsupport");
187     QueueHandler* handler = new (std::nothrow) QueueHandler(name, attr, type);
188     FFRT_COND_DO_ERR((handler == nullptr), return nullptr, "failed to construct QueueHandler");
189     return static_cast<ffrt_queue_t>(handler);
190 }
191 
192 API_ATTRIBUTE((visibility("default")))
ffrt_queue_destroy(ffrt_queue_t queue)193 void ffrt_queue_destroy(ffrt_queue_t queue)
194 {
195     FFRT_COND_DO_ERR((queue == nullptr), return, "input invalid, queue is nullptr");
196     QueueHandler* handler = static_cast<QueueHandler*>(queue);
197     delete handler;
198 }
199 
200 API_ATTRIBUTE((visibility("default")))
ffrt_queue_submit(ffrt_queue_t queue,ffrt_function_header_t * f,const ffrt_task_attr_t * attr)201 void ffrt_queue_submit(ffrt_queue_t queue, ffrt_function_header_t* f, const ffrt_task_attr_t* attr)
202 {
203     FFRT_COND_DO_ERR((f == nullptr), return, "input invalid, function is nullptr");
204     QueueTask* task = ffrt_queue_submit_base(queue, f, false, false, attr);
205     FFRT_COND_DO_ERR((task == nullptr), return, "failed to submit serial task");
206 }
207 
208 API_ATTRIBUTE((visibility("default")))
ffrt_queue_submit_f(ffrt_queue_t queue,ffrt_function_t func,void * arg,const ffrt_task_attr_t * attr)209 void ffrt_queue_submit_f(ffrt_queue_t queue, ffrt_function_t func, void* arg, const ffrt_task_attr_t* attr)
210 {
211     ffrt_function_header_t* f = ffrt_create_function_wrapper(func, nullptr, arg, ffrt_function_kind_queue);
212     ffrt_queue_submit(queue, f, attr);
213 }
214 
215 API_ATTRIBUTE((visibility("default")))
ffrt_queue_submit_head(ffrt_queue_t queue,ffrt_function_header_t * f,const ffrt_task_attr_t * attr)216 void ffrt_queue_submit_head(ffrt_queue_t queue, ffrt_function_header_t* f, const ffrt_task_attr_t* attr)
217 {
218     FFRT_COND_DO_ERR((f == nullptr), return, "input invalid, function is nullptr");
219     QueueTask* task = ffrt_queue_submit_base(queue, f, false, true, attr);
220     FFRT_COND_DO_ERR((task == nullptr), return, "failed to submit serial task");
221 }
222 
223 API_ATTRIBUTE((visibility("default")))
ffrt_queue_submit_h(ffrt_queue_t queue,ffrt_function_header_t * f,const ffrt_task_attr_t * attr)224 ffrt_task_handle_t ffrt_queue_submit_h(ffrt_queue_t queue, ffrt_function_header_t* f, const ffrt_task_attr_t* attr)
225 {
226     FFRT_COND_DO_ERR((f == nullptr), return nullptr, "input invalid, function is nullptr");
227     QueueTask* task = ffrt_queue_submit_base(queue, f, true, false, attr);
228     FFRT_COND_DO_ERR((task == nullptr), return nullptr, "failed to submit serial task");
229     return static_cast<ffrt_task_handle_t>(task);
230 }
231 
232 API_ATTRIBUTE((visibility("default")))
ffrt_queue_submit_h_f(ffrt_queue_t queue,ffrt_function_t func,void * arg,const ffrt_task_attr_t * attr)233 ffrt_task_handle_t ffrt_queue_submit_h_f(ffrt_queue_t queue, ffrt_function_t func, void* arg,
234     const ffrt_task_attr_t* attr)
235 {
236     ffrt_function_header_t* f = ffrt_create_function_wrapper(func, nullptr, arg, ffrt_function_kind_queue);
237     return ffrt_queue_submit_h(queue, f, attr);
238 }
239 
240 API_ATTRIBUTE((visibility("default")))
ffrt_queue_submit_head_h(ffrt_queue_t queue,ffrt_function_header_t * f,const ffrt_task_attr_t * attr)241 ffrt_task_handle_t ffrt_queue_submit_head_h(ffrt_queue_t queue, ffrt_function_header_t* f, const ffrt_task_attr_t* attr)
242 {
243     FFRT_COND_DO_ERR((f == nullptr), return nullptr, "input invalid, function is nullptr");
244     QueueTask* task = ffrt_queue_submit_base(queue, f, true, true, attr);
245     FFRT_COND_DO_ERR((task == nullptr), return nullptr, "failed to submit serial task");
246     return static_cast<ffrt_task_handle_t>(task);
247 }
248 
249 API_ATTRIBUTE((visibility("default")))
ffrt_queue_wait(ffrt_task_handle_t handle)250 void ffrt_queue_wait(ffrt_task_handle_t handle)
251 {
252     FFRT_COND_DO_ERR((handle == nullptr), return, "input invalid, task_handle is nullptr");
253     QueueTask* task = static_cast<QueueTask*>(handle);
254     task->Wait();
255 }
256 
257 API_ATTRIBUTE((visibility("default")))
ffrt_queue_get_task_cnt(ffrt_queue_t queue)258 uint64_t ffrt_queue_get_task_cnt(ffrt_queue_t queue)
259 {
260     FFRT_COND_DO_ERR(unlikely(queue == nullptr), return 0, "input invalid, queue == nullptr");
261     QueueHandler* handler = static_cast<QueueHandler*>(queue);
262     return handler->GetTaskCnt();
263 }
264 
265 API_ATTRIBUTE((visibility("default")))
ffrt_queue_cancel(ffrt_task_handle_t handle)266 int ffrt_queue_cancel(ffrt_task_handle_t handle)
267 {
268     FFRT_COND_DO_ERR((handle == nullptr), return -1, "input invalid, handle is nullptr");
269     QueueTask* task = reinterpret_cast<QueueTask*>(static_cast<CPUEUTask*>(handle));
270     QueueHandler* handler = task->GetHandler();
271     FFRT_COND_DO_ERR((handler == nullptr), return -1, "task handler is nullptr");
272     int ret = handler->Cancel(task);
273     return ret;
274 }
275 
276 API_ATTRIBUTE((visibility("default")))
ffrt_queue_cancel_all(ffrt_queue_t queue)277 void ffrt_queue_cancel_all(ffrt_queue_t queue)
278 {
279     FFRT_COND_DO_ERR(unlikely(queue == nullptr), return, "input invalid, queue is nullptr");
280     QueueHandler* handler = static_cast<QueueHandler*>(queue);
281     handler->Cancel();
282 }
283 
284 API_ATTRIBUTE((visibility("default")))
ffrt_queue_cancel_and_wait(ffrt_queue_t queue)285 void ffrt_queue_cancel_and_wait(ffrt_queue_t queue)
286 {
287     FFRT_COND_DO_ERR(unlikely(queue == nullptr), return, "input invalid, queue is nullptr");
288     QueueHandler* handler = static_cast<QueueHandler*>(queue);
289     handler->CancelAndWait();
290 }
291 
292 API_ATTRIBUTE((visibility("default")))
ffrt_queue_cancel_by_name(ffrt_queue_t queue,const char * name)293 int ffrt_queue_cancel_by_name(ffrt_queue_t queue, const char* name)
294 {
295     FFRT_COND_DO_ERR(unlikely(queue == nullptr), return -1, "input invalid, queue is nullptr");
296     FFRT_COND_DO_ERR(unlikely(name == nullptr), return -1, "input invalid, name is nullptr");
297     QueueHandler* handler = static_cast<QueueHandler*>(queue);
298     return handler->Cancel(name);
299 }
300 
301 API_ATTRIBUTE((visibility("default")))
ffrt_queue_has_task(ffrt_queue_t queue,const char * name)302 bool ffrt_queue_has_task(ffrt_queue_t queue, const char* name)
303 {
304     FFRT_COND_DO_ERR(unlikely(queue == nullptr), return false, "input invalid, queue is nullptr");
305     FFRT_COND_DO_ERR(unlikely(name == nullptr), return false, "input invalid, name is nullptr");
306     QueueHandler* handler = static_cast<QueueHandler*>(queue);
307     return handler->HasTask(name);
308 }
309 
310 API_ATTRIBUTE((visibility("default")))
ffrt_queue_is_idle(ffrt_queue_t queue)311 bool ffrt_queue_is_idle(ffrt_queue_t queue)
312 {
313     FFRT_COND_DO_ERR(unlikely(queue == nullptr), return false, "input invalid, queue is nullptr");
314     QueueHandler* handler = static_cast<QueueHandler*>(queue);
315     return handler->IsIdle();
316 }
317 
318 API_ATTRIBUTE((visibility("default")))
ffrt_queue_set_eventhandler(ffrt_queue_t queue,void * eventhandler)319 void ffrt_queue_set_eventhandler(ffrt_queue_t queue, void* eventhandler)
320 {
321     FFRT_COND_DO_ERR(unlikely(queue == nullptr), return, "input invalid, queue is nullptr");
322     QueueHandler* handler = static_cast<QueueHandler*>(queue);
323     handler->SetEventHandler(eventhandler);
324 }
325 
326 API_ATTRIBUTE((visibility("default")))
ffrt_get_current_queue_eventhandler(void)327 void* ffrt_get_current_queue_eventhandler(void)
328 {
329     TaskBase* curTask = ffrt::ExecuteCtx::Cur()->task;
330     if (curTask == nullptr || curTask->type != ffrt_queue_task) {
331         FFRT_LOGD("Current task is nullptr or is not a serial task.");
332         return nullptr;
333     }
334 
335     QueueHandler* handler = static_cast<QueueTask*>(curTask)->GetHandler();
336     FFRT_COND_DO_ERR((handler == nullptr), return nullptr, "task handler is nullptr");
337     return handler->GetEventHandler();
338 }
339 
340 API_ATTRIBUTE((visibility("default")))
ffrt_concurrent_queue_wait_all(ffrt_queue_t queue)341 int ffrt_concurrent_queue_wait_all(ffrt_queue_t queue)
342 {
343     FFRT_COND_DO_ERR(unlikely(queue == nullptr), return -1, "input invalid, queue is nullptr");
344     return static_cast<QueueHandler*>(queue)->WaitAll();
345 }
346 
347 API_ATTRIBUTE((visibility("default")))
ffrt_get_main_queue(void)348 ffrt_queue_t ffrt_get_main_queue(void)
349 {
350     FFRT_COND_DO_ERR((EventHandlerAdapter::Instance()->GetMainEventHandler == nullptr),
351         return nullptr, "failed to load GetMainEventHandler Func.");
352     static QueueHandler handler = QueueHandler("main_queue", nullptr, ffrt_queue_eventhandler_interactive);
353     if (!handler.GetEventHandler()) {
354         void* mainHandler = EventHandlerAdapter::Instance()->GetMainEventHandler();
355         FFRT_COND_DO_ERR((mainHandler == nullptr), return nullptr, "failed to get main queue.");
356         handler.SetEventHandler(mainHandler);
357     }
358     return static_cast<ffrt_queue_t>(&handler);
359 }
360 
361 API_ATTRIBUTE((visibility("default")))
ffrt_get_current_queue(void)362 ffrt_queue_t ffrt_get_current_queue(void)
363 {
364     FFRT_COND_DO_ERR((EventHandlerAdapter::Instance()->GetCurrentEventHandler == nullptr),
365         return nullptr, "failed to load GetCurrentEventHandler Func.");
366     void* workerHandler = EventHandlerAdapter::Instance()->GetCurrentEventHandler();
367     FFRT_COND_DO_ERR((workerHandler == nullptr), return nullptr, "failed to get ArkTs worker queue.");
368     QueueHandler *handler = new (std::nothrow) QueueHandler(
369         "current_queue", nullptr, ffrt_queue_eventhandler_interactive);
370     FFRT_COND_DO_ERR((handler == nullptr), return nullptr, "failed to construct WorkerThreadQueueHandler");
371     handler->SetEventHandler(workerHandler);
372     return static_cast<ffrt_queue_t>(handler);
373 }
374 
375 API_ATTRIBUTE((visibility("default")))
ffrt_queue_dump(ffrt_queue_t queue,const char * tag,char * buf,uint32_t len,bool history_info)376 int ffrt_queue_dump(ffrt_queue_t queue, const char* tag, char* buf, uint32_t len, bool history_info)
377 {
378     FFRT_COND_DO_ERR((queue == nullptr), return -1, "input invalid, queue is nullptr");
379     FFRT_COND_DO_ERR((tag == nullptr || buf == nullptr), return -1, "input invalid, tag or buf is nullptr");
380     QueueHandler* handler = static_cast<QueueHandler*>(queue);
381     return handler->Dump(tag, buf, len, history_info);
382 }
383 
384 API_ATTRIBUTE((visibility("default")))
ffrt_queue_size_dump(ffrt_queue_t queue,ffrt_inner_queue_priority_t priority)385 int ffrt_queue_size_dump(ffrt_queue_t queue, ffrt_inner_queue_priority_t priority)
386 {
387     if (priority > ffrt_inner_queue_priority_idle || priority < ffrt_inner_queue_priority_vip) {
388         FFRT_LOGE("priority:%d is not valid", priority);
389         return -1;
390     }
391     FFRT_COND_DO_ERR((queue == nullptr), return -1, "input invalid, queue is nullptr");
392     QueueHandler* handler = static_cast<QueueHandler*>(queue);
393     return handler->DumpSize(priority);
394 }