1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <memory>
17 #include <vector>
18 #include <climits>
19
20 #include "ffrt_inner.h"
21
22 #include "internal_inc/osal.h"
23 #include "sync/io_poller.h"
24 #include "qos.h"
25 #include "sched/task_scheduler.h"
26 #include "task_attr_private.h"
27 #include "internal_inc/config.h"
28 #include "eu/osattr_manager.h"
29
30 #include "eu/worker_thread.h"
31 #include "dfx/log/ffrt_log_api.h"
32 #include "queue/serial_task.h"
33 #include "eu/func_manager.h"
34 #include "eu/sexecute_unit.h"
35 #include "util/ffrt_facade.h"
36 #ifdef FFRT_IO_TASK_SCHEDULER
37 #include "core/task_io.h"
38 #include "sync/poller.h"
39 #include "util/spmc_queue.h"
40 #endif
41 #include "tm/task_factory.h"
42
43 namespace ffrt {
submit_impl(bool has_handle,ffrt_task_handle_t & handle,ffrt_function_header_t * f,const ffrt_deps_t * ins,const ffrt_deps_t * outs,const task_attr_private * attr)44 inline void submit_impl(bool has_handle, ffrt_task_handle_t &handle, ffrt_function_header_t *f,
45 const ffrt_deps_t *ins, const ffrt_deps_t *outs, const task_attr_private *attr)
46 {
47 FFRTFacade::GetDMInstance().onSubmit(has_handle, handle, f, ins, outs, attr);
48 }
49
50 API_ATTRIBUTE((visibility("default")))
sync_io(int fd)51 void sync_io(int fd)
52 {
53 ffrt_wait_fd(fd);
54 }
55
56 API_ATTRIBUTE((visibility("default")))
set_trace_tag(const char * name)57 void set_trace_tag(const char* name)
58 {
59 CPUEUTask* curTask = ffrt::ExecuteCtx::Cur()->task;
60 if (curTask != nullptr) {
61 curTask->SetTraceTag(name);
62 }
63 }
64
65 API_ATTRIBUTE((visibility("default")))
clear_trace_tag()66 void clear_trace_tag()
67 {
68 CPUEUTask* curTask = ffrt::ExecuteCtx::Cur()->task;
69 if (curTask != nullptr) {
70 curTask->ClearTraceTag();
71 }
72 }
73
create_delay_deps(ffrt_task_handle_t & handle,const ffrt_deps_t * in_deps,const ffrt_deps_t * out_deps,task_attr_private * p)74 void create_delay_deps(
75 ffrt_task_handle_t &handle, const ffrt_deps_t *in_deps, const ffrt_deps_t *out_deps, task_attr_private *p)
76 {
77 // setting dependences is not supportted for delayed task
78 if (unlikely(((in_deps != nullptr) && (in_deps->len != 0)) || ((out_deps != nullptr) && (out_deps->len != 0)))) {
79 FFRT_LOGE("delayed task not support dependence, in_deps/out_deps ignored.");
80 }
81
82 // delay task
83 uint64_t delayUs = p->delay_;
84 std::function<void()> &&func = [delayUs]() {
85 this_task::sleep_for(std::chrono::microseconds(delayUs));
86 FFRT_LOGI("submit task delay time [%d us] has ended.", delayUs);
87 };
88 ffrt_function_header_t *delay_func = create_function_wrapper(std::move(func));
89 submit_impl(true, handle, delay_func, nullptr, nullptr, reinterpret_cast<task_attr_private *>(p));
90 }
91 } // namespace ffrt
92
93 #ifdef __cplusplus
94 extern "C" {
95 #endif
96 API_ATTRIBUTE((visibility("default")))
ffrt_task_attr_init(ffrt_task_attr_t * attr)97 int ffrt_task_attr_init(ffrt_task_attr_t *attr)
98 {
99 if (unlikely(!attr)) {
100 FFRT_LOGE("attr should be a valid address");
101 return -1;
102 }
103 static_assert(sizeof(ffrt::task_attr_private) <= ffrt_task_attr_storage_size,
104 "size must be less than ffrt_task_attr_storage_size");
105
106 new (attr)ffrt::task_attr_private();
107 return 0;
108 }
109
110 API_ATTRIBUTE((visibility("default")))
ffrt_task_attr_destroy(ffrt_task_attr_t * attr)111 void ffrt_task_attr_destroy(ffrt_task_attr_t *attr)
112 {
113 if (unlikely(!attr)) {
114 FFRT_LOGE("attr should be a valid address");
115 return;
116 }
117 auto p = reinterpret_cast<ffrt::task_attr_private *>(attr);
118 p->~task_attr_private();
119 }
120
121 API_ATTRIBUTE((visibility("default")))
ffrt_task_attr_set_name(ffrt_task_attr_t * attr,const char * name)122 void ffrt_task_attr_set_name(ffrt_task_attr_t *attr, const char *name)
123 {
124 if (unlikely(!attr || !name)) {
125 FFRT_LOGE("attr or name not valid");
126 return;
127 }
128 (reinterpret_cast<ffrt::task_attr_private *>(attr))->name_ = name;
129 }
130
131 API_ATTRIBUTE((visibility("default")))
ffrt_task_attr_get_name(const ffrt_task_attr_t * attr)132 const char *ffrt_task_attr_get_name(const ffrt_task_attr_t *attr)
133 {
134 if (unlikely(!attr)) {
135 FFRT_LOGE("attr should be a valid address");
136 return nullptr;
137 }
138 ffrt_task_attr_t *p = const_cast<ffrt_task_attr_t *>(attr);
139 return (reinterpret_cast<ffrt::task_attr_private *>(p))->name_.c_str();
140 }
141
142 API_ATTRIBUTE((visibility("default")))
ffrt_task_attr_set_qos(ffrt_task_attr_t * attr,ffrt_qos_t qos)143 void ffrt_task_attr_set_qos(ffrt_task_attr_t *attr, ffrt_qos_t qos)
144 {
145 if (unlikely(!attr)) {
146 FFRT_LOGE("attr should be a valid address");
147 return;
148 }
149 (reinterpret_cast<ffrt::task_attr_private *>(attr))->qos_map = ffrt::QoSMap(qos);
150 }
151
152 API_ATTRIBUTE((visibility("default")))
ffrt_task_attr_get_qos(const ffrt_task_attr_t * attr)153 ffrt_qos_t ffrt_task_attr_get_qos(const ffrt_task_attr_t *attr)
154 {
155 if (unlikely(!attr)) {
156 FFRT_LOGE("attr should be a valid address");
157 return static_cast<int>(ffrt_qos_default);
158 }
159 ffrt_task_attr_t *p = const_cast<ffrt_task_attr_t *>(attr);
160 return (reinterpret_cast<ffrt::task_attr_private *>(p))->qos_map.m_qos;
161 }
162
163 API_ATTRIBUTE((visibility("default")))
ffrt_task_attr_set_delay(ffrt_task_attr_t * attr,uint64_t delay_us)164 void ffrt_task_attr_set_delay(ffrt_task_attr_t *attr, uint64_t delay_us)
165 {
166 if (unlikely(!attr)) {
167 FFRT_LOGE("attr should be a valid address");
168 return;
169 }
170 (reinterpret_cast<ffrt::task_attr_private *>(attr))->delay_ = delay_us;
171 }
172
173 API_ATTRIBUTE((visibility("default")))
ffrt_task_attr_get_delay(const ffrt_task_attr_t * attr)174 uint64_t ffrt_task_attr_get_delay(const ffrt_task_attr_t *attr)
175 {
176 if (unlikely(!attr)) {
177 FFRT_LOGE("attr should be a valid address");
178 return 0;
179 }
180 ffrt_task_attr_t *p = const_cast<ffrt_task_attr_t *>(attr);
181 return (reinterpret_cast<ffrt::task_attr_private *>(p))->delay_;
182 }
183
184 // submit
185 API_ATTRIBUTE((visibility("default")))
ffrt_alloc_auto_managed_function_storage_base(ffrt_function_kind_t kind)186 void *ffrt_alloc_auto_managed_function_storage_base(ffrt_function_kind_t kind)
187 {
188 if (kind == ffrt_function_kind_general) {
189 return ffrt::TaskFactory::Alloc()->func_storage;
190 }
191 return ffrt::SimpleAllocator<ffrt::SerialTask>::allocMem()->func_storage;
192 }
193
194 API_ATTRIBUTE((visibility("default")))
ffrt_submit_base(ffrt_function_header_t * f,const ffrt_deps_t * in_deps,const ffrt_deps_t * out_deps,const ffrt_task_attr_t * attr)195 void ffrt_submit_base(ffrt_function_header_t *f, const ffrt_deps_t *in_deps, const ffrt_deps_t *out_deps,
196 const ffrt_task_attr_t *attr)
197 {
198 if (unlikely(!f)) {
199 FFRT_LOGE("function handler should not be empty");
200 return;
201 }
202 ffrt_task_handle_t handle;
203 ffrt::task_attr_private *p = reinterpret_cast<ffrt::task_attr_private *>(const_cast<ffrt_task_attr_t *>(attr));
204 if (likely(attr == nullptr || ffrt_task_attr_get_delay(attr) == 0)) {
205 ffrt::submit_impl(false, handle, f, in_deps, out_deps, p);
206 return;
207 }
208
209 // task after delay
210 ffrt_task_handle_t delay_handle;
211 ffrt::create_delay_deps(delay_handle, in_deps, out_deps, p);
212 std::vector<ffrt_dependence_t> deps = {{ffrt_dependence_task, delay_handle}};
213 ffrt_deps_t delay_deps {static_cast<uint32_t>(deps.size()), deps.data()};
214 ffrt::submit_impl(false, handle, f, &delay_deps, nullptr, p);
215 ffrt_task_handle_destroy(delay_handle);
216 }
217
218 API_ATTRIBUTE((visibility("default")))
ffrt_submit_h_base(ffrt_function_header_t * f,const ffrt_deps_t * in_deps,const ffrt_deps_t * out_deps,const ffrt_task_attr_t * attr)219 ffrt_task_handle_t ffrt_submit_h_base(ffrt_function_header_t *f, const ffrt_deps_t *in_deps,
220 const ffrt_deps_t *out_deps, const ffrt_task_attr_t *attr)
221 {
222 if (unlikely(!f)) {
223 FFRT_LOGE("function handler should not be empty");
224 return nullptr;
225 }
226 ffrt_task_handle_t handle = nullptr;
227 ffrt::task_attr_private *p = reinterpret_cast<ffrt::task_attr_private *>(const_cast<ffrt_task_attr_t *>(attr));
228 if (likely(attr == nullptr || ffrt_task_attr_get_delay(attr) == 0)) {
229 ffrt::submit_impl(true, handle, f, in_deps, out_deps, p);
230 return handle;
231 }
232
233 // task after delay
234 ffrt_task_handle_t delay_handle = nullptr;
235 ffrt::create_delay_deps(delay_handle, in_deps, out_deps, p);
236 std::vector<ffrt_dependence_t> deps = {{ffrt_dependence_task, delay_handle}};
237 ffrt_deps_t delay_deps {static_cast<uint32_t>(deps.size()), deps.data()};
238 ffrt::submit_impl(true, handle, f, &delay_deps, nullptr, p);
239 ffrt_task_handle_destroy(delay_handle);
240 return handle;
241 }
242
243 API_ATTRIBUTE((visibility("default")))
ffrt_task_handle_destroy(ffrt_task_handle_t handle)244 void ffrt_task_handle_destroy(ffrt_task_handle_t handle)
245 {
246 if (!handle) {
247 FFRT_LOGE("input task handle is invalid");
248 return;
249 }
250 static_cast<ffrt::CPUEUTask*>(handle)->DecDeleteRef();
251 }
252
253 // wait
254 API_ATTRIBUTE((visibility("default")))
ffrt_wait_deps(const ffrt_deps_t * deps)255 void ffrt_wait_deps(const ffrt_deps_t *deps)
256 {
257 if (unlikely(!deps)) {
258 FFRT_LOGE("deps should not be empty");
259 return;
260 }
261 std::vector<ffrt_dependence_t> v(deps->len);
262 for (uint64_t i = 0; i < deps->len; ++i) {
263 v[i] = deps->items[i];
264 }
265 ffrt_deps_t d = { deps->len, v.data() };
266 ffrt::FFRTFacade::GetDMInstance().onWait(&d);
267 }
268
269 API_ATTRIBUTE((visibility("default")))
ffrt_wait()270 void ffrt_wait()
271 {
272 ffrt::FFRTFacade::GetDMInstance().onWait();
273 }
274
275 API_ATTRIBUTE((visibility("default")))
ffrt_set_cgroup_attr(ffrt_qos_t qos,ffrt_os_sched_attr * attr)276 int ffrt_set_cgroup_attr(ffrt_qos_t qos, ffrt_os_sched_attr *attr)
277 {
278 if (unlikely(!attr)) {
279 FFRT_LOGE("attr should not be empty");
280 return -1;
281 }
282 ffrt::QoS _qos = ffrt::QoS(ffrt::QoSMap(qos).m_qos);
283 return ffrt::OSAttrManager::Instance()->UpdateSchedAttr(_qos, attr);
284 }
285
286 API_ATTRIBUTE((visibility("default")))
ffrt_set_cpu_worker_max_num(ffrt_qos_t qos,uint32_t num)287 int ffrt_set_cpu_worker_max_num(ffrt_qos_t qos, uint32_t num)
288 {
289 ffrt::QoS _qos = ffrt::QoS(ffrt::QoSMap(qos).m_qos);
290 if (((qos != ffrt::qos_default) && (_qos() == ffrt::qos_default)) || (qos <= ffrt::qos_inherit)) {
291 FFRT_LOGE("qos[%d] is invalid.", qos);
292 return -1;
293 }
294 ffrt::CPUMonitor *monitor = ffrt::FFRTFacade::GetEUInstance().GetCPUMonitor();
295 return monitor->SetWorkerMaxNum(_qos, num);
296 }
297
298 API_ATTRIBUTE((visibility("default")))
ffrt_set_worker_stack_size(ffrt_qos_t qos,size_t stack_size)299 ffrt_error_t ffrt_set_worker_stack_size(ffrt_qos_t qos, size_t stack_size)
300 {
301 if (qos < ffrt::QoS::Min() || qos >= ffrt::QoS::Max() || stack_size < PTHREAD_STACK_MIN) {
302 FFRT_LOGE("qos [%d] or stack size [%d] is invalid.", qos, stack_size);
303 return ffrt_error_inval;
304 }
305
306 ffrt::WorkerGroupCtl* groupCtl = ffrt::ExecuteUnit::Instance().GetGroupCtl();
307 if (!groupCtl[qos].threads.empty()) {
308 FFRT_LOGE("Stack size can be set only when there is no worker.");
309 return ffrt_error;
310 }
311
312 size_t pageSize = getpagesize();
313 groupCtl[qos].workerStackSize = (stack_size - 1 + pageSize) & -pageSize;
314 return ffrt_success;
315 }
316
317 API_ATTRIBUTE((visibility("default")))
ffrt_this_task_update_qos(ffrt_qos_t qos)318 int ffrt_this_task_update_qos(ffrt_qos_t qos)
319 {
320 ffrt::QoS _qos = ffrt::QoS(ffrt::QoSMap(qos).m_qos);
321 auto curTask = ffrt::ExecuteCtx::Cur()->task;
322 if (curTask == nullptr) {
323 FFRT_LOGW("task is nullptr");
324 return 1;
325 }
326
327 if (_qos() == curTask->qos) {
328 FFRT_LOGW("the target qos is euqal to current qos, no need update");
329 return 0;
330 }
331
332 curTask->SetQos(_qos);
333 ffrt_yield();
334
335 return 0;
336 }
337
338 API_ATTRIBUTE((visibility("default")))
ffrt_this_task_get_id()339 uint64_t ffrt_this_task_get_id()
340 {
341 auto curTask = ffrt::ExecuteCtx::Cur()->task;
342 if (curTask == nullptr) {
343 return 0;
344 }
345
346 return curTask->gid;
347 }
348
349 API_ATTRIBUTE((visibility("default")))
ffrt_skip(ffrt_task_handle_t handle)350 int ffrt_skip(ffrt_task_handle_t handle)
351 {
352 if (!handle) {
353 FFRT_LOGE("input ffrt task handle is invalid.");
354 return -1;
355 }
356 ffrt::CPUEUTask *task = static_cast<ffrt::CPUEUTask*>(handle);
357 auto exp = ffrt::SkipStatus::SUBMITTED;
358 if (__atomic_compare_exchange_n(&task->skipped, &exp, ffrt::SkipStatus::SKIPPED, 0, __ATOMIC_ACQUIRE,
359 __ATOMIC_RELAXED)) {
360 return 0;
361 }
362 FFRT_LOGE("skip task [%lu] faild", task->gid);
363 return 1;
364 }
365
366 #ifdef FFRT_IO_TASK_SCHEDULER
367 API_ATTRIBUTE((visibility("default")))
ffrt_poller_register(int fd,uint32_t events,void * data,ffrt_poller_cb cb)368 int ffrt_poller_register(int fd, uint32_t events, void* data, ffrt_poller_cb cb)
369 {
370 ffrt::QoS qos = ffrt::ExecuteCtx::Cur()->qos;
371 int ret = ffrt::PollerProxy::Instance()->GetPoller(qos).AddFdEvent(events, fd, data, cb);
372 if (ret == 0) {
373 ffrt::ExecuteUnit::Instance().NotifyLocalTaskAdded(qos);
374 }
375 return ret;
376 }
377
378 API_ATTRIBUTE((visibility("default")))
ffrt_poller_deregister(int fd)379 int ffrt_poller_deregister(int fd)
380 {
381 ffrt::QoS qos = ffrt::ExecuteCtx::Cur()->qos;
382 return ffrt::PollerProxy::Instance()->GetPoller(qos).DelFdEvent(fd);
383 }
384
385 API_ATTRIBUTE((visibility("default")))
ffrt_poller_wakeup()386 void ffrt_poller_wakeup()
387 {
388 ffrt::QoS qos = ffrt::ExecuteCtx::Cur()->qos;
389 ffrt::PollerProxy::Instance()->GetPoller(qos).WakeUp();
390 }
391
392 API_ATTRIBUTE((visibility("default")))
ffrt_timer_start(uint64_t timeout,void * data,ffrt_timer_cb cb)393 int ffrt_timer_start(uint64_t timeout, void* data, ffrt_timer_cb cb)
394 {
395 ffrt::QoS qos = ffrt::ExecuteCtx::Cur()->qos;
396 int handle = ffrt::PollerProxy::Instance()->GetPoller(qos).RegisterTimer(timeout, data, cb);
397 if (handle >= 0) {
398 ffrt::ExecuteUnit::Instance().NotifyLocalTaskAdded(qos);
399 }
400 return handle;
401 }
402
403 API_ATTRIBUTE((visibility("default")))
ffrt_timer_stop(int handle)404 void ffrt_timer_stop(int handle)
405 {
406 ffrt::QoS qos = ffrt::ExecuteCtx::Cur()->qos;
407 ffrt::PollerProxy::Instance()->GetPoller(qos).DeregisterTimer(handle);
408 }
409
410 API_ATTRIBUTE((visibility("default")))
ffrt_timer_query(int handle)411 ffrt_timer_query_t ffrt_timer_query(int handle)
412 {
413 ffrt::QoS qos = ffrt::ExecuteCtx::Cur()->qos;
414 return ffrt::PollerProxy::Instance()->GetPoller(qos).GetTimerStatus(handle);
415 }
416 #endif
417
418 API_ATTRIBUTE((visibility("default")))
ffrt_executor_task_submit(ffrt_executor_task_t * task,const ffrt_task_attr_t * attr)419 void ffrt_executor_task_submit(ffrt_executor_task_t *task, const ffrt_task_attr_t *attr)
420 {
421 if (task == nullptr) {
422 FFRT_LOGE("function handler should not be empty");
423 return;
424 }
425 ffrt::task_attr_private *p = reinterpret_cast<ffrt::task_attr_private *>(const_cast<ffrt_task_attr_t *>(attr));
426 if (likely(attr == nullptr || ffrt_task_attr_get_delay(attr) == 0)) {
427 ffrt::FFRTFacade::GetDMInstance().onSubmitUV(task, p);
428 return;
429 }
430 FFRT_LOGE("uv function not supports delay");
431 }
432
433 API_ATTRIBUTE((visibility("default")))
ffrt_executor_task_register_func(ffrt_executor_task_func func,ffrt_executor_task_type_t type)434 void ffrt_executor_task_register_func(ffrt_executor_task_func func, ffrt_executor_task_type_t type)
435 {
436 ffrt::FuncManager* func_mg = ffrt::FuncManager::Instance();
437 func_mg->insert(type, func);
438 }
439
440 API_ATTRIBUTE((visibility("default")))
ffrt_executor_task_cancel(ffrt_executor_task_t * task,const ffrt_qos_t qos)441 int ffrt_executor_task_cancel(ffrt_executor_task_t *task, const ffrt_qos_t qos)
442 {
443 if (task == nullptr) {
444 FFRT_LOGE("function handler should not be empty");
445 return 0;
446 }
447 ffrt::QoS _qos = ffrt::QoS(qos);
448
449 ffrt::LinkedList* node = reinterpret_cast<ffrt::LinkedList *>(&task->wq);
450 ffrt::FFRTScheduler* sch = ffrt::FFRTScheduler::Instance();
451 return static_cast<int>(sch->RemoveNode(node, _qos));
452 }
453 #ifdef __cplusplus
454 }
455 #endif