1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "c/queue_ext.h"
16 #include "cpp/queue.h"
17 #include "core/task_wrapper.h"
18 #include "util/event_handler_adapter.h"
19 #include "dm/dependence_manager.h"
20 #include "tm/queue_task.h"
21 #include "queue/queue_handler.h"
22 #include "util/common_const.h"
23
24 constexpr uint64_t MAX_TIMEOUT_US_COUNT = 1000000ULL * 100 * 60 * 60 * 24 * 365; // 100 year
25
26 using namespace std;
27 using namespace ffrt;
28
29 namespace {
ResetTimeoutCb(ffrt::queue_attr_private * p)30 inline void ResetTimeoutCb(ffrt::queue_attr_private* p)
31 {
32 if (p->timeoutCb_ == nullptr) {
33 return;
34 }
35 QueueTask* cbTask = GetQueueTaskByFuncStorageOffset(p->timeoutCb_);
36 cbTask->DecDeleteRef();
37 p->timeoutCb_ = nullptr;
38 }
39
ffrt_queue_submit_base(ffrt_queue_t queue,ffrt_function_header_t * f,bool withHandle,bool insertHead,const ffrt_task_attr_t * attr)40 inline QueueTask* ffrt_queue_submit_base(ffrt_queue_t queue, ffrt_function_header_t* f, bool withHandle,
41 bool insertHead, const ffrt_task_attr_t* attr)
42 {
43 FFRT_COND_DO_ERR(unlikely(queue == nullptr), return nullptr, "input invalid, queue == nullptr");
44 FFRT_COND_DO_ERR(unlikely(f == nullptr), return nullptr, "input invalid, function header == nullptr");
45 QueueHandler* handler = static_cast<QueueHandler*>(queue);
46 ffrt::task_attr_private *p = reinterpret_cast<ffrt::task_attr_private *>(const_cast<ffrt_task_attr_t *>(attr));
47 QueueTask* task = GetQueueTaskByFuncStorageOffset(f);
48 new (task)ffrt::QueueTask(handler, p, insertHead);
49 if (withHandle) {
50 task->IncDeleteRef();
51 }
52
53 handler->Submit(task);
54 return task;
55 }
56
57 constexpr uint64_t MIN_TRAFFIC_INTERVAL_US = 1000000;
58 constexpr uint64_t MAX_TRAFFIC_INTERVAL_US = 600000000;
59 constexpr uint64_t DEFAULT_TRAFFIC_INTERVAL_US = 6000000;
60 } // namespace
61
62 API_ATTRIBUTE((visibility("default")))
ffrt_queue_attr_init(ffrt_queue_attr_t * attr)63 int ffrt_queue_attr_init(ffrt_queue_attr_t* attr)
64 {
65 FFRT_COND_DO_ERR((attr == nullptr), return -1, "input invalid, attr == nullptr");
66 static_assert(sizeof(ffrt::queue_attr_private) <= ffrt_queue_attr_storage_size,
67 "size must be less than ffrt_queue_attr_storage_size");
68
69 new (attr) ffrt::queue_attr_private();
70 return 0;
71 }
72
73 API_ATTRIBUTE((visibility("default")))
ffrt_queue_attr_destroy(ffrt_queue_attr_t * attr)74 void ffrt_queue_attr_destroy(ffrt_queue_attr_t* attr)
75 {
76 FFRT_COND_DO_ERR((attr == nullptr), return, "input invalid, attr == nullptr");
77 auto p = reinterpret_cast<ffrt::queue_attr_private*>(attr);
78 ResetTimeoutCb(p);
79 p->~queue_attr_private();
80 }
81
82 API_ATTRIBUTE((visibility("default")))
ffrt_queue_attr_set_qos(ffrt_queue_attr_t * attr,ffrt_qos_t qos)83 void ffrt_queue_attr_set_qos(ffrt_queue_attr_t* attr, ffrt_qos_t qos)
84 {
85 FFRT_COND_DO_ERR((attr == nullptr), return, "input invalid, attr == nullptr");
86 FFRT_COND_DO_ERR((ffrt::GetFuncQosMap() == nullptr), return, "input invalid, FuncQosMap has not regist");
87
88 (reinterpret_cast<ffrt::queue_attr_private*>(attr))->qos_ = ffrt::GetFuncQosMap()(qos);
89 }
90
91 API_ATTRIBUTE((visibility("default")))
ffrt_queue_attr_get_qos(const ffrt_queue_attr_t * attr)92 ffrt_qos_t ffrt_queue_attr_get_qos(const ffrt_queue_attr_t* attr)
93 {
94 FFRT_COND_DO_ERR((attr == nullptr), return ffrt_qos_default, "input invalid, attr == nullptr");
95 ffrt_queue_attr_t* p = const_cast<ffrt_queue_attr_t*>(attr);
96 return (reinterpret_cast<ffrt::queue_attr_private*>(p))->qos_;
97 }
98
99 API_ATTRIBUTE((visibility("default")))
ffrt_queue_attr_set_timeout(ffrt_queue_attr_t * attr,uint64_t timeout_us)100 void ffrt_queue_attr_set_timeout(ffrt_queue_attr_t* attr, uint64_t timeout_us)
101 {
102 FFRT_COND_DO_ERR((attr == nullptr), return, "input invalid, attr == nullptr");
103 if (timeout_us < ONE_THOUSAND) {
104 (reinterpret_cast<ffrt::queue_attr_private*>(attr))->timeout_ = ONE_THOUSAND;
105 return;
106 }
107
108 if (timeout_us > MAX_TIMEOUT_US_COUNT) {
109 FFRT_LOGW("timeout_us exceeds maximum allowed value %llu us. Clamping to %llu us.", timeout_us,
110 MAX_TIMEOUT_US_COUNT);
111 timeout_us = MAX_TIMEOUT_US_COUNT;
112 }
113
114 (reinterpret_cast<ffrt::queue_attr_private*>(attr))->timeout_ = timeout_us;
115 }
116
117 API_ATTRIBUTE((visibility("default")))
ffrt_queue_attr_get_timeout(const ffrt_queue_attr_t * attr)118 uint64_t ffrt_queue_attr_get_timeout(const ffrt_queue_attr_t* attr)
119 {
120 FFRT_COND_DO_ERR((attr == nullptr), return 0, "input invalid, attr == nullptr");
121 ffrt_queue_attr_t* p = const_cast<ffrt_queue_attr_t*>(attr);
122 return (reinterpret_cast<ffrt::queue_attr_private*>(p))->timeout_;
123 }
124
125 API_ATTRIBUTE((visibility("default")))
ffrt_queue_attr_set_callback(ffrt_queue_attr_t * attr,ffrt_function_header_t * f)126 void ffrt_queue_attr_set_callback(ffrt_queue_attr_t* attr, ffrt_function_header_t* f)
127 {
128 FFRT_COND_DO_ERR((attr == nullptr), return, "input invalid, attr == nullptr");
129 FFRT_COND_DO_ERR((f == nullptr), return, "input invalid, f == nullptr");
130 ffrt::queue_attr_private* p = reinterpret_cast<ffrt::queue_attr_private*>(attr);
131 ResetTimeoutCb(p);
132 p->timeoutCb_ = f;
133 // the memory of timeoutCb are managed in the form of QueueTask
134 QueueTask* task = GetQueueTaskByFuncStorageOffset(f);
135 new (task)ffrt::QueueTask(nullptr);
136 }
137
138 API_ATTRIBUTE((visibility("default")))
ffrt_queue_attr_get_callback(const ffrt_queue_attr_t * attr)139 ffrt_function_header_t* ffrt_queue_attr_get_callback(const ffrt_queue_attr_t* attr)
140 {
141 FFRT_COND_DO_ERR((attr == nullptr), return nullptr, "input invalid, attr == nullptr");
142 ffrt_queue_attr_t* p = const_cast<ffrt_queue_attr_t*>(attr);
143 return (reinterpret_cast<ffrt::queue_attr_private*>(p))->timeoutCb_;
144 }
145
146 API_ATTRIBUTE((visibility("default")))
ffrt_queue_attr_set_max_concurrency(ffrt_queue_attr_t * attr,const int max_concurrency)147 void ffrt_queue_attr_set_max_concurrency(ffrt_queue_attr_t* attr, const int max_concurrency)
148 {
149 FFRT_COND_DO_ERR((attr == nullptr), return, "input invalid, attr == nullptr");
150
151 FFRT_COND_DO_ERR((max_concurrency <= 0), return,
152 "max concurrency should be a valid value");
153
154 (reinterpret_cast<ffrt::queue_attr_private*>(attr))->maxConcurrency_ = max_concurrency;
155 }
156
157 API_ATTRIBUTE((visibility("default")))
ffrt_queue_attr_get_max_concurrency(const ffrt_queue_attr_t * attr)158 int ffrt_queue_attr_get_max_concurrency(const ffrt_queue_attr_t* attr)
159 {
160 FFRT_COND_DO_ERR((attr == nullptr), return 0, "input invalid, attr == nullptr");
161 ffrt_queue_attr_t* p = const_cast<ffrt_queue_attr_t*>(attr);
162 return (reinterpret_cast<ffrt::queue_attr_private*>(p))->maxConcurrency_;
163 }
164
165 API_ATTRIBUTE((visibility("default")))
ffrt_queue_attr_set_thread_mode(ffrt_queue_attr_t * attr,bool mode)166 void ffrt_queue_attr_set_thread_mode(ffrt_queue_attr_t* attr, bool mode)
167 {
168 FFRT_COND_DO_ERR((attr == nullptr), return, "input invalid, attr == nullptr");
169
170 (reinterpret_cast<ffrt::queue_attr_private*>(attr))->threadMode_ = mode;
171 }
172
173 API_ATTRIBUTE((visibility("default")))
ffrt_queue_attr_get_thread_mode(const ffrt_queue_attr_t * attr)174 bool ffrt_queue_attr_get_thread_mode(const ffrt_queue_attr_t* attr)
175 {
176 FFRT_COND_DO_ERR((attr == nullptr), return 0, "input invalid, attr == nullptr");
177 ffrt_queue_attr_t* p = const_cast<ffrt_queue_attr_t*>(attr);
178 return (reinterpret_cast<ffrt::queue_attr_private*>(p))->threadMode_;
179 }
180
181 API_ATTRIBUTE((visibility("default")))
ffrt_queue_create(ffrt_queue_type_t type,const char * name,const ffrt_queue_attr_t * attr)182 ffrt_queue_t ffrt_queue_create(ffrt_queue_type_t type, const char* name, const ffrt_queue_attr_t* attr)
183 {
184 bool invalidType = (type == ffrt_queue_max) || (type < ffrt_queue_serial) ||
185 (type >= static_cast<ffrt_queue_type_t>(ffrt_queue_inner_max));
186 FFRT_COND_DO_ERR(invalidType, return nullptr, "input invalid, type unsupport");
187 QueueHandler* handler = new (std::nothrow) QueueHandler(name, attr, type);
188 FFRT_COND_DO_ERR((handler == nullptr), return nullptr, "failed to construct QueueHandler");
189 return static_cast<ffrt_queue_t>(handler);
190 }
191
192 API_ATTRIBUTE((visibility("default")))
ffrt_queue_destroy(ffrt_queue_t queue)193 void ffrt_queue_destroy(ffrt_queue_t queue)
194 {
195 FFRT_COND_DO_ERR((queue == nullptr), return, "input invalid, queue is nullptr");
196 QueueHandler* handler = static_cast<QueueHandler*>(queue);
197 delete handler;
198 }
199
200 API_ATTRIBUTE((visibility("default")))
ffrt_queue_submit(ffrt_queue_t queue,ffrt_function_header_t * f,const ffrt_task_attr_t * attr)201 void ffrt_queue_submit(ffrt_queue_t queue, ffrt_function_header_t* f, const ffrt_task_attr_t* attr)
202 {
203 FFRT_COND_DO_ERR((f == nullptr), return, "input invalid, function is nullptr");
204 QueueTask* task = ffrt_queue_submit_base(queue, f, false, false, attr);
205 FFRT_COND_DO_ERR((task == nullptr), return, "failed to submit serial task");
206 }
207
208 API_ATTRIBUTE((visibility("default")))
ffrt_queue_submit_f(ffrt_queue_t queue,ffrt_function_t func,void * arg,const ffrt_task_attr_t * attr)209 void ffrt_queue_submit_f(ffrt_queue_t queue, ffrt_function_t func, void* arg, const ffrt_task_attr_t* attr)
210 {
211 ffrt_function_header_t* f = ffrt_create_function_wrapper(func, nullptr, arg, ffrt_function_kind_queue);
212 ffrt_queue_submit(queue, f, attr);
213 }
214
215 API_ATTRIBUTE((visibility("default")))
ffrt_queue_submit_head(ffrt_queue_t queue,ffrt_function_header_t * f,const ffrt_task_attr_t * attr)216 void ffrt_queue_submit_head(ffrt_queue_t queue, ffrt_function_header_t* f, const ffrt_task_attr_t* attr)
217 {
218 FFRT_COND_DO_ERR((f == nullptr), return, "input invalid, function is nullptr");
219 QueueTask* task = ffrt_queue_submit_base(queue, f, false, true, attr);
220 FFRT_COND_DO_ERR((task == nullptr), return, "failed to submit serial task");
221 }
222
223 API_ATTRIBUTE((visibility("default")))
ffrt_queue_submit_h(ffrt_queue_t queue,ffrt_function_header_t * f,const ffrt_task_attr_t * attr)224 ffrt_task_handle_t ffrt_queue_submit_h(ffrt_queue_t queue, ffrt_function_header_t* f, const ffrt_task_attr_t* attr)
225 {
226 FFRT_COND_DO_ERR((f == nullptr), return nullptr, "input invalid, function is nullptr");
227 QueueTask* task = ffrt_queue_submit_base(queue, f, true, false, attr);
228 FFRT_COND_DO_ERR((task == nullptr), return nullptr, "failed to submit serial task");
229 return static_cast<ffrt_task_handle_t>(task);
230 }
231
232 API_ATTRIBUTE((visibility("default")))
ffrt_queue_submit_h_f(ffrt_queue_t queue,ffrt_function_t func,void * arg,const ffrt_task_attr_t * attr)233 ffrt_task_handle_t ffrt_queue_submit_h_f(ffrt_queue_t queue, ffrt_function_t func, void* arg,
234 const ffrt_task_attr_t* attr)
235 {
236 ffrt_function_header_t* f = ffrt_create_function_wrapper(func, nullptr, arg, ffrt_function_kind_queue);
237 return ffrt_queue_submit_h(queue, f, attr);
238 }
239
240 API_ATTRIBUTE((visibility("default")))
ffrt_queue_submit_head_h(ffrt_queue_t queue,ffrt_function_header_t * f,const ffrt_task_attr_t * attr)241 ffrt_task_handle_t ffrt_queue_submit_head_h(ffrt_queue_t queue, ffrt_function_header_t* f, const ffrt_task_attr_t* attr)
242 {
243 FFRT_COND_DO_ERR((f == nullptr), return nullptr, "input invalid, function is nullptr");
244 QueueTask* task = ffrt_queue_submit_base(queue, f, true, true, attr);
245 FFRT_COND_DO_ERR((task == nullptr), return nullptr, "failed to submit serial task");
246 return static_cast<ffrt_task_handle_t>(task);
247 }
248
249 API_ATTRIBUTE((visibility("default")))
ffrt_queue_wait(ffrt_task_handle_t handle)250 void ffrt_queue_wait(ffrt_task_handle_t handle)
251 {
252 FFRT_COND_DO_ERR((handle == nullptr), return, "input invalid, task_handle is nullptr");
253 QueueTask* task = static_cast<QueueTask*>(handle);
254 task->Wait();
255 }
256
257 API_ATTRIBUTE((visibility("default")))
ffrt_queue_get_task_cnt(ffrt_queue_t queue)258 uint64_t ffrt_queue_get_task_cnt(ffrt_queue_t queue)
259 {
260 FFRT_COND_DO_ERR(unlikely(queue == nullptr), return 0, "input invalid, queue == nullptr");
261 QueueHandler* handler = static_cast<QueueHandler*>(queue);
262 return handler->GetTaskCnt();
263 }
264
265 API_ATTRIBUTE((visibility("default")))
ffrt_queue_cancel(ffrt_task_handle_t handle)266 int ffrt_queue_cancel(ffrt_task_handle_t handle)
267 {
268 FFRT_COND_DO_ERR((handle == nullptr), return -1, "input invalid, handle is nullptr");
269 QueueTask* task = reinterpret_cast<QueueTask*>(static_cast<CPUEUTask*>(handle));
270 QueueHandler* handler = task->GetHandler();
271 FFRT_COND_DO_ERR((handler == nullptr), return -1, "task handler is nullptr");
272 int ret = handler->Cancel(task);
273 return ret;
274 }
275
276 API_ATTRIBUTE((visibility("default")))
ffrt_queue_cancel_all(ffrt_queue_t queue)277 void ffrt_queue_cancel_all(ffrt_queue_t queue)
278 {
279 FFRT_COND_DO_ERR(unlikely(queue == nullptr), return, "input invalid, queue is nullptr");
280 QueueHandler* handler = static_cast<QueueHandler*>(queue);
281 handler->Cancel();
282 }
283
284 API_ATTRIBUTE((visibility("default")))
ffrt_queue_cancel_and_wait(ffrt_queue_t queue)285 void ffrt_queue_cancel_and_wait(ffrt_queue_t queue)
286 {
287 FFRT_COND_DO_ERR(unlikely(queue == nullptr), return, "input invalid, queue is nullptr");
288 QueueHandler* handler = static_cast<QueueHandler*>(queue);
289 handler->CancelAndWait();
290 }
291
292 API_ATTRIBUTE((visibility("default")))
ffrt_queue_cancel_by_name(ffrt_queue_t queue,const char * name)293 int ffrt_queue_cancel_by_name(ffrt_queue_t queue, const char* name)
294 {
295 FFRT_COND_DO_ERR(unlikely(queue == nullptr), return -1, "input invalid, queue is nullptr");
296 FFRT_COND_DO_ERR(unlikely(name == nullptr), return -1, "input invalid, name is nullptr");
297 QueueHandler* handler = static_cast<QueueHandler*>(queue);
298 return handler->Cancel(name);
299 }
300
301 API_ATTRIBUTE((visibility("default")))
ffrt_queue_has_task(ffrt_queue_t queue,const char * name)302 bool ffrt_queue_has_task(ffrt_queue_t queue, const char* name)
303 {
304 FFRT_COND_DO_ERR(unlikely(queue == nullptr), return false, "input invalid, queue is nullptr");
305 FFRT_COND_DO_ERR(unlikely(name == nullptr), return false, "input invalid, name is nullptr");
306 QueueHandler* handler = static_cast<QueueHandler*>(queue);
307 return handler->HasTask(name);
308 }
309
310 API_ATTRIBUTE((visibility("default")))
ffrt_queue_is_idle(ffrt_queue_t queue)311 bool ffrt_queue_is_idle(ffrt_queue_t queue)
312 {
313 FFRT_COND_DO_ERR(unlikely(queue == nullptr), return false, "input invalid, queue is nullptr");
314 QueueHandler* handler = static_cast<QueueHandler*>(queue);
315 return handler->IsIdle();
316 }
317
318 API_ATTRIBUTE((visibility("default")))
ffrt_queue_set_eventhandler(ffrt_queue_t queue,void * eventhandler)319 void ffrt_queue_set_eventhandler(ffrt_queue_t queue, void* eventhandler)
320 {
321 FFRT_COND_DO_ERR(unlikely(queue == nullptr), return, "input invalid, queue is nullptr");
322 QueueHandler* handler = static_cast<QueueHandler*>(queue);
323 handler->SetEventHandler(eventhandler);
324 }
325
326 API_ATTRIBUTE((visibility("default")))
ffrt_get_current_queue_eventhandler(void)327 void* ffrt_get_current_queue_eventhandler(void)
328 {
329 TaskBase* curTask = ffrt::ExecuteCtx::Cur()->task;
330 if (curTask == nullptr || curTask->type != ffrt_queue_task) {
331 FFRT_LOGD("Current task is nullptr or is not a serial task.");
332 return nullptr;
333 }
334
335 QueueHandler* handler = static_cast<QueueTask*>(curTask)->GetHandler();
336 FFRT_COND_DO_ERR((handler == nullptr), return nullptr, "task handler is nullptr");
337 return handler->GetEventHandler();
338 }
339
340 API_ATTRIBUTE((visibility("default")))
ffrt_concurrent_queue_wait_all(ffrt_queue_t queue)341 int ffrt_concurrent_queue_wait_all(ffrt_queue_t queue)
342 {
343 FFRT_COND_DO_ERR(unlikely(queue == nullptr), return -1, "input invalid, queue is nullptr");
344 return static_cast<QueueHandler*>(queue)->WaitAll();
345 }
346
347 API_ATTRIBUTE((visibility("default")))
ffrt_get_main_queue(void)348 ffrt_queue_t ffrt_get_main_queue(void)
349 {
350 FFRT_COND_DO_ERR((EventHandlerAdapter::Instance()->GetMainEventHandler == nullptr),
351 return nullptr, "failed to load GetMainEventHandler Func.");
352 static QueueHandler handler = QueueHandler("main_queue", nullptr, ffrt_queue_eventhandler_interactive);
353 if (!handler.GetEventHandler()) {
354 void* mainHandler = EventHandlerAdapter::Instance()->GetMainEventHandler();
355 FFRT_COND_DO_ERR((mainHandler == nullptr), return nullptr, "failed to get main queue.");
356 handler.SetEventHandler(mainHandler);
357 }
358 return static_cast<ffrt_queue_t>(&handler);
359 }
360
361 API_ATTRIBUTE((visibility("default")))
ffrt_get_current_queue(void)362 ffrt_queue_t ffrt_get_current_queue(void)
363 {
364 FFRT_COND_DO_ERR((EventHandlerAdapter::Instance()->GetCurrentEventHandler == nullptr),
365 return nullptr, "failed to load GetCurrentEventHandler Func.");
366 void* workerHandler = EventHandlerAdapter::Instance()->GetCurrentEventHandler();
367 FFRT_COND_DO_ERR((workerHandler == nullptr), return nullptr, "failed to get ArkTs worker queue.");
368 QueueHandler *handler = new (std::nothrow) QueueHandler(
369 "current_queue", nullptr, ffrt_queue_eventhandler_interactive);
370 FFRT_COND_DO_ERR((handler == nullptr), return nullptr, "failed to construct WorkerThreadQueueHandler");
371 handler->SetEventHandler(workerHandler);
372 return static_cast<ffrt_queue_t>(handler);
373 }
374
375 API_ATTRIBUTE((visibility("default")))
ffrt_queue_dump(ffrt_queue_t queue,const char * tag,char * buf,uint32_t len,bool history_info)376 int ffrt_queue_dump(ffrt_queue_t queue, const char* tag, char* buf, uint32_t len, bool history_info)
377 {
378 FFRT_COND_DO_ERR((queue == nullptr), return -1, "input invalid, queue is nullptr");
379 FFRT_COND_DO_ERR((tag == nullptr || buf == nullptr), return -1, "input invalid, tag or buf is nullptr");
380 QueueHandler* handler = static_cast<QueueHandler*>(queue);
381 return handler->Dump(tag, buf, len, history_info);
382 }
383
384 API_ATTRIBUTE((visibility("default")))
ffrt_queue_size_dump(ffrt_queue_t queue,ffrt_inner_queue_priority_t priority)385 int ffrt_queue_size_dump(ffrt_queue_t queue, ffrt_inner_queue_priority_t priority)
386 {
387 if (priority > ffrt_inner_queue_priority_idle || priority < ffrt_inner_queue_priority_vip) {
388 FFRT_LOGE("priority:%d is not valid", priority);
389 return -1;
390 }
391 FFRT_COND_DO_ERR((queue == nullptr), return -1, "input invalid, queue is nullptr");
392 QueueHandler* handler = static_cast<QueueHandler*>(queue);
393 return handler->DumpSize(priority);
394 }